document_parsers.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from __future__ import annotations
  2. import base64
  3. import csv
  4. import io
  5. import json
  6. import re
  7. from dataclasses import dataclass, field
  8. from html.parser import HTMLParser
  9. from pathlib import Path
  10. from core_shared import JSONValue
  11. @dataclass(frozen=True, slots=True)
  12. class ParsedDocument:
  13. content_text: str
  14. source_type: str
  15. metadata_json: dict[str, JSONValue] = field(default_factory=dict)
  16. class DocumentParseError(ValueError):
  17. pass
  18. class _HTMLTextExtractor(HTMLParser):
  19. def __init__(self) -> None:
  20. super().__init__()
  21. self._parts: list[str] = []
  22. self._skip_depth = 0
  23. def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
  24. if tag.lower() in {"script", "style", "noscript"}:
  25. self._skip_depth += 1
  26. if tag.lower() in {"p", "br", "div", "section", "article", "li", "tr", "h1", "h2", "h3"}:
  27. self._parts.append("\n")
  28. def handle_endtag(self, tag: str) -> None:
  29. if tag.lower() in {"script", "style", "noscript"} and self._skip_depth > 0:
  30. self._skip_depth -= 1
  31. if tag.lower() in {"p", "div", "section", "article", "li", "tr"}:
  32. self._parts.append("\n")
  33. def handle_data(self, data: str) -> None:
  34. if self._skip_depth == 0:
  35. self._parts.append(data)
  36. def text(self) -> str:
  37. return normalize_text(" ".join(self._parts))
  38. def parse_document_content(
  39. *,
  40. source_type: str,
  41. content_text: str | None = None,
  42. content_base64: str | None = None,
  43. source_uri: str | None = None) -> ParsedDocument:
  44. normalized_source_type = normalize_source_type(source_type=source_type, source_uri=source_uri)
  45. text = content_text if content_text is not None else _decode_content_base64(content_base64)
  46. if not text.strip():
  47. raise DocumentParseError("document content is empty")
  48. if normalized_source_type in {"text", "txt"}:
  49. parsed_text = normalize_text(text)
  50. elif normalized_source_type in {"markdown", "md"}:
  51. parsed_text = parse_markdown(text)
  52. elif normalized_source_type in {"html", "htm"}:
  53. parsed_text = parse_html(text)
  54. elif normalized_source_type == "json":
  55. parsed_text = parse_json(text)
  56. elif normalized_source_type == "csv":
  57. parsed_text = parse_csv(text)
  58. elif normalized_source_type == "pdf":
  59. parsed_text = parse_pdf(content_text=text, content_base64=content_base64)
  60. elif normalized_source_type in {"docx", "word"}:
  61. parsed_text = parse_docx(content_text=text, content_base64=content_base64)
  62. else:
  63. parsed_text = normalize_text(text)
  64. if not parsed_text:
  65. raise DocumentParseError("parsed document content is empty")
  66. return ParsedDocument(
  67. content_text=parsed_text,
  68. source_type=normalized_source_type,
  69. metadata_json={
  70. "parser": "knowledge-document-parser-v1",
  71. "original_source_type": source_type,
  72. "normalized_source_type": normalized_source_type,
  73. "content_length": len(parsed_text),
  74. })
  75. def read_document_content_bytes(
  76. *,
  77. content_text: str | None = None,
  78. content_base64: str | None = None) -> bytes:
  79. if content_base64 is not None:
  80. return _decode_content_bytes(content_base64)
  81. if content_text is None:
  82. raise DocumentParseError("content_text or content_base64 is required")
  83. return content_text.encode("utf-8")
  84. def normalize_source_type(*, source_type: str, source_uri: str | None = None) -> str:
  85. value = source_type.strip().lower() if source_type else ""
  86. if value and value != "auto":
  87. return value.removeprefix(".")
  88. if source_uri:
  89. suffix = Path(source_uri).suffix.lower().removeprefix(".")
  90. if suffix:
  91. return suffix
  92. return "text"
  93. def parse_markdown(content: str) -> str:
  94. text = re.sub(r"`([^`]+)`", r"\1", content)
  95. text = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", text)
  96. text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
  97. return normalize_text(text)
  98. def parse_html(content: str) -> str:
  99. parser = _HTMLTextExtractor()
  100. parser.feed(content)
  101. parser.close()
  102. return parser.text()
  103. def parse_json(content: str) -> str:
  104. try:
  105. payload = json.loads(content)
  106. except json.JSONDecodeError as exc:
  107. raise DocumentParseError(f"invalid json document: {exc}") from exc
  108. lines: list[str] = []
  109. _flatten_json(value=payload, path="", lines=lines)
  110. return normalize_text("\n".join(lines))
  111. def parse_csv(content: str) -> str:
  112. reader = csv.DictReader(io.StringIO(content))
  113. if reader.fieldnames:
  114. rows = []
  115. for index, row in enumerate(reader, start=1):
  116. values = [
  117. f"{field}: {row.get(field, '')}"
  118. for field in reader.fieldnames
  119. if field is not None
  120. ]
  121. rows.append(f"row {index}: " + "; ".join(values))
  122. return normalize_text("\n".join(rows))
  123. fallback_reader = csv.reader(io.StringIO(content))
  124. return normalize_text("\n".join(" | ".join(row) for row in fallback_reader))
  125. def parse_pdf(*, content_text: str, content_base64: str | None) -> str:
  126. try:
  127. import pypdf
  128. except Exception:
  129. return normalize_text(content_text)
  130. raw_bytes = _decode_content_bytes(content_base64)
  131. reader = pypdf.PdfReader(io.BytesIO(raw_bytes))
  132. return normalize_text("\n".join(page.extract_text() or "" for page in reader.pages))
  133. def parse_docx(*, content_text: str, content_base64: str | None) -> str:
  134. try:
  135. import docx
  136. except Exception:
  137. return normalize_text(content_text)
  138. raw_bytes = _decode_content_bytes(content_base64)
  139. document = docx.Document(io.BytesIO(raw_bytes))
  140. return normalize_text("\n".join(paragraph.text for paragraph in document.paragraphs))
  141. def normalize_text(content: str) -> str:
  142. lines = [re.sub(r"\s+", " ", line).strip() for line in content.splitlines()]
  143. return "\n".join(line for line in lines if line).strip()
  144. def _decode_content_base64(content_base64: str | None) -> str:
  145. raw_bytes = _decode_content_bytes(content_base64)
  146. return raw_bytes.decode("utf-8", errors="replace")
  147. def _decode_content_bytes(content_base64: str | None) -> bytes:
  148. if not content_base64:
  149. raise DocumentParseError("content_text or content_base64 is required")
  150. try:
  151. return base64.b64decode(content_base64, validate=True)
  152. except Exception as exc:
  153. raise DocumentParseError("invalid base64 document content") from exc
  154. def _flatten_json(*, value: JSONValue, path: str, lines: list[str]) -> None:
  155. if isinstance(value, dict):
  156. for key, item in value.items():
  157. next_path = f"{path}.{key}" if path else str(key)
  158. _flatten_json(value=item, path=next_path, lines=lines)
  159. return
  160. if isinstance(value, list):
  161. for index, item in enumerate(value):
  162. next_path = f"{path}[{index}]" if path else f"[{index}]"
  163. _flatten_json(value=item, path=next_path, lines=lines)
  164. return
  165. lines.append(f"{path}: {value}")