assistant-claw/mcp-tools/email-extractor/atlas_extractor/fetch.py
Atlas refactor bd0be97630 Refactor: drop Vega framing, promote Atlas to repo root
This repo IS Atlas (总助 Claw / 老板视角项目执行雷达). The earlier
two-profile framing (Atlas + Vega placeholder) was a misread — Vega is
the agent persona answering Multica issues, not the product. Vega has
no relationship to assistant-claw the product.

Changes:
- Move atlas/* to top-level (git mv preserves history)
- Remove empty Vega placeholders prompts/.gitkeep, tools/.gitkeep
- Delete atlas/ wrapper directory (now empty)
- Update path references in INTEGRATION-hermes.md, scripts/mirror-...sh,
  docs/decisions/0001-mirror-nuwa-skill.md
- Rewrite README.md as Atlas-only, remove dual-profile language

After this commit:
- Top-level OpenClaw 8 files (IDENTITY/SOUL/USER/AGENTS/TOOLS/MEMORY/
  BOOTSTRAP/HEARTBEAT + CLAUDE symlink + zh-CN mirrors)
- skills/{6 sub-skills + DESCRIPTION + README}
- mcp-tools/{spec + Python implementation}
- state-schemas/{project, person, customer + README}
- autopilots/{5 atlas-*.yaml}
- client-deck/, docs/decisions/, scripts/

The ~/.hermes/skills/atlas/ destination convention preserved (atlas as
a skill namespace on the operator's machine, distinct from source path).
2026-05-09 17:54:18 +08:00

120 lines
3.6 KiB
Python

"""Stage 1: Fetch.
IMAP-based incremental fetcher. Persists `last_uid` per (account, folder)
in a JSON sidecar so re-runs only pull new messages.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
from imap_tools import MailBox, AND
@dataclass
class FetchedRaw:
account: str
folder: str
uid: str
internal_date: datetime
raw_mime: bytes
def _sync_state_path(state_dir: Path, account: str, folder: str) -> Path:
safe = f"{account}__{folder}".replace("/", "_").replace("@", "_at_")
return state_dir / f".sync__{safe}.json"
def _load_last_uid(state_dir: Path, account: str, folder: str) -> int | None:
p = _sync_state_path(state_dir, account, folder)
if not p.exists():
return None
try:
return int(json.loads(p.read_text(encoding="utf-8"))["last_uid"])
except (KeyError, ValueError, json.JSONDecodeError):
return None
def _save_last_uid(state_dir: Path, account: str, folder: str, last_uid: int) -> None:
p = _sync_state_path(state_dir, account, folder)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(
json.dumps(
{
"account": account,
"folder": folder,
"last_uid": last_uid,
"updated_at": datetime.now(timezone.utc).isoformat(),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
def fetch_imap(
*,
host: str,
port: int,
username: str,
password: str,
folders: list[str],
state_dir: Path,
since: datetime | None = None,
max_per_run: int = 5000,
) -> Iterator[FetchedRaw]:
"""Yield raw MIME messages incrementally per folder.
Sync model: per (username, folder) we remember the highest UID seen.
On re-run we fetch UIDs strictly greater. First run may use `since`
to bound the cold-start window.
"""
with MailBox(host, port).login(username, password) as mailbox:
for folder in folders:
mailbox.folder.set(folder)
last_uid = _load_last_uid(state_dir, username, folder)
if last_uid is None:
# cold start
criteria = AND(date_gte=since.date()) if since else "ALL"
msgs = mailbox.fetch(
criteria=criteria,
bulk=True,
headers_only=False,
limit=max_per_run,
mark_seen=False,
)
else:
# incremental: UID > last_uid
# imap-tools UIDRange string form
msgs = mailbox.fetch(
f"UID {last_uid + 1}:*",
bulk=True,
headers_only=False,
limit=max_per_run,
mark_seen=False,
)
highest_seen = last_uid or 0
for m in msgs:
yield FetchedRaw(
account=username,
folder=folder,
uid=str(m.uid),
internal_date=m.date or datetime.now(timezone.utc),
raw_mime=m.obj.as_bytes(), # full MIME bytes
)
try:
if int(m.uid) > highest_seen:
highest_seen = int(m.uid)
except (TypeError, ValueError):
continue
if highest_seen and (last_uid is None or highest_seen > last_uid):
_save_last_uid(state_dir, username, folder, highest_seen)