"""Stage 3: Dequote. Strip quoted-reply chains, signature blocks, and disclaimer footers. This is the unglamorous-but-critical step — without it, every email looks like every other email and downstream clustering is destroyed. Strategy stack (apply in order, keep all matches conservative): 1. Marker patterns (English + Chinese reply/forward markers) 2. Outlook-style block headers 3. RFC quoted lines (`> ...`) 4. Signature separator (`-- \n`) 5. Trailing-block heuristic (phone/title patterns) 6. Disclaimer footer regex Result: only the new content the sender wrote in this message. """ from __future__ import annotations import re from dataclasses import dataclass # --- Marker patterns --------------------------------------------------------- _MARKERS = [ # English: "On Mon, Apr 22, 2024 at 9:14 AM Wang wrote:" re.compile(r"^On\s.+?wrote:\s*$", re.MULTILINE), # English forwards re.compile(r"^[-]+\s*Forwarded message\s*[-]+\s*$", re.MULTILINE | re.IGNORECASE), re.compile(r"^[-]+\s*Original Message\s*[-]+\s*$", re.MULTILINE | re.IGNORECASE), # Chinese: "王 于 2026年4月22日 下午2:30 写道:" / variants re.compile(r"^.*?于\s*\d{4}年.+?写道[::]\s*$", re.MULTILINE), re.compile(r"^.+?写道[::]\s*$", re.MULTILINE), # Chinese forward markers re.compile(r"^[-]+\s*转发(的)?邮件\s*[-]+\s*$", re.MULTILINE), re.compile(r"^[-]+\s*原始邮件\s*[-]+\s*$", re.MULTILINE), # Outlook block (From: / Sent: / To: / Subject: stack) re.compile( r"^From:.+?\n(Sent|发送时间):.+?\n(To|收件人):.+?\n(Subject|主题):.+?$", re.MULTILINE | re.DOTALL, ), re.compile( r"^发件人[::].+?\n发送时间[::].+?\n收件人[::].+?\n主题[::].+?$", re.MULTILINE | re.DOTALL, ), ] _QUOTE_LINE = re.compile(r"^\s*>+\s?", re.MULTILINE) _SIGNATURE_SEP = re.compile(r"^--\s*$", re.MULTILINE) _DISCLAIMER_PATTERNS = [ re.compile(r"本邮件(及其附件)?(包含|含有)?(保密|机密).*", re.IGNORECASE | re.DOTALL), re.compile(r"This\s+e?-?mail.*confidential.*", re.IGNORECASE | re.DOTALL), re.compile(r"DISCLAIMER:.*", re.IGNORECASE | re.DOTALL), ] @dataclass class DequoteResult: text_clean: str strategies_used: list[str] chars_stripped: int def _strip_at_first_marker(text: str, used: list[str]) -> str: earliest_idx: int | None = None matched_pattern: str | None = None for pat in _MARKERS: m = pat.search(text) if m and (earliest_idx is None or m.start() < earliest_idx): earliest_idx = m.start() matched_pattern = pat.pattern[:40] if earliest_idx is not None: used.append(f"marker:{matched_pattern}") return text[:earliest_idx].rstrip() return text def _strip_quoted_lines(text: str, used: list[str]) -> str: """Cut all leading-`>` lines AND any trailing blocks of them.""" if not _QUOTE_LINE.search(text): return text used.append("rfc_quoted_lines") lines = text.splitlines() # Find first line that is NOT a quoted line, working from the bottom up while lines and ( _QUOTE_LINE.match(lines[-1]) or lines[-1].strip() == "" ): lines.pop() cleaned = [ln for ln in lines if not _QUOTE_LINE.match(ln)] return "\n".join(cleaned).strip() def _strip_signature(text: str, used: list[str]) -> str: m = _SIGNATURE_SEP.search(text) if m: used.append("signature_sep_dashdash") return text[: m.start()].rstrip() return text def _strip_trailing_block_heuristic(text: str, used: list[str]) -> str: """If the last 3-8 lines look like a contact block, drop them. Heuristic: trailing block of short lines that contains a phone number pattern, an email, or a generic title word like 'CEO/总监/经理/董事长'. """ lines = text.splitlines() if len(lines) < 6: return text tail = [ln for ln in lines[-8:] if ln.strip()] if len(tail) < 2 or len(tail) > 8: return text joined = "\n".join(tail) has_signal = ( re.search(r"\+?\d[\d\s\-()]{6,}", joined) or re.search(r"[\w.+-]+@[\w-]+\.[\w.-]+", joined) or re.search(r"(CEO|CTO|CFO|总监|经理|董事长|总裁|主管|VP|Director)", joined) ) if not has_signal: return text avg_len = sum(len(t) for t in tail) / len(tail) if avg_len > 60: # too long, probably real content return text used.append("trailing_block_heuristic") cut_idx = len(lines) for i in range(len(lines) - 1, -1, -1): if lines[i].strip() == "" and i < len(lines) - 1: continue if lines[i] in tail: cut_idx = i else: break return "\n".join(lines[:cut_idx]).rstrip() def _strip_disclaimer(text: str, used: list[str]) -> str: for pat in _DISCLAIMER_PATTERNS: m = pat.search(text) if m: used.append(f"disclaimer:{pat.pattern[:30]}") text = text[: m.start()].rstrip() return text def dequote(text: str) -> DequoteResult: """Run the full dequote stack and return the cleaned text.""" if not text: return DequoteResult(text_clean="", strategies_used=[], chars_stripped=0) original_len = len(text) used: list[str] = [] text = _strip_at_first_marker(text, used) text = _strip_quoted_lines(text, used) text = _strip_signature(text, used) text = _strip_disclaimer(text, used) text = _strip_trailing_block_heuristic(text, used) return DequoteResult( text_clean=text.strip(), strategies_used=used, chars_stripped=original_len - len(text.strip()), )