from __future__ import annotations

import base64
import json
import mimetypes
import os
import re
import shutil
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, unquote, urlparse, urlunparse

import click
import requests

SENSITIVE_HEADERS = {"cookie", "authorization", "x-csrf-token", "x-xsrf-token"}
QIANNIU_HOST_HINTS = ("qianniu", "taobao", "tmall", "alimama", "sycm", "alibaba")
LIST_HINTS = (
    "template", "report", "qushu", "fetch", "query", "list", "取数", "模板", "报表",
    "%E5%8F%96%E6%95%B0", "%E6%A8%A1%E6%9D%BF", "%E6%8A%A5%E8%A1%A8",
)
DOWNLOAD_HINTS = (
    "download", "export", "excel", "xlsx", "xls", "csv", "导出", "下载",
    "%E5%AF%BC%E5%87%BA", "%E4%B8%8B%E8%BD%BD",
)
EXCEL_MIMES = (
    "application/vnd.ms-excel",
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    "application/octet-stream",
    "text/csv",
)


@dataclass
class Candidate:
    kind: str
    method: str
    url: str
    reason: str
    request_headers: dict[str, str]
    post_data: str | None = None
    response_mime: str | None = None
    response_status: int | None = None
    suggested_filename: str | None = None

    def to_json(self) -> dict[str, Any]:
        return {
            "kind": self.kind,
            "method": self.method,
            "url": self.url,
            "reason": self.reason,
            "request_headers": self.request_headers,
            "post_data": self.post_data,
            "response_mime": self.response_mime,
            "response_status": self.response_status,
            "suggested_filename": self.suggested_filename,
        }


def _json_print(payload: Any) -> None:
    click.echo(json.dumps(payload, ensure_ascii=False, indent=2))


def _load_json(path: str | Path) -> Any:
    return json.loads(Path(path).expanduser().read_text(encoding="utf-8"))


def _safe_headers(headers: list[dict[str, str]] | dict[str, str]) -> dict[str, str]:
    if isinstance(headers, dict):
        pairs = headers.items()
    else:
        pairs = ((h.get("name", ""), h.get("value", "")) for h in headers)
    out: dict[str, str] = {}
    for key, value in pairs:
        if not key:
            continue
        if key.lower() in SENSITIVE_HEADERS:
            continue
        if key.lower().startswith(":"):
            continue
        out[key] = value
    return out


def _raw_request_headers(headers: list[dict[str, str]] | dict[str, str]) -> dict[str, str]:
    if isinstance(headers, dict):
        pairs = headers.items()
    else:
        pairs = ((h.get("name", ""), h.get("value", "")) for h in headers)
    out = {}
    for key, value in pairs:
        if key and not key.lower().startswith(":"):
            out[key] = value
    return out


def _header_value(headers: list[dict[str, str]] | dict[str, str], name: str) -> str | None:
    lname = name.lower()
    if isinstance(headers, dict):
        for key, value in headers.items():
            if key.lower() == lname:
                return value
        return None
    for item in headers:
        if item.get("name", "").lower() == lname:
            return item.get("value")
    return None


def _entry_text(entry: dict[str, Any]) -> str:
    req = entry.get("request", {})
    res = entry.get("response", {})
    parts = [
        req.get("url", ""),
        unquote(req.get("url", "")),
        str(req.get("postData", {}).get("text", "")),
        unquote(str(req.get("postData", {}).get("text", ""))),
        str(res.get("content", {}).get("mimeType", "")),
        str(_header_value(res.get("headers", []), "content-disposition") or ""),
    ]
    return "\n".join(parts).lower()


def _host_matches(url: str) -> bool:
    host = urlparse(url).netloc.lower()
    return any(hint in host for hint in QIANNIU_HOST_HINTS)


def _infer_filename(entry: dict[str, Any], index: int) -> str:
    req = entry.get("request", {})
    res = entry.get("response", {})
    disposition = _header_value(res.get("headers", []), "content-disposition") or ""
    match = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)', disposition, flags=re.I)
    if match:
        name = unquote(match.group(1)).strip().strip('"')
        if name:
            return _sanitize_filename(name)
    path = urlparse(req.get("url", "")).path
    suffix = Path(path).suffix
    if suffix.lower() not in {".xls", ".xlsx", ".csv"}:
        mime = res.get("content", {}).get("mimeType", "")
        suffix = mimetypes.guess_extension(mime.split(";")[0]) or ".xlsx"
        if suffix == ".xls" and "spreadsheetml" in mime:
            suffix = ".xlsx"
    return f"qianniu_export_{index:03d}{suffix or '.xlsx'}"


def _sanitize_filename(name: str) -> str:
    return re.sub(r"[\\/:*?\"<>|]+", "_", name).strip()[:180] or "qianniu_export.xlsx"


def _classify_entry(entry: dict[str, Any], index: int) -> Candidate | None:
    req = entry.get("request", {})
    res = entry.get("response", {})
    url = req.get("url", "")
    if not url or not _host_matches(url):
        return None

    text = _entry_text(entry)
    method = req.get("method", "GET")
    status = res.get("status")
    mime = res.get("content", {}).get("mimeType", "")
    reasons = []
    is_download = False
    is_list = False

    if any(h.lower() in text for h in DOWNLOAD_HINTS):
        is_download = True
        reasons.append("download/export keyword")
    if any((mime or "").lower().startswith(m) for m in EXCEL_MIMES):
        is_download = True
        reasons.append(f"excel-like mime {mime}")
    disposition = _header_value(res.get("headers", []), "content-disposition") or ""
    if re.search(r"\.xlsx?|\.csv|attachment", disposition, flags=re.I):
        is_download = True
        reasons.append("content-disposition attachment/excel")

    if any(h.lower() in text for h in LIST_HINTS):
        is_list = True
        reasons.append("template/report/list keyword")

    if not (is_download or is_list):
        return None

    post_text = req.get("postData", {}).get("text")
    return Candidate(
        kind="download" if is_download else "list_or_template",
        method=method,
        url=url,
        reason="; ".join(reasons),
        request_headers=_safe_headers(req.get("headers", [])),
        post_data=post_text,
        response_mime=mime,
        response_status=status,
        suggested_filename=_infer_filename(entry, index) if is_download else None,
    )


def scan_har_file(path: str | Path) -> dict[str, Any]:
    payload = _load_json(path)
    entries = payload.get("log", {}).get("entries", [])
    candidates: list[Candidate] = []
    for index, entry in enumerate(entries, 1):
        candidate = _classify_entry(entry, index)
        if candidate:
            candidates.append(candidate)
    return {
        "source_har": str(Path(path).expanduser()),
        "total_entries": len(entries),
        "candidate_count": len(candidates),
        "download_count": sum(1 for c in candidates if c.kind == "download"),
        "list_or_template_count": sum(1 for c in candidates if c.kind == "list_or_template"),
        "candidates": [c.to_json() for c in candidates],
        "notes": [
            "敏感 Cookie/Authorization 请求头不会写入 manifest；导出时请单独传 cookie 文件。",
            "manifest 是从浏览器 HAR 推断出的候选接口，需要用 dry-run 或小批量导出验证。",
        ],
    }


TEMPLATE_ID_FIELDS = ("templateId", "template_id", "id", "reportId", "report_id", "taskId", "task_id")
TEMPLATE_NAME_FIELDS = ("templateName", "template_name", "name", "reportName", "report_name", "title", "displayName")
DOWNLOAD_URL_FIELDS = ("downloadUrl", "download_url", "exportUrl", "export_url", "fileUrl", "file_url", "url")


def _walk_dict_lists(payload: Any) -> list[list[dict[str, Any]]]:
    """Return all JSON arrays whose members are dicts, preserving traversal order."""
    found: list[list[dict[str, Any]]] = []
    if isinstance(payload, list):
        dict_items = [item for item in payload if isinstance(item, dict)]
        if dict_items:
            found.append(dict_items)
        for item in payload:
            found.extend(_walk_dict_lists(item))
    elif isinstance(payload, dict):
        # Prefer common API envelope keys first.
        for key in ("data", "result", "records", "items", "list", "rows", "templateList", "template_list"):
            if key in payload:
                found.extend(_walk_dict_lists(payload[key]))
        for key, value in payload.items():
            if key not in {"data", "result", "records", "items", "list", "rows", "templateList", "template_list"}:
                found.extend(_walk_dict_lists(value))
    return found


def _score_template_item(item: dict[str, Any]) -> int:
    keys = set(item)
    score = 0
    if keys.intersection(TEMPLATE_ID_FIELDS):
        score += 3
    if keys.intersection(TEMPLATE_NAME_FIELDS):
        score += 2
    if keys.intersection(DOWNLOAD_URL_FIELDS):
        score += 2
    if any("template" in k.lower() or "report" in k.lower() for k in keys):
        score += 1
    return score


def _extract_template_items(payload: Any) -> list[dict[str, Any]]:
    arrays = _walk_dict_lists(payload)
    if not arrays:
        return []
    scored = sorted(arrays, key=lambda arr: (sum(_score_template_item(x) for x in arr), len(arr)), reverse=True)
    best = scored[0]
    return [item for item in best if _score_template_item(item) > 0] or best


def _pick_field(item: dict[str, Any], explicit: str | None, candidates: tuple[str, ...]) -> str | None:
    if explicit:
        return explicit if explicit in item else None
    for key in candidates:
        if key in item and item.get(key) not in (None, ""):
            return key
    # Case-insensitive fallback.
    lower = {k.lower(): k for k in item}
    for key in candidates:
        if key.lower() in lower and item.get(lower[key.lower()]) not in (None, ""):
            return lower[key.lower()]
    return None


def _replace_query_param(url: str, param: str, value: Any) -> str:
    parsed = urlparse(url)
    query = parse_qsl(parsed.query, keep_blank_values=True)
    replaced = False
    next_query: list[tuple[str, str]] = []
    for key, old_value in query:
        if key == param:
            next_query.append((key, str(value)))
            replaced = True
        else:
            next_query.append((key, old_value))
    if not replaced:
        next_query.append((param, str(value)))
    return urlunparse(parsed._replace(query=urlencode(next_query)))


def _infer_download_id_param(example_url: str) -> str:
    params = parse_qsl(urlparse(example_url).query, keep_blank_values=True)
    for preferred in ("id", "templateId", "template_id", "reportId", "taskId"):
        if any(key == preferred for key, _ in params):
            return preferred
    return params[0][0] if params else "id"


def _first_download_example(manifest: dict[str, Any]) -> dict[str, Any] | None:
    for candidate in manifest.get("candidates", []):
        if candidate.get("kind") == "download" and candidate.get("url"):
            return candidate
    return None


def _manifest_from_template_items(
    items: list[dict[str, Any]],
    *,
    download_example: dict[str, Any] | None = None,
    id_field: str | None = None,
    name_field: str | None = None,
    download_url_field: str | None = None,
    download_id_param: str | None = None,
) -> dict[str, Any]:
    candidates = []
    skipped = []
    example_url = download_example.get("url") if download_example else None
    id_param = download_id_param or (_infer_download_id_param(example_url) if example_url else "id")
    for index, item in enumerate(items, 1):
        url_field = _pick_field(item, download_url_field, DOWNLOAD_URL_FIELDS)
        item_id_field = _pick_field(item, id_field, TEMPLATE_ID_FIELDS)
        item_name_field = _pick_field(item, name_field, TEMPLATE_NAME_FIELDS)
        item_id = item.get(item_id_field) if item_id_field else None
        item_name = item.get(item_name_field) if item_name_field else None
        if url_field:
            url = str(item[url_field])
        elif example_url and item_id not in (None, ""):
            url = _replace_query_param(str(example_url), id_param, item_id)
        else:
            skipped.append({"index": index, "reason": "no download url field and no usable id/example URL", "item_keys": sorted(item.keys())})
            continue
        filename_base = _sanitize_filename(str(item_name or item_id or f"template_{index:03d}"))
        if not Path(filename_base).suffix:
            filename_base += ".xlsx"
        candidate = Candidate(
            kind="download",
            method=(download_example.get("method") if download_example else "GET") or "GET",
            url=url,
            reason="built from template list JSON" + (" and captured download example" if example_url and not url_field else " direct download URL field"),
            request_headers=dict(download_example.get("request_headers") or {}) if download_example else {},
            post_data=download_example.get("post_data") if download_example and (download_example.get("method") or "GET").upper() != "GET" else None,
            response_mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
            response_status=None,
            suggested_filename=filename_base,
        )
        candidates.append(candidate.to_json() | {"template_id": item_id, "template_name": item_name})
    return {
        "source": "template_list_json",
        "item_count": len(items),
        "candidate_count": len(candidates),
        "download_count": len(candidates),
        "list_or_template_count": 0,
        "candidates": candidates,
        "skipped_count": len(skipped),
        "skipped": skipped,
        "notes": [
            "由模板列表 JSON 生成下载 manifest；首次真实使用请先 --limit 1 验证。",
            "如果接口要求 CSRF/sign 参数，需在 HAR 中保留对应非敏感请求头或重新采集。",
        ],
    }


def _load_cookie_header(cookie_file: str | None) -> str | None:
    if not cookie_file:
        return None
    path = Path(cookie_file).expanduser()
    text = path.read_text(encoding="utf-8").strip()
    if not text:
        return None
    # JSON formats: {"Cookie":"a=b"} / {"cookie":"a=b"} / [{"name":"a","value":"b"}]
    try:
        data = json.loads(text)
    except json.JSONDecodeError:
        return text
    if isinstance(data, dict):
        for key in ("Cookie", "cookie"):
            if isinstance(data.get(key), str):
                return data[key]
        if all(isinstance(v, str) for v in data.values()):
            return "; ".join(f"{k}={v}" for k, v in data.items())
    if isinstance(data, list):
        pairs = []
        for item in data:
            if isinstance(item, dict) and item.get("name") and item.get("value") is not None:
                pairs.append(f"{item['name']}={item['value']}")
        return "; ".join(pairs)
    raise click.ClickException(f"Unsupported cookie file format: {cookie_file}")


def _is_excel_response(resp: requests.Response) -> bool:
    ctype = resp.headers.get("content-type", "").lower()
    dispo = resp.headers.get("content-disposition", "").lower()
    head = resp.content[:200].lower()
    return (
        any(m in ctype for m in EXCEL_MIMES)
        or "attachment" in dispo
        or b"<html" not in head and (resp.content[:2] == b"PK" or resp.content[:8].startswith(b"\xd0\xcf\x11\xe0"))
    )


def _extract_async_download_url(payload: Any) -> str | None:
    """Return a signed/temporary file URL from JSON payloads such as {data:{url:...}}."""
    if isinstance(payload, dict):
        data = payload.get("data")
        if isinstance(data, dict) and isinstance(data.get("url"), str) and data["url"].startswith("http"):
            return data["url"]
        for value in payload.values():
            found = _extract_async_download_url(value)
            if found:
                return found
    elif isinstance(payload, list):
        for item in payload:
            found = _extract_async_download_url(item)
            if found:
                return found
    return None


def _derive_poll_url(download_url: str) -> str | None:
    if "download.json" in download_url:
        return download_url.replace("download.json", "queryDownloadUrl.json", 1)
    if "queryDownloadUrl.json" in download_url:
        return download_url
    return None


def _filename_from_response_or_url(resp: requests.Response, fallback: str) -> str:
    disposition = resp.headers.get("content-disposition", "")
    match = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)', disposition, flags=re.I)
    if match:
        name = unquote(match.group(1)).strip().strip('"')
        if name:
            return _sanitize_filename(name)
    path_name = Path(urlparse(resp.url).path).name
    if path_name:
        return _sanitize_filename(unquote(path_name))
    return _sanitize_filename(fallback)


@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.version_option()
def main() -> None:
    """千牛“数据-自助分析-取数报表-取数模板”Excel 导出 CLI harness."""


@main.command()
def doctor() -> None:
    """检查当前 CLI 运行环境。"""
    _json_print({
        "ok": True,
        "python": sys.version.split()[0],
        "cwd": os.getcwd(),
        "requests": requests.__version__,
        "note": "本 CLI 不直接保存账号密码；登录态通过外部 cookie 文件或 HAR 推断流程提供。",
    })


@main.command("operator-brief")
@click.option("--out", "out_path", type=click.Path(path_type=Path), help="写出给 visual-operator 的操作简报。")
def operator_brief(out_path: Path | None) -> None:
    """生成采集千牛页面/HAR 的操作简报；IT profile 不执行本机 GUI 点击。"""
    text = """# visual-operator 操作简报：千牛取数模板导出接口采集\n\n目标：在已登录千牛/生意参谋账号中进入：数据 → 自助分析 → 取数报表 → 取数模板。\n\n允许动作：\n1. 打开对应页面并确认已登录。\n2. 打开浏览器开发者工具 Network，启用 Preserve log。\n3. 在页面中刷新/翻页/搜索，使所有“取数模板”列表请求出现。\n4. 手动点击 1~2 个模板的“导出/下载 Excel”，确认下载能成功。\n5. 导出 HAR 文件，命名为 qianniu_templates.har。\n6. 如页面原生支持批量下载，可优先使用页面功能下载全部 Excel。\n\n禁止/风险：\n- 不修改模板、不删除、不新建报表。\n- 不把 Cookie/Token 贴到聊天里；HAR 和 Cookie 文件只放到项目工作区受控目录。\n- 若出现验证码、短信、账号风控，停止并回报。\n\n交付给 IT：\n- HAR 文件路径。\n- 已下载 Excel 目录路径（如有）。\n- 页面中模板总数、是否分页、导出按钮文案。\n"""
    if out_path:
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(text, encoding="utf-8")
        click.echo(str(out_path))
    else:
        click.echo(text)


@main.command("scan-har")
@click.argument("har", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--out", "out_path", type=click.Path(path_type=Path), help="输出 manifest JSON 路径。")
def scan_har(har: Path, out_path: Path | None) -> None:
    """扫描浏览器 HAR，推断取数模板列表/下载候选接口。"""
    result = scan_har_file(har)
    if out_path:
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
        click.echo(str(out_path))
    else:
        _json_print(result)


@main.command("fetch-list")
@click.argument("manifest", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--cookie-file", type=click.Path(exists=True, dir_okay=False), help="Cookie header 文本或 JSON cookie 文件。")
@click.option("--out", "out_path", type=click.Path(path_type=Path), required=True, help="保存列表接口返回 JSON。")
@click.option("--candidate-index", type=int, default=1, show_default=True, help="使用第几个 list/template 候选接口。")
def fetch_list(manifest: Path, cookie_file: str | None, out_path: Path, candidate_index: int) -> None:
    """重放 HAR 中识别到的列表接口，获取真实取数模板列表 JSON。"""
    data = _load_json(manifest)
    list_candidates = [c for c in data.get("candidates", []) if c.get("kind") == "list_or_template"]
    if not list_candidates:
        raise click.ClickException("manifest 中没有 list/template 候选；请重新采集包含页面加载/刷新列表的 HAR。")
    if candidate_index < 1 or candidate_index > len(list_candidates):
        raise click.ClickException(f"candidate-index 超出范围：1..{len(list_candidates)}")
    item = list_candidates[candidate_index - 1]
    cookie_header = _load_cookie_header(cookie_file)
    headers = dict(item.get("request_headers") or {})
    if cookie_header:
        headers["Cookie"] = cookie_header
    method = (item.get("method") or "GET").upper()
    resp = requests.request(method, item["url"], headers=headers, data=item.get("post_data"), timeout=90)
    try:
        payload = resp.json()
    except Exception as exc:  # noqa: BLE001
        raise click.ClickException(f"列表接口没有返回 JSON：status={resp.status_code}, content-type={resp.headers.get('content-type')}, error={exc}") from exc
    out_path.parent.mkdir(parents=True, exist_ok=True)
    out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
    template_items = _extract_template_items(payload)
    _json_print({
        "status": resp.status_code,
        "out": str(out_path),
        "template_item_count_guess": len(template_items),
        "candidate_url": item.get("url"),
        "note": "已保存列表 JSON；下一步用 build-downloads 生成下载 manifest。",
    })


@main.command("build-downloads")
@click.argument("list_json", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--download-example-manifest", type=click.Path(exists=True, dir_okay=False, path_type=Path), help="包含一次真实下载请求的 scan-har manifest，用来推断下载 URL。")
@click.option("--download-example-url", help="手动指定下载 URL 模板；会替换其中的 id/templateId 等查询参数。")
@click.option("--download-id-param", help="下载 URL 里承载模板 ID 的查询参数名，默认自动推断。")
@click.option("--id-field", help="列表 JSON 中模板 ID 字段名，默认自动推断。")
@click.option("--name-field", help="列表 JSON 中模板名字段名，默认自动推断。")
@click.option("--download-url-field", help="列表 JSON 中直接下载 URL 字段名，默认自动推断。")
@click.option("--out", "out_path", type=click.Path(path_type=Path), required=True, help="输出下载 manifest。")
def build_downloads(
    list_json: Path,
    download_example_manifest: Path | None,
    download_example_url: str | None,
    download_id_param: str | None,
    id_field: str | None,
    name_field: str | None,
    download_url_field: str | None,
    out_path: Path,
) -> None:
    """从模板列表 JSON 生成逐个下载 Excel 的 manifest。"""
    payload = _load_json(list_json)
    items = _extract_template_items(payload)
    if not items:
        raise click.ClickException("未能在列表 JSON 中识别模板数组；请用 --id-field/--name-field 或检查返回结构。")
    download_example = None
    if download_example_manifest:
        download_example = _first_download_example(_load_json(download_example_manifest))
        if not download_example:
            raise click.ClickException("download-example-manifest 中没有 download 候选。")
    if download_example_url:
        download_example = {
            "method": "GET",
            "url": download_example_url,
            "request_headers": {},
            "post_data": None,
        }
    result = _manifest_from_template_items(
        items,
        download_example=download_example,
        id_field=id_field,
        name_field=name_field,
        download_url_field=download_url_field,
        download_id_param=download_id_param,
    )
    out_path.parent.mkdir(parents=True, exist_ok=True)
    out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
    _json_print({
        "out": str(out_path),
        "item_count": result["item_count"],
        "download_count": result["download_count"],
        "skipped_count": result["skipped_count"],
        "note": "下一步用 export-all --dry-run 预览，再 --limit 1 小批量验证。",
    })


@main.command("collect-downloads")
@click.option("--source-dir", type=click.Path(exists=True, file_okay=False, path_type=Path), default=Path("/Users/bot1/Downloads/My WangWang"), show_default=True, help="千牛默认下载目录。")
@click.option("--out-dir", type=click.Path(path_type=Path), default=Path("deliverables/operator_downloads"), show_default=True, help="收集到的项目目录。")
@click.option("--since-minutes", type=int, default=120, show_default=True, help="只收集最近 N 分钟修改的 Excel/CSV；0 表示不限制。")
@click.option("--copy/--move", "copy_mode", default=True, show_default=True, help="默认复制，不移动用户下载目录里的原文件。")
def collect_downloads(source_dir: Path, out_dir: Path, since_minutes: int, copy_mode: bool) -> None:
    """收集千牛默认下载目录里的 Excel/CSV 到项目 deliverables。"""
    exts = {".xls", ".xlsx", ".csv"}
    now = time.time()
    out_dir.mkdir(parents=True, exist_ok=True)
    copied = []
    skipped = []
    for src in sorted(source_dir.iterdir(), key=lambda p: p.stat().st_mtime if p.exists() else 0):
        if not src.is_file() or src.suffix.lower() not in exts:
            continue
        age_minutes = (now - src.stat().st_mtime) / 60
        if since_minutes and age_minutes > since_minutes:
            skipped.append({"file": str(src), "reason": f"older than {since_minutes} minutes"})
            continue
        dst = out_dir / src.name
        if copy_mode:
            shutil.copy2(src, dst)
            action = "copied"
        else:
            shutil.move(str(src), dst)
            action = "moved"
        copied.append({"action": action, "source": str(src), "file": str(dst), "bytes": dst.stat().st_size})
    _json_print({"source_dir": str(source_dir), "out_dir": str(out_dir), "collected_count": len(copied), "collected": copied, "skipped_count": len(skipped)})


@main.command("export-all")
@click.argument("manifest", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--cookie-file", type=click.Path(exists=True, dir_okay=False), help="Cookie header 文本或 JSON cookie 文件。")
@click.option("--out-dir", type=click.Path(path_type=Path), default=Path("exports"), show_default=True)
@click.option("--dry-run", is_flag=True, help="只列出将要请求的下载接口，不实际下载。")
@click.option("--limit", type=int, default=0, show_default=True, help="最多导出 N 个；0 表示不限制。")
@click.option("--poll-attempts", type=int, default=18, show_default=True, help="异步导出任务最多轮询次数。")
@click.option("--poll-seconds", type=float, default=5.0, show_default=True, help="异步导出任务轮询间隔秒数。")
def export_all(
    manifest: Path,
    cookie_file: str | None,
    out_dir: Path,
    dry_run: bool,
    limit: int,
    poll_attempts: int,
    poll_seconds: float,
) -> None:
    """按 manifest 中的下载候选逐个请求并保存 Excel，支持生意参谋异步下载轮询。"""
    data = _load_json(manifest)
    downloads = [c for c in data.get("candidates", []) if c.get("kind") == "download"]
    if limit and limit > 0:
        downloads = downloads[:limit]
    cookie_header = _load_cookie_header(cookie_file)
    plan = []
    for idx, item in enumerate(downloads, 1):
        plan.append({"index": idx, "method": item.get("method"), "url": item.get("url"), "filename": item.get("suggested_filename")})
    if dry_run:
        _json_print({"dry_run": True, "count": len(plan), "plan": plan})
        return
    if not downloads:
        raise click.ClickException("manifest 中没有 download 候选；请重新采集 HAR，至少手动导出一次 Excel。")
    out_dir.mkdir(parents=True, exist_ok=True)
    session = requests.Session()
    if cookie_header:
        session.headers.update({"Cookie": cookie_header})
    saved = []
    failed = []
    for idx, item in enumerate(downloads, 1):
        headers = dict(item.get("request_headers") or {})
        if cookie_header:
            headers["Cookie"] = cookie_header
        method = (item.get("method") or "GET").upper()
        url = item["url"]
        try:
            resp = session.request(method, url, headers=headers, data=item.get("post_data"), timeout=90)
            filename = _sanitize_filename(item.get("suggested_filename") or f"qianniu_export_{idx:03d}.xlsx")
            target = out_dir / filename
            if resp.status_code == 200 and _is_excel_response(resp):
                target.write_bytes(resp.content)
                saved.append({"index": idx, "file": str(target), "bytes": target.stat().st_size, "mode": "direct"})
                continue

            async_url = None
            json_payload: Any = None
            ctype = resp.headers.get("content-type", "")
            if "json" in ctype.lower():
                try:
                    json_payload = resp.json()
                    async_url = _extract_async_download_url(json_payload)
                except Exception:
                    json_payload = None

            poll_url = item.get("poll_url") or _derive_poll_url(url)
            if not async_url and poll_url:
                for attempt in range(1, max(1, poll_attempts) + 1):
                    poll_resp = session.get(poll_url, headers=headers, timeout=90)
                    if "json" not in poll_resp.headers.get("content-type", "").lower():
                        failed.append({
                            "index": idx,
                            "url": url,
                            "poll_url": poll_url,
                            "status": poll_resp.status_code,
                            "content_type": poll_resp.headers.get("content-type"),
                            "message": "poll response is not JSON",
                        })
                        break
                    poll_payload = poll_resp.json()
                    async_url = _extract_async_download_url(poll_payload)
                    if async_url:
                        break
                    if attempt < poll_attempts:
                        time.sleep(max(0.1, poll_seconds))

            if async_url:
                file_resp = session.get(async_url, timeout=180)
                if file_resp.status_code != 200 or not _is_excel_response(file_resp):
                    failed.append({
                        "index": idx,
                        "url": url,
                        "status": file_resp.status_code,
                        "content_type": file_resp.headers.get("content-type"),
                        "message": "resolved async URL did not return Excel",
                    })
                    continue
                final_name = filename if item.get("suggested_filename") else _filename_from_response_or_url(file_resp, filename)
                target = out_dir / _sanitize_filename(final_name)
                target.write_bytes(file_resp.content)
                saved.append({"index": idx, "file": str(target), "bytes": target.stat().st_size, "mode": "async"})
                continue

            failed.append({
                "index": idx,
                "url": url,
                "status": resp.status_code,
                "content_type": resp.headers.get("content-type"),
                "message": "response does not look like Excel and no async download URL was produced; login/session/anti-bot may be required",
            })
        except Exception as exc:  # noqa: BLE001 - CLI should report and continue
            failed.append({"index": idx, "url": url, "error": repr(exc)})
    _json_print({"saved_count": len(saved), "failed_count": len(failed), "saved": saved, "failed": failed})


if __name__ == "__main__":
    main()
