| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580 |
- from datetime import datetime, timedelta
- from typing import cast
- from sqlalchemy.orm import Session
- from core_domain import (
- ChatCompletionRequestContract,
- ChatMessageContract,
- MemoryCreateContract,
- MemoryScopeType,
- MemorySearchRequestContract,
- MemorySearchResultContract,
- )
- from core_shared import JSONValue
- from app.bootstrap.settings import AgentServiceSettings
- from app.db.models import AgentDefinition, AgentRun, AgentVersion
- from app.domain.repositories import (
- AgentDefinitionRepository,
- AgentRunRepository,
- AgentVersionRepository,
- )
- from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError
- from app.infrastructure.memory_client import MemoryClient, MemoryClientError
- from app.schemas.agent import (
- AgentCreateRequest,
- AgentRunCreateRequest,
- AgentRunExecuteRequest,
- AgentRunStatusUpdateRequest,
- AgentStatusUpdateRequest,
- AgentVersionCreateRequest,
- )
- class AgentApplicationService:
- def __init__(
- self,
- *,
- agent_repository: AgentDefinitionRepository,
- agent_version_repository: AgentVersionRepository,
- agent_run_repository: AgentRunRepository,
- model_gateway_client: ModelGatewayClient | None = None,
- memory_client: MemoryClient | None = None,
- ) -> None:
- self.agent_repository = agent_repository
- self.agent_version_repository = agent_version_repository
- self.agent_run_repository = agent_run_repository
- self.model_gateway_client = model_gateway_client
- self.memory_client = memory_client
- def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition:
- return self.agent_repository.create(
- tenant_id=payload.tenant_id,
- code=payload.code,
- name=payload.name,
- description=payload.description,
- agent_type=payload.agent_type,
- owner_user_id=payload.owner_user_id,
- metadata_json=payload.metadata_json,
- )
- def list_agents(self, *, tenant_id: str) -> list[AgentDefinition]:
- return self.agent_repository.list_by_tenant(tenant_id=tenant_id)
- def update_agent_status(
- self,
- *,
- agent_id: str,
- payload: AgentStatusUpdateRequest,
- ) -> AgentDefinition | None:
- return self.agent_repository.update_status(
- tenant_id=payload.tenant_id,
- agent_id=agent_id,
- status=payload.status,
- )
- def create_agent_version(self, payload: AgentVersionCreateRequest) -> AgentVersion:
- agent = self.agent_repository.get_by_id(
- tenant_id=payload.tenant_id,
- agent_id=payload.agent_id,
- )
- if agent is None:
- raise ValueError(f"agent not found: {payload.agent_id}")
- return self.agent_version_repository.create(
- tenant_id=payload.tenant_id,
- agent_id=payload.agent_id,
- status=payload.status,
- role=payload.role,
- goal=payload.goal,
- system_prompt=payload.system_prompt,
- model_config_json=payload.model_config_data.model_dump(mode="json"),
- memory_policy_json=payload.memory_policy.model_dump(mode="json"),
- tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs],
- skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs],
- )
- def list_agent_versions(self, *, tenant_id: str, agent_id: str) -> list[AgentVersion]:
- return self.agent_version_repository.list_by_agent(tenant_id=tenant_id, agent_id=agent_id)
- def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun:
- agent_version = self._resolve_agent_version(
- tenant_id=payload.tenant_id,
- agent_id=payload.agent_id,
- agent_version_id=payload.agent_version_id,
- )
- if agent_version is None:
- raise ValueError("published agent version not found")
- return self.agent_run_repository.create(
- tenant_id=payload.tenant_id,
- agent_id=payload.agent_id,
- agent_version_id=agent_version.id,
- session_id=payload.session_id,
- input_text=payload.input_text,
- input_json=payload.input_json,
- )
- def list_agent_runs(
- self,
- *,
- tenant_id: str,
- agent_id: str | None = None,
- session_id: str | None = None,
- ) -> list[AgentRun]:
- return self.agent_run_repository.list_by_scope(
- tenant_id=tenant_id,
- agent_id=agent_id,
- session_id=session_id,
- )
- def update_agent_run_status(
- self,
- *,
- agent_run_id: str,
- payload: AgentRunStatusUpdateRequest,
- ) -> AgentRun | None:
- entity = self.agent_run_repository.get_by_id(
- tenant_id=payload.tenant_id,
- agent_run_id=agent_run_id,
- )
- if entity is None:
- return None
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- output_text=payload.output_text,
- output_json=payload.output_json,
- error_code=payload.error_code,
- error_message=payload.error_message,
- )
- def execute_agent_run(
- self,
- *,
- agent_run_id: str,
- payload: AgentRunExecuteRequest,
- ) -> AgentRun | None:
- agent_run = self.agent_run_repository.get_by_id(
- tenant_id=payload.tenant_id,
- agent_run_id=agent_run_id,
- )
- if agent_run is None:
- return None
- agent_version = self.agent_version_repository.get_by_id(
- tenant_id=payload.tenant_id,
- agent_version_id=agent_run.agent_version_id,
- )
- if agent_version is None:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_version_missing",
- error_message=f"agent version not found: {agent_run.agent_version_id}",
- )
- self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="running",
- worker_key=payload.worker_key,
- )
- memory_results, memory_metadata = self._read_relevant_memories(
- agent_run=agent_run,
- agent_version=agent_version,
- )
- messages = self._build_chat_messages(
- agent_run=agent_run,
- agent_version=agent_version,
- memory_results=memory_results,
- )
- if payload.dry_run:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=self._build_dry_run_output(
- agent_run=agent_run,
- agent_version=agent_version,
- ),
- output_json={
- "dry_run": True,
- "agent_version_id": agent_version.id,
- "message_count": len(messages),
- "messages": [message.model_dump(mode="json") for message in messages],
- **memory_metadata,
- },
- )
- if self.model_gateway_client is None:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_missing",
- error_message="model gateway client is not configured",
- )
- try:
- response = self.model_gateway_client.create_chat_completion(
- ChatCompletionRequestContract(
- model=self._read_optional_string(agent_version.model_config_json, "model"),
- temperature=self._read_optional_float(
- agent_version.model_config_json,
- "temperature",
- ),
- max_tokens=self._read_optional_int(agent_version.model_config_json, "max_tokens"),
- messages=messages,
- metadata_json={
- "tenant_id": agent_run.tenant_id,
- "agent_id": agent_run.agent_id,
- "agent_version_id": agent_version.id,
- "agent_run_id": agent_run.id,
- },
- )
- )
- except ModelGatewayClientError as exc:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_error",
- error_message=str(exc),
- )
- memory_write_metadata = self._write_interaction_memory(
- agent_run=agent_run,
- agent_version=agent_version,
- output_text=response.content,
- )
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=response.content,
- output_json={
- "dry_run": False,
- "agent_version_id": agent_version.id,
- "model": response.model,
- "finish_reason": response.finish_reason,
- "usage_json": response.usage_json,
- "raw_response_json": response.raw_response_json,
- **memory_metadata,
- **memory_write_metadata,
- },
- )
- def execute_next_claimed_agent_run(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- dry_run: bool,
- ) -> tuple[AgentRun, int] | None:
- released_lease_count = self.agent_run_repository.release_expired_leases(
- now_time=datetime.utcnow(),
- )
- claimed_agent_run = self.agent_run_repository.claim_next_queued(
- worker_key=worker_key,
- lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
- )
- if claimed_agent_run is None:
- return None
- result = self.execute_agent_run(
- agent_run_id=claimed_agent_run.id,
- payload=AgentRunExecuteRequest(
- tenant_id=claimed_agent_run.tenant_id,
- worker_key=worker_key,
- dry_run=dry_run,
- ),
- )
- if result is None:
- return None
- return result, released_lease_count
- def _resolve_agent_version(
- self,
- *,
- tenant_id: str,
- agent_id: str,
- agent_version_id: str | None,
- ) -> AgentVersion | None:
- if agent_version_id is not None:
- return self.agent_version_repository.get_by_id(
- tenant_id=tenant_id,
- agent_version_id=agent_version_id,
- )
- return self.agent_version_repository.get_latest_published(
- tenant_id=tenant_id,
- agent_id=agent_id,
- )
- def _build_chat_messages(
- self,
- *,
- agent_run: AgentRun,
- agent_version: AgentVersion,
- memory_results: list[MemorySearchResultContract] | None = None,
- ) -> list[ChatMessageContract]:
- messages = [
- ChatMessageContract(role="system", content=agent_version.system_prompt),
- ]
- if agent_version.goal:
- messages.append(ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}"))
- if memory_results:
- messages.append(
- ChatMessageContract(
- role="system",
- content=self._format_memory_context(memory_results),
- )
- )
- if agent_run.input_text:
- messages.append(ChatMessageContract(role="user", content=agent_run.input_text))
- if agent_run.input_json:
- messages.append(
- ChatMessageContract(
- role="user",
- content=f"Structured input: {agent_run.input_json}",
- )
- )
- return messages
- def _build_dry_run_output(self, *, agent_run: AgentRun, agent_version: AgentVersion) -> str:
- input_preview = agent_run.input_text or str(agent_run.input_json or {})
- return (
- f"[dry-run] Agent role={agent_version.role} "
- f"version={agent_version.version_no} received: {input_preview}"
- )
- def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str) and value:
- return value
- return None
- def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None:
- value = payload.get(key)
- if isinstance(value, (int, float)) and not isinstance(value, bool):
- return float(value)
- return None
- def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _read_relevant_memories(
- self,
- *,
- agent_run: AgentRun,
- agent_version: AgentVersion,
- ) -> tuple[list[MemorySearchResultContract], dict[str, JSONValue]]:
- if self.memory_client is None:
- return [], {"memory_read_enabled": False, "memory_read_reason": "client_missing"}
- if not self._read_bool(agent_version.memory_policy_json, "enabled", default=True):
- return [], {"memory_read_enabled": False, "memory_read_reason": "policy_disabled"}
- query = agent_run.input_text or str(agent_run.input_json or "")
- if not query:
- return [], {"memory_read_enabled": True, "memory_read_count": 0}
- scope = self._resolve_memory_scope(agent_run=agent_run, agent_version=agent_version)
- if scope is None:
- return [], {
- "memory_read_enabled": True,
- "memory_read_count": 0,
- "memory_read_reason": "scope_unavailable",
- }
- scope_type, scope_id = scope
- try:
- results = self.memory_client.search_memories(
- MemorySearchRequestContract(
- tenant_id=agent_run.tenant_id,
- query=query,
- scope_type=scope_type,
- scope_id=scope_id,
- owner_agent_id=agent_run.agent_id,
- session_id=agent_run.session_id,
- limit=self._read_int(
- agent_version.memory_policy_json,
- "read_top_k",
- default=8,
- ),
- )
- )
- except MemoryClientError as exc:
- return [], {
- "memory_read_enabled": True,
- "memory_read_count": 0,
- "memory_read_error": str(exc),
- }
- return results, {
- "memory_read_enabled": True,
- "memory_read_count": len(results),
- "memory_scope_type": scope_type,
- "memory_scope_id": scope_id,
- }
- def _write_interaction_memory(
- self,
- *,
- agent_run: AgentRun,
- agent_version: AgentVersion,
- output_text: str,
- ) -> dict[str, JSONValue]:
- if self.memory_client is None:
- return {"memory_write_enabled": False, "memory_write_reason": "client_missing"}
- if not self._read_bool(agent_version.memory_policy_json, "write_enabled", default=True):
- return {"memory_write_enabled": False, "memory_write_reason": "policy_disabled"}
- scope = self._resolve_memory_scope(agent_run=agent_run, agent_version=agent_version)
- if scope is None:
- return {"memory_write_enabled": True, "memory_write_reason": "scope_unavailable"}
- scope_type, scope_id = scope
- try:
- memory = self.memory_client.create_memory(
- MemoryCreateContract(
- tenant_id=agent_run.tenant_id,
- scope_type=scope_type,
- scope_id=scope_id,
- memory_type="conversation",
- content_text=self._format_interaction_memory(
- agent_run=agent_run,
- output_text=output_text,
- ),
- content_json={
- "agent_run_id": agent_run.id,
- "agent_version_id": agent_version.id,
- "input_text": agent_run.input_text,
- "output_text": output_text,
- },
- metadata_json={
- "source": "agent-service",
- "role": agent_version.role,
- "version_no": agent_version.version_no,
- },
- owner_agent_id=agent_run.agent_id,
- session_id=agent_run.session_id,
- source_ref=f"agent_run:{agent_run.id}",
- importance_score=self._read_nested_int(
- agent_version.memory_policy_json,
- "config_json",
- "write_importance_score",
- default=50,
- ),
- )
- )
- except MemoryClientError as exc:
- return {
- "memory_write_enabled": True,
- "memory_write_error": str(exc),
- }
- return {
- "memory_write_enabled": True,
- "memory_written_id": memory.id,
- "memory_scope_type": scope_type,
- "memory_scope_id": scope_id,
- }
- def _resolve_memory_scope(
- self,
- *,
- agent_run: AgentRun,
- agent_version: AgentVersion,
- ) -> tuple[MemoryScopeType, str] | None:
- scope_value = self._read_optional_string(
- agent_version.memory_policy_json,
- "memory_scope",
- ) or "session"
- if scope_value == "tenant":
- return "tenant", agent_run.tenant_id
- if scope_value == "agent":
- return "agent", agent_run.agent_id
- if scope_value == "session" and agent_run.session_id:
- return "session", agent_run.session_id
- if scope_value == "user":
- user_id = self._read_input_json_string(agent_run=agent_run, key="user_id")
- if user_id is not None:
- return "user", user_id
- if scope_value == "team":
- team_id = self._read_input_json_string(agent_run=agent_run, key="team_id")
- if team_id is not None:
- return "team", team_id
- return None
- def _format_memory_context(self, memory_results: list[MemorySearchResultContract]) -> str:
- lines = ["Relevant memories:"]
- for index, result in enumerate(memory_results, start=1):
- lines.append(f"{index}. {result.item.content_text}")
- return "\n".join(lines)
- def _format_interaction_memory(self, *, agent_run: AgentRun, output_text: str) -> str:
- input_text = agent_run.input_text or str(agent_run.input_json or {})
- return f"User input: {input_text}\nAgent output: {output_text}"
- def _read_bool(self, payload: dict[str, JSONValue], key: str, *, default: bool) -> bool:
- value = payload.get(key)
- if isinstance(value, bool):
- return value
- return default
- def _read_int(self, payload: dict[str, JSONValue], key: str, *, default: int) -> int:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return default
- def _read_nested_int(
- self,
- payload: dict[str, JSONValue],
- parent_key: str,
- child_key: str,
- *,
- default: int,
- ) -> int:
- parent_value = payload.get(parent_key)
- if not isinstance(parent_value, dict):
- return default
- return self._read_int(
- cast(dict[str, JSONValue], parent_value),
- child_key,
- default=default,
- )
- def _read_input_json_string(self, *, agent_run: AgentRun, key: str) -> str | None:
- if agent_run.input_json is None:
- return None
- value = agent_run.input_json.get(key)
- if isinstance(value, str) and value:
- return value
- return None
- def build_agent_application_service(
- *,
- db: Session,
- settings: AgentServiceSettings,
- ) -> AgentApplicationService:
- return AgentApplicationService(
- agent_repository=AgentDefinitionRepository(db),
- agent_version_repository=AgentVersionRepository(db),
- agent_run_repository=AgentRunRepository(db),
- model_gateway_client=ModelGatewayClient(
- base_url=settings.model_gateway_service_url,
- timeout_seconds=settings.model_gateway_timeout_seconds,
- ),
- memory_client=MemoryClient(
- base_url=settings.memory_service_url,
- timeout_seconds=settings.memory_service_timeout_seconds,
- ),
- )
|