"""受控通用 CRUD 层：白名单实体 + 白名单列 + 全程审计。

原则：
- 不开放裸 SQL；只有注册表里声明的实体/列可读写。
- update 审计记录 old→new 逐列 diff；delete 审计记录整行原值。
- 沿用 write_service 的 _audit（api_write_requests + agent_write_audit 双表）。
"""
from __future__ import annotations

from typing import Any

from psycopg import errors as pg_errors
from psycopg import sql

from .db import connect
from .write_service import _audit, _now_code

# ---------------------------------------------------------------------------
# 实体注册表
# col 类型: text / textarea / number / date / bool / select / ref
# ref 列需带 ref=实体名；列表查询自动 LEFT JOIN 出 <col>_label
# ---------------------------------------------------------------------------

def C(name: str, zh: str, type: str = 'text', **kw: Any) -> dict[str, Any]:
    return {'name': name, 'zh': zh, 'type': type, **kw}


# 结算方式统一字典：采购单/对账单共用同一组下拉值。
# 计算规则集中在 settle_line_amount()：当前所有方式 = 单价×数量；
# 后续补充税率/提点等方式时，在这里扩展，不动表结构。
SETTLEMENT_METHODS = [
    {'value': '代销', 'zh': '代销'},
    {'value': '经销', 'zh': '经销'},
]


def settle_line_amount(settlement_method: str | None, quantity, unit_price):
    """按结算方式计算行金额。当前统一：单价×数量；税率/提点等后续在此扩展。"""
    if quantity is None or unit_price is None:
        return None
    from decimal import Decimal as _D
    return _D(str(quantity)) * _D(str(unit_price))


STATUS_COMMON = [
    {'value': 'active', 'zh': '启用'}, {'value': 'inactive', 'zh': '停用'}, {'value': 'draft', 'zh': '草稿'},
]

ENTITIES: dict[str, dict[str, Any]] = {
    'products': {
        'table': 'products', 'zh': '产品', 'code_col': 'sku_code', 'code_prefix': 'SKU',
        'label_col': 'standard_name',
        'search': ['standard_name', 'raw_name_default', 'sku_code', 'spec', 'barcode_69', 'guobo_code'],
        'columns': [
            C('sku_code', 'SKU编码', auto=True), C('standard_name', '标准商品名称', required=True),
            C('raw_name_default', '原始名称'), C('category', '品类'), C('spec', '规格/款式'),
            C('default_unit', '单位'), C('barcode_69', '69码'), C('guobo_code', '国博编码'),
            C('brand', '品牌'), C('series', '系列'), C('color_style', '颜色/款式'),
            C('package_spec', '包装规格'),
            C('status', '状态', 'select', options=STATUS_COMMON),
            C('notes', '备注', 'textarea'),
        ],
    },
    'parties': {
        'table': 'parties', 'zh': '组织/对象', 'code_col': 'party_code', 'code_prefix': 'PT',
        'label_col': 'display_name',
        'search': ['display_name', 'legal_name', 'short_name', 'party_code'],
        'columns': [
            C('party_code', '对象编码', auto=True), C('display_name', '名称', required=True),
            C('party_type', '对象类型', 'select', required=True, options=[
                {'value': 'own_company', 'zh': '我方公司'}, {'value': 'supplier', 'zh': '供应商'},
                {'value': 'customer', 'zh': '客户'}, {'value': 'channel', 'zh': '渠道'},
                {'value': 'warehouse', 'zh': '仓库'}, {'value': 'person', 'zh': '个人'},
                {'value': 'platform', 'zh': '平台'},
            ]),
            C('legal_name', '法定全称'), C('short_name', '简称'),
            C('contact_masked', '联系方式(脱敏)'), C('tax_no', '税号'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'projects': {
        'table': 'projects', 'zh': '项目/IP', 'code_col': 'project_code', 'code_prefix': 'PJ',
        'label_col': 'name',
        'search': ['name', 'ip_name', 'project_code'],
        'columns': [
            C('project_code', '项目编码', auto=True), C('name', '项目名称', required=True),
            C('ip_name', 'IP/业务线'), C('owner_party_id', '管理主体', 'ref', ref='parties'),
            C('start_date', '开始日期', 'date'),
            C('status', '状态', 'select', options=[
                {'value': 'active', 'zh': '进行中'}, {'value': 'archived', 'zh': '已归档'}, {'value': 'paused', 'zh': '暂停'},
            ]),
            C('notes', '备注', 'textarea'),
        ],
    },
    'delivery_addresses': {
        'table': 'delivery_addresses', 'zh': '收货地址簿', 'code_col': 'address_code', 'code_prefix': 'AD',
        'label_col': 'label',
        'search': ['label', 'receiver_name', 'delivery_address'],
        'columns': [
            C('address_code', '地址编码', auto=True),
            C('label', '名称/标签', required=True),
            C('receiver_name', '收货人'),
            C('receiver_phone_masked', '联系电话(脱敏)'),
            C('delivery_address', '收货地址', 'textarea', required=True),
            C('logistics_company', '常用物流'),
            C('party_id', '关联对象', 'ref', ref='parties'),
            C('is_default', '默认地址', 'bool'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'purchase_orders': {
        'table': 'purchase_orders', 'zh': '采购/需求单', 'code_col': 'order_code', 'code_prefix': 'PO',
        'label_col': "coalesce(order_no, order_code)",
        'label_is_expr': True,
        'search': ['order_code', 'order_no', 'external_contract_no', 'settlement_method'],
        'columns': [
            C('order_code', '内部订单编码', auto=True),
            C('order_side', '订单方向', 'select', required=True, options=[
                {'value': 'downstream_supplier_purchase', 'zh': '下游供应商采购'},
                {'value': 'upstream_customer_order', 'zh': '上游客户订单'},
                {'value': 'internal_transfer', 'zh': '内部调拨'},
                {'value': 'supplemental_demand', 'zh': '补充需求'},
            ]),
            C('order_type', '订单类型', 'select', options=[
                {'value': 'formal_order', 'zh': '正式订单'}, {'value': 'contract_order', 'zh': '合同订单'},
                {'value': 'no_document_supplement', 'zh': '无单补充'}, {'value': 'replenishment', 'zh': '补货'},
            ]),
            C('project_id', '项目', 'ref', ref='projects'),
            C('buyer_party_id', '买方', 'ref', ref='parties'),
            C('seller_party_id', '卖方/供货方', 'ref', ref='parties'),
            C('order_no', '外部订单号'), C('order_date', '订单日期', 'date'),
            C('external_contract_no', '外部合同号'),
            C('settlement_method', '结算方式', 'select', options=SETTLEMENT_METHODS),
            C('expected_arrival_start', '预计到货(起)', 'date'),
            C('expected_arrival_end', '预计到货(止)', 'date'),
            C('expected_arrival_text', '到货时间(文本备注)'),
            C('delivery_address', '收货地址'), C('receiver_name', '收货人'),
            C('logistics_company', '物流公司'), C('tracking_no', '物流单号'),
            C('payment_status', '付款状态'), C('fulfillment_status', '履约状态'),
            C('status', '状态', 'select', options=[
                {'value': 'draft', 'zh': '草稿'}, {'value': 'confirmed', 'zh': '已确认'}, {'value': 'cancelled', 'zh': '已取消'},
            ]),
            C('notes', '备注', 'textarea'),
        ],
    },
    'order_lines': {
        'table': 'order_lines', 'zh': '订单明细', 'label_col': 'product_name_raw',
        'search': ['product_name_raw', 'product_name_standard_draft', 'material'],
        'columns': [
            C('order_id', '所属订单', 'ref', ref='purchase_orders', required=True),
            C('product_id', '标准产品', 'ref', ref='products'),
            C('product_name_raw', '原始商品名', required=True),
            C('product_name_standard_draft', '标准名草案'),
            C('quantity', '数量', 'number', required=True), C('unit', '单位'),
            C('unit_price', '单价', 'number'), C('amount', '金额', 'number'),
            C('material', '材质/工艺'), C('expected_arrival_text', '到货时间(行级)'),
            C('verification_status', '核对状态', 'select', options=[
                {'value': 'pending', 'zh': '待确认'}, {'value': 'confirmed', 'zh': '已确认'}, {'value': 'conflict', 'zh': '有冲突'},
            ]),
            C('settled_quantity', '已结算数量', 'number', auto=True),
            C('settlement_status', '结算消耗状态', 'select', auto=True, options=[
                {'value': 'unsettled', 'zh': '未结清'}, {'value': 'exhausted', 'zh': '已结清'},
            ]),
            C('evidence_note', '证据说明'), C('notes', '备注', 'textarea'),
        ],
    },
    'channel_allocations': {
        'table': 'channel_allocations', 'zh': '渠道分配', 'mode': 'alloc_ops', 'label_col': "coalesce(standard_channel, raw_channel)",
        'label_is_expr': True,
        'search': ['raw_channel', 'standard_channel', 'adjustment_reason'],
        'columns': [
            C('order_line_id', '订单明细', 'ref', ref='order_lines', required=True),
            C('raw_channel', '原始渠道名'), C('standard_channel', '标准渠道名'),
            C('original_quantity', '原单数量', 'number'),
            C('adjustment_quantity', '调整数量', 'number'),
            C('adjusted_quantity', '调整后数量', 'number'),
            C('planned_quantity', '计划数量', 'number'),
            C('confirmed_quantity', '确认数量', 'number'),
            C('shipped_quantity', '发货数量', 'number'),
            C('signed_quantity', '签收数量', 'number'),
            C('allocation_type', '分配类型'),
            C('adjustment_reason', '调整原因'), C('evidence_note', '证据说明'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'stock_items': {
        'table': 'stock_items', 'zh': '库存SKU', 'code_col': 'stock_item_code', 'code_prefix': 'ST',
        'label_col': 'display_name',
        'search': ['display_name', 'stock_item_code', 'spec'],
        'columns': [
            C('stock_item_code', '样品/库存编码', auto=True),
            C('inventory_account_id', '库存账户', 'ref', ref='inventory_accounts', required=True),
            C('product_id', '标准产品', 'ref', ref='products'),
            C('display_name', '库存显示名', required=True),
            C('spec', '规格/款式'), C('unit', '单位'),
            C('unit_cost_tax_included', '含税单位成本', 'number'),
            C('batch_no', '批次号'), C('storage_location', '存放位置'),
            C('safety_stock_qty', '安全库存', 'number'),
            C('source_policy', '来源口径'),
            C('status', '状态', 'select', options=STATUS_COMMON),
            C('notes', '备注', 'textarea'),
        ],
    },
    'stock_movements': {
        'table': 'stock_movements', 'zh': '库存流水', 'mode': 'stock_ops', 'code_col': 'movement_code', 'code_prefix': 'MV',
        'label_col': 'movement_code',
        'search': ['movement_code', 'tracking_no', 'receiver_name', 'purpose'],
        'columns': [
            C('movement_code', '流水编码', auto=True),
            C('stock_item_id', '库存SKU', 'ref', ref='stock_items', required=True, lock_on_edit=True),
            C('movement_date', '日期', 'date', required=True),
            C('direction', '方向', 'select', required=True, lock_on_edit=True, options=[
                {'value': 'in', 'zh': '入库'}, {'value': 'out', 'zh': '出库'},
            ]),
            C('movement_type', '流水类型', 'select', required=True, lock_on_edit=True, options=[
                {'value': 'purchase_in', 'zh': '采购入库'}, {'value': 'self_stock_in', 'zh': '自留入库'},
                {'value': 'shipment_out', 'zh': '发货出库'}, {'value': 'claim_out', 'zh': '申领出库'},
                {'value': 'return_in', 'zh': '归还入库'}, {'value': 'gift_out', 'zh': '礼赠出库'},
                {'value': 'transfer_out', 'zh': '调拨出'}, {'value': 'transfer_in', 'zh': '调拨入'},
                {'value': 'adjustment', 'zh': '调整'},
            ]),
            C('quantity', '数量', 'number', required=True, lock_on_edit=True), C('unit', '单位'),
            C('handler_party_id', '经办人', 'ref', ref='parties'),
            C('counterparty_id', '去处/对象', 'ref', ref='parties'),
            C('purpose', '用途/场景'), C('returnable', '是否可退回', 'bool'),
            C('unit_cost_tax_included', '流水成本单价', 'number'),
            C('logistics_company', '物流公司'), C('tracking_no', '物流单号'),
            C('receiver_name', '收货人'), C('signoff_status', '签收状态'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'dispatch_notes': {
        'table': 'dispatch_notes', 'zh': '发货单', 'code_col': 'dispatch_code', 'code_prefix': 'DN',
        'label_col': 'dispatch_code',
        'search': ['dispatch_code', 'tracking_no', 'receiver_name', 'purpose'],
        'columns': [
            C('dispatch_code', '发货单编码', auto=True),
            C('purchase_order_id', '对应采购单', 'ref', ref='purchase_orders', required=True),
            C('project_id', '项目', 'ref', ref='projects'),
            C('shipper_party_id', '供货商/发货方', 'ref', ref='parties'),
            C('receiver_party_id', '收货方', 'ref', ref='parties'),
            C('ship_date', '发货日期', 'date'),
            C('settlement_method', '结算方式', 'select', options=SETTLEMENT_METHODS),
            C('logistics_company', '物流公司'), C('tracking_no', '物流单号'),
            C('delivery_address', '收货地址'), C('receiver_name', '收货人'),
            C('contact_name', '联系人姓名'), C('contact_phone_masked', '联系人电话'),
            C('purchaser_name', '对方采购负责人'),
            C('product_manager_name', '产品经理(预留)'),
            C('header_title', '单据抬头'),
            C('purpose', '用途/补货说明'),
            C('status', '状态', 'select', options=[
                {'value': 'draft', 'zh': '草稿'}, {'value': 'shipped', 'zh': '已发货'},
                {'value': 'cancelled', 'zh': '已取消'},
            ]),
            C('notes', '备注', 'textarea'),
        ],
    },
    'dispatch_note_lines': {
        'table': 'dispatch_note_lines', 'zh': '发货明细', 'label_col': 'product_name',
        'search': ['product_name'],
        'columns': [
            C('dispatch_note_id', '所属发货单', 'ref', ref='dispatch_notes', required=True),
            C('product_id', '标准产品', 'ref', ref='products'),
            C('product_name', '产品名称', required=True),
            C('quantity', '数量', 'number', required=True), C('unit', '单位'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'shipment_signoffs': {
        'table': 'shipment_signoffs', 'zh': '签收单', 'code_col': 'signoff_code', 'code_prefix': 'SO',
        'label_col': 'signoff_code',
        'search': ['signoff_code', 'logistics_company', 'tracking_no', 'receiver_name', 'signer_name'],
        'columns': [
            C('signoff_code', '签收单编码', auto=True),
            C('purchase_order_id', '对应采购单', 'ref', ref='purchase_orders', required=True),
            C('inventory_account_id', '入库账户(所在位置)', 'ref', ref='inventory_accounts'),
            C('project_id', '项目', 'ref', ref='projects'),
            C('shipper_party_id', '发货方', 'ref', ref='parties'),
            C('receiver_party_id', '收货方', 'ref', ref='parties'),
            C('dispatch_note_id', '关联发货单', 'ref', ref='dispatch_notes'),
            C('ship_date', '发货日期', 'date'),
            C('purpose', '用途'),
            C('status', '状态', 'select', options=[
                {'value': 'draft', 'zh': '草稿'}, {'value': 'sent', 'zh': '已发出'},
                {'value': 'signed', 'zh': '已签收'}, {'value': 'partially_signed', 'zh': '部分签收'},
                {'value': 'rejected', 'zh': '拒收'},
            ]),
            C('logistics_company', '物流公司'), C('tracking_no', '物流单号'),
            C('delivery_address', '收货地址'), C('receiver_name', '收货人'),
            C('signer_name', '签收人'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'shipment_signoff_lines': {
        'table': 'shipment_signoff_lines', 'zh': '签收明细', 'label_col': 'product_name',
        'search': ['product_name'],
        'columns': [
            C('signoff_id', '所属签收单', 'ref', ref='shipment_signoffs', required=True),
            C('product_id', '标准产品', 'ref', ref='products'),
            C('product_name', '产品名称', required=True),
            C('quantity', '数量', 'number', required=True), C('unit', '单位'),
            C('shipped_quantity', '发货数量', 'number'),
            C('signed_quantity', '签收数量', 'number'),
            C('rejected_quantity', '拒收数量', 'number'),
            C('rejection_reason', '拒收原因'),
            C('sign_status', '签收状态', 'select', options=[
                {'value': 'pending', 'zh': '待签收'}, {'value': 'signed', 'zh': '已签收'}, {'value': 'rejected', 'zh': '拒收'},
            ]),
            C('notes', '备注', 'textarea'),
        ],
    },
    'upstream_statements': {
        'table': 'upstream_statements', 'zh': '上游对账单', 'code_col': 'statement_code', 'code_prefix': 'US',
        'label_col': 'statement_code',
        'search': ['statement_code', 'notes'],
        'columns': [
            C('statement_code', '对账单编码', auto=True),
            C('customer_party_id', '上游公司', 'ref', ref='parties', required=True),
            C('project_id', '项目/客户', 'ref', ref='projects'),
            C('statement_date', '对账日期', 'date'),
            C('settlement_method', '结算方式', 'select', options=SETTLEMENT_METHODS),
            C('settle_date', '上游结算日期', 'date'),
            C('total_quantity', '总数量', 'number'),
            C('total_amount', '总金额', 'number'),
            C('status', '状态', 'select', options=[
                {'value': 'pending', 'zh': '待上游结算'}, {'value': 'settled', 'zh': '上游已结算'},
                {'value': 'frozen', 'zh': '已锁入下游结算'}, {'value': 'cleared', 'zh': '已结清'},
            ]),
            C('settlement_batch_code', '结算批次', auto=True),
            C('notes', '备注', 'textarea'),
        ],
    },
    'upstream_statement_lines': {
        'table': 'upstream_statement_lines', 'zh': '上游对账明细', 'label_col': 'product_name',
        'search': ['product_name', 'channel'],
        'columns': [
            C('statement_id', '所属对账单', 'ref', ref='upstream_statements', required=True),
            C('product_id', '标准产品', 'ref', ref='products'),
            C('product_name', '产品名称', required=True),
            C('channel', '销售渠道'),
            C('quantity', '数量', 'number', required=True), C('unit', '单位'),
            C('unit_price', '结算单价', 'number'), C('amount', '结算金额', 'number'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'downstream_statements': {
        'table': 'downstream_statements', 'zh': '下游对账单', 'code_col': 'statement_code', 'code_prefix': 'DS',
        'label_col': 'statement_code',
        'search': ['statement_code', 'notes'],
        'columns': [
            C('statement_code', '对账单编码', auto=True),
            C('settlement_batch_code', '结算批次', auto=True),
            C('supplier_party_id', '下游供应商', 'ref', ref='parties'),
            C('project_id', '项目/客户', 'ref', ref='projects'),
            C('statement_date', '发起日期', 'date'),
            C('settlement_method', '结算方式', 'select', options=SETTLEMENT_METHODS),
            C('settle_date', '结算完成日期', 'date'),
            C('total_quantity', '总数量', 'number'),
            C('total_amount', '总金额', 'number'),
            C('status', '状态', 'select', options=[
                {'value': 'draft', 'zh': '待结算'}, {'value': 'settled', 'zh': '已结算'},
            ]),
            C('notes', '备注', 'textarea'),
        ],
    },
    'downstream_statement_lines': {
        'table': 'downstream_statement_lines', 'zh': '下游结算明细', 'label_col': 'product_name',
        'search': ['product_name', 'source_order_no'],
        'columns': [
            C('statement_id', '所属结算单', 'ref', ref='downstream_statements', required=True),
            C('product_id', '标准产品', 'ref', ref='products'),
            C('product_name', '产品名称', required=True),
            C('quantity', '数量', 'number', required=True), C('unit', '单位'),
            C('unit_price', '合同单价', 'number'), C('amount', '结算金额', 'number'),
            C('source_order_no', '来源采购单'),
            C('notes', '备注', 'textarea'),
        ],
    },
    'validation_issues': {
        'table': 'validation_issues', 'zh': '校验问题', 'label_col': 'message',
        'has_updated_at': False,
        'search': ['message', 'issue_type', 'related_table', 'field_name'],
        'columns': [
            C('issue_type', '问题类型', required=True),
            C('severity', '严重度', 'select', required=True, options=[
                {'value': 'info', 'zh': '提示'}, {'value': 'warning', 'zh': '警告'}, {'value': 'error', 'zh': '错误'},
            ]),
            C('related_table', '关联表'), C('field_name', '字段名'),
            C('message', '问题说明', 'textarea', required=True),
            C('status', '状态', 'select', options=[
                {'value': 'open', 'zh': '待处理'}, {'value': 'resolved', 'zh': '已解决'}, {'value': 'ignored', 'zh': '已忽略'},
            ]),
        ],
    },
    'inventory_accounts': {
        'table': 'inventory_accounts', 'zh': '库存账户', 'code_col': 'account_code', 'code_prefix': 'IA',
        'label_col': 'account_code',
        'search': ['account_code', 'account_type'],
        'columns': [
            C('account_code', '账户编码', auto=True),
            C('owner_party_id', '公司主体', 'ref', ref='parties', required=True),
            C('project_id', '项目', 'ref', ref='projects'),
            C('account_type', '账户类型', 'select', options=[
                {'value': 'sample_stock', 'zh': '样品库'}, {'value': 'sellable_stock', 'zh': '可售库存'},
                {'value': 'gift_stock', 'zh': '礼赠库'}, {'value': 'returnable_claim', 'zh': '可退申领'},
            ]),
            C('warehouse_party_id', '仓库/保管方', 'ref', ref='parties'),
            C('location_text', '位置说明'),
            C('status', '状态', 'select', options=[
                {'value': 'active', 'zh': '启用'}, {'value': 'closed', 'zh': '已关闭'},
            ]),
            C('notes', '备注', 'textarea'),
        ],
    },
}


# 行号由系统自动维护（同父单 max+1），不让用户填
AUTO_LINE_NO: dict[str, str] = {'order_lines': 'order_id', 'shipment_signoff_lines': 'signoff_id', 'dispatch_note_lines': 'dispatch_note_id', 'upstream_statement_lines': 'statement_id', 'downstream_statement_lines': 'statement_id'}

# 明细变动后自动同步父单总计（数量合计、金额合计统一落库，不靠手填）
SYNC_PARENT_TOTALS = {'upstream_statement_lines': ('upstream_statements', 'statement_id')}


def _sync_statement_totals(cur, parent_table: str, parent_id) -> None:
    cur.execute(sql.SQL('''
        UPDATE {} SET
            total_quantity = (SELECT coalesce(sum(quantity),0) FROM upstream_statement_lines WHERE statement_id = %s),
            total_amount = (SELECT coalesce(sum(amount),0) FROM upstream_statement_lines WHERE statement_id = %s),
            updated_at = now()
        WHERE id = %s
    ''').format(sql.Identifier(parent_table)), (parent_id, parent_id, parent_id))


def _cfg(entity: str) -> dict[str, Any]:
    cfg = ENTITIES.get(entity)
    if not cfg:
        raise KeyError(f'unknown entity: {entity}')
    return cfg


def _editable_cols(cfg: dict[str, Any]) -> dict[str, dict[str, Any]]:
    return {c['name']: c for c in cfg['columns']}


def meta() -> dict[str, Any]:
    out = {}
    for key, cfg in ENTITIES.items():
        out[key] = {
            'entity': key, 'zh': cfg['zh'],
            'mode': cfg.get('mode', 'generic'),
            'columns': cfg['columns'],
            'search_columns': cfg['search'],
        }
    return {'entities': out}


def _label_sql(cfg: dict[str, Any], alias: str) -> sql.Composable:
    expr = cfg.get('label_col', 'id')
    if cfg.get('label_is_expr'):
        # 受控表达式：仅注册表内写死的 coalesce(...)，无外部输入
        return sql.SQL(expr.replace('order_no', f'{alias}.order_no').replace('order_code', f'{alias}.order_code')
                       .replace('standard_channel', f'{alias}.standard_channel').replace('raw_channel', f'{alias}.raw_channel'))
    return sql.SQL('{}.{}').format(sql.Identifier(alias), sql.Identifier(expr))


def list_rows(entity: str, q: str | None = None, limit: int = 50, offset: int = 0,
              ref_col: str | None = None, ref_id: str | None = None) -> dict[str, Any]:
    cfg = _cfg(entity)
    table = cfg['table']
    cols = cfg['columns']
    select_parts: list[sql.Composable] = [sql.SQL('t.*')]
    joins: list[sql.Composable] = []
    for i, c in enumerate(col for col in cols if col['type'] == 'ref'):
        ref_cfg = _cfg(c['ref'])
        alias = f'r{i}'
        select_parts.append(sql.SQL('{} AS {}').format(_label_sql(ref_cfg, alias), sql.Identifier(f"{c['name']}_label")))
        joins.append(sql.SQL(' LEFT JOIN {} {} ON {}.id = t.{}').format(
            sql.Identifier(ref_cfg['table']), sql.SQL(alias), sql.SQL(alias), sql.Identifier(c['name'])))
    query = sql.SQL('SELECT {}, count(*) OVER() AS _total FROM {} t').format(
        sql.SQL(', ').join(select_parts), sql.Identifier(table))
    for j in joins:
        query += j
    params: list[Any] = []
    wheres: list[sql.Composable] = []
    if ref_col and ref_id:
        if ref_col not in {c['name'] for c in cols if c['type'] == 'ref'}:
            raise ValueError('非法的父子过滤字段')
        wheres.append(sql.SQL('t.{} = {}').format(sql.Identifier(ref_col), sql.Placeholder()))
        params.append(ref_id)
    if q:
        conds = [sql.SQL('coalesce(t.{}::text, {}) ILIKE {}').format(sql.Identifier(s), sql.Literal(''), sql.Placeholder())
                 for s in cfg['search']]
        wheres.append(sql.SQL('(') + sql.SQL(' OR ').join(conds) + sql.SQL(')'))
        params.extend([f'%{q}%'] * len(cfg['search']))
    if wheres:
        query += sql.SQL(' WHERE ') + sql.SQL(' AND ').join(wheres)
    # 父子展开时按行号排序更自然
    if ref_col and entity in AUTO_LINE_NO:
        query += sql.SQL(' ORDER BY t.line_no NULLS LAST LIMIT {} OFFSET {}').format(sql.Placeholder(), sql.Placeholder())
    else:
        query += sql.SQL(' ORDER BY t.created_at DESC NULLS LAST LIMIT {} OFFSET {}').format(sql.Placeholder(), sql.Placeholder())
    params.extend([min(max(limit, 1), 200), max(offset, 0)])
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute(query, params)
            rows = [dict(r) for r in cur.fetchall()]
    total = rows[0]['_total'] if rows else 0
    for r in rows:
        r.pop('_total', None)
    return {'rows': rows, 'total': total}


def refs(entity: str, q: str | None = None, limit: int = 500) -> list[dict[str, Any]]:
    cfg = _cfg(entity)
    label = _label_sql(cfg, 't')
    query = sql.SQL('SELECT t.id, {} AS label FROM {} t').format(label, sql.Identifier(cfg['table']))
    params: list[Any] = []
    if q:
        query += sql.SQL(' WHERE {}::text ILIKE {}').format(label, sql.Placeholder())
        params.append(f'%{q}%')
    query += sql.SQL(' ORDER BY label LIMIT {}').format(sql.Placeholder())
    params.append(min(limit, 1000))
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute(query, params)
            return [dict(r) for r in cur.fetchall()]


def _clean_payload(cfg: dict[str, Any], data: dict[str, Any]) -> dict[str, Any]:
    allowed = _editable_cols(cfg)
    clean: dict[str, Any] = {}
    for k, v in data.items():
        if k not in allowed:
            continue
        if isinstance(v, str):
            v = v.strip()
            if v == '':
                v = None
        clean[k] = v
    return clean


def create_row(entity: str, data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    cfg = _cfg(entity)
    if cfg.get('mode') == 'alloc_ops':
        raise ValueError('渠道分配请使用「分配 / 调拨」操作，不直接新增原始行')
    if cfg.get('mode') == 'stock_ops':
        raise ValueError('库存流水请使用「登记入库 / 出库」操作，不直接新增原始行')
    clean = _clean_payload(cfg, data)
    for c in cfg['columns']:
        if c.get('required') and clean.get(c['name']) is None:
            raise ValueError(f"缺少必填字段: {c['zh']}({c['name']})")
    # 编码列一律自动生成，忽略用户传入
    if cfg.get('code_col'):
        clean[cfg['code_col']] = _now_code(cfg['code_prefix'])
    insert_data = {k: v for k, v in clean.items() if v is not None}
    if not insert_data:
        raise ValueError('没有可写入的字段')
    with connect() as conn:
        with conn.cursor() as cur:
            # 对账明细：金额未填时按结算方式自动计算（当前=单价×数量）
            if entity == 'upstream_statement_lines' and insert_data.get('amount') is None:
                auto_amount = settle_line_amount(None, insert_data.get('quantity'), insert_data.get('unit_price'))
                if auto_amount is not None:
                    insert_data['amount'] = auto_amount
            parent_col = AUTO_LINE_NO.get(entity)
            if parent_col and insert_data.get(parent_col) and 'line_no' not in insert_data:
                cur.execute(
                    sql.SQL('SELECT coalesce(max(line_no), 0) + 1 AS n FROM {} WHERE {} = %s').format(
                        sql.Identifier(cfg['table']), sql.Identifier(parent_col)),
                    (insert_data[parent_col],))
                insert_data['line_no'] = cur.fetchone()['n']
            cols = list(insert_data)
            query = sql.SQL('INSERT INTO {} ({}) VALUES ({}) RETURNING id').format(
                sql.Identifier(cfg['table']),
                sql.SQL(', ').join(sql.Identifier(c) for c in cols),
                sql.SQL(', ').join(sql.Placeholder() for _ in cols))
            try:
                cur.execute(query, list(insert_data.values()))
            except pg_errors.NotNullViolation as exc:
                conn.rollback()
                col = getattr(exc.diag, 'column_name', '') or ''
                zh = next((c['zh'] for c in cfg['columns'] if c['name'] == col), col)
                raise ValueError(f'缺少必填字段：{zh}（数据库约束）') from exc
            except pg_errors.CheckViolation as exc:
                conn.rollback()
                raise ValueError('字段取值不符合数据库约束，请检查枚举/数值范围') from exc
            target_id = str(cur.fetchone()['id'])
            if entity in SYNC_PARENT_TOTALS:
                pt, pc = SYNC_PARENT_TOTALS[entity]
                _sync_statement_totals(cur, pt, insert_data.get(pc))
            _audit(cur, actor_profile=actor, tool_name=f'admin_create_{entity}', operation='create',
                   status='applied', target_table=cfg['table'], target_id=target_id, payload=insert_data)
            conn.commit()
    return {'id': target_id}


def update_row(entity: str, row_id: str, data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    cfg = _cfg(entity)
    if cfg.get('mode') == 'alloc_ops':
        raise ValueError('渠道分配数量请使用「调拨」操作修改，避免合计失衡')
    clean = _clean_payload(cfg, data)
    # 编码列与登记后锁定列不可修改
    allowed = _editable_cols(cfg)
    clean = {k: v for k, v in clean.items()
             if not allowed[k].get('auto') and not allowed[k].get('lock_on_edit')}
    if not clean:
        raise ValueError('没有可更新的字段')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute(sql.SQL('SELECT * FROM {} WHERE id = %s').format(sql.Identifier(cfg['table'])), (row_id,))
            old = cur.fetchone()
            if not old:
                raise LookupError('记录不存在')
            old = dict(old)
            changes = {}
            for k, v in clean.items():
                old_v = old.get(k)
                if str(old_v) != str(v) and not (old_v is None and v is None):
                    changes[k] = {'old': old_v, 'new': v}
            if not changes:
                return {'id': row_id, 'unchanged': True}
            # 对账明细：量/价变动且未显式给金额时，按结算方式重算
            if entity == 'upstream_statement_lines' and 'amount' not in clean and (
                    'quantity' in changes or 'unit_price' in changes):
                merged_qty = clean.get('quantity', old.get('quantity'))
                merged_price = clean.get('unit_price', old.get('unit_price'))
                auto_amount = settle_line_amount(None, merged_qty, merged_price)
                if auto_amount is not None:
                    clean['amount'] = auto_amount
                    changes['amount'] = {'old': old.get('amount'), 'new': auto_amount}
            set_cols = list(changes)
            set_sql = sql.SQL(', ').join(sql.SQL('{} = {}').format(sql.Identifier(c), sql.Placeholder()) for c in set_cols)
            if cfg.get('has_updated_at', True):
                set_sql += sql.SQL(', updated_at = now()')
            query = sql.SQL('UPDATE {} SET {} WHERE id = %s').format(sql.Identifier(cfg['table']), set_sql)
            cur.execute(query, [clean[c] for c in set_cols] + [row_id])
            if entity in SYNC_PARENT_TOTALS:
                pt, pc = SYNC_PARENT_TOTALS[entity]
                _sync_statement_totals(cur, pt, old.get(pc))
            _audit(cur, actor_profile=actor, tool_name=f'admin_update_{entity}', operation='update',
                   status='applied', target_table=cfg['table'], target_id=row_id,
                   payload={'changes': changes})
            conn.commit()
    result: dict[str, Any] = {'id': row_id, 'changed_fields': list(changes)}
    # 上游对账单置「上游已结算」→ 自动核销库存（幂等；失败不回滚状态，转为提示）
    if entity == 'upstream_statements' and changes.get('status', {}).get('new') == 'settled':
        try:
            result['writeoff'] = statement_writeoff({'statement_id': row_id}, actor=actor)
        except (ValueError, LookupError) as exc:
            result['writeoff_error'] = str(exc)
    return result


# 注册表之外的已知外键引用（这些表/列不在管理界面注册表里，但删除时必须检查）
EXTRA_REFS: list[tuple[str, str, str, str]] = [
    # (引用表, 引用列, 被引用实体, 中文名)
    ('shipment_signoff_lines', 'stock_movement_id', 'stock_movements', '签收明细(入库关联)'),
    ('stock_movements', 'related_movement_id', 'stock_movements', '库存流水(冲正关联)'),
    ('product_aliases', 'product_id', 'products', '产品别名'),
    ('reconciliation_lines', 'product_id', 'products', '对账明细'),
    ('channel_allocations', 'channel_party_id', 'parties', '渠道分配(渠道对象)'),
    ('dispatch_note_lines', 'source_order_line_id', 'order_lines', '发货明细(来源行)'),
    ('upstream_statement_lines', 'stock_movement_id', 'stock_movements', '上游对账明细(核销关联)'),
    ('upstream_statements', 'downstream_statement_id', 'downstream_statements', '上游对账单(旧关联)'),
    ('downstream_statement_lines', 'source_order_line_id', 'order_lines', '下游结算明细(来源行)'),
]


def _find_references(cur, entity: str, row_id: str) -> list[str]:
    """删除前反查：哪些数据引用了这条记录。数据库 FK 是级联删除，必须在应用层挡住。"""
    found: list[str] = []
    for src_key, src_cfg in ENTITIES.items():
        if src_key == entity:
            continue
        for col in src_cfg['columns']:
            if col.get('type') == 'ref' and col.get('ref') == entity:
                cur.execute(
                    sql.SQL('SELECT count(*) AS n FROM {} WHERE {} = %s').format(
                        sql.Identifier(src_cfg['table']), sql.Identifier(col['name'])),
                    (row_id,))
                n = cur.fetchone()['n']
                if n:
                    found.append(f"{src_cfg['zh']} {n} 条")
    for table, col, target, zh in EXTRA_REFS:
        if target != entity:
            continue
        cur.execute(
            sql.SQL('SELECT count(*) AS n FROM {} WHERE {} = %s').format(
                sql.Identifier(table), sql.Identifier(col)),
            (row_id,))
        n = cur.fetchone()['n']
        if n:
            found.append(f'{zh} {n} 条')
    return found


def delete_row(entity: str, row_id: str, *, actor: str) -> dict[str, Any]:
    cfg = _cfg(entity)
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute(sql.SQL('SELECT * FROM {} WHERE id = %s').format(sql.Identifier(cfg['table'])), (row_id,))
            old = cur.fetchone()
            if not old:
                raise LookupError('记录不存在')
            refs = _find_references(cur, entity, row_id)
            if refs:
                raise ValueError(
                    f"无法删除：该{cfg['zh']}正被以下数据引用——" + '、'.join(refs)
                    + '。请先删除或改挂这些关联记录（数据库为级联删除，直接删会连带删掉它们）')
            try:
                cur.execute(sql.SQL('DELETE FROM {} WHERE id = %s').format(sql.Identifier(cfg['table'])), (row_id,))
            except pg_errors.ForeignKeyViolation as exc:
                conn.rollback()
                raise ValueError('该记录被其他数据引用，请先删除关联记录') from exc
            if entity in SYNC_PARENT_TOTALS:
                pt, pc = SYNC_PARENT_TOTALS[entity]
                _sync_statement_totals(cur, pt, dict(old).get(pc))
            _audit(cur, actor_profile=actor, tool_name=f'admin_delete_{entity}', operation='delete_request',
                   status='applied', target_table=cfg['table'], target_id=row_id,
                   payload={'deleted_row': dict(old)})
            conn.commit()
    return {'id': row_id, 'deleted': True}


# ---------------------------------------------------------------------------
# 业务操作层：渠道调拨/分配、库存登记/冲正
# 用算法承接修改意图，不让用户直接改原始行，减少错误操作风险。
# ---------------------------------------------------------------------------

from datetime import date as _date  # noqa: E402
from decimal import Decimal, InvalidOperation  # noqa: E402


def _dec(value: Any, field: str) -> Decimal:
    try:
        d = Decimal(str(value))
    except (InvalidOperation, TypeError, ValueError):
        raise ValueError(f'{field} 不是有效数字')
    if d <= 0:
        raise ValueError(f'{field} 必须大于 0')
    return d


def allocation_overview(q: str | None = None, limit: int = 50) -> dict[str, Any]:
    """按订单明细分组的渠道分配总览：明细数量、各渠道现量、未分配余量。"""
    params: list[Any] = []
    where = ''
    if q:
        where = "WHERE ol.product_name_raw ILIKE %s OR coalesce(po.order_no, po.order_code) ILIKE %s"
        params = [f'%{q}%', f'%{q}%']
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute(f'''
                SELECT ol.id, ol.product_name_raw, ol.quantity, ol.unit,
                       coalesce(po.order_no, po.order_code) AS order_no, po.order_side
                FROM order_lines ol JOIN purchase_orders po ON po.id = ol.order_id
                {where}
                ORDER BY po.order_date DESC NULLS LAST, ol.line_no
                LIMIT %s
            ''', params + [min(limit, 200)])
            lines = [dict(r) for r in cur.fetchall()]
            allocs: dict[str, list[dict[str, Any]]] = {}
            if lines:
                cur.execute(
                    'SELECT * FROM channel_allocations WHERE order_line_id = ANY(%s) ORDER BY created_at',
                    ([l['id'] for l in lines],))
                for a in cur.fetchall():
                    allocs.setdefault(str(a['order_line_id']), []).append(dict(a))
    rows = []
    for line in lines:
        al = allocs.get(str(line['id']), [])
        allocated = sum(Decimal(str(a['adjusted_quantity'] or a['original_quantity'] or 0)) for a in al)
        line_qty = Decimal(str(line['quantity'] or 0))
        rows.append({
            'order_line_id': line['id'],
            'product_name': line['product_name_raw'],
            'order_no': line['order_no'],
            'order_side': line['order_side'],
            'line_quantity': line_qty,
            'unit': line['unit'],
            'allocated_total': allocated,
            'unallocated': line_qty - allocated,
            'allocations': [{
                'id': a['id'],
                'channel': a.get('standard_channel') or a.get('raw_channel') or '未命名渠道',
                'quantity': a.get('adjusted_quantity') if a.get('adjusted_quantity') is not None else a.get('original_quantity'),
            } for a in al],
        })
    # 有分配/有余量的排前面
    rows.sort(key=lambda r: (len(r['allocations']) == 0, r['unallocated'] == 0))
    return {'rows': rows}


def channel_transfer(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """渠道调拨：从渠道 A 调 N 件到渠道 B（同一订单明细内，总量不变）。"""
    from_id = data.get('from_allocation_id')
    to_channel = (data.get('to_channel') or '').strip()
    reason = (data.get('reason') or '').strip()
    qty = _dec(data.get('quantity'), '调拨数量')
    if not from_id:
        raise ValueError('缺少调出渠道')
    if not to_channel:
        raise ValueError('缺少调入渠道')
    if not reason:
        raise ValueError('请填写调拨原因')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM channel_allocations WHERE id = %s FOR UPDATE', (from_id,))
            src = cur.fetchone()
            if not src:
                raise LookupError('调出渠道分配不存在')
            src = dict(src)
            src_channel = src.get('standard_channel') or src.get('raw_channel') or '未命名渠道'
            if src_channel == to_channel:
                raise ValueError('调出与调入渠道相同')
            src_qty = Decimal(str(src['adjusted_quantity'] if src['adjusted_quantity'] is not None else src['original_quantity'] or 0))
            if qty > src_qty:
                raise ValueError(f'「{src_channel}」当前数量 {src_qty}，不足以调出 {qty}')
            line_id = src['order_line_id']
            new_src_qty = src_qty - qty
            new_src_adj = Decimal(str(src['adjustment_quantity'] or 0)) - qty
            cur.execute('''
                UPDATE channel_allocations
                SET adjustment_quantity=%s, confirmed_quantity=%s,
                    adjustment_reason=%s, updated_at=now()
                WHERE id=%s
            ''', (new_src_adj, new_src_qty, reason, from_id))
            cur.execute('''
                SELECT * FROM channel_allocations
                WHERE order_line_id=%s AND coalesce(standard_channel, raw_channel)=%s AND id<>%s
                FOR UPDATE
            ''', (line_id, to_channel, from_id))
            dst = cur.fetchone()
            if dst:
                dst = dict(dst)
                dst_qty = Decimal(str(dst['adjusted_quantity'] if dst['adjusted_quantity'] is not None else dst['original_quantity'] or 0)) + qty
                dst_adj = Decimal(str(dst['adjustment_quantity'] or 0)) + qty
                cur.execute('''
                    UPDATE channel_allocations
                    SET adjustment_quantity=%s, confirmed_quantity=%s,
                        adjustment_reason=%s, updated_at=now()
                    WHERE id=%s
                ''', (dst_adj, dst_qty, reason, dst['id']))
                dst_id = str(dst['id'])
            else:
                cur.execute('''
                    INSERT INTO channel_allocations(
                        order_line_id, raw_channel, standard_channel,
                        original_quantity, adjustment_quantity,
                        planned_quantity, confirmed_quantity, allocation_type, adjustment_reason)
                    VALUES (%s, %s, %s, 0, %s, %s, %s, 'manual_transfer', %s)
                    RETURNING id
                ''', (line_id, to_channel, to_channel, qty, qty, qty, reason))
                dst_id = str(cur.fetchone()['id'])
            _audit(cur, actor_profile=actor, tool_name='ops_channel_transfer', operation='update',
                   status='applied', target_table='channel_allocations', target_id=str(from_id),
                   payload={'order_line_id': str(line_id), 'from': src_channel, 'to': to_channel,
                            'quantity': str(qty), 'reason': reason,
                            'from_after': str(new_src_qty), 'to_allocation_id': dst_id})
            conn.commit()
    return {'ok': True, 'from_remaining': new_src_qty, 'to_allocation_id': dst_id}


def channel_allocate(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """渠道分配：把订单明细的未分配余量分给某个渠道。"""
    line_id = data.get('order_line_id')
    channel = (data.get('channel') or '').strip()
    reason = (data.get('reason') or '').strip()
    qty = _dec(data.get('quantity'), '分配数量')
    if not line_id:
        raise ValueError('缺少订单明细')
    if not channel:
        raise ValueError('缺少渠道名称')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT id, quantity FROM order_lines WHERE id=%s FOR UPDATE', (line_id,))
            line = cur.fetchone()
            if not line:
                raise LookupError('订单明细不存在')
            cur.execute('''
                SELECT coalesce(sum(coalesce(adjusted_quantity, original_quantity, 0)),0) AS allocated
                FROM channel_allocations WHERE order_line_id=%s
            ''', (line_id,))
            allocated = Decimal(str(cur.fetchone()['allocated']))
            remaining = Decimal(str(line['quantity'] or 0)) - allocated
            if qty > remaining:
                raise ValueError(f'未分配余量只有 {remaining}，不足以分配 {qty}')
            cur.execute('''
                SELECT * FROM channel_allocations
                WHERE order_line_id=%s AND coalesce(standard_channel, raw_channel)=%s
                FOR UPDATE
            ''', (line_id, channel))
            dst = cur.fetchone()
            if dst:
                dst = dict(dst)
                dst_qty = Decimal(str(dst['adjusted_quantity'] if dst['adjusted_quantity'] is not None else dst['original_quantity'] or 0)) + qty
                dst_adj = Decimal(str(dst['adjustment_quantity'] or 0)) + qty
                cur.execute('''
                    UPDATE channel_allocations
                    SET adjustment_quantity=%s, confirmed_quantity=%s,
                        adjustment_reason=%s, updated_at=now()
                    WHERE id=%s
                ''', (dst_adj, dst_qty, reason or '补分配', dst['id']))
                alloc_id = str(dst['id'])
            else:
                cur.execute('''
                    INSERT INTO channel_allocations(
                        order_line_id, raw_channel, standard_channel,
                        original_quantity, adjustment_quantity,
                        planned_quantity, confirmed_quantity, allocation_type, adjustment_reason)
                    VALUES (%s, %s, %s, 0, %s, %s, %s, 'manual_allocate', %s)
                    RETURNING id
                ''', (line_id, channel, channel, qty, qty, qty, reason or '补分配'))
                alloc_id = str(cur.fetchone()['id'])
            _audit(cur, actor_profile=actor, tool_name='ops_channel_allocate', operation='update',
                   status='applied', target_table='channel_allocations', target_id=alloc_id,
                   payload={'order_line_id': str(line_id), 'channel': channel,
                            'quantity': str(qty), 'reason': reason})
            conn.commit()
    return {'ok': True, 'allocation_id': alloc_id, 'remaining_after': remaining - qty}


def stock_current_one(stock_item_id: str) -> dict[str, Any]:
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM v_current_stock WHERE stock_item_id=%s', (stock_item_id,))
            row = cur.fetchone()
    return dict(row) if row else {'stock_item_id': stock_item_id, 'current_quantity': 0}


def stock_reverse(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """冲正：对一笔流水生成等量反向流水，原流水保留，可追溯。"""
    movement_id = data.get('movement_id')
    reason = (data.get('reason') or '').strip()
    if not movement_id:
        raise ValueError('缺少流水')
    if not reason:
        raise ValueError('请填写冲正原因')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM stock_movements WHERE id=%s', (movement_id,))
            mv = cur.fetchone()
            if not mv:
                raise LookupError('流水不存在')
            mv = dict(mv)
            cur.execute('SELECT id FROM stock_movements WHERE related_movement_id=%s', (movement_id,))
            if cur.fetchone():
                raise ValueError('该流水已被冲正过，不能重复冲正')
            rev_dir = 'out' if mv['direction'] == 'in' else 'in'
            qty = Decimal(str(mv['quantity']))
            if rev_dir == 'out':
                cur.execute('SELECT current_quantity FROM v_current_stock WHERE stock_item_id=%s', (mv['stock_item_id'],))
                row = cur.fetchone()
                current = Decimal(str(row['current_quantity'])) if row and row['current_quantity'] is not None else Decimal(0)
                if current < qty:
                    raise ValueError(f'当前库存 {current}，不足以冲正出库 {qty}（该入库可能已被消耗）')
            code = _now_code('MV')
            cur.execute('''
                INSERT INTO stock_movements(
                    movement_code, movement_no, stock_item_id, movement_date, direction, movement_type,
                    quantity, unit, unit_cost_tax_included, related_movement_id, notes)
                VALUES (%s, %s, %s, %s, %s, 'adjustment', %s, %s, %s, %s, %s)
                RETURNING id
            ''', (code, code, mv['stock_item_id'], _date.today(), rev_dir,
                  qty, mv['unit'], mv['unit_cost_tax_included'], movement_id,
                  f"冲正 {mv['movement_code']}：{reason}"))
            rev_id = str(cur.fetchone()['id'])
            _audit(cur, actor_profile=actor, tool_name='ops_stock_reverse', operation='create',
                   status='applied', target_table='stock_movements', target_id=rev_id,
                   payload={'reversed_movement': mv['movement_code'], 'direction': rev_dir,
                            'quantity': str(qty), 'reason': reason})
            conn.commit()
    return {'ok': True, 'reverse_movement_id': rev_id}


def signoff_confirm(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """签收确认：下游采购到货签收 → 按明细自动入库到签收单指定的库存账户。

    幂等：明细行已有 stock_movement_id 的跳过；库存SKU不存在则按产品自动创建。
    """
    signoff_id = data.get('signoff_id')
    signer = (data.get('signer_name') or '').strip()
    signed_date = (data.get('signed_date') or '').strip() or None
    if not signoff_id:
        raise ValueError('缺少签收单')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM shipment_signoffs WHERE id=%s FOR UPDATE', (signoff_id,))
            so = cur.fetchone()
            if not so:
                raise LookupError('签收单不存在')
            so = dict(so)
            if not so.get('purchase_order_id'):
                raise ValueError('签收单未关联采购单，请先补填「对应采购单」')
            cur.execute('SELECT order_side, coalesce(order_no, order_code) AS order_no FROM purchase_orders WHERE id=%s',
                        (so['purchase_order_id'],))
            po = cur.fetchone()
            if not po:
                raise LookupError('关联的采购单不存在')
            if po['order_side'] != 'downstream_supplier_purchase':
                raise ValueError('自动入库目前仅支持「下游供应商采购」到货签收；向客户发货请走出库登记')
            account_id = so.get('inventory_account_id')
            if not account_id:
                raise ValueError('签收单未指定「入库账户(所在位置)」，请先补填')
            cur.execute('SELECT * FROM shipment_signoff_lines WHERE signoff_id=%s ORDER BY line_no', (signoff_id,))
            lines = [dict(r) for r in cur.fetchall()]
            if not lines:
                raise ValueError('签收单没有明细行，请先添加签收明细')
            created, skipped, problems = [], [], []
            for line in lines:
                if line.get('stock_movement_id'):
                    skipped.append(line['line_no'])
                    continue
                qty = line.get('signed_quantity')
                if qty is None:
                    qty = line.get('quantity')
                qty = Decimal(str(qty or 0))
                if qty <= 0:
                    skipped.append(line['line_no'])
                    continue
                product_id = line.get('product_id')
                if not product_id:
                    problems.append(f"行{line['line_no']}（{line.get('product_name')}）未绑定标准产品")
                    continue
                # 找/建该账户下的库存SKU
                cur.execute('SELECT id FROM stock_items WHERE inventory_account_id=%s AND product_id=%s LIMIT 1',
                            (account_id, product_id))
                si = cur.fetchone()
                if si:
                    stock_item_id = si['id']
                else:
                    cur.execute('SELECT standard_name, default_unit FROM products WHERE id=%s', (product_id,))
                    prod = cur.fetchone()
                    cur.execute("""
                        INSERT INTO stock_items(stock_item_code, inventory_account_id, product_id, display_name, unit, status)
                        VALUES (%s, %s, %s, %s, %s, 'active') RETURNING id
                    """, (_now_code('ST'), account_id, product_id,
                          (prod['standard_name'] if prod else line.get('product_name')) or '未命名',
                          line.get('unit') or (prod['default_unit'] if prod else None)))
                    stock_item_id = cur.fetchone()['id']
                code = _now_code('MV')
                cur.execute("""
                    INSERT INTO stock_movements(
                        movement_code, movement_no, stock_item_id, movement_date, direction, movement_type,
                        quantity, unit, signoff_status, notes)
                    VALUES (%s, %s, %s, %s, 'in', 'purchase_in', %s, %s, 'signed', %s)
                    RETURNING id
                """, (code, code, stock_item_id, signed_date or _date.today(), qty, line.get('unit'),
                      f"签收入库 {so['signoff_code']}（采购单 {po['order_no']}）"))
                mv_id = cur.fetchone()['id']
                cur.execute('UPDATE shipment_signoff_lines SET stock_movement_id=%s, sign_status=%s, '
                            'signed_quantity=coalesce(signed_quantity, quantity), updated_at=now() WHERE id=%s',
                            (mv_id, 'signed', line['id']))
                created.append({'line_no': line['line_no'], 'movement_id': str(mv_id), 'quantity': str(qty)})
            if problems:
                conn.rollback()
                raise ValueError('；'.join(problems) + '。请在签收明细里绑定标准产品后重试')
            if not created and not skipped:
                conn.rollback()
                raise ValueError('没有可入库的明细行')
            cur.execute("""
                UPDATE shipment_signoffs
                SET status='signed', signed_at=coalesce(signed_at, now()),
                    signer_name=coalesce(nullif(%s,''), signer_name), updated_at=now()
                WHERE id=%s
            """, (signer, signoff_id))
            _audit(cur, actor_profile=actor, tool_name='ops_signoff_confirm', operation='update',
                   status='applied', target_table='shipment_signoffs', target_id=str(signoff_id),
                   payload={'signoff_code': so['signoff_code'], 'order_no': po['order_no'],
                            'inventory_account_id': str(account_id),
                            'movements_created': created, 'lines_skipped': skipped})
            conn.commit()
    return {'ok': True, 'movements_created': len(created), 'lines_skipped': len(skipped), 'detail': created}


# ---------------------------------------------------------------------------
# 发货单：从采购单快捷生成、发货vs签收交叉核对、按真实版式导出 Excel
# ---------------------------------------------------------------------------

def dispatch_from_order(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """从采购单一键生成发货单：单头带出双方/地址/结算方式，明细复制订单明细。"""
    po_id = data.get('purchase_order_id')
    if not po_id:
        raise ValueError('缺少采购单')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM purchase_orders WHERE id=%s', (po_id,))
            po = cur.fetchone()
            if not po:
                raise LookupError('采购单不存在')
            po = dict(po)
            cur.execute('SELECT * FROM order_lines WHERE order_id=%s ORDER BY line_no', (po_id,))
            lines = [dict(r) for r in cur.fetchall()]
            if not lines:
                raise ValueError('该采购单没有明细行，无法生成发货单')
            code = _now_code('DN')
            cur.execute('''
                INSERT INTO dispatch_notes(
                    dispatch_code, purchase_order_id, project_id,
                    shipper_party_id, receiver_party_id,
                    settlement_method, logistics_company, tracking_no,
                    delivery_address, receiver_name, status)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'draft')
                RETURNING id
            ''', (code, po_id, po.get('project_id'),
                  po.get('seller_party_id'), po.get('buyer_party_id'),
                  po.get('settlement_method'), po.get('logistics_company'), po.get('tracking_no'),
                  po.get('delivery_address'), po.get('receiver_name')))
            note_id = cur.fetchone()['id']
            for idx, line in enumerate(lines, start=1):
                cur.execute('''
                    INSERT INTO dispatch_note_lines(
                        dispatch_note_id, line_no, product_id, source_order_line_id,
                        product_name, quantity, unit)
                    VALUES (%s, %s, %s, %s, %s, %s, %s)
                ''', (note_id, idx, line.get('product_id'), line['id'],
                      line.get('product_name_standard_draft') or line.get('product_name_raw'),
                      line.get('quantity'), line.get('unit')))
            _audit(cur, actor_profile=actor, tool_name='ops_dispatch_from_order', operation='create',
                   status='applied', target_table='dispatch_notes', target_id=str(note_id),
                   payload={'purchase_order_id': str(po_id), 'dispatch_code': code, 'line_count': len(lines)})
            conn.commit()
    return {'id': str(note_id), 'dispatch_code': code, 'line_count': len(lines)}


def dispatch_signoff_check(project_id: str | None = None) -> dict[str, Any]:
    """发货 vs 签收 交叉核对：按采购单对比发货量与签收量。"""
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('''
                SELECT po.id AS purchase_order_id,
                       coalesce(po.order_no, po.order_code) AS order_no,
                       po.order_side,
                       (SELECT count(distinct dn.id) FROM dispatch_notes dn
                         WHERE dn.purchase_order_id = po.id AND coalesce(dn.status,'') <> 'cancelled') AS dispatch_count,
                       (SELECT coalesce(sum(dl.quantity),0) FROM dispatch_notes dn
                          JOIN dispatch_note_lines dl ON dl.dispatch_note_id = dn.id
                         WHERE dn.purchase_order_id = po.id AND coalesce(dn.status,'') <> 'cancelled') AS dispatched_qty,
                       (SELECT count(distinct ss.id) FROM shipment_signoffs ss
                         WHERE ss.purchase_order_id = po.id) AS signoff_count,
                       (SELECT coalesce(sum(coalesce(sl.signed_quantity, sl.quantity)),0)
                          FROM shipment_signoffs ss
                          JOIN shipment_signoff_lines sl ON sl.signoff_id = ss.id
                         WHERE ss.purchase_order_id = po.id AND ss.status = 'signed') AS signed_qty
                FROM purchase_orders po
                WHERE (%s::uuid IS NULL OR po.project_id = %s::uuid)
            ''', (project_id, project_id))
            rows = []
            for r in cur.fetchall():
                r = dict(r)
                if not r['dispatch_count'] and not r['signoff_count']:
                    continue
                r['diff'] = (r['dispatched_qty'] or 0) - (r['signed_qty'] or 0)
                rows.append(r)
    rows.sort(key=lambda x: (-abs(x['diff']), str(x['order_no'])))
    return {'rows': rows}


def dispatch_export_xlsx(note_id: str) -> tuple[bytes, str]:
    """按真实《补货发货单》版式导出 Excel：编码从产品主数据带出，含合计与签章区。"""
    import io
    from openpyxl import Workbook
    from openpyxl.styles import Alignment, Border, Font, Side

    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('''
                SELECT dn.*, sp.display_name AS shipper_name, rp.display_name AS receiver_party_name,
                       coalesce(po.order_no, po.order_code) AS order_no
                FROM dispatch_notes dn
                LEFT JOIN parties sp ON sp.id = dn.shipper_party_id
                LEFT JOIN parties rp ON rp.id = dn.receiver_party_id
                LEFT JOIN purchase_orders po ON po.id = dn.purchase_order_id
                WHERE dn.id = %s
            ''', (note_id,))
            dn = cur.fetchone()
            if not dn:
                raise LookupError('发货单不存在')
            dn = dict(dn)
            cur.execute('''
                SELECT dl.*, p.guobo_code, p.barcode_69
                FROM dispatch_note_lines dl
                LEFT JOIN products p ON p.id = dl.product_id
                WHERE dl.dispatch_note_id = %s ORDER BY dl.line_no
            ''', (note_id,))
            lines = [dict(r) for r in cur.fetchall()]

    wb = Workbook()
    ws = wb.active
    ws.title = '发货单'
    thin = Side(style='thin')
    box = Border(left=thin, right=thin, top=thin, bottom=thin)
    center = Alignment(horizontal='center', vertical='center')
    for col, w in zip('ABCDEF', (6, 30, 18, 20, 10, 16)):
        ws.column_dimensions[col].width = w

    def merged(row, text, font=None):
        ws.merge_cells(f'A{row}:F{row}')
        c = ws[f'A{row}']
        c.value = text
        c.alignment = center
        if font:
            c.font = font

    merged(1, dn.get('header_title') or '发货单', Font(size=14, bold=True))
    merged(2, dn.get('receiver_party_name') or '', Font(size=12, bold=True))
    ws['A3'] = '物流单号：'; ws['C3'] = dn.get('tracking_no') or ''
    ws['A4'] = '供货商名称：'; ws['C4'] = dn.get('shipper_name') or ''
    ws['A5'] = '联系人姓名：'; ws['C5'] = dn.get('contact_name') or ''
    ws['A6'] = '联系人电话：'; ws['C6'] = dn.get('contact_phone_masked') or ''
    ws['A7'] = '采购负责人(必填)：'; ws['C7'] = dn.get('purchaser_name') or ''
    ws['A8'] = '结算方式：(代销、经销)'; ws['C8'] = dn.get('settlement_method') or ''
    ws['E8'] = f"发货日期：{dn.get('ship_date') or '待确认'}"
    headers = ['序号', '产品名称', '国博编码（必填）', '图样/商品69码（必填）', '数量', '备注']
    hr = 9
    for i, h in enumerate(headers):
        c = ws.cell(row=hr, column=i + 1, value=h)
        c.font = Font(bold=True)
        c.alignment = center
        c.border = box
    total = 0
    r = hr
    for line in lines:
        r += 1
        qty = line.get('quantity') or 0
        total += float(qty)
        vals = [line.get('line_no'), line.get('product_name'), line.get('guobo_code') or '',
                line.get('barcode_69') or '', float(qty), line.get('notes') or '']
        for i, v in enumerate(vals):
            c = ws.cell(row=r, column=i + 1, value=v)
            c.border = box
            if i in (0, 2, 3, 4):
                c.alignment = center
    r += 1
    for i in range(6):
        ws.cell(row=r, column=i + 1).border = box
    ws.cell(row=r, column=2, value='合计：').font = Font(bold=True)
    ws.cell(row=r, column=5, value=total).font = Font(bold=True)
    ws.cell(row=r, column=5).alignment = center
    r += 2
    ws.cell(row=r, column=1, value='发货人签章(必须签字或盖章)：')
    r += 2
    ws.cell(row=r, column=1, value='收货人签章：')
    r += 2
    ws.cell(row=r, column=1, value='产品经理：')

    buf = io.BytesIO()
    wb.save(buf)
    fname = f"发货单_{dn.get('dispatch_code') or note_id}.xlsx"
    return buf.getvalue(), fname


# ---------------------------------------------------------------------------
# 对账结算状态机：上游对账单 →(筛选汇总确认)→ 下游总对账单 →(完成)→ 同步结清
# ---------------------------------------------------------------------------

def downstream_settle_preview(project_id: str | None = None) -> dict[str, Any]:
    """筛出「上游已结算 且 未锁入下游结算」的对账单，连同汇总，供弹窗确认。"""
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('''
                SELECT us.id, us.statement_code, us.statement_date, us.settle_date,
                       us.total_quantity, us.total_amount,
                       cp.display_name AS customer_name
                FROM upstream_statements us
                LEFT JOIN parties cp ON cp.id = us.customer_party_id
                WHERE us.status = 'settled' AND us.downstream_statement_id IS NULL AND us.settlement_batch_code IS NULL
                  AND (%s::uuid IS NULL OR us.project_id = %s::uuid)
                ORDER BY us.settle_date NULLS LAST, us.statement_date
            ''', (project_id, project_id))
            rows = [dict(r) for r in cur.fetchall()]
            ids = [r['id'] for r in rows]
            line_totals = {'quantity': 0, 'amount': 0}
            if ids:
                cur.execute('''
                    SELECT coalesce(sum(quantity),0) AS q, coalesce(sum(amount),0) AS a
                    FROM upstream_statement_lines WHERE statement_id = ANY(%s)
                ''', (ids,))
                t = cur.fetchone()
                line_totals = {'quantity': t['q'], 'amount': t['a']}
    return {'rows': rows, 'summary': {
        'statement_count': len(rows),
        'total_quantity': line_totals['quantity'],
        'total_amount': line_totals['amount'],
    }}


def _all_contract_prices(cur) -> list[dict[str, Any]]:
    """全部下游采购合同行，含供应商与剩余可结算量。
    FIFO：按订单日期**升序**——先签的采购单先消耗，价格变化按先后顺序各算各价。"""
    cur.execute('''
        SELECT ol.id AS order_line_id, ol.product_id, ol.product_name_raw, ol.product_name_standard_draft,
               ol.unit_price, ol.unit, ol.quantity AS contract_quantity,
               coalesce(ol.settled_quantity, 0) AS settled_quantity,
               coalesce(po.order_no, po.order_code) AS order_no, po.order_date,
               po.seller_party_id AS supplier_id, sp.display_name AS supplier_name
        FROM order_lines ol
        JOIN purchase_orders po ON po.id = ol.order_id
        LEFT JOIN parties sp ON sp.id = po.seller_party_id
        WHERE po.order_side = 'downstream_supplier_purchase'
          AND coalesce(po.status,'') <> 'cancelled'
          AND ol.unit_price IS NOT NULL
        ORDER BY po.order_date ASC NULLS FIRST, ol.created_at ASC  -- 无日期视为最早（FIFO）
    ''')
    return [dict(r) for r in cur.fetchall()]


def _price_statement_lines(cur, ids: list) -> dict[str, Any]:
    """跨供应商定价：产品聚合数量 → 匹配覆盖它的供应商合同（最新合同价优先）→ 按供应商分组。"""
    cur.execute('''
        SELECT product_id, product_name, coalesce(sum(quantity),0) AS qty, max(unit) AS unit
        FROM upstream_statement_lines WHERE statement_id = ANY(%s)
        GROUP BY product_id, product_name
        ORDER BY product_name
    ''', (ids,))
    goods = [dict(r) for r in cur.fetchall()]
    contracts = _all_contract_prices(cur)

    def lines_for(g):
        """该产品的合同行（FIFO顺序）。"""
        name = (g['product_name'] or '').strip()
        out = []
        for c in contracts:
            if g['product_id'] and c['product_id'] and str(g['product_id']) == str(c['product_id']):
                out.append(c)
            elif name and name in ((c['product_name_standard_draft'] or '').strip(), (c['product_name_raw'] or '').strip()):
                out.append(c)
        return out

    groups: dict[str, dict[str, Any]] = {}
    uncovered: list[str] = []
    shortages: list[dict[str, Any]] = []
    total_qty = total_amount = 0
    for g in goods:
        need = Decimal(str(g['qty'] or 0))
        total_qty += need
        cls = lines_for(g)
        if not cls:
            uncovered.append(g['product_name'])
            continue
        remaining_total = sum(Decimal(str(c['contract_quantity'] or 0)) - Decimal(str(c['settled_quantity'] or 0)) for c in cls)
        if need > remaining_total:
            shortages.append({'product_name': g['product_name'], 'need': str(need),
                              'remaining': str(remaining_total),
                              'exhausted': str(sum(Decimal(str(c['settled_quantity'] or 0)) for c in cls))})
            continue
        # FIFO 消耗：先签的采购单先用，价格按各自合同行
        left = need
        for c in cls:
            if left <= 0:
                break
            avail = Decimal(str(c['contract_quantity'] or 0)) - Decimal(str(c['settled_quantity'] or 0))
            if avail <= 0:
                continue
            take = min(left, avail)
            left -= take
            amount = settle_line_amount(None, take, c['unit_price'])
            total_amount += amount or 0
            sid = str(c['supplier_id'])
            grp = groups.setdefault(sid, {'supplier_id': sid, 'supplier_name': c['supplier_name'],
                                          'lines': [], 'total_quantity': 0, 'total_amount': 0})
            grp['lines'].append({'product_id': g['product_id'], 'product_name': g['product_name'],
                                 'qty': take, 'unit': g['unit'] or c['unit'], 'covered': True,
                                 'unit_price': c['unit_price'], 'amount': amount,
                                 'source_order_no': c['order_no'],
                                 'source_order_line_id': str(c['order_line_id'])})
            grp['total_quantity'] += take
            grp['total_amount'] += amount or 0
    return {'groups': list(groups.values()), 'uncovered': uncovered, 'shortages': shortages,
            'total_quantity': total_qty, 'total_amount': total_amount}


def downstream_settle_pricing(data: dict[str, Any]) -> dict[str, Any]:
    """预览：跨供应商自动分组的合同定价（含未覆盖货品提醒），供弹窗确认。"""
    ids = data.get('upstream_statement_ids') or []
    if not ids:
        raise ValueError('未选择任何上游对账单')
    with connect() as conn:
        with conn.cursor() as cur:
            return _price_statement_lines(cur, ids)


def downstream_settle_create(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """发起下游结算（批次）：按供应商合同自动分组，一个批次生成 N 张下游对账单；
    单价/金额按各供应商采购合同价；无任何合同覆盖的货品拒绝并列清单；冻结上游单挂批次号。"""
    ids = data.get('upstream_statement_ids') or []
    if not ids:
        raise ValueError('未选择任何上游对账单')
    notes = (data.get('notes') or '').strip() or None
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM upstream_statements WHERE id = ANY(%s) FOR UPDATE', (ids,))
            stmts = [dict(r) for r in cur.fetchall()]
            if len(stmts) != len(set(ids)):
                raise LookupError('部分上游对账单不存在')
            bad = [s['statement_code'] for s in stmts
                   if s['status'] != 'settled' or s.get('settlement_batch_code') or s.get('downstream_statement_id')]
            if bad:
                raise ValueError('以下对账单不满足「上游已结算且未锁定」：' + '、'.join(bad))
            pricing = _price_statement_lines(cur, ids)
            if pricing['uncovered']:
                raise ValueError('没有任何下游采购合同覆盖以下货品：'
                                 + '、'.join(pricing['uncovered'])
                                 + '。请先补录对应供应商的采购单合同')
            if pricing['shortages']:
                raise ValueError('以下货品结算数量超出下游采购单剩余可结算量，请先补录新的采购单：'
                                 + '；'.join(f"{s['product_name']}（需 {s['need']}，剩余 {s['remaining']}，此前已结 {s['exhausted']}）"
                                             for s in pricing['shortages']))
            if not pricing['groups']:
                raise ValueError('没有可结算的货品')
            batch = _now_code('SB')
            projects = {str(s['project_id']) for s in stmts if s['project_id']}
            created = []
            for grp in pricing['groups']:
                code = _now_code('DS')
                cur.execute('''
                    INSERT INTO downstream_statements(
                        statement_code, settlement_batch_code, supplier_party_id, project_id, statement_date,
                        settlement_method, total_quantity, total_amount, status, notes)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 'draft', %s)
                    RETURNING id
                ''', (code, batch, grp['supplier_id'],
                      list(projects)[0] if len(projects) == 1 else None,
                      _date.today(), (data.get('settlement_method') or None),
                      grp['total_quantity'], grp['total_amount'], notes))
                ds_id = cur.fetchone()['id']
                for idx, line in enumerate(grp['lines'], start=1):
                    cur.execute('''
                        INSERT INTO downstream_statement_lines(
                            statement_id, line_no, product_id, product_name, quantity, unit,
                            unit_price, amount, source_order_no, source_order_line_id)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ''', (ds_id, idx, line.get('product_id'), line['product_name'], line['qty'],
                          line.get('unit'), line['unit_price'], line['amount'],
                          line['source_order_no'], line['source_order_line_id']))
                created.append({'id': str(ds_id), 'statement_code': code,
                                'supplier_name': grp['supplier_name'],
                                'total_quantity': str(grp['total_quantity']),
                                'total_amount': str(grp['total_amount'])})
                # FIFO 消耗落库：累加已结算数量，消耗满置「已结清」
                for line in grp['lines']:
                    cur.execute('''
                        UPDATE order_lines
                        SET settled_quantity = coalesce(settled_quantity,0) + %s,
                            settlement_status = CASE WHEN coalesce(settled_quantity,0) + %s >= coalesce(quantity,0)
                                                     THEN 'exhausted' ELSE 'unsettled' END,
                            updated_at = now()
                        WHERE id = %s
                    ''', (line['qty'], line['qty'], line['source_order_line_id']))
            cur.execute('''
                UPDATE upstream_statements
                SET status='frozen', settlement_batch_code=%s, updated_at=now()
                WHERE id = ANY(%s)
            ''', (batch, ids))
            _audit(cur, actor_profile=actor, tool_name='ops_downstream_settle_create', operation='create',
                   status='applied', target_table='downstream_statements', target_id=created[0]['id'],
                   payload={'batch_code': batch, 'statements': created,
                            'upstream_codes': [s['statement_code'] for s in stmts],
                            'pricing_basis': 'downstream_purchase_contract'})
            conn.commit()
    return {'batch_code': batch, 'statements': created, 'frozen_count': len(stmts),
            'total_quantity': pricing['total_quantity'], 'total_amount': pricing['total_amount']}


def downstream_settle_complete(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """下游结算完成：下游单置已结算，同步涉及的上游单置已结清（这批货结清）。"""
    ds_id = data.get('downstream_statement_id')
    settle_date = (data.get('settle_date') or '').strip() or None
    if not ds_id:
        raise ValueError('缺少下游对账单')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM downstream_statements WHERE id=%s FOR UPDATE', (ds_id,))
            ds = cur.fetchone()
            if not ds:
                raise LookupError('下游对账单不存在')
            ds = dict(ds)
            if ds['status'] == 'settled':
                raise ValueError('该下游对账单已结算过')
            cur.execute('''
                UPDATE downstream_statements SET status='settled', settle_date=%s, updated_at=now()
                WHERE id=%s
            ''', (settle_date or _date.today(), ds_id))
            cleared = []
            remaining = 0
            batch = ds.get('settlement_batch_code')
            if batch:
                cur.execute("SELECT count(*)::int AS n FROM downstream_statements WHERE settlement_batch_code=%s AND status <> 'settled'", (batch,))
                remaining = cur.fetchone()['n']
                if remaining == 0:
                    cur.execute('''
                        UPDATE upstream_statements SET status='cleared', updated_at=now()
                        WHERE settlement_batch_code=%s
                        RETURNING statement_code
                    ''', (batch,))
                    cleared = [r['statement_code'] for r in cur.fetchall()]
            else:
                cur.execute('''
                    UPDATE upstream_statements SET status='cleared', updated_at=now()
                    WHERE downstream_statement_id=%s
                    RETURNING statement_code
                ''', (ds_id,))
                cleared = [r['statement_code'] for r in cur.fetchall()]
            _audit(cur, actor_profile=actor, tool_name='ops_downstream_settle_complete', operation='update',
                   status='applied', target_table='downstream_statements', target_id=str(ds_id),
                   payload={'statement_code': ds['statement_code'], 'batch_code': batch,
                            'cleared_upstream': cleared, 'batch_remaining': remaining,
                            'settle_date': str(settle_date or _date.today())})
            conn.commit()
    return {'ok': True, 'cleared_upstream_count': len(cleared), 'batch_remaining': remaining}


def downstream_settle_cancel(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    """取消未结算的下游对账单：解冻上游单（回到上游已结算），删除下游单。"""
    ds_id = data.get('downstream_statement_id')
    if not ds_id:
        raise ValueError('缺少下游对账单')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM downstream_statements WHERE id=%s FOR UPDATE', (ds_id,))
            ds = cur.fetchone()
            if not ds:
                raise LookupError('下游对账单不存在')
            ds = dict(ds)
            if ds['status'] == 'settled':
                raise ValueError('已结算的下游对账单不能取消')
            batch = ds.get('settlement_batch_code')
            if batch:
                cur.execute("SELECT count(*)::int AS n FROM downstream_statements WHERE settlement_batch_code=%s AND status='settled'", (batch,))
                if cur.fetchone()['n']:
                    raise ValueError('该结算批次中已有完成结算的下游单，整批不能取消')
                cur.execute("SELECT id, statement_code FROM downstream_statements WHERE settlement_batch_code=%s", (batch,))
                batch_ds = [dict(r) for r in cur.fetchall()]
                cur.execute('''
                    UPDATE upstream_statements SET status='settled', settlement_batch_code=NULL, updated_at=now()
                    WHERE settlement_batch_code=%s RETURNING statement_code
                ''', (batch,))
                unfrozen = [r['statement_code'] for r in cur.fetchall()]
                for d in batch_ds:
                    # 回退合同行消耗
                    cur.execute('''
                        UPDATE order_lines ol
                        SET settled_quantity = greatest(coalesce(ol.settled_quantity,0) - dsl.quantity, 0),
                            settlement_status = CASE WHEN coalesce(ol.settled_quantity,0) - dsl.quantity >= coalesce(ol.quantity,0)
                                                     THEN 'exhausted' ELSE 'unsettled' END,
                            updated_at = now()
                        FROM downstream_statement_lines dsl
                        WHERE dsl.statement_id = %s AND dsl.source_order_line_id = ol.id
                    ''', (d['id'],))
                    cur.execute('DELETE FROM downstream_statement_lines WHERE statement_id=%s', (d['id'],))
                    cur.execute('DELETE FROM downstream_statements WHERE id=%s', (d['id'],))
                cancelled = [d['statement_code'] for d in batch_ds]
            else:
                cur.execute('''
                    UPDATE upstream_statements SET status='settled', downstream_statement_id=NULL, updated_at=now()
                    WHERE downstream_statement_id=%s RETURNING statement_code
                ''', (ds_id,))
                unfrozen = [r['statement_code'] for r in cur.fetchall()]
                cur.execute('''
                    UPDATE order_lines ol
                    SET settled_quantity = greatest(coalesce(ol.settled_quantity,0) - dsl.quantity, 0),
                        settlement_status = CASE WHEN coalesce(ol.settled_quantity,0) - dsl.quantity >= coalesce(ol.quantity,0)
                                                 THEN 'exhausted' ELSE 'unsettled' END,
                        updated_at = now()
                    FROM downstream_statement_lines dsl
                    WHERE dsl.statement_id = %s AND dsl.source_order_line_id = ol.id
                ''', (ds_id,))
                cur.execute('DELETE FROM downstream_statement_lines WHERE statement_id=%s', (ds_id,))
                cur.execute('DELETE FROM downstream_statements WHERE id=%s', (ds_id,))
                cancelled = [ds['statement_code']]
            _audit(cur, actor_profile=actor, tool_name='ops_downstream_settle_cancel', operation='delete_request',
                   status='applied', target_table='downstream_statements', target_id=str(ds_id),
                   payload={'batch_code': batch, 'cancelled': cancelled, 'unfrozen_upstream': unfrozen})
            conn.commit()
    return {'ok': True, 'unfrozen_count': len(unfrozen), 'cancelled_statements': cancelled}


# ---------------------------------------------------------------------------
# 上游对账单库存核销：销售事实照扣；负库存照记 + error 校验问题（不静默不阻断）
# ---------------------------------------------------------------------------

def statement_writeoff(data: dict[str, Any], *, actor: str) -> dict[str, Any]:
    stmt_id = data.get('statement_id')
    if not stmt_id:
        raise ValueError('缺少对账单')
    with connect() as conn:
        with conn.cursor() as cur:
            cur.execute('SELECT * FROM upstream_statements WHERE id=%s FOR UPDATE', (stmt_id,))
            stmt = cur.fetchone()
            if not stmt:
                raise LookupError('对账单不存在')
            stmt = dict(stmt)
            cur.execute('SELECT * FROM upstream_statement_lines WHERE statement_id=%s ORDER BY line_no', (stmt_id,))
            lines = [dict(r) for r in cur.fetchall()]
            if not lines:
                raise ValueError('对账单没有明细行')
            # 渠道名 → 渠道对象
            cur.execute("SELECT id, display_name FROM parties WHERE party_type IN ('channel','platform')")
            channel_map = {r['display_name']: r['id'] for r in cur.fetchall()}
            done, skipped, negatives, problems = [], 0, [], []
            for line in lines:
                if line.get('stock_movement_id'):
                    skipped += 1
                    continue
                qty = Decimal(str(line.get('quantity') or 0))
                if qty <= 0:
                    skipped += 1
                    continue
                if not line.get('product_id'):
                    problems.append(f"行{line['line_no']}（{line.get('product_name')}）未绑定标准产品")
                    continue
                # 找库存SKU：优先对账单项目下的账户；找不到则在唯一可定位的账户里自动建
                cur.execute('''
                    SELECT si.id, si.inventory_account_id FROM stock_items si
                    JOIN inventory_accounts ia ON ia.id = si.inventory_account_id
                    WHERE si.product_id = %s AND (%s::uuid IS NULL OR ia.project_id = %s::uuid)
                ''', (line['product_id'], stmt.get('project_id'), stmt.get('project_id')))
                sis = cur.fetchall()
                if len(sis) > 1:
                    problems.append(f"{line.get('product_name')} 在多个库存账户存在，请在对账单上指定项目后重试")
                    continue
                if sis:
                    stock_item_id = sis[0]['id']
                else:
                    cur.execute('''
                        SELECT ia.id FROM inventory_accounts ia
                        WHERE (%s::uuid IS NULL OR ia.project_id = %s::uuid) AND ia.status='active'
                    ''', (stmt.get('project_id'), stmt.get('project_id')))
                    accs = cur.fetchall()
                    if len(accs) != 1:
                        problems.append(f"{line.get('product_name')} 无库存SKU且无法唯一定位库存账户（请在对账单指定项目）")
                        continue
                    cur.execute('SELECT standard_name, default_unit FROM products WHERE id=%s', (line['product_id'],))
                    prod = cur.fetchone()
                    cur.execute('''
                        INSERT INTO stock_items(stock_item_code, inventory_account_id, product_id, display_name, unit, status, notes)
                        VALUES (%s, %s, %s, %s, %s, 'active', %s) RETURNING id
                    ''', (_now_code('ST'), accs[0]['id'], line['product_id'],
                          (prod['standard_name'] if prod else line.get('product_name')),
                          line.get('unit') or (prod['default_unit'] if prod else None),
                          f"对账核销自动创建（{stmt['statement_code']}）"))
                    stock_item_id = cur.fetchone()['id']
                channel = (line.get('channel') or '').strip()
                counterparty = channel_map.get(channel) or next(
                    (v for k, v in channel_map.items() if channel and channel in k), None)
                code = _now_code('MV')
                cur.execute('''
                    INSERT INTO stock_movements(
                        movement_code, movement_no, stock_item_id, movement_date, direction, movement_type,
                        quantity, unit, counterparty_id, purpose, notes)
                    VALUES (%s, %s, %s, %s, 'out', 'shipment_out', %s, %s, %s, %s, %s)
                    RETURNING id
                ''', (code, code, stock_item_id, stmt.get('statement_date') or _date.today(),
                      qty, line.get('unit'), counterparty,
                      f"对账核销 渠道:{channel or '未注明'}",
                      f"上游对账单 {stmt['statement_code']} 行{line['line_no']} 销售核销"))
                mv_id = cur.fetchone()['id']
                cur.execute('UPDATE upstream_statement_lines SET stock_movement_id=%s, updated_at=now() WHERE id=%s',
                            (mv_id, line['id']))
                # 负库存检查：照扣 + error 问题，不回滚
                cur.execute('SELECT current_quantity FROM v_current_stock WHERE stock_item_id=%s', (stock_item_id,))
                row = cur.fetchone()
                current = row['current_quantity'] if row else None
                if current is not None and Decimal(str(current)) < 0:
                    negatives.append({'product': line.get('product_name'), 'current': str(current)})
                    cur.execute('''
                        INSERT INTO validation_issues(issue_type, severity, related_table, related_id, field_name, message, status)
                        VALUES ('negative_stock', 'error', 'stock_items', %s, 'current_quantity', %s, 'open')
                    ''', (stock_item_id,
                          f"对账核销后库存为负：{line.get('product_name')} 当前 {current}"
                          f"（{stmt['statement_code']} 渠道{channel or '未注明'}扣减 {qty}）。"
                          f"请核查此前入库是否漏录，或对账数量是否有误（误录可冲正流水 {code}）"))
                done.append({'line_no': line['line_no'], 'movement_code': code, 'quantity': str(qty)})
            if problems:
                conn.rollback()
                raise ValueError('；'.join(problems))
            _audit(cur, actor_profile=actor, tool_name='ops_statement_writeoff', operation='update',
                   status='applied', target_table='upstream_statements', target_id=str(stmt_id),
                   payload={'statement_code': stmt['statement_code'], 'movements': done,
                            'skipped': skipped, 'negatives': negatives})
            conn.commit()
    return {'ok': True, 'written_off': len(done), 'skipped': skipped, 'negatives': negatives}
