瀏覽代碼

feat: enhance knowledge document parsing

Jax Docker 1 月之前
父節點
當前提交
231d8b15b2

+ 19 - 0
services/knowledge-service/app/api/routes.py

@@ -4,6 +4,7 @@ from sqlalchemy.orm import Session
 
 from core_domain import ServiceHealth
 
+from app.application.document_parsers import DocumentParseError
 from app.application.services import KnowledgeApplicationService
 from app.bootstrap.settings import KnowledgeServiceSettings
 from app.db.session import get_db
@@ -19,6 +20,8 @@ from app.schemas.knowledge import (
     KnowledgeChunkResponse,
     KnowledgeDocumentCreateRequest,
     KnowledgeDocumentIngestResponse,
+    KnowledgeDocumentParseRequest,
+    KnowledgeDocumentParseResponse,
     KnowledgeDocumentResponse,
     KnowledgeSearchRequest,
     KnowledgeSearchResultResponse,
@@ -101,6 +104,22 @@ def create_document(
     )
 
 
+@router.post("/documents/parse", response_model=KnowledgeDocumentParseResponse)
+def parse_document(
+    payload: KnowledgeDocumentParseRequest,
+    service: KnowledgeApplicationService = Depends(get_knowledge_application_service),
+) -> KnowledgeDocumentParseResponse:
+    try:
+        parsed = service.parse_document(payload)
+    except DocumentParseError as exc:
+        raise HTTPException(status_code=422, detail=str(exc)) from exc
+    return KnowledgeDocumentParseResponse(
+        content_text=parsed.content_text,
+        source_type=parsed.source_type,
+        metadata_json=parsed.metadata_json,
+    )
+
+
 @router.get("/documents", response_model=list[KnowledgeDocumentResponse])
 def list_documents(
     tenant_id: str = Query(...),

+ 200 - 0
services/knowledge-service/app/application/document_parsers.py

@@ -0,0 +1,200 @@
+from __future__ import annotations
+
+import base64
+import csv
+import io
+import json
+import re
+from dataclasses import dataclass, field
+from html.parser import HTMLParser
+from pathlib import Path
+
+from core_shared import JSONValue
+
+
+@dataclass(frozen=True, slots=True)
+class ParsedDocument:
+    content_text: str
+    source_type: str
+    metadata_json: dict[str, JSONValue] = field(default_factory=dict)
+
+
+class DocumentParseError(ValueError):
+    pass
+
+
+class _HTMLTextExtractor(HTMLParser):
+    def __init__(self) -> None:
+        super().__init__()
+        self._parts: list[str] = []
+        self._skip_depth = 0
+
+    def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
+        if tag.lower() in {"script", "style", "noscript"}:
+            self._skip_depth += 1
+        if tag.lower() in {"p", "br", "div", "section", "article", "li", "tr", "h1", "h2", "h3"}:
+            self._parts.append("\n")
+
+    def handle_endtag(self, tag: str) -> None:
+        if tag.lower() in {"script", "style", "noscript"} and self._skip_depth > 0:
+            self._skip_depth -= 1
+        if tag.lower() in {"p", "div", "section", "article", "li", "tr"}:
+            self._parts.append("\n")
+
+    def handle_data(self, data: str) -> None:
+        if self._skip_depth == 0:
+            self._parts.append(data)
+
+    def text(self) -> str:
+        return normalize_text(" ".join(self._parts))
+
+
+def parse_document_content(
+    *,
+    source_type: str,
+    content_text: str | None = None,
+    content_base64: str | None = None,
+    source_uri: str | None = None,
+) -> ParsedDocument:
+    normalized_source_type = normalize_source_type(source_type=source_type, source_uri=source_uri)
+    text = content_text if content_text is not None else _decode_content_base64(content_base64)
+    if not text.strip():
+        raise DocumentParseError("document content is empty")
+
+    if normalized_source_type in {"text", "txt"}:
+        parsed_text = normalize_text(text)
+    elif normalized_source_type in {"markdown", "md"}:
+        parsed_text = parse_markdown(text)
+    elif normalized_source_type in {"html", "htm"}:
+        parsed_text = parse_html(text)
+    elif normalized_source_type == "json":
+        parsed_text = parse_json(text)
+    elif normalized_source_type == "csv":
+        parsed_text = parse_csv(text)
+    elif normalized_source_type == "pdf":
+        parsed_text = parse_pdf(content_text=text, content_base64=content_base64)
+    elif normalized_source_type in {"docx", "word"}:
+        parsed_text = parse_docx(content_text=text, content_base64=content_base64)
+    else:
+        parsed_text = normalize_text(text)
+
+    if not parsed_text:
+        raise DocumentParseError("parsed document content is empty")
+    return ParsedDocument(
+        content_text=parsed_text,
+        source_type=normalized_source_type,
+        metadata_json={
+            "parser": "knowledge-document-parser-v1",
+            "original_source_type": source_type,
+            "normalized_source_type": normalized_source_type,
+            "content_length": len(parsed_text),
+        },
+    )
+
+
+def normalize_source_type(*, source_type: str, source_uri: str | None = None) -> str:
+    value = source_type.strip().lower() if source_type else ""
+    if value and value != "auto":
+        return value.removeprefix(".")
+    if source_uri:
+        suffix = Path(source_uri).suffix.lower().removeprefix(".")
+        if suffix:
+            return suffix
+    return "text"
+
+
+def parse_markdown(content: str) -> str:
+    text = re.sub(r"```[\s\S]*?```", " ", content)
+    text = re.sub(r"`([^`]+)`", r"\1", text)
+    text = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", text)
+    text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
+    text = re.sub(r"^\s{0,3}#{1,6}\s*", "", text, flags=re.MULTILINE)
+    text = re.sub(r"^\s{0,3}>\s?", "", text, flags=re.MULTILINE)
+    text = re.sub(r"^\s*[-*+]\s+", "", text, flags=re.MULTILINE)
+    return normalize_text(text)
+
+
+def parse_html(content: str) -> str:
+    parser = _HTMLTextExtractor()
+    parser.feed(content)
+    parser.close()
+    return parser.text()
+
+
+def parse_json(content: str) -> str:
+    try:
+        payload = json.loads(content)
+    except json.JSONDecodeError as exc:
+        raise DocumentParseError(f"invalid json document: {exc}") from exc
+    lines: list[str] = []
+    _flatten_json(value=payload, path="", lines=lines)
+    return normalize_text("\n".join(lines))
+
+
+def parse_csv(content: str) -> str:
+    reader = csv.DictReader(io.StringIO(content))
+    if reader.fieldnames:
+        rows = []
+        for index, row in enumerate(reader, start=1):
+            values = [
+                f"{field}: {row.get(field, '')}"
+                for field in reader.fieldnames
+                if field is not None
+            ]
+            rows.append(f"row {index}: " + "; ".join(values))
+        return normalize_text("\n".join(rows))
+    fallback_reader = csv.reader(io.StringIO(content))
+    return normalize_text("\n".join(" | ".join(row) for row in fallback_reader))
+
+
+def parse_pdf(*, content_text: str, content_base64: str | None) -> str:
+    try:
+        import pypdf
+    except Exception:
+        return normalize_text(content_text)
+    raw_bytes = _decode_content_bytes(content_base64)
+    reader = pypdf.PdfReader(io.BytesIO(raw_bytes))
+    return normalize_text("\n".join(page.extract_text() or "" for page in reader.pages))
+
+
+def parse_docx(*, content_text: str, content_base64: str | None) -> str:
+    try:
+        import docx
+    except Exception:
+        return normalize_text(content_text)
+    raw_bytes = _decode_content_bytes(content_base64)
+    document = docx.Document(io.BytesIO(raw_bytes))
+    return normalize_text("\n".join(paragraph.text for paragraph in document.paragraphs))
+
+
+def normalize_text(content: str) -> str:
+    lines = [re.sub(r"\s+", " ", line).strip() for line in content.splitlines()]
+    return "\n".join(line for line in lines if line).strip()
+
+
+def _decode_content_base64(content_base64: str | None) -> str:
+    raw_bytes = _decode_content_bytes(content_base64)
+    return raw_bytes.decode("utf-8", errors="replace")
+
+
+def _decode_content_bytes(content_base64: str | None) -> bytes:
+    if not content_base64:
+        raise DocumentParseError("content_text or content_base64 is required")
+    try:
+        return base64.b64decode(content_base64, validate=True)
+    except Exception as exc:
+        raise DocumentParseError("invalid base64 document content") from exc
+
+
+def _flatten_json(*, value: JSONValue, path: str, lines: list[str]) -> None:
+    if isinstance(value, dict):
+        for key, item in value.items():
+            next_path = f"{path}.{key}" if path else str(key)
+            _flatten_json(value=item, path=next_path, lines=lines)
+        return
+    if isinstance(value, list):
+        for index, item in enumerate(value):
+            next_path = f"{path}[{index}]" if path else f"[{index}]"
+            _flatten_json(value=item, path=next_path, lines=lines)
+        return
+    lines.append(f"{path}: {value}")

+ 45 - 9
services/knowledge-service/app/application/services.py

@@ -1,5 +1,10 @@
 from core_shared import JSONValue
 
+from app.application.document_parsers import (
+    DocumentParseError,
+    ParsedDocument,
+    parse_document_content,
+)
 from app.application.embeddings import EmbeddingService
 from app.application.retrieval import (
     build_chunk_payloads,
@@ -18,6 +23,7 @@ from app.schemas.knowledge import (
     KnowledgeBaseCreateRequest,
     KnowledgeBaseStatusUpdateRequest,
     KnowledgeDocumentCreateRequest,
+    KnowledgeDocumentParseRequest,
     KnowledgeSearchRequest,
 )
 
@@ -72,23 +78,51 @@ class KnowledgeApplicationService:
         if knowledge_base is None:
             raise ValueError(f"knowledge base not found: {payload.knowledge_base_id}")
 
+        parsed = self.parse_document(
+            KnowledgeDocumentParseRequest(
+                source_type=payload.source_type,
+                source_uri=payload.source_uri,
+                content_text=payload.content_text,
+                content_base64=payload.content_base64,
+            )
+        )
+        metadata_json = {
+            **payload.metadata_json,
+            "parser_metadata": parsed.metadata_json,
+        }
         document = self.document_repository.create(
             tenant_id=payload.tenant_id,
             knowledge_base_id=payload.knowledge_base_id,
             title=payload.title,
-            source_type=payload.source_type,
+            source_type=parsed.source_type,
             source_uri=payload.source_uri,
-            content_text=payload.content_text,
-            content_hash=stable_content_hash(payload.content_text),
-            metadata_json=payload.metadata_json,
+            content_text=parsed.content_text,
+            content_hash=stable_content_hash(parsed.content_text),
+            metadata_json=metadata_json,
+        )
+        chunks = self._index_document(
+            document=document,
+            content_text=parsed.content_text,
+            chunk_size=payload.chunk_size,
+            chunk_overlap=payload.chunk_overlap,
         )
-        chunks = self._index_document(document=document, payload=payload)
         indexed_document = self.document_repository.update_status(
             document_id=document.id,
             status="indexed",
         )
         return indexed_document or document, chunks
 
+    def parse_document(self, payload: KnowledgeDocumentParseRequest) -> ParsedDocument:
+        try:
+            return parse_document_content(
+                source_type=payload.source_type,
+                content_text=payload.content_text,
+                content_base64=payload.content_base64,
+                source_uri=payload.source_uri,
+            )
+        except DocumentParseError:
+            raise
+
     def list_documents(
         self,
         *,
@@ -164,12 +198,14 @@ class KnowledgeApplicationService:
         self,
         *,
         document: KnowledgeDocument,
-        payload: KnowledgeDocumentCreateRequest,
+        content_text: str,
+        chunk_size: int | None,
+        chunk_overlap: int | None,
     ) -> list[KnowledgeChunk]:
         chunk_payloads = build_chunk_payloads(
-            content_text=payload.content_text,
-            chunk_size=payload.chunk_size or self.settings.default_chunk_size,
-            chunk_overlap=payload.chunk_overlap or self.settings.default_chunk_overlap,
+            content_text=content_text,
+            chunk_size=chunk_size or self.settings.default_chunk_size,
+            chunk_overlap=chunk_overlap or self.settings.default_chunk_overlap,
         )
         for chunk_payload in chunk_payloads:
             content_text = self._read_chunk_content(chunk_payload)

+ 15 - 1
services/knowledge-service/app/schemas/knowledge.py

@@ -39,7 +39,8 @@ class KnowledgeDocumentCreateRequest(BaseModel):
     tenant_id: str
     knowledge_base_id: str
     title: str
-    content_text: str
+    content_text: str | None = None
+    content_base64: str | None = None
     source_type: str = "text"
     source_uri: str | None = None
     metadata_json: dict[str, JSONValue] = Field(default_factory=dict)
@@ -64,6 +65,19 @@ class KnowledgeDocumentIngestResponse(BaseModel):
     chunks: list[KnowledgeChunkResponse]
 
 
+class KnowledgeDocumentParseRequest(BaseModel):
+    source_type: str = "auto"
+    source_uri: str | None = None
+    content_text: str | None = None
+    content_base64: str | None = None
+
+
+class KnowledgeDocumentParseResponse(BaseModel):
+    content_text: str
+    source_type: str
+    metadata_json: dict[str, JSONValue] = Field(default_factory=dict)
+
+
 class KnowledgeSearchRequest(KnowledgeSearchRequestContract):
     pass
 

+ 43 - 0
tests/test_knowledge_document_parsers.py

@@ -0,0 +1,43 @@
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+
+REPO_ROOT = Path(__file__).resolve().parents[1]
+for module_name in list(sys.modules):
+    if module_name == "app" or module_name.startswith("app."):
+        del sys.modules[module_name]
+for path in [
+    REPO_ROOT / "libs" / "core-shared" / "src",
+    REPO_ROOT / "services" / "knowledge-service",
+]:
+    sys.path.insert(0, str(path))
+
+from app.application.document_parsers import parse_document_content
+
+
+def test_parse_markdown_html_json_csv_documents() -> None:
+    markdown = parse_document_content(
+        source_type="markdown",
+        content_text="# Title\n\nUse [docs](https://example.com) and `code`.",
+    )
+    html = parse_document_content(
+        source_type="html",
+        content_text="<h1>Title</h1><script>hidden()</script><p>Hello <b>world</b></p>",
+    )
+    json_doc = parse_document_content(
+        source_type="json",
+        content_text='{"order":{"id":"A1","status":"paid"}}',
+    )
+    csv_doc = parse_document_content(
+        source_type="csv",
+        content_text="id,status\nA1,paid\nA2,refunded\n",
+    )
+
+    assert "Title" in markdown.content_text
+    assert "docs" in markdown.content_text
+    assert "hidden" not in html.content_text
+    assert "Hello world" in html.content_text
+    assert "order.id: A1" in json_doc.content_text
+    assert "row 2: id: A2; status: refunded" in csv_doc.content_text

+ 45 - 0
tests/test_knowledge_pgvector_fallback.py

@@ -1,6 +1,7 @@
 from __future__ import annotations
 
 import sys
+import base64
 from pathlib import Path
 
 
@@ -75,3 +76,47 @@ def test_knowledge_search_falls_back_without_pgvector(tmp_path: Path) -> None:
         assert results
         assert results[0][3]["retrieval_mode"] == "hybrid"
     session_factory.kw["bind"].dispose()
+
+
+def test_create_document_parses_base64_markdown_before_indexing(tmp_path: Path) -> None:
+    settings = KnowledgeServiceSettings(
+        database_url=f"sqlite:///{tmp_path / 'knowledge_service.db'}",
+        embedding_provider="local",
+    )
+    session_factory = build_session_factory(settings)
+    Base.metadata.create_all(bind=session_factory.kw["bind"])
+
+    with session_factory() as db:
+        service = KnowledgeApplicationService(
+            settings=settings,
+            base_repository=KnowledgeBaseRepository(db),
+            document_repository=KnowledgeDocumentRepository(db),
+            chunk_repository=KnowledgeChunkRepository(db),
+        )
+        base = service.create_base(
+            KnowledgeBaseCreateRequest(tenant_id="t1", code="kb", name="KB")
+        )
+        encoded = base64.b64encode(
+            "# Refund Policy\nRefunds are available within seven days.".encode("utf-8")
+        ).decode("ascii")
+
+        document, chunks = service.create_document(
+            KnowledgeDocumentCreateRequest(
+                tenant_id="t1",
+                knowledge_base_id=base.id,
+                title="Refund Policy",
+                source_type="markdown",
+                content_base64=encoded,
+                chunk_size=80,
+                chunk_overlap=0,
+            )
+        )
+
+        assert document.source_type == "markdown"
+        assert document.content_text.startswith("Refund Policy")
+        assert document.metadata_json is not None
+        assert document.metadata_json["parser_metadata"]["parser"] == "knowledge-document-parser-v1"
+        assert chunks
+        assert chunks[0].content_text.startswith("Refund Policy")
+    session_factory.kw["bind"].dispose()
+    Base.metadata.clear()