from datetime import datetime, timedelta from typing import cast from sqlalchemy.orm import Session from core_events import EventPublishContract, EventServiceClient, EventServiceClientError from core_domain import ( AgentSkillRefContract, AgentToolRefContract, ChatCompletionRequestContract, ChatMessageContract, MemoryCreateContract, MemoryScopeType, MemorySearchRequestContract, MemorySearchResultContract, ) from core_shared import JSONValue from app.bootstrap.settings import AgentServiceSettings from app.db.models import AgentDefinition, AgentRun, AgentVersion from app.domain.repositories import ( AgentDefinitionRepository, AgentRunRepository, AgentVersionRepository, ) from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError from app.infrastructure.memory_client import MemoryClient, MemoryClientError from app.infrastructure.skill_client import SkillServiceClient, SkillServiceClientError from app.infrastructure.tool_client import ToolServiceClient, ToolServiceClientError from app.schemas.agent import ( AgentCreateRequest, AgentRunCreateRequest, AgentRunExecuteRequest, AgentRunStatusUpdateRequest, AgentStatusUpdateRequest, AgentVersionCreateRequest, ) class AgentApplicationService: def __init__( self, *, agent_repository: AgentDefinitionRepository, agent_version_repository: AgentVersionRepository, agent_run_repository: AgentRunRepository, model_gateway_client: ModelGatewayClient | None = None, memory_client: MemoryClient | None = None, tool_client: ToolServiceClient | None = None, skill_client: SkillServiceClient | None = None, event_client: EventServiceClient | None = None, ) -> None: self.agent_repository = agent_repository self.agent_version_repository = agent_version_repository self.agent_run_repository = agent_run_repository self.model_gateway_client = model_gateway_client self.memory_client = memory_client self.tool_client = tool_client self.skill_client = skill_client self.event_client = event_client def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition: return self.agent_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, agent_type=payload.agent_type, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json, ) def list_agents(self, *, tenant_id: str) -> list[AgentDefinition]: return self.agent_repository.list_by_tenant(tenant_id=tenant_id) def update_agent_status( self, *, agent_id: str, payload: AgentStatusUpdateRequest, ) -> AgentDefinition | None: return self.agent_repository.update_status( tenant_id=payload.tenant_id, agent_id=agent_id, status=payload.status, ) def create_agent_version(self, payload: AgentVersionCreateRequest) -> AgentVersion: agent = self.agent_repository.get_by_id( tenant_id=payload.tenant_id, agent_id=payload.agent_id, ) if agent is None: raise ValueError(f"agent not found: {payload.agent_id}") return self.agent_version_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, status=payload.status, role=payload.role, goal=payload.goal, system_prompt=payload.system_prompt, model_config_json=payload.model_config_data.model_dump(mode="json"), memory_policy_json=payload.memory_policy.model_dump(mode="json"), tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs], skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs], ) def list_agent_versions(self, *, tenant_id: str, agent_id: str) -> list[AgentVersion]: return self.agent_version_repository.list_by_agent(tenant_id=tenant_id, agent_id=agent_id) def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun: agent_version = self._resolve_agent_version( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=payload.agent_version_id, ) if agent_version is None: raise ValueError("published agent version not found") agent_run = self.agent_run_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=agent_version.id, session_id=payload.session_id, input_text=payload.input_text, input_json=payload.input_json, ) self._publish_event( event_type="agent.run.created", agent_run=agent_run, payload_json={"agent_run_id": agent_run.id, "status": agent_run.status}, ) return agent_run def list_agent_runs( self, *, tenant_id: str, agent_id: str | None = None, session_id: str | None = None, ) -> list[AgentRun]: return self.agent_run_repository.list_by_scope( tenant_id=tenant_id, agent_id=agent_id, session_id=session_id, ) def update_agent_run_status( self, *, agent_run_id: str, payload: AgentRunStatusUpdateRequest, ) -> AgentRun | None: entity = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if entity is None: return None return self.agent_run_repository.update_status( agent_run_id=agent_run_id, status=payload.status, worker_key=payload.worker_key, output_text=payload.output_text, output_json=payload.output_json, error_code=payload.error_code, error_message=payload.error_message, ) def execute_agent_run( self, *, agent_run_id: str, payload: AgentRunExecuteRequest, ) -> AgentRun | None: agent_run = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if agent_run is None: return None agent_version = self.agent_version_repository.get_by_id( tenant_id=payload.tenant_id, agent_version_id=agent_run.agent_version_id, ) if agent_version is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="agent_version_missing", error_message=f"agent version not found: {agent_run.agent_version_id}", ) self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="running", worker_key=payload.worker_key, ) memory_results, memory_metadata = self._read_relevant_memories( agent_run=agent_run, agent_version=agent_version, ) selected_tools = self._select_tool_refs(agent_run=agent_run, agent_version=agent_version) selected_skills = self._select_skill_refs(agent_run=agent_run, agent_version=agent_version) if payload.dry_run: messages = self._build_chat_messages( agent_run=agent_run, agent_version=agent_version, memory_results=memory_results, capability_context=self._format_capability_plan( selected_tools=selected_tools, selected_skills=selected_skills, ), ) completed_run = self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=self._build_dry_run_output( agent_run=agent_run, agent_version=agent_version, ), output_json={ "dry_run": True, "agent_version_id": agent_version.id, "message_count": len(messages), "messages": [message.model_dump(mode="json") for message in messages], "selected_tool_refs": [ tool_ref.model_dump(mode="json") for tool_ref in selected_tools ], "selected_skill_refs": [ skill_ref.model_dump(mode="json") for skill_ref in selected_skills ], **memory_metadata, }, ) if completed_run is not None: self._publish_event( event_type="agent.run.completed", agent_run=completed_run, payload_json={ "agent_run_id": completed_run.id, "dry_run": True, "status": completed_run.status, }, ) return completed_run tool_invocations = self._invoke_selected_tools( agent_run=agent_run, selected_tools=selected_tools, ) skill_invocations = self._invoke_selected_skills( agent_run=agent_run, selected_skills=selected_skills, worker_key=payload.worker_key, ) messages = self._build_chat_messages( agent_run=agent_run, agent_version=agent_version, memory_results=memory_results, capability_context=self._format_capability_results( tool_invocations=tool_invocations, skill_invocations=skill_invocations, ), ) if self.model_gateway_client is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_missing", error_message="model gateway client is not configured", output_json={ "tool_invocations": tool_invocations, "skill_invocations": skill_invocations, **memory_metadata, }, ) try: response = self.model_gateway_client.create_chat_completion( ChatCompletionRequestContract( model=self._read_optional_string(agent_version.model_config_json, "model"), temperature=self._read_optional_float( agent_version.model_config_json, "temperature", ), max_tokens=self._read_optional_int( agent_version.model_config_json, "max_tokens", ), messages=messages, metadata_json={ "tenant_id": agent_run.tenant_id, "agent_id": agent_run.agent_id, "agent_version_id": agent_version.id, "agent_run_id": agent_run.id, }, ) ) except ModelGatewayClientError as exc: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_error", error_message=str(exc), ) memory_write_metadata = self._write_interaction_memory( agent_run=agent_run, agent_version=agent_version, output_text=response.content, ) completed_run = self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=response.content, output_json={ "dry_run": False, "agent_version_id": agent_version.id, "model": response.model, "finish_reason": response.finish_reason, "usage_json": response.usage_json, "raw_response_json": response.raw_response_json, "tool_invocations": tool_invocations, "skill_invocations": skill_invocations, **memory_metadata, **memory_write_metadata, }, ) if completed_run is not None: self._publish_event( event_type="agent.run.completed", agent_run=completed_run, payload_json={ "agent_run_id": completed_run.id, "dry_run": False, "status": completed_run.status, }, ) return completed_run def _publish_event( self, *, event_type: str, agent_run: AgentRun, payload_json: dict[str, JSONValue], ) -> None: if self.event_client is None: return try: self.event_client.publish_event( EventPublishContract( tenant_id=agent_run.tenant_id, event_type=event_type, source_service="agent-service", aggregate_type="agent_run", aggregate_id=agent_run.id, correlation_id=agent_run.session_id, payload_json={ **payload_json, "agent_id": agent_run.agent_id, "agent_version_id": agent_run.agent_version_id, }, ) ) except EventServiceClientError: return def execute_next_claimed_agent_run( self, *, worker_key: str, lease_seconds: int, dry_run: bool, ) -> tuple[AgentRun, int] | None: released_lease_count = self.agent_run_repository.release_expired_leases( now_time=datetime.utcnow(), ) claimed_agent_run = self.agent_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds), ) if claimed_agent_run is None: return None result = self.execute_agent_run( agent_run_id=claimed_agent_run.id, payload=AgentRunExecuteRequest( tenant_id=claimed_agent_run.tenant_id, worker_key=worker_key, dry_run=dry_run, ), ) if result is None: return None return result, released_lease_count def _resolve_agent_version( self, *, tenant_id: str, agent_id: str, agent_version_id: str | None, ) -> AgentVersion | None: if agent_version_id is not None: return self.agent_version_repository.get_by_id( tenant_id=tenant_id, agent_version_id=agent_version_id, ) return self.agent_version_repository.get_latest_published( tenant_id=tenant_id, agent_id=agent_id, ) def _build_chat_messages( self, *, agent_run: AgentRun, agent_version: AgentVersion, memory_results: list[MemorySearchResultContract] | None = None, capability_context: str | None = None, ) -> list[ChatMessageContract]: messages = [ ChatMessageContract(role="system", content=agent_version.system_prompt), ] if agent_version.goal: messages.append( ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}") ) if memory_results: messages.append( ChatMessageContract( role="system", content=self._format_memory_context(memory_results), ) ) if capability_context: messages.append(ChatMessageContract(role="system", content=capability_context)) if agent_run.input_text: messages.append(ChatMessageContract(role="user", content=agent_run.input_text)) if agent_run.input_json: messages.append( ChatMessageContract( role="user", content=f"Structured input: {agent_run.input_json}", ) ) return messages def _select_tool_refs( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> list[AgentToolRefContract]: input_preview = self._build_input_preview(agent_run) selected: list[AgentToolRefContract] = [] for item in agent_version.tool_refs_json: ref = AgentToolRefContract.model_validate(item) if ( ref.required or self._read_bool(ref.config_json, "auto_invoke", default=False) or self._matches_selection_keywords(ref.config_json, input_preview) ): selected.append(ref) return selected def _select_skill_refs( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> list[AgentSkillRefContract]: input_preview = self._build_input_preview(agent_run) selected: list[AgentSkillRefContract] = [] for item in agent_version.skill_refs_json: ref = AgentSkillRefContract.model_validate(item) auto_invoke = self._read_bool(ref.config_json, "auto_invoke", default=True) if auto_invoke or self._matches_selection_keywords(ref.config_json, input_preview): selected.append(ref) return selected def _invoke_selected_tools( self, *, agent_run: AgentRun, selected_tools: list[AgentToolRefContract], ) -> list[dict[str, JSONValue]]: invocations: list[dict[str, JSONValue]] = [] for ref in selected_tools: if ref.tool_binding_id is None: invocations.append( { "status": "skipped", "reason": "tool_binding_id_missing", "tool_code": ref.tool_code, } ) continue if self.tool_client is None: invocations.append( { "status": "failed", "reason": "tool_client_missing", "tool_binding_id": ref.tool_binding_id, } ) continue try: detail = self.tool_client.get_tool_binding_detail( tenant_id=agent_run.tenant_id, binding_id=ref.tool_binding_id, ) if not detail.binding.enabled: invocations.append( { "status": "failed", "reason": "tool_binding_disabled", "tool_binding_id": ref.tool_binding_id, } ) continue if detail.tool_definition.tool_type != "http": invocations.append( { "status": "skipped", "reason": "unsupported_tool_type", "tool_type": detail.tool_definition.tool_type, "tool_binding_id": ref.tool_binding_id, } ) continue output_text, output_json = self.tool_client.invoke_http_tool( detail=detail, input_json=agent_run.input_json or {}, config_json=ref.config_json, ) except ToolServiceClientError as exc: invocations.append( { "status": "failed", "reason": str(exc), "tool_binding_id": ref.tool_binding_id, } ) continue invocations.append( { "status": "completed", "tool_binding_id": ref.tool_binding_id, "tool_code": detail.tool_definition.code, "output_text": output_text, "output_json": output_json, } ) return invocations def _invoke_selected_skills( self, *, agent_run: AgentRun, selected_skills: list[AgentSkillRefContract], worker_key: str | None, ) -> list[dict[str, JSONValue]]: invocations: list[dict[str, JSONValue]] = [] for ref in selected_skills: if self.skill_client is None: invocations.append( { "status": "failed", "reason": "skill_client_missing", "skill_id": ref.skill_id, "skill_code": ref.skill_code, } ) continue skill_id = ref.skill_id or self._resolve_skill_id_by_code( tenant_id=agent_run.tenant_id, skill_code=ref.skill_code, ) if skill_id is None: invocations.append( { "status": "failed", "reason": "skill_id_missing", "skill_code": ref.skill_code, } ) continue try: created_run = self.skill_client.create_skill_run( tenant_id=agent_run.tenant_id, skill_id=skill_id, skill_version_id=self._read_optional_string( ref.config_json, "skill_version_id", ), installation_id=self._read_optional_string( ref.config_json, "installation_id", ), input_json=self._build_skill_input_json(agent_run=agent_run, ref=ref), ) executed_run = self.skill_client.execute_skill_run( tenant_id=agent_run.tenant_id, skill_run_id=created_run.id, worker_key=worker_key, ) except SkillServiceClientError as exc: invocations.append( { "status": "failed", "reason": str(exc), "skill_id": skill_id, "skill_code": ref.skill_code, } ) continue invocations.append( { "status": executed_run.status, "skill_id": skill_id, "skill_code": ref.skill_code, "skill_run_id": executed_run.id, "output_text": executed_run.output_text, "output_json": executed_run.output_json, "error_code": executed_run.error_code, "error_message": executed_run.error_message, } ) return invocations def _format_capability_plan( self, *, selected_tools: list[AgentToolRefContract], selected_skills: list[AgentSkillRefContract], ) -> str: return ( "Selected capability plan before model call:\n" f"Tools: {[item.model_dump(mode='json') for item in selected_tools]}\n" f"Skills: {[item.model_dump(mode='json') for item in selected_skills]}" ) def _format_capability_results( self, *, tool_invocations: list[dict[str, JSONValue]], skill_invocations: list[dict[str, JSONValue]], ) -> str | None: if not tool_invocations and not skill_invocations: return None return ( "Capability invocation results before model call:\n" f"Tools: {tool_invocations}\n" f"Skills: {skill_invocations}" ) def _build_skill_input_json( self, *, agent_run: AgentRun, ref: AgentSkillRefContract, ) -> dict[str, JSONValue]: input_json: dict[str, JSONValue] = dict(agent_run.input_json or {}) if agent_run.input_text: input_json.setdefault("input_text", agent_run.input_text) configured_input = ref.config_json.get("input_json") if isinstance(configured_input, dict): input_json.update( {str(item_key): item_value for item_key, item_value in configured_input.items()} ) return input_json def _resolve_skill_id_by_code(self, *, tenant_id: str, skill_code: str | None) -> str | None: if skill_code is None or self.skill_client is None: return None try: skills = self.skill_client.list_skills(tenant_id=tenant_id) except SkillServiceClientError: return None for skill in skills: if skill.code == skill_code: return skill.id return None def _build_input_preview(self, agent_run: AgentRun) -> str: return f"{agent_run.input_text or ''} {agent_run.input_json or {}}".lower() def _matches_selection_keywords( self, config_json: dict[str, JSONValue], input_preview: str, ) -> bool: keywords = config_json.get("selection_keywords") if not isinstance(keywords, list): return False return any( isinstance(keyword, str) and keyword.lower() in input_preview for keyword in keywords ) def _build_dry_run_output(self, *, agent_run: AgentRun, agent_version: AgentVersion) -> str: input_preview = agent_run.input_text or str(agent_run.input_json or {}) return ( f"[dry-run] Agent role={agent_version.role} " f"version={agent_version.version_no} received: {input_preview}" ) def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) if isinstance(value, str) and value: return value return None def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None: value = payload.get(key) if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) return None def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return None def _read_relevant_memories( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> tuple[list[MemorySearchResultContract], dict[str, JSONValue]]: if self.memory_client is None: return [], {"memory_read_enabled": False, "memory_read_reason": "client_missing"} if not self._read_bool(agent_version.memory_policy_json, "enabled", default=True): return [], {"memory_read_enabled": False, "memory_read_reason": "policy_disabled"} query = agent_run.input_text or str(agent_run.input_json or "") if not query: return [], {"memory_read_enabled": True, "memory_read_count": 0} scope = self._resolve_memory_scope(agent_run=agent_run, agent_version=agent_version) if scope is None: return [], { "memory_read_enabled": True, "memory_read_count": 0, "memory_read_reason": "scope_unavailable", } scope_type, scope_id = scope try: results = self.memory_client.search_memories( MemorySearchRequestContract( tenant_id=agent_run.tenant_id, query=query, scope_type=scope_type, scope_id=scope_id, owner_agent_id=agent_run.agent_id, session_id=agent_run.session_id, limit=self._read_int( agent_version.memory_policy_json, "read_top_k", default=8, ), ) ) except MemoryClientError as exc: return [], { "memory_read_enabled": True, "memory_read_count": 0, "memory_read_error": str(exc), } return results, { "memory_read_enabled": True, "memory_read_count": len(results), "memory_scope_type": scope_type, "memory_scope_id": scope_id, } def _write_interaction_memory( self, *, agent_run: AgentRun, agent_version: AgentVersion, output_text: str, ) -> dict[str, JSONValue]: if self.memory_client is None: return {"memory_write_enabled": False, "memory_write_reason": "client_missing"} if not self._read_bool(agent_version.memory_policy_json, "write_enabled", default=True): return {"memory_write_enabled": False, "memory_write_reason": "policy_disabled"} scope = self._resolve_memory_scope(agent_run=agent_run, agent_version=agent_version) if scope is None: return {"memory_write_enabled": True, "memory_write_reason": "scope_unavailable"} scope_type, scope_id = scope try: memory = self.memory_client.create_memory( MemoryCreateContract( tenant_id=agent_run.tenant_id, scope_type=scope_type, scope_id=scope_id, memory_type="conversation", content_text=self._format_interaction_memory( agent_run=agent_run, output_text=output_text, ), content_json={ "agent_run_id": agent_run.id, "agent_version_id": agent_version.id, "input_text": agent_run.input_text, "output_text": output_text, }, metadata_json={ "source": "agent-service", "role": agent_version.role, "version_no": agent_version.version_no, }, owner_agent_id=agent_run.agent_id, session_id=agent_run.session_id, source_ref=f"agent_run:{agent_run.id}", importance_score=self._read_nested_int( agent_version.memory_policy_json, "config_json", "write_importance_score", default=50, ), ) ) except MemoryClientError as exc: return { "memory_write_enabled": True, "memory_write_error": str(exc), } return { "memory_write_enabled": True, "memory_written_id": memory.id, "memory_scope_type": scope_type, "memory_scope_id": scope_id, } def _resolve_memory_scope( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> tuple[MemoryScopeType, str] | None: scope_value = self._read_optional_string( agent_version.memory_policy_json, "memory_scope", ) or "session" if scope_value == "tenant": return "tenant", agent_run.tenant_id if scope_value == "agent": return "agent", agent_run.agent_id if scope_value == "session" and agent_run.session_id: return "session", agent_run.session_id if scope_value == "user": user_id = self._read_input_json_string(agent_run=agent_run, key="user_id") if user_id is not None: return "user", user_id if scope_value == "team": team_id = self._read_input_json_string(agent_run=agent_run, key="team_id") if team_id is not None: return "team", team_id return None def _format_memory_context(self, memory_results: list[MemorySearchResultContract]) -> str: lines = ["Relevant memories:"] for index, result in enumerate(memory_results, start=1): lines.append(f"{index}. {result.item.content_text}") return "\n".join(lines) def _format_interaction_memory(self, *, agent_run: AgentRun, output_text: str) -> str: input_text = agent_run.input_text or str(agent_run.input_json or {}) return f"User input: {input_text}\nAgent output: {output_text}" def _read_bool(self, payload: dict[str, JSONValue], key: str, *, default: bool) -> bool: value = payload.get(key) if isinstance(value, bool): return value return default def _read_int(self, payload: dict[str, JSONValue], key: str, *, default: int) -> int: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return default def _read_nested_int( self, payload: dict[str, JSONValue], parent_key: str, child_key: str, *, default: int, ) -> int: parent_value = payload.get(parent_key) if not isinstance(parent_value, dict): return default return self._read_int( cast(dict[str, JSONValue], parent_value), child_key, default=default, ) def _read_input_json_string(self, *, agent_run: AgentRun, key: str) -> str | None: if agent_run.input_json is None: return None value = agent_run.input_json.get(key) if isinstance(value, str) and value: return value return None def build_agent_application_service( *, db: Session, settings: AgentServiceSettings, ) -> AgentApplicationService: return AgentApplicationService( agent_repository=AgentDefinitionRepository(db), agent_version_repository=AgentVersionRepository(db), agent_run_repository=AgentRunRepository(db), model_gateway_client=ModelGatewayClient( base_url=settings.model_gateway_service_url, timeout_seconds=settings.model_gateway_timeout_seconds, ), memory_client=MemoryClient( base_url=settings.memory_service_url, timeout_seconds=settings.memory_service_timeout_seconds, ), tool_client=ToolServiceClient( base_url=settings.tool_service_url, timeout_seconds=settings.tool_service_timeout_seconds, ), skill_client=SkillServiceClient( base_url=settings.skill_service_url, timeout_seconds=settings.skill_service_timeout_seconds, ), event_client=EventServiceClient( base_url=settings.event_service_url, timeout_seconds=settings.event_service_timeout_seconds, ), )