from __future__ import annotations

import os
from pathlib import Path
from typing import Any

from fastapi import Depends, FastAPI, Header, HTTPException, Query, Response
from fastapi.encoders import jsonable_encoder
from fastapi.staticfiles import StaticFiles

from .db import fetch_all, fetch_one
from .models import (
    PartyIn, ProductIn, SourceFileIn, PurchaseOrderIn, StockMovementIn,
    ShipmentSignoffIn, ValidationIssueIn, ReconciliationRunIn,
)
from .write_service import (
    create_party, create_product, register_source_file, create_purchase_order,
    add_stock_movement, create_shipment_signoff, create_validation_issue,
    run_reconciliation,
)
from . import admin_service

app = FastAPI(
    title='Goods Ledger API',
    version='0.2.0',
    description='最小闭环：货品台账只读查询 + 受控业务写入层。写入接口需要 API Key，不开放裸 SQL。',
)

READ_LIMIT_MAX = 500


def _limit_offset(limit: int, offset: int) -> tuple[int, int]:
    return min(max(limit, 1), READ_LIMIT_MAX), max(offset, 0)


def _require_write_key(x_ledger_api_key: str | None = Header(default=None)) -> None:
    # 接受两把钥匙：机器人/脚本用 LEDGER_API_KEY；看板 UI 经 nginx 注入 LEDGER_UI_API_KEY（独立可轮换）
    accepted = {k for k in (os.environ.get('LEDGER_API_KEY'), os.environ.get('LEDGER_UI_API_KEY')) if k}
    if not accepted:
        raise HTTPException(status_code=503, detail='write API key is not configured')
    if not x_ledger_api_key or x_ledger_api_key not in accepted:
        raise HTTPException(status_code=401, detail='invalid write API key')


def _actor(x_ledger_actor: str | None = Header(default=None)) -> str:
    return x_ledger_actor or 'api'


@app.get('/health')
def health() -> dict[str, Any]:
    row = fetch_one("SELECT current_database() AS database, now() AS server_time")
    return {'ok': True, **(row or {})}


@app.get('/api/summary')
def summary() -> list[dict[str, Any]]:
    return jsonable_encoder(fetch_all('SELECT metric, value FROM v_api_summary ORDER BY metric'))


@app.get('/api/source-files')
def source_files(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    return jsonable_encoder(fetch_all('''
        SELECT id, original_filename, stored_path, material_type, received_at, extraction_status,
               source_platform, received_status, file_exists, notes
        FROM source_files ORDER BY created_at DESC LIMIT %s OFFSET %s
    ''', (limit, offset)))


@app.get('/api/products')
def products(q: str | None = None, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    if q:
        return jsonable_encoder(fetch_all('''
            SELECT id, sku_code, standard_name, raw_name_default, category, spec, default_unit,
                   barcode_69, guobo_code, brand, series, color_style, package_spec, status
            FROM products
            WHERE standard_name ILIKE %s OR raw_name_default ILIKE %s OR sku_code ILIKE %s
            ORDER BY standard_name LIMIT %s OFFSET %s
        ''', (f'%{q}%', f'%{q}%', f'%{q}%', limit, offset)))
    return jsonable_encoder(fetch_all('''
        SELECT id, sku_code, standard_name, raw_name_default, category, spec, default_unit,
               barcode_69, guobo_code, brand, series, color_style, package_spec, status
        FROM products ORDER BY standard_name LIMIT %s OFFSET %s
    ''', (limit, offset)))


@app.get('/api/orders')
def orders(order_side: str | None = None, limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    if order_side:
        return jsonable_encoder(fetch_all('''
            SELECT po.*, bd.contract_no, bd.document_type, bp.display_name AS buyer_name, sp.display_name AS seller_name
            FROM purchase_orders po
            LEFT JOIN business_documents bd ON bd.id = po.document_id
            LEFT JOIN parties bp ON bp.id = po.buyer_party_id
            LEFT JOIN parties sp ON sp.id = po.seller_party_id
            WHERE po.order_side = %s
            ORDER BY po.created_at DESC LIMIT %s OFFSET %s
        ''', (order_side, limit, offset)))
    return jsonable_encoder(fetch_all('''
        SELECT po.*, bd.contract_no, bd.document_type, bp.display_name AS buyer_name, sp.display_name AS seller_name
        FROM purchase_orders po
        LEFT JOIN business_documents bd ON bd.id = po.document_id
        LEFT JOIN parties bp ON bp.id = po.buyer_party_id
        LEFT JOIN parties sp ON sp.id = po.seller_party_id
        ORDER BY po.created_at DESC LIMIT %s OFFSET %s
    ''', (limit, offset)))


@app.get('/api/orders/{order_id}/lines')
def order_lines(order_id: str) -> list[dict[str, Any]]:
    return jsonable_encoder(fetch_all('''
        SELECT * FROM v_order_lines_expanded WHERE order_id = %s ORDER BY line_no
    ''', (order_id,)))


@app.get('/api/channel-allocations')
def channel_allocations(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    return jsonable_encoder(fetch_all('''
        SELECT ca.*, ol.product_name_raw, ol.quantity AS order_quantity
        FROM channel_allocations ca
        JOIN order_lines ol ON ol.id = ca.order_line_id
        ORDER BY ca.created_at DESC LIMIT %s OFFSET %s
    ''', (limit, offset)))


@app.get('/api/stock/current')
def current_stock(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    return jsonable_encoder(fetch_all('SELECT * FROM v_current_stock ORDER BY display_name LIMIT %s OFFSET %s', (limit, offset)))


@app.get('/api/stock/movements')
def stock_movements(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    return jsonable_encoder(fetch_all('''
        SELECT sm.*, si.display_name AS stock_item_name
        FROM stock_movements sm JOIN stock_items si ON si.id = sm.stock_item_id
        ORDER BY movement_date DESC, created_at DESC LIMIT %s OFFSET %s
    ''', (limit, offset)))


@app.get('/api/shipments')
def shipments(limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    return jsonable_encoder(fetch_all('''
        SELECT ss.*, sp.display_name AS shipper_name, rp.display_name AS receiver_party_name
        FROM shipment_signoffs ss
        LEFT JOIN parties sp ON sp.id = ss.shipper_party_id
        LEFT JOIN parties rp ON rp.id = ss.receiver_party_id
        ORDER BY ss.created_at DESC LIMIT %s OFFSET %s
    ''', (limit, offset)))


@app.get('/api/reconciliation/issues')
def validation_issues(status: str = 'open', limit: int = 100, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    return jsonable_encoder(fetch_all('''
        SELECT * FROM validation_issues WHERE status = %s ORDER BY severity DESC, created_at DESC LIMIT %s OFFSET %s
    ''', (status, limit, offset)))


@app.get('/api/raw-fields')
def raw_fields(source_label: str | None = None, sheet_or_table: str | None = None, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
    limit, offset = _limit_offset(limit, offset)
    where = []
    params: list[Any] = []
    if source_label:
        where.append('source_label = %s')
        params.append(source_label)
    if sheet_or_table:
        where.append('sheet_or_table = %s')
        params.append(sheet_or_table)
    sql = 'SELECT source_label, source_file_path, sheet_or_table, row_number, row_payload FROM raw_import_rows'
    if where:
        sql += ' WHERE ' + ' AND '.join(where)
    sql += ' ORDER BY source_label, sheet_or_table, row_number LIMIT %s OFFSET %s'
    params.extend([limit, offset])
    return jsonable_encoder(fetch_all(sql, params))


@app.get('/api/dashboard/workbench')
def dashboard_workbench(project_id: str | None = None) -> dict[str, Any]:
    """工作台：只回答“今天需要处理什么”，不堆数据库统计。支持按项目/客户过滤。"""
    pid = (project_id, project_id)
    shortage = fetch_one('''
        SELECT count(*) FILTER (WHERE shortage_qty > 0)::int AS shortage_product_count,
               coalesce(sum(shortage_qty),0) AS shortage_total,
               coalesce(sum(self_stock_candidate_qty),0) AS self_stock_total
        FROM v_supplier_vs_upstream_reconciliation_live
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
    ''', pid) or {}
    shortage_top = fetch_all('''
        SELECT product_name, shortage_qty, supplier_purchase_quantity_total, upstream_demand_quantity_total
        FROM v_supplier_vs_upstream_reconciliation_live
        WHERE shortage_qty > 0 AND (%s::uuid IS NULL OR project_id = %s::uuid)
        ORDER BY shortage_qty DESC
        LIMIT 8
    ''', pid)
    surplus_top = fetch_all('''
        SELECT product_name, difference_supplier_minus_demand, self_stock_candidate_qty,
               supplier_purchase_quantity_total, upstream_demand_quantity_total
        FROM v_supplier_vs_upstream_reconciliation_live
        WHERE difference_supplier_minus_demand > 0 AND (%s::uuid IS NULL OR project_id = %s::uuid)
        ORDER BY difference_supplier_minus_demand DESC
        LIMIT 8
    ''', pid)
    issues = fetch_one("SELECT count(*)::int AS open_count FROM validation_issues WHERE status='open'") or {}
    issue_top = fetch_all('''
        SELECT id, severity, issue_type, related_table, message, created_at
        FROM validation_issues WHERE status='open'
        ORDER BY CASE severity WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END, created_at DESC
        LIMIT 8
    ''')
    stock = fetch_one('''
        SELECT coalesce(sum(current_quantity),0) AS total_quantity,
               coalesce(sum(current_cost_estimate),0) AS total_cost,
               count(*) FILTER (WHERE current_quantity <= 0)::int AS empty_or_negative_count
        FROM v_current_stock
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
    ''', pid) or {}
    shipments = fetch_one('''
        SELECT count(*) FILTER (WHERE coalesce(status,'') NOT IN ('signed'))::int AS unsigned_count
        FROM shipment_signoffs
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
    ''', pid) or {}
    amounts = fetch_all('''
        SELECT po.order_side, coalesce(sum(ol.amount),0) AS amount_total, coalesce(sum(ol.quantity),0) AS quantity_total
        FROM purchase_orders po JOIN order_lines ol ON ol.order_id = po.id
        WHERE (%s::uuid IS NULL OR po.project_id = %s::uuid)
        GROUP BY po.order_side
    ''', pid)
    order_tracking = fetch_one('''
        SELECT count(*)::int AS pending_count,
               count(*) FILTER (WHERE po.expected_arrival_end IS NOT NULL
                                  AND po.expected_arrival_end < CURRENT_DATE)::int AS overdue_count
        FROM purchase_orders po
        WHERE coalesce(po.status,'') <> 'cancelled'
          AND coalesce(po.fulfillment_status,'') NOT IN ('fulfilled', 'completed', 'done', '已完成')
          AND NOT EXISTS (SELECT 1 FROM shipment_signoffs ss
                          WHERE ss.purchase_order_id = po.id AND ss.status = 'signed')
          AND (%s::uuid IS NULL OR po.project_id = %s::uuid)
    ''', pid) or {}
    return jsonable_encoder({
        'shortage': shortage, 'shortage_top': shortage_top, 'surplus_top': surplus_top,
        'issues': {'open_count': issues.get('open_count', 0), 'top': issue_top},
        'stock': stock, 'shipments': shipments, 'amounts': amounts,
        'order_tracking': order_tracking,
    })


@app.get('/api/dashboard/overview')
def dashboard_overview() -> dict[str, Any]:
    counts = {r['metric']: r['value'] for r in fetch_all('SELECT metric, value FROM v_api_summary ORDER BY metric')}
    extras = fetch_one('''
        SELECT
            (SELECT count(*) FROM validation_issues WHERE status = 'open')::int AS open_issue_count,
            (SELECT count(*) FROM stock_movements WHERE direction = 'in')::int AS stock_in_count,
            (SELECT count(*) FROM stock_movements WHERE direction = 'out')::int AS stock_out_count,
            (SELECT count(*) FROM reconciliation_runs)::int AS reconciliation_run_count,
            (SELECT count(*) FROM api_write_requests)::int AS api_write_request_count
    ''') or {}
    recent_files = fetch_all('''
        SELECT id, original_filename, material_type, received_at, extraction_status
        FROM source_files ORDER BY created_at DESC LIMIT 10
    ''')
    recent_issues = fetch_all('''
        SELECT id, issue_type, severity, related_table, message, status, created_at
        FROM validation_issues ORDER BY created_at DESC LIMIT 10
    ''')
    return jsonable_encoder({'counts': counts, 'extras': extras, 'recent_files': recent_files, 'recent_issues': recent_issues})


@app.get('/api/dashboard/inventory')
def dashboard_inventory(limit: int = 50, project_id: str | None = None) -> dict[str, Any]:
    limit, _ = _limit_offset(limit, 0)
    pid = (project_id, project_id)
    summary = fetch_one('''
        SELECT
            count(*)::int AS item_count,
            coalesce(sum(current_quantity),0) AS total_quantity,
            coalesce(sum(current_cost_estimate),0) AS total_cost_estimate,
            count(*) FILTER (WHERE current_quantity < 0)::int AS negative_stock_count
        FROM v_current_stock
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
    ''', pid) or {}
    by_direction = fetch_all('''
        SELECT sm.direction, sm.movement_type, count(*)::int AS movement_count, coalesce(sum(sm.quantity),0) AS quantity_total
        FROM stock_movements sm
        JOIN stock_items si ON si.id = sm.stock_item_id
        JOIN inventory_accounts ia ON ia.id = si.inventory_account_id
        WHERE (%s::uuid IS NULL OR ia.project_id = %s::uuid)
        GROUP BY sm.direction, sm.movement_type
        ORDER BY sm.direction, sm.movement_type
    ''', pid)
    rows = fetch_all('''
        SELECT * FROM v_current_stock
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
        ORDER BY display_name NULLS LAST LIMIT %s
    ''', (project_id, project_id, limit))
    recent = fetch_all('''
        SELECT sm.id, sm.movement_code, sm.movement_date, sm.direction, sm.movement_type, sm.quantity, sm.unit,
               sm.logistics_company, sm.tracking_no, si.display_name AS stock_item_name
        FROM stock_movements sm
        LEFT JOIN stock_items si ON si.id = sm.stock_item_id
        LEFT JOIN inventory_accounts ia ON ia.id = si.inventory_account_id
        WHERE (%s::uuid IS NULL OR ia.project_id = %s::uuid)
        ORDER BY sm.movement_date DESC, sm.created_at DESC LIMIT %s
    ''', (project_id, project_id, limit))
    return jsonable_encoder({'summary': summary, 'by_direction': by_direction, 'current_stock': rows, 'recent_movements': recent})


@app.get('/api/dashboard/orders')
def dashboard_orders(limit: int = 50, project_id: str | None = None) -> dict[str, Any]:
    limit, _ = _limit_offset(limit, 0)
    pid = (project_id, project_id)
    by_side = fetch_all('''
        SELECT order_side, order_type, coalesce(status,'unknown') AS status,
               count(*)::int AS order_count
        FROM purchase_orders
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
        GROUP BY order_side, order_type, coalesce(status,'unknown')
        ORDER BY order_side, order_type, status
    ''', pid)
    line_totals = fetch_one('''
        SELECT count(*)::int AS line_count,
               coalesce(sum(ol.quantity),0) AS quantity_total,
               coalesce(sum(ol.amount),0) AS amount_total
        FROM order_lines ol JOIN purchase_orders po ON po.id = ol.order_id
        WHERE (%s::uuid IS NULL OR po.project_id = %s::uuid)
    ''', pid) or {}
    allocation_mismatches = fetch_all('''
        SELECT v.* FROM v_order_line_channel_totals v
        JOIN purchase_orders po ON po.id = v.order_id
        WHERE v.allocation_diff IS DISTINCT FROM 0
          AND (%s::uuid IS NULL OR po.project_id = %s::uuid)
        ORDER BY abs(v.allocation_diff) DESC NULLS LAST
        LIMIT %s
    ''', (project_id, project_id, limit))
    recent = fetch_all('''
        SELECT po.id, po.order_code, po.order_side, po.order_type, po.order_no, po.order_date, po.status,
               po.external_contract_no, po.expected_arrival_text,
               bp.display_name AS buyer_name, sp.display_name AS seller_name
        FROM purchase_orders po
        LEFT JOIN parties bp ON bp.id = po.buyer_party_id
        LEFT JOIN parties sp ON sp.id = po.seller_party_id
        WHERE (%s::uuid IS NULL OR po.project_id = %s::uuid)
        ORDER BY po.created_at DESC LIMIT %s
    ''', (project_id, project_id, limit))
    pending_signoff = fetch_all('''
        SELECT po.id, coalesce(po.order_no, po.order_code) AS order_no, po.order_side, po.order_date,
               po.expected_arrival_start, po.expected_arrival_end, po.expected_arrival_text,
               po.status, po.fulfillment_status,
               bp.display_name AS buyer_name, sp.display_name AS seller_name,
               count(ss.id)::int AS signoff_count,
               CASE WHEN po.expected_arrival_end IS NOT NULL AND po.expected_arrival_end < CURRENT_DATE
                    THEN (CURRENT_DATE - po.expected_arrival_end)::int ELSE 0 END AS days_overdue
        FROM purchase_orders po
        LEFT JOIN parties bp ON bp.id = po.buyer_party_id
        LEFT JOIN parties sp ON sp.id = po.seller_party_id
        LEFT JOIN shipment_signoffs ss ON ss.purchase_order_id = po.id
        WHERE coalesce(po.status,'') <> 'cancelled'
          AND coalesce(po.fulfillment_status,'') NOT IN ('fulfilled', 'completed', 'done', '已完成')
          AND (%s::uuid IS NULL OR po.project_id = %s::uuid)
        GROUP BY po.id, bp.display_name, sp.display_name
        HAVING count(ss.id) FILTER (WHERE ss.status = 'signed') = 0
        ORDER BY days_overdue DESC, po.expected_arrival_end NULLS LAST, po.order_date DESC
        LIMIT %s
    ''', (project_id, project_id, limit))
    return jsonable_encoder({'by_side': by_side, 'line_totals': line_totals, 'allocation_mismatches': allocation_mismatches, 'recent_orders': recent, 'pending_signoff': pending_signoff})


@app.get('/api/dashboard/channels')
def dashboard_channels(limit: int = 50, project_id: str | None = None) -> dict[str, Any]:
    limit, _ = _limit_offset(limit, 0)
    by_channel = fetch_all('''
        SELECT coalesce(ca.standard_channel, ca.raw_channel, 'unknown') AS channel,
               count(*)::int AS allocation_count,
               coalesce(sum(ca.original_quantity),0) AS original_quantity_total,
               coalesce(sum(ca.planned_quantity),0) AS planned_quantity_total,
               coalesce(sum(ca.confirmed_quantity),0) AS confirmed_quantity_total,
               coalesce(sum(ca.shipped_quantity),0) AS shipped_quantity_total,
               coalesce(sum(ca.signed_quantity),0) AS signed_quantity_total
        FROM channel_allocations ca
        JOIN order_lines ol ON ol.id = ca.order_line_id
        JOIN purchase_orders po ON po.id = ol.order_id
        WHERE (%s::uuid IS NULL OR po.project_id = %s::uuid)
        GROUP BY coalesce(ca.standard_channel, ca.raw_channel, 'unknown')
        ORDER BY confirmed_quantity_total DESC NULLS LAST, channel
    ''', (project_id, project_id))
    mismatches = fetch_all('''
        SELECT v.* FROM v_order_line_channel_totals v
        JOIN purchase_orders po ON po.id = v.order_id
        WHERE v.allocation_diff IS DISTINCT FROM 0
          AND (%s::uuid IS NULL OR po.project_id = %s::uuid)
        ORDER BY abs(v.allocation_diff) DESC NULLS LAST
        LIMIT %s
    ''', (project_id, project_id, limit))
    recent = fetch_all('''
        SELECT ca.*, ol.product_name_raw, ol.quantity AS order_quantity
        FROM channel_allocations ca
        JOIN order_lines ol ON ol.id = ca.order_line_id
        JOIN purchase_orders po ON po.id = ol.order_id
        WHERE (%s::uuid IS NULL OR po.project_id = %s::uuid)
        ORDER BY ca.created_at DESC LIMIT %s
    ''', (project_id, project_id, limit))
    return jsonable_encoder({'by_channel': by_channel, 'mismatches': mismatches, 'recent_allocations': recent})


@app.get('/api/dashboard/shipments')
def dashboard_shipments(limit: int = 50, project_id: str | None = None) -> dict[str, Any]:
    limit, _ = _limit_offset(limit, 0)
    pid = (project_id, project_id)
    by_status = fetch_all('''
        SELECT coalesce(status,'unknown') AS status, count(*)::int AS signoff_count
        FROM shipment_signoffs
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
        GROUP BY coalesce(status,'unknown') ORDER BY status
    ''', pid)
    line_summary = fetch_one('''
        SELECT count(*)::int AS line_count,
               coalesce(sum(sl.shipped_quantity),0) AS shipped_quantity_total,
               coalesce(sum(sl.signed_quantity),0) AS signed_quantity_total,
               coalesce(sum(sl.rejected_quantity),0) AS rejected_quantity_total
        FROM shipment_signoff_lines sl
        JOIN shipment_signoffs ss ON ss.id = sl.signoff_id
        WHERE (%s::uuid IS NULL OR ss.project_id = %s::uuid)
    ''', pid) or {}
    exceptions = fetch_all('''
        SELECT sl.*, ss.signoff_code, ss.logistics_company, ss.tracking_no, ss.receiver_name
        FROM shipment_signoff_lines sl
        JOIN shipment_signoffs ss ON ss.id = sl.signoff_id
        WHERE (coalesce(sl.rejected_quantity,0) > 0 OR coalesce(sl.signed_quantity,0) <> coalesce(sl.shipped_quantity, sl.quantity))
          AND (%s::uuid IS NULL OR ss.project_id = %s::uuid)
        ORDER BY sl.created_at DESC LIMIT %s
    ''', (project_id, project_id, limit))
    recent = fetch_all('''
        SELECT ss.*, sp.display_name AS shipper_name, rp.display_name AS receiver_party_name
        FROM shipment_signoffs ss
        LEFT JOIN parties sp ON sp.id = ss.shipper_party_id
        LEFT JOIN parties rp ON rp.id = ss.receiver_party_id
        WHERE (%s::uuid IS NULL OR ss.project_id = %s::uuid)
        ORDER BY ss.created_at DESC LIMIT %s
    ''', (project_id, project_id, limit))
    return jsonable_encoder({'by_status': by_status, 'line_summary': line_summary, 'exceptions': exceptions, 'recent_shipments': recent})


@app.get('/api/dashboard/reconciliation')
def dashboard_reconciliation(limit: int = 50, project_id: str | None = None) -> dict[str, Any]:
    limit, _ = _limit_offset(limit, 0)
    pid = (project_id, project_id)
    live_summary = fetch_one('''
        SELECT count(*)::int AS product_count,
               count(*) FILTER (WHERE shortage_qty > 0)::int AS shortage_count,
               coalesce(sum(shortage_qty),0) AS shortage_quantity_total,
               coalesce(sum(self_stock_candidate_qty),0) AS self_stock_candidate_total
        FROM v_supplier_vs_upstream_reconciliation_live
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
    ''', pid) or {}
    live_rows = fetch_all('''
        SELECT * FROM v_supplier_vs_upstream_reconciliation_live
        WHERE (%s::uuid IS NULL OR project_id = %s::uuid)
        ORDER BY shortage_qty DESC NULLS LAST, product_name
        LIMIT %s
    ''', (project_id, project_id, limit))
    latest_runs = fetch_all('''
        SELECT rr.id, rr.run_code, rr.reconciliation_type, rr.policy_version, rr.run_status, rr.run_at, rr.notes,
               count(rl.id)::int AS line_count,
               coalesce(sum(rl.shortage_qty),0) AS shortage_quantity_total
        FROM reconciliation_runs rr
        LEFT JOIN reconciliation_lines rl ON rl.run_id = rr.id
        WHERE (%s::uuid IS NULL OR rr.project_id = %s::uuid)
        GROUP BY rr.id
        ORDER BY rr.run_at DESC, rr.created_at DESC
        LIMIT %s
    ''', (project_id, project_id, limit))
    open_issues = fetch_all('''
        SELECT id, issue_type, severity, related_table, related_id, field_name, message, status, created_at
        FROM validation_issues WHERE status='open'
        ORDER BY severity DESC, created_at DESC LIMIT %s
    ''', (limit,))
    return jsonable_encoder({'live_summary': live_summary, 'live_rows': live_rows, 'latest_runs': latest_runs, 'open_issues': open_issues})


@app.get('/api/search')
def search(q: str = Query(..., min_length=1), scope: str = 'all', limit: int = 10) -> dict[str, Any]:
    limit, _ = _limit_offset(limit, 0)
    kw = f'%{q}%'
    groups: dict[str, Any] = {}
    scopes = {s.strip() for s in scope.split(',') if s.strip()} or {'all'}
    def enabled(name: str) -> bool:
        return 'all' in scopes or name in scopes
    if enabled('products'):
        groups['products'] = fetch_all('''
            SELECT id, sku_code, standard_name, raw_name_default, category, spec, barcode_69, guobo_code, status
            FROM products
            WHERE coalesce(standard_name,'') ILIKE %s OR coalesce(raw_name_default,'') ILIKE %s
               OR coalesce(sku_code,'') ILIKE %s OR coalesce(barcode_69,'') ILIKE %s OR coalesce(guobo_code,'') ILIKE %s
               OR coalesce(spec,'') ILIKE %s
            ORDER BY standard_name LIMIT %s
        ''', (kw, kw, kw, kw, kw, kw, limit))
    if enabled('orders'):
        groups['orders'] = fetch_all('''
            SELECT po.id, po.order_code, po.order_side, po.order_type, po.order_no, po.external_contract_no,
                   po.supplier_contract_no_on_order, po.customer_contract_no_on_order, po.tracking_no,
                   bp.display_name AS buyer_name, sp.display_name AS seller_name
            FROM purchase_orders po
            LEFT JOIN parties bp ON bp.id = po.buyer_party_id
            LEFT JOIN parties sp ON sp.id = po.seller_party_id
            WHERE coalesce(po.order_code,'') ILIKE %s OR coalesce(po.order_no,'') ILIKE %s
               OR coalesce(po.external_contract_no,'') ILIKE %s OR coalesce(po.supplier_contract_no_on_order,'') ILIKE %s
               OR coalesce(po.customer_contract_no_on_order,'') ILIKE %s OR coalesce(po.tracking_no,'') ILIKE %s
               OR coalesce(bp.display_name,'') ILIKE %s OR coalesce(sp.display_name,'') ILIKE %s
            ORDER BY po.created_at DESC LIMIT %s
        ''', (kw, kw, kw, kw, kw, kw, kw, kw, limit))
    if enabled('files'):
        groups['files'] = fetch_all('''
            SELECT id, original_filename, stored_path, material_type, source_platform, uploader, extraction_status, received_at
            FROM source_files
            WHERE coalesce(original_filename,'') ILIKE %s OR coalesce(stored_path,'') ILIKE %s
               OR coalesce(material_type,'') ILIKE %s OR coalesce(source_platform,'') ILIKE %s OR coalesce(uploader,'') ILIKE %s
            ORDER BY created_at DESC LIMIT %s
        ''', (kw, kw, kw, kw, kw, limit))
    if enabled('stock'):
        groups['stock_movements'] = fetch_all('''
            SELECT sm.id, sm.movement_code, sm.movement_date, sm.direction, sm.movement_type, sm.quantity, sm.unit,
                   sm.tracking_no, sm.receiver_name, si.display_name AS stock_item_name
            FROM stock_movements sm LEFT JOIN stock_items si ON si.id = sm.stock_item_id
            WHERE coalesce(sm.movement_code,'') ILIKE %s OR coalesce(sm.tracking_no,'') ILIKE %s
               OR coalesce(sm.receiver_name,'') ILIKE %s OR coalesce(si.display_name,'') ILIKE %s
               OR coalesce(si.stock_item_code,'') ILIKE %s
            ORDER BY sm.movement_date DESC, sm.created_at DESC LIMIT %s
        ''', (kw, kw, kw, kw, kw, limit))
    if enabled('shipments'):
        groups['shipments'] = fetch_all('''
            SELECT id, signoff_code, ship_date, status, logistics_company, tracking_no, receiver_name, signer_name, signed_at
            FROM shipment_signoffs
            WHERE coalesce(signoff_code,'') ILIKE %s OR coalesce(logistics_company,'') ILIKE %s
               OR coalesce(tracking_no,'') ILIKE %s OR coalesce(receiver_name,'') ILIKE %s OR coalesce(signer_name,'') ILIKE %s
            ORDER BY created_at DESC LIMIT %s
        ''', (kw, kw, kw, kw, kw, limit))
    if enabled('issues'):
        groups['issues'] = fetch_all('''
            SELECT id, issue_type, severity, related_table, field_name, message, status, created_at
            FROM validation_issues
            WHERE coalesce(issue_type,'') ILIKE %s OR coalesce(related_table,'') ILIKE %s
               OR coalesce(field_name,'') ILIKE %s OR coalesce(message,'') ILIKE %s
            ORDER BY created_at DESC LIMIT %s
        ''', (kw, kw, kw, kw, limit))
    return jsonable_encoder({'query': q, 'scope': scope, 'groups': groups})


@app.post('/api/reconciliation/run', dependencies=[Depends(_require_write_key)])
def write_reconciliation_run(payload: ReconciliationRunIn = ReconciliationRunIn(), actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return run_reconciliation(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/parties', dependencies=[Depends(_require_write_key)])
def write_party(payload: PartyIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return create_party(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/products', dependencies=[Depends(_require_write_key)])
def write_product(payload: ProductIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return create_product(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/source-files', dependencies=[Depends(_require_write_key)])
def write_source_file(payload: SourceFileIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return register_source_file(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/purchase-orders', dependencies=[Depends(_require_write_key)])
def write_purchase_order(payload: PurchaseOrderIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return create_purchase_order(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/stock-movements', dependencies=[Depends(_require_write_key)])
def write_stock_movement(payload: StockMovementIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return add_stock_movement(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/shipment-signoffs', dependencies=[Depends(_require_write_key)])
def write_shipment_signoff(payload: ShipmentSignoffIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return create_shipment_signoff(payload, actor_profile=actor, idempotency_key=idempotency_key)


@app.post('/api/write/validation-issues', dependencies=[Depends(_require_write_key)])
def write_validation_issue(payload: ValidationIssueIn, actor: str = Depends(_actor), idempotency_key: str | None = Header(default=None, alias='Idempotency-Key')):
    return create_validation_issue(payload, actor_profile=actor, idempotency_key=idempotency_key)


# ---------------------------------------------------------------------------
# 台账管理：受控通用 CRUD（白名单实体+白名单列+全程审计）
# ---------------------------------------------------------------------------

@app.get('/api/admin/meta')
def admin_meta() -> dict[str, Any]:
    return admin_service.meta()


@app.get('/api/admin/refs/{entity}')
def admin_refs(entity: str, q: str | None = None) -> list[dict[str, Any]]:
    try:
        return jsonable_encoder(admin_service.refs(entity, q))
    except KeyError as exc:
        raise HTTPException(status_code=404, detail=str(exc))


@app.get('/api/admin/{entity}')
def admin_list(entity: str, q: str | None = None, limit: int = 50, offset: int = 0,
               ref_col: str | None = None, ref_id: str | None = None) -> dict[str, Any]:
    try:
        return jsonable_encoder(admin_service.list_rows(entity, q, limit, offset, ref_col, ref_id))
    except KeyError as exc:
        raise HTTPException(status_code=404, detail=str(exc))
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))


def _admin_errors(fn, *args, **kwargs):
    try:
        return jsonable_encoder(fn(*args, **kwargs))
    except KeyError as exc:
        raise HTTPException(status_code=404, detail=str(exc))
    except LookupError as exc:
        raise HTTPException(status_code=404, detail=str(exc))
    except ValueError as exc:
        raise HTTPException(status_code=409, detail=str(exc))


@app.post('/api/admin/{entity}', dependencies=[Depends(_require_write_key)])
def admin_create(entity: str, payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.create_row, entity, payload, actor=actor)


@app.patch('/api/admin/{entity}/{row_id}', dependencies=[Depends(_require_write_key)])
def admin_update(entity: str, row_id: str, payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.update_row, entity, row_id, payload, actor=actor)


@app.delete('/api/admin/{entity}/{row_id}', dependencies=[Depends(_require_write_key)])
def admin_delete(entity: str, row_id: str, actor: str = Depends(_actor)):
    return _admin_errors(admin_service.delete_row, entity, row_id, actor=actor)


# ---- 业务操作：用算法承接修改意图，不裸改原始行 ----

@app.get('/api/admin/ops/allocation-overview')
def ops_allocation_overview(q: str | None = None, limit: int = 50) -> dict[str, Any]:
    return jsonable_encoder(admin_service.allocation_overview(q, limit))


@app.post('/api/admin/ops/channel-transfer', dependencies=[Depends(_require_write_key)])
def ops_channel_transfer(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.channel_transfer, payload, actor=actor)


@app.post('/api/admin/ops/channel-allocate', dependencies=[Depends(_require_write_key)])
def ops_channel_allocate(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.channel_allocate, payload, actor=actor)


@app.get('/api/admin/ops/stock-current/{stock_item_id}')
def ops_stock_current(stock_item_id: str) -> dict[str, Any]:
    return jsonable_encoder(admin_service.stock_current_one(stock_item_id))


@app.post('/api/admin/ops/stock-reverse', dependencies=[Depends(_require_write_key)])
def ops_stock_reverse(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.stock_reverse, payload, actor=actor)


@app.post('/api/admin/ops/signoff-confirm', dependencies=[Depends(_require_write_key)])
def ops_signoff_confirm(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.signoff_confirm, payload, actor=actor)


@app.get('/api/admin/ops/settle-preview')
def ops_settle_preview(project_id: str | None = None) -> dict[str, Any]:
    return jsonable_encoder(admin_service.downstream_settle_preview(project_id))


@app.post('/api/admin/ops/statement-writeoff', dependencies=[Depends(_require_write_key)])
def ops_statement_writeoff(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.statement_writeoff, payload, actor=actor)


@app.post('/api/admin/ops/settle-pricing')
def ops_settle_pricing(payload: dict[str, Any]):
    return _admin_errors(admin_service.downstream_settle_pricing, payload)


@app.post('/api/admin/ops/settle-create', dependencies=[Depends(_require_write_key)])
def ops_settle_create(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.downstream_settle_create, payload, actor=actor)


@app.post('/api/admin/ops/settle-complete', dependencies=[Depends(_require_write_key)])
def ops_settle_complete(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.downstream_settle_complete, payload, actor=actor)


@app.post('/api/admin/ops/settle-cancel', dependencies=[Depends(_require_write_key)])
def ops_settle_cancel(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.downstream_settle_cancel, payload, actor=actor)


@app.get('/api/admin/ops/group-overview')
def ops_group_overview() -> dict[str, Any]:
    """台账管理各业务组的总览：给分组落地页用，只放可行动的数字。"""
    basics = fetch_one('''
        SELECT (SELECT count(*) FROM products)::int AS product_count,
               (SELECT count(*) FROM products WHERE coalesce(guobo_code,'')='' AND coalesce(barcode_69,'')='')::int AS product_no_code_count,
               (SELECT count(*) FROM parties)::int AS party_count,
               (SELECT count(*) FROM projects)::int AS project_count,
               (SELECT count(*) FROM delivery_addresses)::int AS address_count
    ''') or {}
    purchase = fetch_one('''
        SELECT (SELECT count(*) FROM purchase_orders WHERE order_side='downstream_supplier_purchase')::int AS downstream_orders,
               (SELECT count(*) FROM purchase_orders WHERE order_side='upstream_customer_order')::int AS upstream_orders,
               (SELECT count(*) FROM order_lines)::int AS line_count,
               (SELECT count(*) FROM v_order_line_channel_totals WHERE allocation_diff IS DISTINCT FROM 0)::int AS mismatch_count
    ''') or {}
    stock = fetch_one('''
        SELECT (SELECT count(*) FROM inventory_accounts)::int AS account_count,
               (SELECT count(*) FROM stock_items)::int AS sku_count,
               (SELECT coalesce(sum(current_quantity),0) FROM v_current_stock) AS total_quantity,
               (SELECT count(*) FROM v_current_stock WHERE current_quantity < 0)::int AS negative_count,
               (SELECT count(*) FROM stock_movements)::int AS movement_count
    ''') or {}
    signoff = fetch_one('''
        SELECT (SELECT count(*) FROM shipment_signoffs)::int AS signoff_count,
               (SELECT count(*) FROM shipment_signoffs WHERE coalesce(status,'') <> 'signed')::int AS unsigned_count,
               (SELECT count(*) FROM shipment_signoff_lines sl
                 WHERE coalesce(sl.rejected_quantity,0) > 0
                    OR coalesce(sl.signed_quantity,0) <> coalesce(sl.shipped_quantity, sl.quantity))::int AS exception_count
    ''') or {}
    issues = fetch_one('''
        SELECT count(*)::int AS open_count,
               count(*) FILTER (WHERE severity='error')::int AS error_count,
               count(*) FILTER (WHERE severity='warning')::int AS warning_count
        FROM validation_issues WHERE status='open'
    ''') or {}
    settle = fetch_one('''
        SELECT
            (SELECT count(*) FROM upstream_statements WHERE status='pending')::int AS us_pending,
            (SELECT count(*) FROM upstream_statements WHERE status='settled' AND downstream_statement_id IS NULL AND settlement_batch_code IS NULL)::int AS us_ready,
            (SELECT count(*) FROM upstream_statements WHERE status='frozen')::int AS us_frozen,
            (SELECT count(*) FROM upstream_statements WHERE status='cleared')::int AS us_cleared,
            (SELECT count(*) FROM downstream_statements WHERE status='draft')::int AS ds_draft
    ''') or {}
    return jsonable_encoder({'basics': basics, 'purchase': purchase, 'stock': stock, 'signoff': signoff,
                             'issues': {**issues, **settle}})


@app.post('/api/admin/ops/dispatch-from-order', dependencies=[Depends(_require_write_key)])
def ops_dispatch_from_order(payload: dict[str, Any], actor: str = Depends(_actor)):
    return _admin_errors(admin_service.dispatch_from_order, payload, actor=actor)


@app.get('/api/admin/ops/dispatch-check')
def ops_dispatch_check(project_id: str | None = None) -> dict[str, Any]:
    return jsonable_encoder(admin_service.dispatch_signoff_check(project_id))


@app.get('/api/admin/ops/dispatch-export/{note_id}')
def ops_dispatch_export(note_id: str):
    from urllib.parse import quote
    try:
        content, fname = admin_service.dispatch_export_xlsx(note_id)
    except LookupError as exc:
        raise HTTPException(status_code=404, detail=str(exc))
    return Response(
        content=content,
        media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
        headers={'Content-Disposition': f"attachment; filename*=UTF-8''{quote(fname)}"},
    )


@app.get('/api/admin/ops/channels')
def ops_channels() -> list[str]:
    """渠道选项：历史分配里出现过的渠道 ∪ 组织对象里类型为渠道/平台的名称。"""
    rows = fetch_all('''
        SELECT DISTINCT coalesce(standard_channel, raw_channel) AS ch
        FROM channel_allocations
        WHERE coalesce(standard_channel, raw_channel) IS NOT NULL
        UNION
        SELECT display_name FROM parties WHERE party_type IN ('channel', 'platform')
        ORDER BY 1
    ''')
    return [r['ch'] for r in rows]


DASHBOARD_DIST = Path(os.environ.get('LEDGER_DASHBOARD_DIST', Path(__file__).resolve().parents[2] / 'frontend' / 'dist'))
if DASHBOARD_DIST.exists():
    app.mount('/dashboard', StaticFiles(directory=str(DASHBOARD_DIST), html=True), name='dashboard')
