#!/usr/bin/env python3
"""Import validated Liangzhu monthly-report supplemental export rows into PostgreSQL.

Purpose:
- keep monthly-report sidecar/export data as durable monthly snapshots;
- preserve raw payloads so next-year YoY can use the same monthly source rows even if
  a platform later stops exposing a report;
- keep this separate from the daily import tables to avoid changing daily pipeline口径.

This script runs locally on bot1 and uses ssh/scp to execute the actual DB importer
under postgres on wwyl-cloud. It prints no secrets.
"""
from __future__ import annotations

import argparse
import calendar
import hashlib
import json
import shlex
import subprocess
import tempfile
import textwrap
from datetime import datetime
from pathlib import Path
from typing import Any

BASE = Path('/Users/bot1/Volumes/root_for_ai/AI工作区/良渚文化_月报_2026年5月_20260601_1123')
WORK = BASE / 'work'
SOURCE_FILES = {
    'store': WORK / 'store_monthly.json',
    'product': WORK / 'product_monthly.json',
    'promotion': WORK / 'promo_plan_rows.json',
    'traffic_source': WORK / 'flow_source_summary.json',
}
STORE_MAP = {
    '天猫': ('liangzhu_culture_flagship', '良渚文化旗舰店'),
    '淘宝': ('liangzhu_wenchuang_product_ops', '良渚博物院官方店'),
    '京东': ('liangzhu_jd', '良渚京东店'),
}


def run(cmd: list[str], timeout: int = 300) -> str:
    proc = subprocess.run(cmd, text=True, capture_output=True, timeout=timeout)
    if proc.returncode:
        raise RuntimeError(
            f"command failed rc={proc.returncode}: {shlex.join(cmd)}\n"
            f"STDOUT:\n{(proc.stdout or '')[-3000:]}\nSTDERR:\n{(proc.stderr or '')[-3000:]}"
        )
    return proc.stdout or ''


def n(v: Any) -> float | None:
    if v is None:
        return None
    s = str(v).strip().replace(',', '')
    if not s or s == '-' or s.upper() == 'NULL':
        return None
    pct = s.endswith('%')
    if pct:
        s = s[:-1]
    try:
        x = float(s)
        return x / 100 if pct else x
    except Exception:
        return None


def month_start(month: str) -> str:
    # Input: YYYY-MM
    dt = datetime.strptime(month, '%Y-%m')
    return dt.strftime('%Y-%m-01')


def month_end(month: str) -> str:
    dt = datetime.strptime(month, '%Y-%m')
    last = calendar.monthrange(dt.year, dt.month)[1]
    return f'{dt.year:04d}-{dt.month:02d}-{last:02d}'


def stable_value(v: Any) -> Any:
    if isinstance(v, float) and v.is_integer():
        return int(v)
    return v


def row_key(section: str, r: dict[str, Any], row_number: int) -> str:
    platform = str(r.get('平台') or '').strip()
    month = str(r.get('月份') or '').strip()
    if section == 'store':
        return '|'.join([section, month, platform, str(r.get('店铺名称') or STORE_MAP.get(platform, ('', ''))[1] or '')])
    if section == 'product':
        return '|'.join([section, month, platform, str(r.get('商品ID') or ''), str(r.get('商品名称') or '')[:80], str(r.get('来源文件') or '')])
    if section == 'promotion':
        return '|'.join([section, month, platform, str(r.get('场景名字') or ''), str(r.get('计划名字') or ''), str(r.get('来源文件') or ''), str(row_number)])
    if section == 'traffic_source':
        return '|'.join([section, month, platform, str(r.get('一级来源') or ''), str(r.get('二级来源') or '')])
    return '|'.join([section, month, platform, str(row_number)])


def build_stage(batch_key: str, out_path: Path) -> dict[str, Any]:
    rows_out: list[dict[str, Any]] = []
    counts: dict[str, int] = {}
    months: set[str] = set()
    for section, path in SOURCE_FILES.items():
        data = json.loads(path.read_text(encoding='utf-8'))
        if not isinstance(data, list):
            raise RuntimeError(f'{path} is not a list')
        for i, src in enumerate(data, 1):
            r = {k: stable_value(v) for k, v in src.items()}
            platform = str(r.get('平台') or '').strip()
            month = str(r.get('月份') or '').strip()
            if not platform or not month:
                raise RuntimeError(f'missing platform/month in {section} row {i}: {r}')
            store_code, default_store = STORE_MAP.get(platform, ('unknown', platform))
            store_name = str(r.get('店铺名称') or default_store)
            rk = row_key(section, r, i)
            material = json.dumps({'batch_key': batch_key, 'section': section, 'row_key': rk, 'payload': r}, ensure_ascii=False, sort_keys=True)
            row_hash = hashlib.sha256(material.encode()).hexdigest()
            rows_out.append({
                'batch_key': batch_key,
                'section': section,
                'snapshot_month': month_start(month),
                'period_end': month_end(month),
                'source_platform': platform,
                'store_code': store_code,
                'store_name': store_name,
                'source_file': str(r.get('来源文件') or path.name),
                'row_number': i,
                'row_key': rk,
                'row_hash': row_hash,
                'payload': r,
            })
            counts[section] = counts.get(section, 0) + 1
            months.add(month)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with out_path.open('w', encoding='utf-8') as f:
        for r in rows_out:
            f.write(json.dumps(r, ensure_ascii=False) + '\n')
    return {
        'batch_key': batch_key,
        'stage_path': str(out_path),
        'rows': len(rows_out),
        'counts': counts,
        'months': sorted(months),
        'created_at': datetime.now().isoformat(),
    }


REMOTE_IMPORTER = r"""
#!/usr/bin/env python3
import json, sys
from pathlib import Path
import psycopg2
from psycopg2.extras import Json

DDL = '''
CREATE TABLE IF NOT EXISTS raw.monthly_report_export_row (
  raw_monthly_row_id BIGSERIAL PRIMARY KEY,
  batch_key TEXT NOT NULL,
  snapshot_month DATE NOT NULL,
  period_end DATE,
  section TEXT NOT NULL,
  source_platform TEXT NOT NULL,
  store_code TEXT,
  store_name TEXT,
  source_file TEXT,
  row_number INTEGER,
  row_key TEXT NOT NULL,
  row_hash TEXT NOT NULL UNIQUE,
  payload JSONB NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  UNIQUE(batch_key, section, row_key)
);
CREATE INDEX IF NOT EXISTS idx_monthly_export_month_section ON raw.monthly_report_export_row(snapshot_month, section, source_platform);
CREATE INDEX IF NOT EXISTS idx_monthly_export_payload_gin ON raw.monthly_report_export_row USING GIN(payload);

CREATE TABLE IF NOT EXISTS fact.monthly_store_snapshot (
  fact_id BIGSERIAL PRIMARY KEY,
  raw_monthly_row_id BIGINT UNIQUE REFERENCES raw.monthly_report_export_row(raw_monthly_row_id) ON DELETE CASCADE,
  batch_key TEXT NOT NULL,
  snapshot_month DATE NOT NULL,
  period_end DATE,
  source_platform TEXT NOT NULL,
  store_code TEXT,
  store_name TEXT,
  visitor_count NUMERIC,
  page_view_count NUMERIC,
  payment_amount NUMERIC,
  transaction_amount NUMERIC,
  refund_amount NUMERIC,
  net_payment_amount NUMERIC,
  payment_buyer_count NUMERIC,
  payment_item_count NUMERIC,
  payment_conversion_rate NUMERIC,
  customer_unit_price NUMERIC,
  add_cart_user_count NUMERIC,
  favorite_count NUMERIC,
  source_file TEXT,
  payload JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_monthly_store_snapshot_month ON fact.monthly_store_snapshot(snapshot_month, source_platform, store_code);

CREATE TABLE IF NOT EXISTS fact.monthly_product_snapshot (
  fact_id BIGSERIAL PRIMARY KEY,
  raw_monthly_row_id BIGINT UNIQUE REFERENCES raw.monthly_report_export_row(raw_monthly_row_id) ON DELETE CASCADE,
  batch_key TEXT NOT NULL,
  snapshot_month DATE NOT NULL,
  period_end DATE,
  source_platform TEXT NOT NULL,
  store_code TEXT,
  store_name TEXT,
  product_id TEXT,
  product_name TEXT,
  sku_or_article_no TEXT,
  new_product_category TEXT,
  visitor_count NUMERIC,
  page_view_count NUMERIC,
  favorite_user_count NUMERIC,
  add_cart_item_count NUMERIC,
  add_cart_user_count NUMERIC,
  payment_buyer_count NUMERIC,
  payment_item_count NUMERIC,
  payment_amount NUMERIC,
  transaction_amount NUMERIC,
  refund_amount NUMERIC,
  net_payment_amount NUMERIC,
  payment_conversion_rate NUMERIC,
  source_file TEXT,
  payload JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_monthly_product_snapshot_month ON fact.monthly_product_snapshot(snapshot_month, source_platform, store_code);
CREATE INDEX IF NOT EXISTS idx_monthly_product_snapshot_product ON fact.monthly_product_snapshot(product_id, snapshot_month);

CREATE TABLE IF NOT EXISTS fact.monthly_promotion_snapshot (
  fact_id BIGSERIAL PRIMARY KEY,
  raw_monthly_row_id BIGINT UNIQUE REFERENCES raw.monthly_report_export_row(raw_monthly_row_id) ON DELETE CASCADE,
  batch_key TEXT NOT NULL,
  snapshot_month DATE NOT NULL,
  period_end DATE,
  source_platform TEXT NOT NULL,
  store_code TEXT,
  store_name TEXT,
  scene_name TEXT,
  plan_name TEXT,
  spend_amount NUMERIC,
  gross_transaction_amount NUMERIC,
  net_transaction_amount NUMERIC,
  transaction_count NUMERIC,
  click_count NUMERIC,
  impression_count NUMERIC,
  favorite_add_cart_count NUMERIC,
  source_file TEXT,
  payload JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_monthly_promotion_snapshot_month ON fact.monthly_promotion_snapshot(snapshot_month, source_platform, scene_name);

CREATE TABLE IF NOT EXISTS fact.monthly_traffic_source_snapshot (
  fact_id BIGSERIAL PRIMARY KEY,
  raw_monthly_row_id BIGINT UNIQUE REFERENCES raw.monthly_report_export_row(raw_monthly_row_id) ON DELETE CASCADE,
  batch_key TEXT NOT NULL,
  snapshot_month DATE NOT NULL,
  period_end DATE,
  source_platform TEXT NOT NULL,
  store_code TEXT,
  store_name TEXT,
  source_l1 TEXT,
  source_l2 TEXT,
  visitor_count NUMERIC,
  payment_amount NUMERIC,
  payment_buyer_count NUMERIC,
  payment_conversion_rate NUMERIC,
  uv_value NUMERIC,
  source_file TEXT,
  payload JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_monthly_traffic_snapshot_month ON fact.monthly_traffic_source_snapshot(snapshot_month, source_platform, source_l1, source_l2);
'''

def num(v):
    if v is None: return None
    s=str(v).strip().replace(',', '')
    if not s or s=='-' or s.upper()=='NULL': return None
    pct=s.endswith('%')
    if pct: s=s[:-1]
    try:
        x=float(s); return x/100 if pct else x
    except Exception: return None

def g(p,*names):
    for name in names:
        if name in p and p.get(name) not in (None, ''):
            return p.get(name)
    return None

def main(stage_path, apply):
    rows=[json.loads(line) for line in Path(stage_path).read_text(encoding='utf-8').splitlines() if line.strip()]
    if not rows:
        raise SystemExit('no rows')
    batch_key=rows[0]['batch_key']
    if any(r['batch_key'] != batch_key for r in rows):
        raise SystemExit('mixed batch_key not allowed')
    summary={'batch_key':batch_key,'apply':bool(apply),'stage_rows':len(rows),'sections':{},'months':sorted(set(r['snapshot_month'][:7] for r in rows)),'inserted':{}}
    for r in rows:
        summary['sections'][r['section']]=summary['sections'].get(r['section'],0)+1
    if not apply:
        print(json.dumps(summary, ensure_ascii=False)); return
    conn=psycopg2.connect(dbname='liangzhu_qianniu', user='postgres')
    conn.autocommit=False
    cur=conn.cursor()
    try:
        cur.execute(DDL)
        # Idempotent monthly snapshot import: re-importing the same batch replaces only that batch.
        for table in ['fact.monthly_store_snapshot','fact.monthly_product_snapshot','fact.monthly_promotion_snapshot','fact.monthly_traffic_source_snapshot']:
            cur.execute(f'DELETE FROM {table} WHERE batch_key=%s', (batch_key,))
        cur.execute('DELETE FROM raw.monthly_report_export_row WHERE batch_key=%s', (batch_key,))
        raw_ids=[]
        for r in rows:
            cur.execute('''INSERT INTO raw.monthly_report_export_row(batch_key,snapshot_month,period_end,section,source_platform,store_code,store_name,source_file,row_number,row_key,row_hash,payload)
                           VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING raw_monthly_row_id''',
                        (r['batch_key'],r['snapshot_month'],r.get('period_end'),r['section'],r['source_platform'],r.get('store_code'),r.get('store_name'),r.get('source_file'),r.get('row_number'),r['row_key'],r['row_hash'],Json(r['payload'])))
            raw_ids.append(cur.fetchone()[0])
        for raw_id, r in zip(raw_ids, rows):
            p=r['payload']; sec=r['section']
            common=dict(raw=raw_id,batch=r['batch_key'],m=r['snapshot_month'],end=r.get('period_end'),plat=r['source_platform'],code=r.get('store_code'),store=r.get('store_name'),src=r.get('source_file'),payload=Json(p))
            if sec=='store':
                vals=dict(visitor_count=num(g(p,'访客数')),page_view_count=num(g(p,'浏览量')),payment_amount=num(g(p,'支付金额')),transaction_amount=num(g(p,'成交金额')),refund_amount=num(g(p,'退款金额')),net_payment_amount=num(g(p,'实际成交金额')),payment_buyer_count=num(g(p,'支付买家数')),payment_item_count=num(g(p,'支付件数')),payment_conversion_rate=num(g(p,'转化率')),customer_unit_price=num(g(p,'客单价')),add_cart_user_count=num(g(p,'加购人数')),favorite_count=num(g(p,'收藏次数')))
                cur.execute('''INSERT INTO fact.monthly_store_snapshot(raw_monthly_row_id,batch_key,snapshot_month,period_end,source_platform,store_code,store_name,visitor_count,page_view_count,payment_amount,transaction_amount,refund_amount,net_payment_amount,payment_buyer_count,payment_item_count,payment_conversion_rate,customer_unit_price,add_cart_user_count,favorite_count,source_file,payload)
                    VALUES (%(raw)s,%(batch)s,%(m)s,%(end)s,%(plat)s,%(code)s,%(store)s,%(visitor_count)s,%(page_view_count)s,%(payment_amount)s,%(transaction_amount)s,%(refund_amount)s,%(net_payment_amount)s,%(payment_buyer_count)s,%(payment_item_count)s,%(payment_conversion_rate)s,%(customer_unit_price)s,%(add_cart_user_count)s,%(favorite_count)s,%(src)s,%(payload)s)''', {**common,**vals})
                summary['inserted']['monthly_store_snapshot']=summary['inserted'].get('monthly_store_snapshot',0)+1
            elif sec=='product':
                vals=dict(product_id=str(g(p,'商品ID') or ''),product_name=g(p,'商品名称'),sku_or_article_no=g(p,'货号'),new_product_category=g(p,'新品类别'),visitor_count=num(g(p,'访客数')),page_view_count=num(g(p,'浏览量')),favorite_user_count=num(g(p,'收藏人数')),add_cart_item_count=num(g(p,'加购件数')),add_cart_user_count=num(g(p,'加购人数')),payment_buyer_count=num(g(p,'支付买家数')),payment_item_count=num(g(p,'支付件数')),payment_amount=num(g(p,'支付金额')),transaction_amount=num(g(p,'成交金额')),refund_amount=num(g(p,'退款金额')),net_payment_amount=num(g(p,'实际成交金额')),payment_conversion_rate=num(g(p,'转化率')))
                cur.execute('''INSERT INTO fact.monthly_product_snapshot(raw_monthly_row_id,batch_key,snapshot_month,period_end,source_platform,store_code,store_name,product_id,product_name,sku_or_article_no,new_product_category,visitor_count,page_view_count,favorite_user_count,add_cart_item_count,add_cart_user_count,payment_buyer_count,payment_item_count,payment_amount,transaction_amount,refund_amount,net_payment_amount,payment_conversion_rate,source_file,payload)
                    VALUES (%(raw)s,%(batch)s,%(m)s,%(end)s,%(plat)s,%(code)s,%(store)s,%(product_id)s,%(product_name)s,%(sku_or_article_no)s,%(new_product_category)s,%(visitor_count)s,%(page_view_count)s,%(favorite_user_count)s,%(add_cart_item_count)s,%(add_cart_user_count)s,%(payment_buyer_count)s,%(payment_item_count)s,%(payment_amount)s,%(transaction_amount)s,%(refund_amount)s,%(net_payment_amount)s,%(payment_conversion_rate)s,%(src)s,%(payload)s)''', {**common,**vals})
                summary['inserted']['monthly_product_snapshot']=summary['inserted'].get('monthly_product_snapshot',0)+1
            elif sec=='promotion':
                vals=dict(scene_name=g(p,'场景名字'),plan_name=g(p,'计划名字'),spend_amount=num(g(p,'花费')),gross_transaction_amount=num(g(p,'总成交金额')),net_transaction_amount=num(g(p,'净成交金额')),transaction_count=num(g(p,'成交笔数')),click_count=num(g(p,'点击量')),impression_count=num(g(p,'展现量')),favorite_add_cart_count=num(g(p,'收藏加购')))
                cur.execute('''INSERT INTO fact.monthly_promotion_snapshot(raw_monthly_row_id,batch_key,snapshot_month,period_end,source_platform,store_code,store_name,scene_name,plan_name,spend_amount,gross_transaction_amount,net_transaction_amount,transaction_count,click_count,impression_count,favorite_add_cart_count,source_file,payload)
                    VALUES (%(raw)s,%(batch)s,%(m)s,%(end)s,%(plat)s,%(code)s,%(store)s,%(scene_name)s,%(plan_name)s,%(spend_amount)s,%(gross_transaction_amount)s,%(net_transaction_amount)s,%(transaction_count)s,%(click_count)s,%(impression_count)s,%(favorite_add_cart_count)s,%(src)s,%(payload)s)''', {**common,**vals})
                summary['inserted']['monthly_promotion_snapshot']=summary['inserted'].get('monthly_promotion_snapshot',0)+1
            elif sec=='traffic_source':
                vals=dict(source_l1=g(p,'一级来源'),source_l2=g(p,'二级来源'),visitor_count=num(g(p,'访客数')),payment_amount=num(g(p,'支付金额')),payment_buyer_count=num(g(p,'支付买家数')),payment_conversion_rate=num(g(p,'支付转化率')),uv_value=num(g(p,'UV价值')))
                cur.execute('''INSERT INTO fact.monthly_traffic_source_snapshot(raw_monthly_row_id,batch_key,snapshot_month,period_end,source_platform,store_code,store_name,source_l1,source_l2,visitor_count,payment_amount,payment_buyer_count,payment_conversion_rate,uv_value,source_file,payload)
                    VALUES (%(raw)s,%(batch)s,%(m)s,%(end)s,%(plat)s,%(code)s,%(store)s,%(source_l1)s,%(source_l2)s,%(visitor_count)s,%(payment_amount)s,%(payment_buyer_count)s,%(payment_conversion_rate)s,%(uv_value)s,%(src)s,%(payload)s)''', {**common,**vals})
                summary['inserted']['monthly_traffic_source_snapshot']=summary['inserted'].get('monthly_traffic_source_snapshot',0)+1
        conn.commit()
        print(json.dumps(summary, ensure_ascii=False))
    except Exception:
        conn.rollback(); raise
    finally:
        cur.close(); conn.close()

if __name__=='__main__':
    main(sys.argv[1], sys.argv[2]=='apply')
"""


def remote_import(stage: Path, apply: bool) -> dict[str, Any]:
    with tempfile.TemporaryDirectory() as td:
        local_imp = Path(td) / 'liangzhu_monthly_snapshot_remote_importer.py'
        local_imp.write_text(REMOTE_IMPORTER, encoding='utf-8')
        remote_stage = f'/tmp/{stage.name}'
        remote_imp = f'/tmp/{local_imp.name}'
        run(['scp', str(stage), f'wwyl-cloud:{remote_stage}'], timeout=300)
        run(['scp', str(local_imp), f'wwyl-cloud:{remote_imp}'], timeout=300)
        mode = 'apply' if apply else 'dry-run'
        out = run(['ssh', '-o', 'BatchMode=yes', 'wwyl-cloud', 'sudo', '-u', 'postgres', 'python3', remote_imp, remote_stage, mode], timeout=900)
    return json.loads(out.strip().splitlines()[-1])


def main() -> int:
    ap = argparse.ArgumentParser()
    ap.add_argument('--batch-key', default='monthly_report_2026_05_validation_v1')
    ap.add_argument('--apply', action='store_true')
    args = ap.parse_args()
    stage = WORK / f'{args.batch_key}_stage.jsonl'
    manifest = build_stage(args.batch_key, stage)
    local_manifest_path = WORK / f'{args.batch_key}_manifest.json'
    local_manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding='utf-8')
    remote = remote_import(stage, args.apply)
    result = {'local': manifest, 'remote': remote}
    result_path = WORK / f'{args.batch_key}_import_result.json'
    result_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding='utf-8')
    print(json.dumps(result, ensure_ascii=False, indent=2))
    return 0


if __name__ == '__main__':
    raise SystemExit(main())
