"""Stages 1-3 orchestration. Reads from IMAP (or a local .eml directory for testing), runs decode + dequote, writes intermediate outputs as JSON under `state_dir/extracted/`. Stages 4-7 (threading, entities, intent, canonical normalization) consume these outputs in subsequent passes. """ from __future__ import annotations import json from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Iterator from .decode import DecodedMessage, decode_mime from .dequote import DequoteResult, dequote from .fetch import FetchedRaw @dataclass class StagedOutput: msg_id: str account: str folder: str uid: str internal_date: str decoded: DecodedMessage dequoted: DequoteResult def stage123(raw: FetchedRaw) -> StagedOutput: decoded = decode_mime(raw.raw_mime) dequoted = dequote(decoded.body_text) return StagedOutput( msg_id=decoded.msg_id or f"no-msgid-{raw.account}-{raw.uid}", account=raw.account, folder=raw.folder, uid=raw.uid, internal_date=raw.internal_date.astimezone(timezone.utc).isoformat(), decoded=decoded, dequoted=dequoted, ) def _output_path(state_dir: Path, out: StagedOutput) -> Path: yyyymm = out.internal_date[:7] # "2026-05" safe_id = out.msg_id.replace("/", "_").replace("\\", "_")[:200] return state_dir / "extracted" / yyyymm / f"{safe_id}.json" def _serializable(out: StagedOutput) -> dict: return { "msg_id": out.msg_id, "account": out.account, "folder": out.folder, "uid": out.uid, "internal_date": out.internal_date, "subject": out.decoded.subject, "from": {"name": out.decoded.from_addr[0], "email": out.decoded.from_addr[1]}, "to": [{"name": n, "email": e} for n, e in out.decoded.to_addrs], "cc": [{"name": n, "email": e} for n, e in out.decoded.cc_addrs], "in_reply_to": out.decoded.in_reply_to, "references": out.decoded.references, "body_text_clean": out.dequoted.text_clean, "body_text_full_chars": len(out.decoded.body_text), "body_text_clean_chars": len(out.dequoted.text_clean), "attachments_meta": out.decoded.attachments_meta, "decode_warnings": out.decoded.decode_warnings, "dequote": { "strategies_used": out.dequoted.strategies_used, "chars_stripped": out.dequoted.chars_stripped, }, "_extraction": { "stages_complete": [1, 2, 3], "extractor_version": "0.1.0", "extracted_at": datetime.now(timezone.utc).isoformat(), }, } def write_staged(out: StagedOutput, state_dir: Path) -> Path: p = _output_path(state_dir, out) p.parent.mkdir(parents=True, exist_ok=True) p.write_text( json.dumps(_serializable(out), ensure_ascii=False, indent=2), encoding="utf-8", ) return p def run_on_raws(raws: Iterator[FetchedRaw], state_dir: Path) -> dict: """Run stages 1-3 over an iterator of FetchedRaw, write JSON, return summary.""" counts = {"fetched": 0, "ok": 0, "failed": 0, "low_signal_clean": 0} failed_dir = state_dir / "extracted" / "_failed" for raw in raws: counts["fetched"] += 1 try: staged = stage123(raw) write_staged(staged, state_dir) counts["ok"] += 1 if len(staged.dequoted.text_clean) < 8: counts["low_signal_clean"] += 1 except Exception as exc: # don't let one bad message kill the run counts["failed"] += 1 failed_dir.mkdir(parents=True, exist_ok=True) (failed_dir / f"{raw.account}__{raw.uid}.error").write_text( f"{type(exc).__name__}: {exc}\n", encoding="utf-8" ) return counts