from __future__ import annotations import base64 import csv import io import json import re from dataclasses import dataclass, field from html.parser import HTMLParser from pathlib import Path from core_shared import JSONValue @dataclass(frozen=True, slots=True) class ParsedDocument: content_text: str source_type: str metadata_json: dict[str, JSONValue] = field(default_factory=dict) class DocumentParseError(ValueError): pass class _HTMLTextExtractor(HTMLParser): def __init__(self) -> None: super().__init__() self._parts: list[str] = [] self._skip_depth = 0 def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag.lower() in {"script", "style", "noscript"}: self._skip_depth += 1 if tag.lower() in {"p", "br", "div", "section", "article", "li", "tr", "h1", "h2", "h3"}: self._parts.append("\n") def handle_endtag(self, tag: str) -> None: if tag.lower() in {"script", "style", "noscript"} and self._skip_depth > 0: self._skip_depth -= 1 if tag.lower() in {"p", "div", "section", "article", "li", "tr"}: self._parts.append("\n") def handle_data(self, data: str) -> None: if self._skip_depth == 0: self._parts.append(data) def text(self) -> str: return normalize_text(" ".join(self._parts)) def parse_document_content( *, source_type: str, content_text: str | None = None, content_base64: str | None = None, source_uri: str | None = None, ) -> ParsedDocument: normalized_source_type = normalize_source_type(source_type=source_type, source_uri=source_uri) text = content_text if content_text is not None else _decode_content_base64(content_base64) if not text.strip(): raise DocumentParseError("document content is empty") if normalized_source_type in {"text", "txt"}: parsed_text = normalize_text(text) elif normalized_source_type in {"markdown", "md"}: parsed_text = parse_markdown(text) elif normalized_source_type in {"html", "htm"}: parsed_text = parse_html(text) elif normalized_source_type == "json": parsed_text = parse_json(text) elif normalized_source_type == "csv": parsed_text = parse_csv(text) elif normalized_source_type == "pdf": parsed_text = parse_pdf(content_text=text, content_base64=content_base64) elif normalized_source_type in {"docx", "word"}: parsed_text = parse_docx(content_text=text, content_base64=content_base64) else: parsed_text = normalize_text(text) if not parsed_text: raise DocumentParseError("parsed document content is empty") return ParsedDocument( content_text=parsed_text, source_type=normalized_source_type, metadata_json={ "parser": "knowledge-document-parser-v1", "original_source_type": source_type, "normalized_source_type": normalized_source_type, "content_length": len(parsed_text), }, ) def normalize_source_type(*, source_type: str, source_uri: str | None = None) -> str: value = source_type.strip().lower() if source_type else "" if value and value != "auto": return value.removeprefix(".") if source_uri: suffix = Path(source_uri).suffix.lower().removeprefix(".") if suffix: return suffix return "text" def parse_markdown(content: str) -> str: text = re.sub(r"```[\s\S]*?```", " ", content) text = re.sub(r"`([^`]+)`", r"\1", text) text = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", text) text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text) text = re.sub(r"^\s{0,3}#{1,6}\s*", "", text, flags=re.MULTILINE) text = re.sub(r"^\s{0,3}>\s?", "", text, flags=re.MULTILINE) text = re.sub(r"^\s*[-*+]\s+", "", text, flags=re.MULTILINE) return normalize_text(text) def parse_html(content: str) -> str: parser = _HTMLTextExtractor() parser.feed(content) parser.close() return parser.text() def parse_json(content: str) -> str: try: payload = json.loads(content) except json.JSONDecodeError as exc: raise DocumentParseError(f"invalid json document: {exc}") from exc lines: list[str] = [] _flatten_json(value=payload, path="", lines=lines) return normalize_text("\n".join(lines)) def parse_csv(content: str) -> str: reader = csv.DictReader(io.StringIO(content)) if reader.fieldnames: rows = [] for index, row in enumerate(reader, start=1): values = [ f"{field}: {row.get(field, '')}" for field in reader.fieldnames if field is not None ] rows.append(f"row {index}: " + "; ".join(values)) return normalize_text("\n".join(rows)) fallback_reader = csv.reader(io.StringIO(content)) return normalize_text("\n".join(" | ".join(row) for row in fallback_reader)) def parse_pdf(*, content_text: str, content_base64: str | None) -> str: try: import pypdf except Exception: return normalize_text(content_text) raw_bytes = _decode_content_bytes(content_base64) reader = pypdf.PdfReader(io.BytesIO(raw_bytes)) return normalize_text("\n".join(page.extract_text() or "" for page in reader.pages)) def parse_docx(*, content_text: str, content_base64: str | None) -> str: try: import docx except Exception: return normalize_text(content_text) raw_bytes = _decode_content_bytes(content_base64) document = docx.Document(io.BytesIO(raw_bytes)) return normalize_text("\n".join(paragraph.text for paragraph in document.paragraphs)) def normalize_text(content: str) -> str: lines = [re.sub(r"\s+", " ", line).strip() for line in content.splitlines()] return "\n".join(line for line in lines if line).strip() def _decode_content_base64(content_base64: str | None) -> str: raw_bytes = _decode_content_bytes(content_base64) return raw_bytes.decode("utf-8", errors="replace") def _decode_content_bytes(content_base64: str | None) -> bytes: if not content_base64: raise DocumentParseError("content_text or content_base64 is required") try: return base64.b64decode(content_base64, validate=True) except Exception as exc: raise DocumentParseError("invalid base64 document content") from exc def _flatten_json(*, value: JSONValue, path: str, lines: list[str]) -> None: if isinstance(value, dict): for key, item in value.items(): next_path = f"{path}.{key}" if path else str(key) _flatten_json(value=item, path=next_path, lines=lines) return if isinstance(value, list): for index, item in enumerate(value): next_path = f"{path}[{index}]" if path else f"[{index}]" _flatten_json(value=item, path=next_path, lines=lines) return lines.append(f"{path}: {value}")