document_parsers.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. from __future__ import annotations
  2. import base64
  3. import csv
  4. import io
  5. import json
  6. import re
  7. from dataclasses import dataclass, field
  8. from html.parser import HTMLParser
  9. from pathlib import Path
  10. from core_shared import JSONValue
  11. @dataclass(frozen=True, slots=True)
  12. class ParsedDocument:
  13. content_text: str
  14. source_type: str
  15. metadata_json: dict[str, JSONValue] = field(default_factory=dict)
  16. class DocumentParseError(ValueError):
  17. pass
  18. class _HTMLTextExtractor(HTMLParser):
  19. def __init__(self) -> None:
  20. super().__init__()
  21. self._parts: list[str] = []
  22. self._skip_depth = 0
  23. def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
  24. if tag.lower() in {"script", "style", "noscript"}:
  25. self._skip_depth += 1
  26. if tag.lower() in {"p", "br", "div", "section", "article", "li", "tr", "h1", "h2", "h3"}:
  27. self._parts.append("\n")
  28. def handle_endtag(self, tag: str) -> None:
  29. if tag.lower() in {"script", "style", "noscript"} and self._skip_depth > 0:
  30. self._skip_depth -= 1
  31. if tag.lower() in {"p", "div", "section", "article", "li", "tr"}:
  32. self._parts.append("\n")
  33. def handle_data(self, data: str) -> None:
  34. if self._skip_depth == 0:
  35. self._parts.append(data)
  36. def text(self) -> str:
  37. return normalize_text(" ".join(self._parts))
  38. def parse_document_content(
  39. *,
  40. source_type: str,
  41. content_text: str | None = None,
  42. content_base64: str | None = None,
  43. source_uri: str | None = None,
  44. ) -> ParsedDocument:
  45. normalized_source_type = normalize_source_type(source_type=source_type, source_uri=source_uri)
  46. text = content_text if content_text is not None else _decode_content_base64(content_base64)
  47. if not text.strip():
  48. raise DocumentParseError("document content is empty")
  49. if normalized_source_type in {"text", "txt"}:
  50. parsed_text = normalize_text(text)
  51. elif normalized_source_type in {"markdown", "md"}:
  52. parsed_text = parse_markdown(text)
  53. elif normalized_source_type in {"html", "htm"}:
  54. parsed_text = parse_html(text)
  55. elif normalized_source_type == "json":
  56. parsed_text = parse_json(text)
  57. elif normalized_source_type == "csv":
  58. parsed_text = parse_csv(text)
  59. elif normalized_source_type == "pdf":
  60. parsed_text = parse_pdf(content_text=text, content_base64=content_base64)
  61. elif normalized_source_type in {"docx", "word"}:
  62. parsed_text = parse_docx(content_text=text, content_base64=content_base64)
  63. else:
  64. parsed_text = normalize_text(text)
  65. if not parsed_text:
  66. raise DocumentParseError("parsed document content is empty")
  67. return ParsedDocument(
  68. content_text=parsed_text,
  69. source_type=normalized_source_type,
  70. metadata_json={
  71. "parser": "knowledge-document-parser-v1",
  72. "original_source_type": source_type,
  73. "normalized_source_type": normalized_source_type,
  74. "content_length": len(parsed_text),
  75. },
  76. )
  77. def normalize_source_type(*, source_type: str, source_uri: str | None = None) -> str:
  78. value = source_type.strip().lower() if source_type else ""
  79. if value and value != "auto":
  80. return value.removeprefix(".")
  81. if source_uri:
  82. suffix = Path(source_uri).suffix.lower().removeprefix(".")
  83. if suffix:
  84. return suffix
  85. return "text"
  86. def parse_markdown(content: str) -> str:
  87. text = re.sub(r"```[\s\S]*?```", " ", content)
  88. text = re.sub(r"`([^`]+)`", r"\1", text)
  89. text = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", text)
  90. text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
  91. text = re.sub(r"^\s{0,3}#{1,6}\s*", "", text, flags=re.MULTILINE)
  92. text = re.sub(r"^\s{0,3}>\s?", "", text, flags=re.MULTILINE)
  93. text = re.sub(r"^\s*[-*+]\s+", "", text, flags=re.MULTILINE)
  94. return normalize_text(text)
  95. def parse_html(content: str) -> str:
  96. parser = _HTMLTextExtractor()
  97. parser.feed(content)
  98. parser.close()
  99. return parser.text()
  100. def parse_json(content: str) -> str:
  101. try:
  102. payload = json.loads(content)
  103. except json.JSONDecodeError as exc:
  104. raise DocumentParseError(f"invalid json document: {exc}") from exc
  105. lines: list[str] = []
  106. _flatten_json(value=payload, path="", lines=lines)
  107. return normalize_text("\n".join(lines))
  108. def parse_csv(content: str) -> str:
  109. reader = csv.DictReader(io.StringIO(content))
  110. if reader.fieldnames:
  111. rows = []
  112. for index, row in enumerate(reader, start=1):
  113. values = [
  114. f"{field}: {row.get(field, '')}"
  115. for field in reader.fieldnames
  116. if field is not None
  117. ]
  118. rows.append(f"row {index}: " + "; ".join(values))
  119. return normalize_text("\n".join(rows))
  120. fallback_reader = csv.reader(io.StringIO(content))
  121. return normalize_text("\n".join(" | ".join(row) for row in fallback_reader))
  122. def parse_pdf(*, content_text: str, content_base64: str | None) -> str:
  123. try:
  124. import pypdf
  125. except Exception:
  126. return normalize_text(content_text)
  127. raw_bytes = _decode_content_bytes(content_base64)
  128. reader = pypdf.PdfReader(io.BytesIO(raw_bytes))
  129. return normalize_text("\n".join(page.extract_text() or "" for page in reader.pages))
  130. def parse_docx(*, content_text: str, content_base64: str | None) -> str:
  131. try:
  132. import docx
  133. except Exception:
  134. return normalize_text(content_text)
  135. raw_bytes = _decode_content_bytes(content_base64)
  136. document = docx.Document(io.BytesIO(raw_bytes))
  137. return normalize_text("\n".join(paragraph.text for paragraph in document.paragraphs))
  138. def normalize_text(content: str) -> str:
  139. lines = [re.sub(r"\s+", " ", line).strip() for line in content.splitlines()]
  140. return "\n".join(line for line in lines if line).strip()
  141. def _decode_content_base64(content_base64: str | None) -> str:
  142. raw_bytes = _decode_content_bytes(content_base64)
  143. return raw_bytes.decode("utf-8", errors="replace")
  144. def _decode_content_bytes(content_base64: str | None) -> bytes:
  145. if not content_base64:
  146. raise DocumentParseError("content_text or content_base64 is required")
  147. try:
  148. return base64.b64decode(content_base64, validate=True)
  149. except Exception as exc:
  150. raise DocumentParseError("invalid base64 document content") from exc
  151. def _flatten_json(*, value: JSONValue, path: str, lines: list[str]) -> None:
  152. if isinstance(value, dict):
  153. for key, item in value.items():
  154. next_path = f"{path}.{key}" if path else str(key)
  155. _flatten_json(value=item, path=next_path, lines=lines)
  156. return
  157. if isinstance(value, list):
  158. for index, item in enumerate(value):
  159. next_path = f"{path}[{index}]" if path else f"[{index}]"
  160. _flatten_json(value=item, path=next_path, lines=lines)
  161. return
  162. lines.append(f"{path}: {value}")