浏览代码

feat: connect runtime knowledge retrieval

Jax Docker 1 月之前
父节点
当前提交
2fb67d5b77

+ 17 - 0
README.md

@@ -729,6 +729,23 @@ Retriever node config example:
 }
 ```
 
+Retriever nodes can call `knowledge-service` directly:
+
+```json
+{
+  "id": "retrieve-kb",
+  "type": "knowledge-retrieval",
+  "config": {
+    "knowledge_base_id": "kb-id",
+    "query_template": "{{state.query}}",
+    "top_k": 3,
+    "filters_json": {
+      "source_type": "text"
+    }
+  }
+}
+```
+
 Retriever output is persisted to `node_run.output_json.retrieved_documents`. Template nodes can consume it:
 
 ```json

+ 3 - 0
deployments/docker/docker-compose.yml

@@ -327,6 +327,7 @@ services:
       AGENT_PLATFORM_TOOL_SERVICE_URL: http://tool-service:8004
       AGENT_PLATFORM_MODEL_GATEWAY_SERVICE_URL: http://model-gateway-service:8005
       AGENT_PLATFORM_CODE_RUNNER_SERVICE_URL: http://code-runner-service:8006
+      AGENT_PLATFORM_KNOWLEDGE_SERVICE_URL: http://knowledge-service:8012
       AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS: ${AGENT_PLATFORM_WORKER_POLL_INTERVAL_SECONDS:-1}
       AGENT_PLATFORM_WORKER_LEASE_SECONDS: ${AGENT_PLATFORM_WORKER_LEASE_SECONDS:-300}
     volumes:
@@ -340,6 +341,8 @@ services:
         condition: service_started
       code-runner-service:
         condition: service_started
+      knowledge-service:
+        condition: service_started
 
   api-gateway:
     build:

+ 5 - 0
services/runtime-service/app/application/services.py

@@ -24,6 +24,7 @@ from app.infrastructure.executors import (
     NodeExecutionDispatcher,
     build_node_execution_dispatcher_with_clients,
 )
+from app.infrastructure.knowledge_client import KnowledgeServiceClient
 from app.infrastructure.code_runner_client import CodeRunnerClient
 from app.infrastructure.model_gateway_client import ModelGatewayClient
 from app.infrastructure.planner import (
@@ -1017,6 +1018,10 @@ def build_runtime_application_service(
             code_runner_client=CodeRunnerClient(base_url=settings.code_runner_service_url),
             model_gateway_client=ModelGatewayClient(base_url=settings.model_gateway_service_url),
             tool_client=ToolServiceClient(base_url=settings.tool_service_url),
+            knowledge_client=KnowledgeServiceClient(
+                base_url=settings.knowledge_service_url,
+                timeout_seconds=settings.knowledge_service_timeout_seconds,
+            ),
         ),
         workflow_client=WorkflowServiceClient(base_url=settings.workflow_service_url),
     )

+ 2 - 0
services/runtime-service/app/bootstrap/settings.py

@@ -9,6 +9,8 @@ class RuntimeServiceSettings(ServiceSettings):
     tool_service_url: str = "http://127.0.0.1:8004"
     model_gateway_service_url: str = "http://127.0.0.1:8005"
     code_runner_service_url: str = "http://127.0.0.1:8006"
+    knowledge_service_url: str = "http://127.0.0.1:8012"
+    knowledge_service_timeout_seconds: float = 10.0
     worker_poll_interval_seconds: float = 1.0
     worker_lease_seconds: int = 300
     worker_max_idle_cycles: int | None = None

+ 104 - 10
services/runtime-service/app/infrastructure/executors.py

@@ -7,6 +7,7 @@ from core_domain import (
     ChatCompletionRequestContract,
     ChatMessageContract,
     CodeExecutionRequestContract,
+    KnowledgeSearchRequestContract,
     NodeExecutionContextContract,
     NodeExecutionRequestContract,
     NodeExecutionResultContract,
@@ -23,6 +24,7 @@ from .context import (
     render_template_string,
     resolve_expression,
 )
+from .knowledge_client import KnowledgeServiceClient, KnowledgeServiceClientError
 from .model_gateway_client import ModelGatewayClient, ModelGatewayClientError
 from .tool_client import ToolServiceClient, ToolServiceClientError
 
@@ -215,7 +217,10 @@ class ToolNodeExecutor(CompletedNodeExecutor):
         request_headers = _merge_json_dicts(
             _render_json_dict(_read_dict_value(invoke_config_json, "headers"), render_context),
             _render_json_dict(_read_dict_value(binding_config_json, "headers"), render_context),
-            _render_json_dict(_read_dict_value(context.node_config_json, "headers"), render_context),
+            _render_json_dict(
+                _read_dict_value(context.node_config_json, "headers"),
+                render_context,
+            ),
         )
         request_query = _merge_json_dicts(
             _render_json_dict(_read_dict_value(invoke_config_json, "query"), render_context),
@@ -427,7 +432,11 @@ class ConditionNodeExecutor(CompletedNodeExecutor):
             condition_result = evaluate_condition_expression(expression, render_context)
             evaluated_expression = expression
         elif path is not None:
-            condition_result = _evaluate_path_condition(context.node_config_json, path, render_context)
+            condition_result = _evaluate_path_condition(
+                context.node_config_json,
+                path,
+                render_context,
+            )
             evaluated_expression = path
         else:
             return NodeExecutionResultContract(
@@ -492,6 +501,13 @@ class AssignerNodeExecutor(CompletedNodeExecutor):
 
 
 class RetrieverNodeExecutor(CompletedNodeExecutor):
+    def __init__(self, knowledge_client: KnowledgeServiceClient | None = None) -> None:
+        super().__init__(
+            executor_name="retriever-executor",
+            supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
+        )
+        self.knowledge_client = knowledge_client
+
     def execute(
         self,
         context: NodeExecutionContextContract,
@@ -502,6 +518,7 @@ class RetrieverNodeExecutor(CompletedNodeExecutor):
         query = _resolve_retriever_query(context.node_config_json, render_context)
         documents = _read_retriever_documents(context.node_config_json, render_context)
         source_url = _read_string_value(context.node_config_json, "source_url")
+        knowledge_base_id = _read_string_value(context.node_config_json, "knowledge_base_id")
         top_k = _read_int_value(context.node_config_json, "top_k") or 3
 
         if query is None:
@@ -535,12 +552,52 @@ class RetrieverNodeExecutor(CompletedNodeExecutor):
                     error_message=str(exc),
                 )
 
+        knowledge_results: list[dict[str, JSONValue]] = []
+        if knowledge_base_id is not None:
+            if self.knowledge_client is None:
+                return NodeExecutionResultContract(
+                    status="failed",
+                    worker_key=worker_key,
+                    error_code="knowledge_client_missing",
+                    error_message="knowledge-service client is not configured",
+                )
+            try:
+                knowledge_results = [
+                    item.model_dump(mode="json")
+                    for item in self.knowledge_client.search(
+                        KnowledgeSearchRequestContract(
+                            tenant_id=context.tenant_id,
+                            knowledge_base_id=render_template_string(
+                                knowledge_base_id,
+                                render_context,
+                            ),
+                            query=query,
+                            top_k=top_k,
+                            filters_json=_render_json_dict(
+                                _read_dict_value(context.node_config_json, "filters_json"),
+                                render_context,
+                            ),
+                        )
+                    )
+                ]
+            except KnowledgeServiceClientError as exc:
+                return NodeExecutionResultContract(
+                    status="failed",
+                    worker_key=worker_key,
+                    error_code="knowledge_search_failed",
+                    error_message=str(exc),
+                )
+            documents.extend(_knowledge_results_to_retriever_documents(knowledge_results))
+
         if not documents:
             return NodeExecutionResultContract(
                 status="failed",
                 worker_key=worker_key,
                 error_code="retriever_documents_missing",
-                error_message="retriever node config requires non-empty documents",
+                error_message=(
+                    "retriever node config requires documents, source_url, "
+                    "or knowledge_base_id"
+                ),
             )
 
         ranked_documents = rank_documents(query=query, documents=documents, top_k=top_k)
@@ -555,15 +612,11 @@ class RetrieverNodeExecutor(CompletedNodeExecutor):
                 "query": query,
                 "top_k": top_k,
                 "retrieved_documents": output_documents,
+                "knowledge_base_id": knowledge_base_id,
+                "knowledge_results": knowledge_results,
             },
         )
 
-    def __init__(self) -> None:
-        super().__init__(
-            executor_name="retriever-executor",
-            supported_node_types=frozenset({"knowledge-retrieval", "retriever"}),
-        )
-
 
 class TemplateNodeExecutor(CompletedNodeExecutor):
     def execute(
@@ -656,6 +709,7 @@ def build_node_execution_dispatcher_with_clients(
     code_runner_client: CodeRunnerClient | None = None,
     model_gateway_client: ModelGatewayClient | None = None,
     tool_client: ToolServiceClient | None = None,
+    knowledge_client: KnowledgeServiceClient | None = None,
 ) -> NodeExecutionDispatcher:
     executors: list[NodeExecutor] = [
         LLMNodeExecutor(model_gateway_client=model_gateway_client),
@@ -664,7 +718,7 @@ def build_node_execution_dispatcher_with_clients(
         AnswerNodeExecutor(),
         ConditionNodeExecutor(),
         AssignerNodeExecutor(),
-        RetrieverNodeExecutor(),
+        RetrieverNodeExecutor(knowledge_client=knowledge_client),
         TemplateNodeExecutor(),
     ]
     return NodeExecutionDispatcher(
@@ -1001,6 +1055,46 @@ def _parse_retriever_document(
     )
 
 
+def _knowledge_results_to_retriever_documents(
+    results: list[dict[str, JSONValue]],
+) -> list[RetrieverDocument]:
+    documents: list[RetrieverDocument] = []
+    for index, result in enumerate(results):
+        chunk = result.get("chunk")
+        document = result.get("document")
+        score = result.get("score")
+        score_json = result.get("score_json")
+        if not isinstance(chunk, dict) or not isinstance(document, dict):
+            continue
+        content_text = chunk.get("content_text")
+        if not isinstance(content_text, str) or not content_text.strip():
+            continue
+        document_id = document.get("id")
+        title = document.get("title")
+        metadata: dict[str, JSONValue] = {
+            "source": "knowledge-service",
+            "chunk_id": str(chunk.get("id")) if chunk.get("id") is not None else None,
+            "chunk_index": (
+                chunk.get("chunk_index")
+                if isinstance(chunk.get("chunk_index"), int)
+                else index
+            ),
+            "score": score if isinstance(score, (int, float)) else None,
+            "score_json": score_json if isinstance(score_json, dict) else {},
+        }
+        documents.append(
+            RetrieverDocument(
+                document_id=str(document_id)
+                if document_id is not None
+                else f"knowledge-{index + 1}",
+                title=title if isinstance(title, str) else None,
+                text=content_text.strip(),
+                metadata=metadata,
+            )
+        )
+    return documents
+
+
 def rank_documents(
     *,
     query: str,

+ 33 - 0
services/runtime-service/app/infrastructure/knowledge_client.py

@@ -0,0 +1,33 @@
+import httpx
+
+from core_domain import KnowledgeSearchRequestContract, KnowledgeSearchResultContract
+
+
+class KnowledgeServiceClientError(Exception):
+    pass
+
+
+class KnowledgeServiceClient:
+    def __init__(self, base_url: str, timeout_seconds: float = 10.0) -> None:
+        self.base_url = base_url.rstrip("/")
+        self.timeout_seconds = timeout_seconds
+
+    def search(
+        self,
+        payload: KnowledgeSearchRequestContract,
+    ) -> list[KnowledgeSearchResultContract]:
+        try:
+            with httpx.Client(timeout=self.timeout_seconds) as client:
+                response = client.post(
+                    f"{self.base_url}/knowledge/search",
+                    json=payload.model_dump(mode="json"),
+                )
+                response.raise_for_status()
+                return [
+                    KnowledgeSearchResultContract.model_validate(item)
+                    for item in response.json()
+                ]
+        except httpx.HTTPError as exc:
+            raise KnowledgeServiceClientError(
+                f"knowledge-service search failed: {exc}"
+            ) from exc