#!/usr/bin/env python3
"""Kestra Bridge v1 for NAS KB profile-memory governance.

This bridge exposes only fixed task keys for the existing meow-chan NAS shared
knowledge approval workflow. It is not a general command runner and does not
accept arbitrary shell/prompt/path input.
"""
from __future__ import annotations

from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import datetime as dt
import hashlib
import json
import os
from pathlib import Path
import re
import shutil
import subprocess
import sys
import time
from typing import Any

HOST = os.environ.get('KESTRA_NAS_KB_BRIDGE_HOST', '127.0.0.1')
PORT = int(os.environ.get('KESTRA_NAS_KB_BRIDGE_PORT', '19092'))
TOKEN = os.environ.get('KESTRA_AGENT_BRIDGE_TOKEN', '')

PROJECT_ROOT = Path('/Users/bot1/Volumes/root_for_ai/AI工作区/通用_多Agent编排_Kestra评估POC_20260602_2208')
PROFILE_HOME = Path('/Users/bot1/.hermes/profiles/meow-chan')
SCRIPT_DIR = PROFILE_HOME / 'scripts'
# Keep runtime state/logs in local Hermes ops so macOS LaunchAgent can run the
# bridge reliably even when AI工作区 is mounted with stricter background access.
OPS_ROOT = Path(os.environ.get('KESTRA_NAS_KB_BRIDGE_OPS_ROOT', '/Users/bot1/.hermes/ops/kestra-nas-kb-bridge'))
STATE_DIR = OPS_ROOT / 'state'
LOG_DIR = OPS_ROOT / 'logs'
BRIEF_DIR = PROJECT_ROOT / 'runtime' / 'briefs'

COLLECT_SCRIPT = SCRIPT_DIR / 'nas_kb_profile_collect.py'
STATUS_SCRIPT = SCRIPT_DIR / 'nas_kb_approval_queue_status.py'
CLEANUP_SCRIPT = SCRIPT_DIR / 'profile_memory_cleanup_after_nas.py'
HERMES_BIN = shutil.which('hermes') or 'hermes'

TASK_TIMEOUTS = {
    'preflight': 30,
    'daily_collect_and_create_approval': 1800,
    'approval_monitor_and_merge': 1800,
    'memory_cleanup_auto_apply': 300,
    'status_snapshot': 120,
}

SECRET_RE = re.compile(
    r'(?i)(api[_-]?key|token|secret|password|authorization|app_secret|verification_token|encrypt_key|cookie|bearer|basic)([=:\s]+)([^\s\]",}]+)'
)


def now_iso() -> str:
    return dt.datetime.now(dt.timezone.utc).astimezone().isoformat(timespec='seconds')


def redact(text: str) -> str:
    return SECRET_RE.sub(r'\1\2[REDACTED]', text or '')


def tail(text: str, limit: int = 6000) -> str:
    text = redact(text or '')
    if len(text) <= limit:
        return text
    return text[:1000] + f'\n...[truncated {len(text)-limit} chars]...\n' + text[-(limit-1000):]


def safe_request_id(value: str | None) -> str:
    value = value or f'manual-{int(time.time())}'
    value = re.sub(r'[^A-Za-z0-9_.:-]+', '-', value)[:120]
    return value or f'manual-{int(time.time())}'


def write_json(path: Path, data: Any) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    tmp = path.with_suffix(path.suffix + '.tmp')
    tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2) + '\n', encoding='utf-8')
    tmp.replace(path)


def read_template(name: str) -> str:
    p = BRIEF_DIR / name
    return p.read_text(encoding='utf-8')


def base_env() -> dict[str, str]:
    env = os.environ.copy()
    env.update({
        'HOME': '/Users/bot1',
        'HERMES_HOME': '/Users/bot1/.hermes',
        'HERMES_PROFILE_COLLECTION_READONLY': '1',
        'PYTHONUNBUFFERED': '1',
    })
    return env


def run_cmd(name: str, cmd: list[str], timeout: int, cwd: Path | None = None) -> dict[str, Any]:
    started = time.time()
    try:
        proc = subprocess.run(
            cmd,
            cwd=str(cwd or PROJECT_ROOT),
            env=base_env(),
            text=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            timeout=timeout,
        )
        return {
            'ok': proc.returncode == 0,
            'name': name,
            'exit_code': proc.returncode,
            'elapsed_sec': round(time.time() - started, 2),
            'stdout_tail': tail(proc.stdout),
            'stderr_tail': tail(proc.stderr, 3000),
        }
    except subprocess.TimeoutExpired as e:
        out = e.stdout.decode('utf-8', 'replace') if isinstance(e.stdout, bytes) else (e.stdout or '')
        err = e.stderr.decode('utf-8', 'replace') if isinstance(e.stderr, bytes) else (e.stderr or '')
        return {
            'ok': False,
            'name': name,
            'exit_code': 124,
            'elapsed_sec': round(time.time() - started, 2),
            'stdout_tail': tail(out, 3000),
            'stderr_tail': tail(err, 3000),
            'error_code': 'TIMEOUT',
        }
    except Exception as e:
        return {
            'ok': False,
            'name': name,
            'exit_code': 125,
            'elapsed_sec': round(time.time() - started, 2),
            'stdout_tail': '',
            'stderr_tail': '',
            'error_code': type(e).__name__,
            'message': str(e),
        }


def hermes_one_shot(name: str, prompt: str, timeout: int) -> dict[str, Any]:
    prompt_file = STATE_DIR / 'prompts' / f'{name}-{hashlib.sha256(prompt.encode()).hexdigest()[:10]}.md'
    prompt_file.parent.mkdir(parents=True, exist_ok=True)
    prompt_file.write_text(prompt, encoding='utf-8')
    cmd = [
        HERMES_BIN,
        '--profile', 'meow-chan',
        '--skills', 'studio-nas-shared-knowledge,profile-delegation,feishu-openapi-automation',
        'chat',
        '-q', prompt,
        '-Q',
    ]
    result = run_cmd(name, cmd, timeout=timeout, cwd=PROJECT_ROOT)
    result['prompt_file'] = str(prompt_file)
    return result


def preflight(request_id: str) -> dict[str, Any]:
    env_keys = ['FEISHU_APP_ID', 'FEISHU_APP_SECRET', 'LARK_APP_ID', 'LARK_APP_SECRET']
    profile_env = PROFILE_HOME / '.env'
    env_text = profile_env.read_text(encoding='utf-8', errors='ignore') if profile_env.exists() else ''
    present = {k: (k in os.environ or re.search(rf'^{re.escape(k)}\s*=', env_text, flags=re.M) is not None) for k in env_keys}
    checks = {
        'project_root': str(PROJECT_ROOT),
        'project_root_exists': PROJECT_ROOT.exists(),
        'profile_home_exists': PROFILE_HOME.exists(),
        'scripts': {
            str(COLLECT_SCRIPT): COLLECT_SCRIPT.exists(),
            str(STATUS_SCRIPT): STATUS_SCRIPT.exists(),
            str(CLEANUP_SCRIPT): CLEANUP_SCRIPT.exists(),
        },
        'briefs': {
            'nas_kb_daily_collect_and_create_approval.md': (BRIEF_DIR / 'nas_kb_daily_collect_and_create_approval.md').exists(),
            'nas_kb_approval_monitor_and_merge.md': (BRIEF_DIR / 'nas_kb_approval_monitor_and_merge.md').exists(),
        },
        'hermes_bin': HERMES_BIN,
        'state_dir': str(STATE_DIR),
        'feishu_env_present': present,
        'token_configured_for_post': bool(TOKEN),
    }
    ok = checks['project_root_exists'] and checks['profile_home_exists'] and all(checks['scripts'].values()) and all(checks['briefs'].values())
    return {'ok': ok, 'request_id': request_id, 'task_key': 'preflight', 'checks': checks, 'time': now_iso()}


def daily_collect_and_create_approval(request_id: str, dry_run: bool = False) -> dict[str, Any]:
    run_dir = STATE_DIR / request_id
    run_dir.mkdir(parents=True, exist_ok=True)
    collect_path = run_dir / 'profile_collect.json'
    collect = run_cmd('nas_kb_profile_collect', ['python3', str(COLLECT_SCRIPT)], timeout=600, cwd=PROJECT_ROOT)
    collect_path.write_text(collect.get('stdout_tail') or '', encoding='utf-8')
    if not collect.get('ok'):
        return {'ok': False, 'request_id': request_id, 'task_key': 'daily_collect_and_create_approval', 'stage': 'collect', 'collect': collect, 'collect_json_path': str(collect_path)}

    if dry_run:
        return {'ok': True, 'request_id': request_id, 'task_key': 'daily_collect_and_create_approval', 'dry_run': True, 'collect_json_path': str(collect_path), 'collect': collect}

    template = read_template('nas_kb_daily_collect_and_create_approval.md')
    prompt = template.replace('{request_id}', request_id).replace('{collect_json_path}', str(collect_path)).replace('{state_dir}', str(run_dir))
    hermes_result = hermes_one_shot('nas_kb_daily_collect_and_create_approval', prompt, timeout=1200)
    return {
        'ok': bool(hermes_result.get('ok')),
        'request_id': request_id,
        'task_key': 'daily_collect_and_create_approval',
        'collect_json_path': str(collect_path),
        'collect_summary': {'ok': collect.get('ok'), 'elapsed_sec': collect.get('elapsed_sec')},
        'hermes': hermes_result,
    }


def approval_monitor_and_merge(request_id: str, dry_run: bool = False) -> dict[str, Any]:
    run_dir = STATE_DIR / request_id
    run_dir.mkdir(parents=True, exist_ok=True)
    status_path = run_dir / 'approval_status.json'
    status = run_cmd('nas_kb_approval_queue_status', ['python3', str(STATUS_SCRIPT)], timeout=180, cwd=PROJECT_ROOT)
    raw = status.get('stdout_tail') or ''
    status_path.write_text(raw, encoding='utf-8')
    if not status.get('ok'):
        return {'ok': False, 'request_id': request_id, 'task_key': 'approval_monitor_and_merge', 'stage': 'status', 'status': status, 'status_json_path': str(status_path)}
    if not raw.strip():
        return {'ok': True, 'request_id': request_id, 'task_key': 'approval_monitor_and_merge', 'noop': True, 'reason': 'no pending packages reported'}
    if dry_run:
        return {'ok': True, 'request_id': request_id, 'task_key': 'approval_monitor_and_merge', 'dry_run': True, 'status_json_path': str(status_path), 'status': status}
    template = read_template('nas_kb_approval_monitor_and_merge.md')
    prompt = template.replace('{request_id}', request_id).replace('{status_json_path}', str(status_path)).replace('{state_dir}', str(run_dir))
    hermes_result = hermes_one_shot('nas_kb_approval_monitor_and_merge', prompt, timeout=1200)
    return {'ok': bool(hermes_result.get('ok')), 'request_id': request_id, 'task_key': 'approval_monitor_and_merge', 'status_json_path': str(status_path), 'hermes': hermes_result}


def memory_cleanup_auto_apply(request_id: str, dry_run: bool = False) -> dict[str, Any]:
    cmd = ['python3', str(CLEANUP_SCRIPT), '--auto-apply-approved']
    if dry_run:
        cmd.append('--verbose')
    result = run_cmd('profile_memory_cleanup_after_nas', cmd, timeout=240, cwd=PROJECT_ROOT)
    return {'ok': bool(result.get('ok')), 'request_id': request_id, 'task_key': 'memory_cleanup_auto_apply', 'dry_run': dry_run, 'result': result}


def status_snapshot(request_id: str) -> dict[str, Any]:
    status = run_cmd('nas_kb_approval_queue_status', ['python3', str(STATUS_SCRIPT)], timeout=180, cwd=PROJECT_ROOT)
    return {'ok': bool(status.get('ok')), 'request_id': request_id, 'task_key': 'status_snapshot', 'status': status}


TASKS = {
    'preflight': preflight,
    'daily_collect_and_create_approval': daily_collect_and_create_approval,
    'approval_monitor_and_merge': approval_monitor_and_merge,
    'memory_cleanup_auto_apply': memory_cleanup_auto_apply,
    'status_snapshot': status_snapshot,
}


def log_event(obj: dict[str, Any]) -> None:
    LOG_DIR.mkdir(parents=True, exist_ok=True)
    line = json.dumps(obj, ensure_ascii=False)
    with (LOG_DIR / 'kestra_nas_kb_bridge_v1.jsonl').open('a', encoding='utf-8') as f:
        f.write(redact(line) + '\n')


class Handler(BaseHTTPRequestHandler):
    def _send(self, code: int, obj: dict[str, Any]) -> None:
        body = json.dumps(obj, ensure_ascii=False, indent=2).encode('utf-8')
        self.send_response(code)
        self.send_header('Content-Type', 'application/json; charset=utf-8')
        self.send_header('Content-Length', str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def log_message(self, fmt: str, *args: Any) -> None:
        sys.stderr.write(f"{now_iso()} {self.client_address[0]} {fmt % args}\n")

    def do_GET(self) -> None:
        path = self.path.split('?', 1)[0]
        if path == '/health':
            return self._send(200, {'ok': True, 'service': 'kestra-nas-kb-bridge-v1', 'host': HOST, 'port': PORT, 'time': now_iso(), 'post_token_required': True})
        return self._send(404, {'ok': False, 'error_code': 'NOT_FOUND'})

    def do_POST(self) -> None:
        path = self.path.split('?', 1)[0]
        if path != '/v1/nas-kb/run':
            return self._send(404, {'ok': False, 'error_code': 'NOT_FOUND'})
        auth = self.headers.get('Authorization', '')
        if not TOKEN or auth != f'Bearer {TOKEN}':
            return self._send(401, {'ok': False, 'error_code': 'UNAUTHORIZED'})
        try:
            length = int(self.headers.get('Content-Length') or '0')
            body = self.rfile.read(min(length, 65536)).decode('utf-8')
            req = json.loads(body or '{}')
        except Exception as e:
            return self._send(400, {'ok': False, 'error_code': 'BAD_JSON', 'message': str(e)})
        allowed = {'request_id', 'task_key', 'dry_run'}
        unknown = sorted(set(req) - allowed)
        if unknown:
            return self._send(400, {'ok': False, 'error_code': 'UNKNOWN_FIELDS', 'fields': unknown})
        task_key = req.get('task_key')
        if task_key not in TASKS:
            return self._send(400, {'ok': False, 'error_code': 'TASK_NOT_ALLOWED', 'allowed_task_keys': sorted(TASKS)})
        request_id = safe_request_id(req.get('request_id'))
        dry_run = bool(req.get('dry_run', False))
        started = time.time()
        try:
            if task_key in {'daily_collect_and_create_approval', 'approval_monitor_and_merge', 'memory_cleanup_auto_apply'}:
                result = TASKS[task_key](request_id, dry_run=dry_run)  # type: ignore[misc]
            else:
                result = TASKS[task_key](request_id)  # type: ignore[misc]
            result.setdefault('elapsed_sec', round(time.time() - started, 2))
            code = 200 if result.get('ok') else 500
        except Exception as e:
            result = {'ok': False, 'request_id': request_id, 'task_key': task_key, 'error_code': type(e).__name__, 'message': str(e), 'elapsed_sec': round(time.time() - started, 2)}
            code = 500
        log_event({'time': now_iso(), 'request_id': request_id, 'task_key': task_key, 'ok': result.get('ok'), 'elapsed_sec': result.get('elapsed_sec'), 'dry_run': dry_run})
        return self._send(code, result)


def main() -> int:
    STATE_DIR.mkdir(parents=True, exist_ok=True)
    LOG_DIR.mkdir(parents=True, exist_ok=True)
    httpd = ThreadingHTTPServer((HOST, PORT), Handler)
    print(f'kestra-nas-kb-bridge-v1 listening on http://{HOST}:{PORT}', flush=True)
    httpd.serve_forever()
    return 0


if __name__ == '__main__':
    raise SystemExit(main())
