"""Stage 2: Decode. MIME parsing → plain text. Handles charset detection, multipart, HTML→text. Output is the raw cleanable text — Stage 3 (dequote) strips the conversation history afterwards. """ from __future__ import annotations import email import re from dataclasses import dataclass, field from email.message import Message from typing import Iterable import chardet import html2text from readability import Document _HTML2TEXT = html2text.HTML2Text() _HTML2TEXT.ignore_images = True _HTML2TEXT.ignore_emphasis = True _HTML2TEXT.ignore_links = False _HTML2TEXT.body_width = 0 # don't re-wrap @dataclass class DecodedMessage: msg_id: str subject: str from_addr: tuple[str, str] # (name, email) to_addrs: list[tuple[str, str]] cc_addrs: list[tuple[str, str]] in_reply_to: str | None references: list[str] body_text: str # full text (may include quoted history) body_html: str | None attachments_meta: list[dict] = field(default_factory=list) decode_warnings: list[str] = field(default_factory=list) def _decode_bytes(data: bytes, declared_charset: str | None) -> str: """Best-effort charset decode.""" candidates: list[str] = [] if declared_charset: candidates.append(declared_charset) candidates.append("utf-8") sniffed = chardet.detect(data).get("encoding") if sniffed and sniffed not in candidates: candidates.append(sniffed) candidates.append("gb18030") # common Chinese fallback candidates.append("latin-1") # never fails for enc in candidates: try: return data.decode(enc) except (UnicodeDecodeError, LookupError): continue return data.decode("utf-8", errors="replace") def _addr_pair(addr_str: str) -> tuple[str, str]: name, email_addr = email.utils.parseaddr(addr_str or "") return (name.strip(), email_addr.strip().lower()) def _addr_list(addr_str: str) -> list[tuple[str, str]]: if not addr_str: return [] pairs = email.utils.getaddresses([addr_str]) return [(n.strip(), e.strip().lower()) for (n, e) in pairs if e] def _walk_parts(msg: Message) -> Iterable[Message]: if msg.is_multipart(): for part in msg.walk(): if not part.is_multipart(): yield part else: yield msg def _extract_bodies(msg: Message) -> tuple[str, str | None, list[dict], list[str]]: """Return (text, html, attachments_meta, warnings).""" text_parts: list[str] = [] html_parts: list[str] = [] attachments: list[dict] = [] warnings: list[str] = [] for part in _walk_parts(msg): ctype = part.get_content_type() disp = (part.get("Content-Disposition") or "").lower() payload = part.get_payload(decode=True) if payload is None: continue if "attachment" in disp: attachments.append( { "filename": part.get_filename(), "content_type": ctype, "size_bytes": len(payload), } ) continue if ctype == "text/plain": text_parts.append(_decode_bytes(payload, part.get_content_charset())) elif ctype == "text/html": html_parts.append(_decode_bytes(payload, part.get_content_charset())) elif ctype == "text/calendar": warnings.append("text/calendar part skipped") text_body = "\n\n".join(text_parts).strip() html_body = "\n\n".join(html_parts).strip() if html_parts else None if not text_body and html_body: # readability for the main article extraction, then html2text try: summary_html = Document(html_body).summary() text_body = _HTML2TEXT.handle(summary_html).strip() except Exception as exc: warnings.append(f"readability failed: {exc}; falling back to html2text") text_body = _HTML2TEXT.handle(html_body).strip() return text_body, html_body, attachments, warnings def decode_mime(raw_mime: bytes) -> DecodedMessage: msg = email.message_from_bytes(raw_mime) text, html, attachments, warnings = _extract_bodies(msg) msg_id = (msg.get("Message-ID") or msg.get("Message-Id") or "").strip("<> ") subject_raw = msg.get("Subject", "") subject = str(email.header.make_header(email.header.decode_header(subject_raw or ""))) refs_raw = msg.get("References", "") or "" references = [r.strip("<> ") for r in refs_raw.split() if r.strip()] in_reply_to = (msg.get("In-Reply-To") or "").strip("<> ") or None return DecodedMessage( msg_id=msg_id, subject=subject, from_addr=_addr_pair(msg.get("From", "")), to_addrs=_addr_list(msg.get("To", "")), cc_addrs=_addr_list(msg.get("Cc", "")), in_reply_to=in_reply_to, references=references, body_text=text, body_html=html, attachments_meta=attachments, decode_warnings=warnings, )