Bladeren bron

feat: add runtime idempotency and pgvector search

Jax Docker 1 maand geleden
bovenliggende
commit
f592e4a4a5

+ 6 - 1
README.md

@@ -495,6 +495,11 @@ scoring, so it works without external API keys. For production, set
 `/embeddings` endpoint; if the provider fails and fallback is enabled, indexing
 and search fall back to local hash embeddings.
 
+When running on PostgreSQL with pgvector, `knowledge_chunk.embedding_vector`
+is populated and search uses pgvector cosine similarity first, then combines it
+with keyword scoring. SQLite and other databases automatically fall back to the
+JSON embedding hybrid search path.
+
 Create a knowledge base:
 
 ```powershell
@@ -1276,7 +1281,7 @@ Important notes:
 - Services still fall back to SQLite files under `/data` if `AGENT_PLATFORM_DATABASE_URL` is not set.
 - For scaled workers, use PostgreSQL plus Redis rather than SQLite.
 - `core-shared.redis_primitives` provides `DistributedLock`, `IdempotencyStore`, and `RedisQueue` for services that need cross-process coordination.
-- `agent-worker` and `scheduler-worker` use Redis locks/idempotency when Redis is available, and fall back to DB leases when Redis is not available.
+- `agent-worker`, `runtime-worker`, and `scheduler-worker` use Redis locks/idempotency when Redis is available, and fall back to DB leases when Redis is not available.
 - `agent-service` stores agent definitions, prompt/config versions, and agent run records under `/data`
 - `memory-service` stores scoped memories under `/data`; move it to PostgreSQL before enabling high-volume memory writes
 - `team-service` stores multi-agent team definitions, team versions, and team run records under `/data`

+ 40 - 0
services/knowledge-service/alembic/versions/20260427_0002_add_pgvector_embeddings.py

@@ -0,0 +1,40 @@
+"""add pgvector embeddings
+
+Revision ID: 20260427_0002
+Revises: 20260425_0001
+Create Date: 2026-04-27 10:30:00
+"""
+
+from collections.abc import Sequence
+
+from alembic import op
+import sqlalchemy as sa
+
+
+revision: str = "20260427_0002"
+down_revision: str | None = "20260425_0001"
+branch_labels: Sequence[str] | None = None
+depends_on: Sequence[str] | None = None
+
+
+def upgrade() -> None:
+    bind = op.get_bind()
+    if bind.dialect.name == "postgresql":
+        op.execute("CREATE EXTENSION IF NOT EXISTS vector")
+        op.execute("ALTER TABLE knowledge_chunk ADD COLUMN embedding_vector vector")
+        op.execute(
+            "CREATE INDEX IF NOT EXISTS ix_knowledge_chunk_embedding_vector_hnsw "
+            "ON knowledge_chunk USING hnsw (embedding_vector vector_cosine_ops)"
+        )
+    else:
+        op.add_column(
+            "knowledge_chunk",
+            sa.Column("embedding_vector", sa.Text(), nullable=True),
+        )
+
+
+def downgrade() -> None:
+    bind = op.get_bind()
+    if bind.dialect.name == "postgresql":
+        op.execute("DROP INDEX IF EXISTS ix_knowledge_chunk_embedding_vector_hnsw")
+    op.drop_column("knowledge_chunk", "embedding_vector")

+ 22 - 5
services/knowledge-service/app/application/services.py

@@ -104,12 +104,27 @@ class KnowledgeApplicationService:
         self,
         payload: KnowledgeSearchRequest,
     ) -> list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]]:
-        chunks = self.chunk_repository.list_by_base(
+        document_cache: dict[str, KnowledgeDocument] = {}
+        query_embedding_result = self.embedding_service.embed_text(payload.query)
+        vector_candidates = self.chunk_repository.search_by_vector(
             tenant_id=payload.tenant_id,
             knowledge_base_id=payload.knowledge_base_id,
+            embedding=query_embedding_result.embedding,
+            limit=max(payload.top_k * 5, payload.top_k),
         )
-        document_cache: dict[str, KnowledgeDocument] = {}
-        query_embedding_result = self.embedding_service.embed_text(payload.query)
+        if vector_candidates:
+            chunks = [chunk for chunk, _ in vector_candidates]
+            vector_scores_by_chunk_id = {
+                chunk.id: score for chunk, score in vector_candidates
+            }
+            retrieval_mode = "pgvector-hybrid"
+        else:
+            chunks = self.chunk_repository.list_by_base(
+                tenant_id=payload.tenant_id,
+                knowledge_base_id=payload.knowledge_base_id,
+            )
+            vector_scores_by_chunk_id = {}
+            retrieval_mode = "hybrid"
         scored: list[tuple[KnowledgeChunk, KnowledgeDocument, float, dict[str, JSONValue]]] = []
         for chunk in chunks:
             document = document_cache.get(chunk.document_id)
@@ -124,7 +139,9 @@ class KnowledgeApplicationService:
             if not self._matches_filters(document=document, filters_json=payload.filters_json):
                 continue
             keyword = keyword_score(payload.query, chunk.content_text)
-            vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
+            vector = vector_scores_by_chunk_id.get(chunk.id)
+            if vector is None:
+                vector = cosine_similarity(query_embedding_result.embedding, chunk.embedding_json)
             score = round(keyword * 0.7 + vector * 0.3, 6)
             scored.append(
                 (
@@ -134,7 +151,7 @@ class KnowledgeApplicationService:
                     {
                         "keyword_score": round(keyword, 6),
                         "vector_score": round(vector, 6),
-                        "retrieval_mode": "hybrid",
+                        "retrieval_mode": retrieval_mode,
                         "embedding_provider": query_embedding_result.provider,
                         "embedding_model": query_embedding_result.model,
                     },

+ 1 - 0
services/knowledge-service/app/db/models/knowledge_chunk.py

@@ -16,4 +16,5 @@ class KnowledgeChunk(TenantMixin, AuditMixin, VersionMixin, Base):
     token_count: Mapped[int] = mapped_column(Integer, default=0)
     embedding_model: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True)
     embedding_json: Mapped[list[float] | None] = mapped_column(JSON, nullable=True)
+    embedding_vector: Mapped[str | None] = mapped_column(Text, nullable=True)
     metadata_json: Mapped[dict[str, JSONValue] | None] = mapped_column(JSON, nullable=True)

+ 60 - 1
services/knowledge-service/app/domain/repositories.py

@@ -1,6 +1,6 @@
 from datetime import datetime
 
-from sqlalchemy import delete, select
+from sqlalchemy import delete, select, text
 from sqlalchemy.orm import Session
 
 from core_domain import KnowledgeBaseStatus, KnowledgeDocumentStatus
@@ -164,6 +164,7 @@ class KnowledgeChunkRepository:
                 token_count=_read_int(chunk, "token_count"),
                 embedding_model=_read_optional_string(chunk, "embedding_model"),
                 embedding_json=_read_float_list(chunk, "embedding_json"),
+                embedding_vector=_format_vector(_read_float_list(chunk, "embedding_json")),
                 metadata_json=_read_optional_dict(chunk, "metadata_json"),
             )
             self.db.add(entity)
@@ -187,6 +188,58 @@ class KnowledgeChunkRepository:
         )
         return list(self.db.scalars(stmt))
 
+    def search_by_vector(
+        self,
+        *,
+        tenant_id: str,
+        knowledge_base_id: str,
+        embedding: list[float],
+        limit: int,
+    ) -> list[tuple[KnowledgeChunk, float]]:
+        if not self._supports_pgvector_search():
+            return []
+        vector = _format_vector(embedding)
+        if vector is None:
+            return []
+        stmt = text(
+            """
+            SELECT id, 1 - (embedding_vector <=> CAST(:embedding AS vector)) AS score
+            FROM knowledge_chunk
+            WHERE tenant_id = :tenant_id
+              AND knowledge_base_id = :knowledge_base_id
+              AND embedding_vector IS NOT NULL
+            ORDER BY embedding_vector <=> CAST(:embedding AS vector)
+            LIMIT :limit
+            """
+        )
+        rows = self.db.execute(
+            stmt,
+            {
+                "tenant_id": tenant_id,
+                "knowledge_base_id": knowledge_base_id,
+                "embedding": vector,
+                "limit": limit,
+            },
+        ).all()
+        if not rows:
+            return []
+        chunk_ids = [str(row[0]) for row in rows]
+        chunks_by_id = {
+            chunk.id: chunk
+            for chunk in self.db.scalars(
+                select(KnowledgeChunk).where(KnowledgeChunk.id.in_(chunk_ids))
+            )
+        }
+        scored: list[tuple[KnowledgeChunk, float]] = []
+        for row in rows:
+            chunk = chunks_by_id.get(str(row[0]))
+            if chunk is not None:
+                scored.append((chunk, float(row[1] or 0.0)))
+        return scored
+
+    def _supports_pgvector_search(self) -> bool:
+        return self.db.bind is not None and self.db.bind.dialect.name == "postgresql"
+
 
 def _read_string(payload: dict[str, JSONValue], key: str) -> str:
     value = payload.get(key)
@@ -210,6 +263,12 @@ def _read_float_list(payload: dict[str, JSONValue], key: str) -> list[float] | N
     return [float(item) for item in value if isinstance(item, (int, float))]
 
 
+def _format_vector(value: list[float] | None) -> str | None:
+    if not value:
+        return None
+    return "[" + ",".join(str(float(item)) for item in value) + "]"
+
+
 def _read_optional_dict(
     payload: dict[str, JSONValue],
     key: str,

+ 40 - 4
services/runtime-service/app/application/services.py

@@ -565,6 +565,7 @@ class RuntimeApplicationService:
         *,
         worker_key: str,
         lease_seconds: int,
+        redis_client: object | None = None,
     ) -> tuple[WorkflowRun, NodeRun, str, int] | None:
         released_lease_count = self.node_run_repository.release_expired_leases(
             now_time=datetime.utcnow(),
@@ -576,10 +577,45 @@ class RuntimeApplicationService:
         if claimed_node_run is None:
             return None
 
-        result = self.execute_node_run(
-            node_run_id=claimed_node_run.id,
-            payload=NodeRunExecuteRequest(worker_key=worker_key),
-        )
+        if redis_client is not None:
+            from core_shared.redis_primitives import DistributedLock, IdempotencyStore
+
+            lock = DistributedLock(
+                client=redis_client,
+                name=f"node-run:{claimed_node_run.id}:lock",
+                ttl_seconds=lease_seconds,
+            )
+            if not lock.acquire():
+                return None
+            idempotency_store = IdempotencyStore(
+                client=redis_client,
+                prefix="node-run-idempotency",
+            )
+            if not idempotency_store.begin(key=claimed_node_run.id):
+                lock.release()
+                return None
+        else:
+            lock = None
+            idempotency_store = None
+
+        try:
+            result = self.execute_node_run(
+                node_run_id=claimed_node_run.id,
+                payload=NodeRunExecuteRequest(worker_key=worker_key),
+            )
+            if idempotency_store is not None and result is not None:
+                _, node_run, executor_name = result
+                idempotency_store.complete(
+                    key=claimed_node_run.id,
+                    result={
+                        "status": node_run.status,
+                        "node_run_id": node_run.id,
+                        "executor_name": executor_name,
+                    },
+                )
+        finally:
+            if lock is not None:
+                lock.release()
         if result is None:
             return None
 

+ 4 - 0
services/runtime-service/app/worker.py

@@ -9,6 +9,8 @@ from uuid import uuid4
 
 from sqlalchemy.orm import sessionmaker, Session
 
+from core_shared import try_build_redis_client
+
 from app.application.services import build_runtime_application_service
 from app.bootstrap.settings import RuntimeServiceSettings
 from app.db.session import build_session_factory
@@ -33,6 +35,7 @@ class RuntimeWorker:
         self.settings = settings
         self.session_factory = session_factory
         self.worker_key = worker_key
+        self.redis_client = try_build_redis_client(settings.redis_url)
 
     def run_forever(self) -> RuntimeWorkerStats:
         executed_count = 0
@@ -70,6 +73,7 @@ class RuntimeWorker:
             result = service.execute_next_claimed_node_run(
                 worker_key=self.worker_key,
                 lease_seconds=self.settings.worker_lease_seconds,
+                redis_client=self.redis_client,
             )
             return result is not None
         finally: