import json from datetime import datetime, timedelta from typing import cast from sqlalchemy.orm import Session from core_events import EventPublishContract, EventServiceClient, EventServiceClientError from core_domain import ( AgentSkillRefContract, AgentToolRefContract, ChatCompletionRequestContract, ChatCompletionResponseContract, ChatMessageContract, MemoryCreateContract, MemoryScopeType, MemorySearchRequestContract, MemorySearchResultContract, ) from core_shared import JSONValue, try_build_redis_client from core_shared.task_queue import TaskQueuePublisher from app.bootstrap.settings import AgentServiceSettings from app.db.models import AgentDefinition, AgentRun, AgentToolInvocation, AgentVersion from app.domain.repositories import ( AgentDefinitionRepository, AgentRunRepository, AgentToolInvocationRepository, AgentVersionRepository, ) from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError from app.infrastructure.memory_client import MemoryClient, MemoryClientError from app.infrastructure.skill_client import SkillServiceClient, SkillServiceClientError from app.infrastructure.tool_client import ToolServiceClient, ToolServiceClientError from app.schemas.agent import ( AgentCreateRequest, AgentRunCreateRequest, AgentRunExecuteRequest, AgentRunStatusUpdateRequest, AgentStatusUpdateRequest, AgentVersionCreateRequest, ) class AgentApplicationService: def __init__( self, *, agent_repository: AgentDefinitionRepository, agent_version_repository: AgentVersionRepository, agent_run_repository: AgentRunRepository, agent_tool_invocation_repository: AgentToolInvocationRepository, model_gateway_client: ModelGatewayClient | None = None, memory_client: MemoryClient | None = None, tool_client: ToolServiceClient | None = None, skill_client: SkillServiceClient | None = None, event_client: EventServiceClient | None = None, task_queue_publisher: TaskQueuePublisher | None = None, react_max_steps: int = 5, react_max_tool_calls: int = 10, react_tool_retry_count: int = 1, ) -> None: self.agent_repository = agent_repository self.agent_version_repository = agent_version_repository self.agent_run_repository = agent_run_repository self.agent_tool_invocation_repository = agent_tool_invocation_repository self.model_gateway_client = model_gateway_client self.memory_client = memory_client self.tool_client = tool_client self.skill_client = skill_client self.event_client = event_client self.task_queue_publisher = task_queue_publisher self.react_max_steps = react_max_steps self.react_max_tool_calls = react_max_tool_calls self.react_tool_retry_count = react_tool_retry_count def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition: return self.agent_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, agent_type=payload.agent_type, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json, ) def list_agents(self, *, tenant_id: str) -> list[AgentDefinition]: return self.agent_repository.list_by_tenant(tenant_id=tenant_id) def update_agent_status( self, *, agent_id: str, payload: AgentStatusUpdateRequest, ) -> AgentDefinition | None: return self.agent_repository.update_status( tenant_id=payload.tenant_id, agent_id=agent_id, status=payload.status, ) def create_agent_version(self, payload: AgentVersionCreateRequest) -> AgentVersion: agent = self.agent_repository.get_by_id( tenant_id=payload.tenant_id, agent_id=payload.agent_id, ) if agent is None: raise ValueError(f"agent not found: {payload.agent_id}") return self.agent_version_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, status=payload.status, role=payload.role, goal=payload.goal, system_prompt=payload.system_prompt, model_config_json=payload.model_config_data.model_dump(mode="json"), memory_policy_json=payload.memory_policy.model_dump(mode="json"), tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs], skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs], ) def list_agent_versions(self, *, tenant_id: str, agent_id: str) -> list[AgentVersion]: return self.agent_version_repository.list_by_agent(tenant_id=tenant_id, agent_id=agent_id) def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun: agent_version = self._resolve_agent_version( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=payload.agent_version_id, ) if agent_version is None: raise ValueError("published agent version not found") agent_run = self.agent_run_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=agent_version.id, session_id=payload.session_id, input_text=payload.input_text, input_json=payload.input_json, ) self._publish_event( event_type="agent.run.created", agent_run=agent_run, payload_json={"agent_run_id": agent_run.id, "status": agent_run.status}, ) if self.task_queue_publisher is not None: self.task_queue_publisher.publish_agent_run( tenant_id=agent_run.tenant_id, agent_run_id=agent_run.id, ) return agent_run def list_agent_runs( self, *, tenant_id: str, agent_id: str | None = None, session_id: str | None = None, ) -> list[AgentRun]: return self.agent_run_repository.list_by_scope( tenant_id=tenant_id, agent_id=agent_id, session_id=session_id, ) def list_agent_tool_invocations( self, *, tenant_id: str, agent_run_id: str, ) -> list[AgentToolInvocation]: return self.agent_tool_invocation_repository.list_by_run( tenant_id=tenant_id, agent_run_id=agent_run_id, ) def update_agent_run_status( self, *, agent_run_id: str, payload: AgentRunStatusUpdateRequest, ) -> AgentRun | None: entity = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if entity is None: return None return self.agent_run_repository.update_status( agent_run_id=agent_run_id, status=payload.status, worker_key=payload.worker_key, output_text=payload.output_text, output_json=payload.output_json, error_code=payload.error_code, error_message=payload.error_message, ) def execute_agent_run( self, *, agent_run_id: str, payload: AgentRunExecuteRequest, ) -> AgentRun | None: agent_run = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if agent_run is None: return None agent_version = self.agent_version_repository.get_by_id( tenant_id=payload.tenant_id, agent_version_id=agent_run.agent_version_id, ) if agent_version is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="agent_version_missing", error_message=f"agent version not found: {agent_run.agent_version_id}", ) self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="running", worker_key=payload.worker_key, ) memory_results, memory_metadata = self._read_relevant_memories( agent_run=agent_run, agent_version=agent_version, ) selected_tools = self._select_tool_refs(agent_run=agent_run, agent_version=agent_version) selected_skills = self._select_skill_refs(agent_run=agent_run, agent_version=agent_version) if payload.dry_run: messages = self._build_chat_messages( agent_run=agent_run, agent_version=agent_version, memory_results=memory_results, capability_context=self._format_capability_plan( selected_tools=selected_tools, selected_skills=selected_skills, ), ) completed_run = self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=self._build_dry_run_output( agent_run=agent_run, agent_version=agent_version, ), output_json={ "dry_run": True, "agent_version_id": agent_version.id, "message_count": len(messages), "messages": [message.model_dump(mode="json") for message in messages], "selected_tool_refs": [ tool_ref.model_dump(mode="json") for tool_ref in selected_tools ], "selected_skill_refs": [ skill_ref.model_dump(mode="json") for skill_ref in selected_skills ], **memory_metadata, }, ) if completed_run is not None: self._publish_event( event_type="agent.run.completed", agent_run=completed_run, payload_json={ "agent_run_id": completed_run.id, "dry_run": True, "status": completed_run.status, }, ) return completed_run if self._read_bool(agent_version.model_config_json, "react_enabled", default=False): return self._execute_react_agent_run( agent_run=agent_run, agent_version=agent_version, payload=payload, memory_results=memory_results, memory_metadata=memory_metadata, selected_tools=selected_tools, selected_skills=selected_skills, ) tool_invocations = self._invoke_selected_tools( agent_run=agent_run, agent_version=agent_version, selected_tools=selected_tools, ) skill_invocations = self._invoke_selected_skills( agent_run=agent_run, selected_skills=selected_skills, worker_key=payload.worker_key, ) messages = self._build_chat_messages( agent_run=agent_run, agent_version=agent_version, memory_results=memory_results, capability_context=self._format_capability_results( tool_invocations=tool_invocations, skill_invocations=skill_invocations, ), ) if self.model_gateway_client is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_missing", error_message="model gateway client is not configured", output_json={ "tool_invocations": tool_invocations, "skill_invocations": skill_invocations, **memory_metadata, }, ) try: response = self.model_gateway_client.create_chat_completion( ChatCompletionRequestContract( model=self._read_optional_string(agent_version.model_config_json, "model"), temperature=self._read_optional_float( agent_version.model_config_json, "temperature", ), max_tokens=self._read_optional_int( agent_version.model_config_json, "max_tokens", ), messages=messages, metadata_json={ "tenant_id": agent_run.tenant_id, "agent_id": agent_run.agent_id, "agent_version_id": agent_version.id, "agent_run_id": agent_run.id, }, ) ) except ModelGatewayClientError as exc: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_error", error_message=str(exc), ) memory_write_metadata = self._write_interaction_memory( agent_run=agent_run, agent_version=agent_version, output_text=response.content, ) completed_run = self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=response.content, output_json={ "dry_run": False, "agent_version_id": agent_version.id, "model": response.model, "finish_reason": response.finish_reason, "usage_json": response.usage_json, "raw_response_json": response.raw_response_json, "tool_invocations": tool_invocations, "skill_invocations": skill_invocations, **memory_metadata, **memory_write_metadata, }, ) if completed_run is not None: self._publish_event( event_type="agent.run.completed", agent_run=completed_run, payload_json={ "agent_run_id": completed_run.id, "dry_run": False, "status": completed_run.status, }, ) return completed_run def _publish_event( self, *, event_type: str, agent_run: AgentRun, payload_json: dict[str, JSONValue], ) -> None: if self.event_client is None: return try: self.event_client.publish_event( EventPublishContract( tenant_id=agent_run.tenant_id, event_type=event_type, source_service="agent-service", aggregate_type="agent_run", aggregate_id=agent_run.id, correlation_id=agent_run.session_id, payload_json={ **payload_json, "agent_id": agent_run.agent_id, "agent_version_id": agent_run.agent_version_id, }, ) ) except EventServiceClientError: return def _execute_react_agent_run( self, *, agent_run: AgentRun, agent_version: AgentVersion, payload: AgentRunExecuteRequest, memory_results: list[MemorySearchResultContract], memory_metadata: dict[str, JSONValue], selected_tools: list[AgentToolRefContract], selected_skills: list[AgentSkillRefContract], ) -> AgentRun | None: if self.model_gateway_client is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_missing", error_message="model gateway client is not configured", ) skill_invocations = self._invoke_selected_skills( agent_run=agent_run, selected_skills=selected_skills, worker_key=payload.worker_key, ) messages = self._build_chat_messages( agent_run=agent_run, agent_version=agent_version, memory_results=memory_results, capability_context=self._format_react_instruction( agent_run=agent_run, selected_tools=selected_tools, skill_invocations=skill_invocations, ), ) react_steps: list[dict[str, JSONValue]] = [] tool_invocations: list[dict[str, JSONValue]] = [] final_answer: str | None = None tool_call_count = 0 max_steps = self._read_int( agent_version.model_config_json, "react_max_steps", default=self.react_max_steps, ) for step_index in range(max(max_steps, 1)): try: response = self.model_gateway_client.create_chat_completion( self._build_chat_completion_request( agent_run=agent_run, agent_version=agent_version, messages=messages, selected_tools=selected_tools, ) ) except ModelGatewayClientError as exc: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_error", error_message=str(exc), output_json={ "react_steps": react_steps, "tool_invocations": tool_invocations, "skill_invocations": skill_invocations, **memory_metadata, }, ) action = self._parse_react_action_from_response(response) react_step: dict[str, JSONValue] = { "step_index": step_index, "model_content": response.content, "action": action, } react_steps.append(react_step) if action.get("action") == "finish": answer_value = action.get("answer") final_answer = answer_value if isinstance(answer_value, str) else response.content break if action.get("action") != "tool": final_answer = response.content break max_tool_calls = self._read_int( agent_version.model_config_json, "react_max_tool_calls", default=self.react_max_tool_calls, ) if tool_call_count >= max(max_tool_calls, 0): final_answer = "Tool call budget exhausted." react_step["observation"] = final_answer break tool_code = action.get("tool_code") matching_tools = [ item for item in selected_tools if item.tool_code == tool_code ] if not matching_tools: observation = f"tool not available: {tool_code}" react_step["observation"] = observation messages.append(ChatMessageContract(role="assistant", content=response.content)) messages.append(ChatMessageContract(role="user", content=observation)) continue tool_input = action.get("input_json") original_input_json = agent_run.input_json if isinstance(tool_input, dict): agent_run.input_json = { str(item_key): item_value for item_key, item_value in tool_input.items() } current_invocations = self._invoke_react_tool_with_retry( agent_run=agent_run, agent_version=agent_version, tool_ref=matching_tools[0], ) tool_call_count += len(current_invocations) agent_run.input_json = original_input_json tool_invocations.extend(current_invocations) observation = self._format_react_observation(current_invocations) react_step["observation"] = observation messages.append(ChatMessageContract(role="assistant", content=response.content)) messages.append(ChatMessageContract(role="user", content=observation)) if final_answer is None: final_answer = "ReAct loop reached max steps without a final answer." memory_write_metadata = self._write_interaction_memory( agent_run=agent_run, agent_version=agent_version, output_text=final_answer, ) completed_run = self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=final_answer, output_json={ "dry_run": False, "agent_version_id": agent_version.id, "react_enabled": True, "react_steps": react_steps, "react_tool_call_count": tool_call_count, "tool_invocations": tool_invocations, "skill_invocations": skill_invocations, **memory_metadata, **memory_write_metadata, }, ) if completed_run is not None: self._publish_event( event_type="agent.run.completed", agent_run=completed_run, payload_json={ "agent_run_id": completed_run.id, "react_enabled": True, "status": completed_run.status, }, ) return completed_run def execute_next_claimed_agent_run( self, *, worker_key: str, lease_seconds: int, dry_run: bool, redis_client: object | None = None, ) -> tuple[AgentRun, int] | None: released_lease_count = self.agent_run_repository.release_expired_leases( now_time=datetime.utcnow(), ) claimed_agent_run = self.agent_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds), ) if claimed_agent_run is None: return None if redis_client is not None: from core_shared.redis_primitives import DistributedLock, IdempotencyStore lock = DistributedLock( client=redis_client, name=f"agent-run:{claimed_agent_run.id}:lock", ttl_seconds=lease_seconds, ) if not lock.acquire(): return None idempotency_store = IdempotencyStore( client=redis_client, prefix="agent-run-idempotency", ) if not idempotency_store.begin(key=claimed_agent_run.id): lock.release() return None else: lock = None idempotency_store = None try: result = self.execute_agent_run( agent_run_id=claimed_agent_run.id, payload=AgentRunExecuteRequest( tenant_id=claimed_agent_run.tenant_id, worker_key=worker_key, dry_run=dry_run, ), ) if idempotency_store is not None and result is not None: idempotency_store.complete( key=claimed_agent_run.id, result={"status": result.status, "agent_run_id": result.id}, ) finally: if lock is not None: lock.release() if result is None: return None return result, released_lease_count def _resolve_agent_version( self, *, tenant_id: str, agent_id: str, agent_version_id: str | None, ) -> AgentVersion | None: if agent_version_id is not None: return self.agent_version_repository.get_by_id( tenant_id=tenant_id, agent_version_id=agent_version_id, ) return self.agent_version_repository.get_latest_published( tenant_id=tenant_id, agent_id=agent_id, ) def _build_chat_messages( self, *, agent_run: AgentRun, agent_version: AgentVersion, memory_results: list[MemorySearchResultContract] | None = None, capability_context: str | None = None, ) -> list[ChatMessageContract]: messages = [ ChatMessageContract(role="system", content=agent_version.system_prompt), ] if agent_version.goal: messages.append( ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}") ) if memory_results: messages.append( ChatMessageContract( role="system", content=self._format_memory_context(memory_results), ) ) if capability_context: messages.append(ChatMessageContract(role="system", content=capability_context)) if agent_run.input_text: messages.append(ChatMessageContract(role="user", content=agent_run.input_text)) if agent_run.input_json: messages.append( ChatMessageContract( role="user", content=f"Structured input: {agent_run.input_json}", ) ) return messages def _select_tool_refs( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> list[AgentToolRefContract]: input_preview = self._build_input_preview(agent_run) selected: list[AgentToolRefContract] = [] for item in agent_version.tool_refs_json: ref = AgentToolRefContract.model_validate(item) if ( ref.required or self._read_bool(ref.config_json, "auto_invoke", default=False) or self._matches_selection_keywords(ref.config_json, input_preview) ): selected.append(ref) return selected def _select_skill_refs( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> list[AgentSkillRefContract]: input_preview = self._build_input_preview(agent_run) selected: list[AgentSkillRefContract] = [] for item in agent_version.skill_refs_json: ref = AgentSkillRefContract.model_validate(item) auto_invoke = self._read_bool(ref.config_json, "auto_invoke", default=True) if auto_invoke or self._matches_selection_keywords(ref.config_json, input_preview): selected.append(ref) return selected def _invoke_selected_tools( self, *, agent_run: AgentRun, agent_version: AgentVersion, selected_tools: list[AgentToolRefContract], ) -> list[dict[str, JSONValue]]: invocations: list[dict[str, JSONValue]] = [] for ref in selected_tools: invocation = self.agent_tool_invocation_repository.create( tenant_id=agent_run.tenant_id, agent_run_id=agent_run.id, agent_id=agent_run.agent_id, agent_version_id=agent_version.id, tool_code=ref.tool_code, tool_binding_id=ref.tool_binding_id, status="selected", input_json=agent_run.input_json or {}, ) if ref.tool_binding_id is None: self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="skipped", reason="tool_binding_id_missing", ) invocations.append( { "status": "skipped", "reason": "tool_binding_id_missing", "tool_code": ref.tool_code, } ) continue if self.tool_client is None: self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="failed", reason="tool_client_missing", error_message="tool client is not configured", ) invocations.append( { "status": "failed", "reason": "tool_client_missing", "tool_binding_id": ref.tool_binding_id, } ) continue try: self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="running", ) detail = self.tool_client.get_tool_binding_detail( tenant_id=agent_run.tenant_id, binding_id=ref.tool_binding_id, ) if not detail.binding.enabled: self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="failed", reason="tool_binding_disabled", error_message="tool binding is disabled", ) invocations.append( { "status": "failed", "reason": "tool_binding_disabled", "tool_binding_id": ref.tool_binding_id, } ) continue if detail.tool_definition.tool_type != "http": self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="skipped", reason="unsupported_tool_type", error_message=detail.tool_definition.tool_type, ) invocations.append( { "status": "skipped", "reason": "unsupported_tool_type", "tool_type": detail.tool_definition.tool_type, "tool_binding_id": ref.tool_binding_id, } ) continue output_text, output_json = self.tool_client.invoke_http_tool( detail=detail, input_json=agent_run.input_json or {}, config_json=ref.config_json, ) except ToolServiceClientError as exc: self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="failed", reason="tool_service_error", error_message=str(exc), ) invocations.append( { "status": "failed", "reason": str(exc), "tool_binding_id": ref.tool_binding_id, } ) continue self.agent_tool_invocation_repository.update_status( invocation_id=invocation.id, status="completed", output_text=output_text, output_json=output_json, ) invocations.append( { "status": "completed", "tool_binding_id": ref.tool_binding_id, "tool_code": detail.tool_definition.code, "output_text": output_text, "output_json": output_json, } ) return invocations def _invoke_selected_skills( self, *, agent_run: AgentRun, selected_skills: list[AgentSkillRefContract], worker_key: str | None, ) -> list[dict[str, JSONValue]]: invocations: list[dict[str, JSONValue]] = [] for ref in selected_skills: if self.skill_client is None: invocations.append( { "status": "failed", "reason": "skill_client_missing", "skill_id": ref.skill_id, "skill_code": ref.skill_code, } ) continue skill_id = ref.skill_id or self._resolve_skill_id_by_code( tenant_id=agent_run.tenant_id, skill_code=ref.skill_code, ) if skill_id is None: invocations.append( { "status": "failed", "reason": "skill_id_missing", "skill_code": ref.skill_code, } ) continue try: created_run = self.skill_client.create_skill_run( tenant_id=agent_run.tenant_id, skill_id=skill_id, skill_version_id=self._read_optional_string( ref.config_json, "skill_version_id", ), installation_id=self._read_optional_string( ref.config_json, "installation_id", ), input_json=self._build_skill_input_json(agent_run=agent_run, ref=ref), ) executed_run = self.skill_client.execute_skill_run( tenant_id=agent_run.tenant_id, skill_run_id=created_run.id, worker_key=worker_key, ) except SkillServiceClientError as exc: invocations.append( { "status": "failed", "reason": str(exc), "skill_id": skill_id, "skill_code": ref.skill_code, } ) continue invocations.append( { "status": executed_run.status, "skill_id": skill_id, "skill_code": ref.skill_code, "skill_run_id": executed_run.id, "output_text": executed_run.output_text, "output_json": executed_run.output_json, "error_code": executed_run.error_code, "error_message": executed_run.error_message, } ) return invocations def _format_capability_plan( self, *, selected_tools: list[AgentToolRefContract], selected_skills: list[AgentSkillRefContract], ) -> str: return ( "Selected capability plan before model call:\n" f"Tools: {[item.model_dump(mode='json') for item in selected_tools]}\n" f"Skills: {[item.model_dump(mode='json') for item in selected_skills]}" ) def _format_capability_results( self, *, tool_invocations: list[dict[str, JSONValue]], skill_invocations: list[dict[str, JSONValue]], ) -> str | None: if not tool_invocations and not skill_invocations: return None return ( "Capability invocation results before model call:\n" f"Tools: {tool_invocations}\n" f"Skills: {skill_invocations}" ) def _format_react_instruction( self, *, agent_run: AgentRun, selected_tools: list[AgentToolRefContract], skill_invocations: list[dict[str, JSONValue]], ) -> str: tool_schemas = self._build_react_tool_schemas( agent_run=agent_run, selected_tools=selected_tools, ) return ( "Use ReAct JSON only. Respond with one JSON object per turn.\n" "To call a tool: " '{"action":"tool","tool_code":"code","input_json":{...}}\n' "To finish: " '{"action":"finish","answer":"final answer"}\n' f"Available tools: {tool_schemas}\n" f"Pre-run skill results: {skill_invocations}" ) def _build_react_tool_schemas( self, *, agent_run: AgentRun, selected_tools: list[AgentToolRefContract], ) -> list[dict[str, JSONValue]]: schemas: list[dict[str, JSONValue]] = [] for ref in selected_tools: schema: dict[str, JSONValue] = { "tool_code": ref.tool_code, "tool_binding_id": ref.tool_binding_id, "required": ref.required, "config_json": ref.config_json, } if ref.tool_binding_id is not None and self.tool_client is not None: try: detail = self.tool_client.get_tool_binding_detail( tenant_id=agent_run.tenant_id, binding_id=ref.tool_binding_id, ) schema.update( { "name": detail.tool_definition.name, "description": detail.tool_definition.description, "tool_type": detail.tool_definition.tool_type, "input_schema_json": detail.tool_version.input_schema_json or {}, "output_schema_json": detail.tool_version.output_schema_json or {}, "timeout_ms": detail.tool_version.timeout_ms, } ) except ToolServiceClientError as exc: schema["schema_error"] = str(exc) schemas.append(schema) return schemas def _invoke_react_tool_with_retry( self, *, agent_run: AgentRun, agent_version: AgentVersion, tool_ref: AgentToolRefContract, ) -> list[dict[str, JSONValue]]: retry_count = self._read_int( agent_version.model_config_json, "react_tool_retry_count", default=self.react_tool_retry_count, ) attempts: list[dict[str, JSONValue]] = [] for attempt_index in range(max(retry_count, 0) + 1): current = self._invoke_selected_tools( agent_run=agent_run, agent_version=agent_version, selected_tools=[tool_ref], ) for item in current: item["attempt_index"] = attempt_index attempts.extend(current) if current and current[-1].get("status") == "completed": break return attempts def _format_react_observation( self, tool_invocations: list[dict[str, JSONValue]], ) -> str: return f"Observation: {tool_invocations}" def _parse_react_action(self, content: str) -> dict[str, JSONValue]: try: value = json.loads(content) except json.JSONDecodeError: start_index = content.find("{") end_index = content.rfind("}") if start_index < 0 or end_index <= start_index: return {"action": "finish", "answer": content} try: value = json.loads(content[start_index : end_index + 1]) except json.JSONDecodeError: return {"action": "finish", "answer": content} if not isinstance(value, dict): return {"action": "finish", "answer": content} return {str(item_key): item_value for item_key, item_value in value.items()} def _parse_react_action_from_response( self, response: ChatCompletionResponseContract, ) -> dict[str, JSONValue]: if response.tool_calls_json: action = self._parse_openai_tool_call(response.tool_calls_json[0]) if action is not None: return action return self._parse_react_action(response.content) def _parse_openai_tool_call( self, tool_call: dict[str, JSONValue], ) -> dict[str, JSONValue] | None: function_value = tool_call.get("function") if not isinstance(function_value, dict): return None tool_code = function_value.get("name") if not isinstance(tool_code, str) or not tool_code: return None raw_arguments = function_value.get("arguments") input_json: dict[str, JSONValue] = {} if isinstance(raw_arguments, str) and raw_arguments: try: decoded = json.loads(raw_arguments) except json.JSONDecodeError: decoded = {"raw_arguments": raw_arguments} if isinstance(decoded, dict): input_json = {str(item_key): item_value for item_key, item_value in decoded.items()} elif isinstance(raw_arguments, dict): input_json = {str(item_key): item_value for item_key, item_value in raw_arguments.items()} tool_call_id = tool_call.get("id") return { "action": "tool", "tool_code": tool_code, "input_json": input_json, "tool_call_id": tool_call_id if isinstance(tool_call_id, str) else None, "tool_call_protocol": "openai", } def _build_chat_completion_request( self, *, agent_run: AgentRun, agent_version: AgentVersion, messages: list[ChatMessageContract], selected_tools: list[AgentToolRefContract] | None = None, ) -> ChatCompletionRequestContract: function_calling_enabled = self._read_bool( agent_version.model_config_json, "function_calling_enabled", default=False, ) or self._read_bool( agent_version.model_config_json, "tool_calling_enabled", default=False, ) return ChatCompletionRequestContract( model=self._read_optional_string(agent_version.model_config_json, "model"), temperature=self._read_optional_float( agent_version.model_config_json, "temperature", ), max_tokens=self._read_optional_int( agent_version.model_config_json, "max_tokens", ), messages=messages, tools_json=( self._build_openai_tool_schemas( agent_run=agent_run, selected_tools=selected_tools or [], ) if function_calling_enabled else [] ), tool_choice="auto" if function_calling_enabled and selected_tools else None, metadata_json={ "tenant_id": agent_run.tenant_id, "agent_id": agent_run.agent_id, "agent_version_id": agent_version.id, "agent_run_id": agent_run.id, }, ) def _build_openai_tool_schemas( self, *, agent_run: AgentRun, selected_tools: list[AgentToolRefContract], ) -> list[dict[str, JSONValue]]: tool_schemas: list[dict[str, JSONValue]] = [] for schema in self._build_react_tool_schemas( agent_run=agent_run, selected_tools=selected_tools, ): tool_code = schema.get("tool_code") if not isinstance(tool_code, str) or not tool_code: continue description = schema.get("description") input_schema = schema.get("input_schema_json") tool_schemas.append( { "type": "function", "function": { "name": tool_code, "description": description if isinstance(description, str) else "", "parameters": input_schema if isinstance(input_schema, dict) else {}, }, } ) return tool_schemas def _build_skill_input_json( self, *, agent_run: AgentRun, ref: AgentSkillRefContract, ) -> dict[str, JSONValue]: input_json: dict[str, JSONValue] = dict(agent_run.input_json or {}) if agent_run.input_text: input_json.setdefault("input_text", agent_run.input_text) configured_input = ref.config_json.get("input_json") if isinstance(configured_input, dict): input_json.update( {str(item_key): item_value for item_key, item_value in configured_input.items()} ) return input_json def _resolve_skill_id_by_code(self, *, tenant_id: str, skill_code: str | None) -> str | None: if skill_code is None or self.skill_client is None: return None try: skills = self.skill_client.list_skills(tenant_id=tenant_id) except SkillServiceClientError: return None for skill in skills: if skill.code == skill_code: return skill.id return None def _build_input_preview(self, agent_run: AgentRun) -> str: return f"{agent_run.input_text or ''} {agent_run.input_json or {}}".lower() def _matches_selection_keywords( self, config_json: dict[str, JSONValue], input_preview: str, ) -> bool: keywords = config_json.get("selection_keywords") if not isinstance(keywords, list): return False return any( isinstance(keyword, str) and keyword.lower() in input_preview for keyword in keywords ) def _build_dry_run_output(self, *, agent_run: AgentRun, agent_version: AgentVersion) -> str: input_preview = agent_run.input_text or str(agent_run.input_json or {}) return ( f"[dry-run] Agent role={agent_version.role} " f"version={agent_version.version_no} received: {input_preview}" ) def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) if isinstance(value, str) and value: return value return None def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None: value = payload.get(key) if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) return None def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return None def _read_relevant_memories( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> tuple[list[MemorySearchResultContract], dict[str, JSONValue]]: if self.memory_client is None: return [], {"memory_read_enabled": False, "memory_read_reason": "client_missing"} if not self._read_bool(agent_version.memory_policy_json, "enabled", default=True): return [], {"memory_read_enabled": False, "memory_read_reason": "policy_disabled"} query = agent_run.input_text or str(agent_run.input_json or "") if not query: return [], {"memory_read_enabled": True, "memory_read_count": 0} scope = self._resolve_memory_scope(agent_run=agent_run, agent_version=agent_version) if scope is None: return [], { "memory_read_enabled": True, "memory_read_count": 0, "memory_read_reason": "scope_unavailable", } scope_type, scope_id = scope try: results = self.memory_client.search_memories( MemorySearchRequestContract( tenant_id=agent_run.tenant_id, query=query, scope_type=scope_type, scope_id=scope_id, owner_agent_id=agent_run.agent_id, session_id=agent_run.session_id, limit=self._read_int( agent_version.memory_policy_json, "read_top_k", default=8, ), ) ) except MemoryClientError as exc: return [], { "memory_read_enabled": True, "memory_read_count": 0, "memory_read_error": str(exc), } return results, { "memory_read_enabled": True, "memory_read_count": len(results), "memory_scope_type": scope_type, "memory_scope_id": scope_id, } def _write_interaction_memory( self, *, agent_run: AgentRun, agent_version: AgentVersion, output_text: str, ) -> dict[str, JSONValue]: if self.memory_client is None: return {"memory_write_enabled": False, "memory_write_reason": "client_missing"} if not self._read_bool(agent_version.memory_policy_json, "write_enabled", default=True): return {"memory_write_enabled": False, "memory_write_reason": "policy_disabled"} scope = self._resolve_memory_scope(agent_run=agent_run, agent_version=agent_version) if scope is None: return {"memory_write_enabled": True, "memory_write_reason": "scope_unavailable"} scope_type, scope_id = scope try: memory = self.memory_client.create_memory( MemoryCreateContract( tenant_id=agent_run.tenant_id, scope_type=scope_type, scope_id=scope_id, memory_type="conversation", content_text=self._format_interaction_memory( agent_run=agent_run, output_text=output_text, ), content_json={ "agent_run_id": agent_run.id, "agent_version_id": agent_version.id, "input_text": agent_run.input_text, "output_text": output_text, }, metadata_json={ "source": "agent-service", "role": agent_version.role, "version_no": agent_version.version_no, }, owner_agent_id=agent_run.agent_id, session_id=agent_run.session_id, source_ref=f"agent_run:{agent_run.id}", importance_score=self._read_nested_int( agent_version.memory_policy_json, "config_json", "write_importance_score", default=50, ), ) ) except MemoryClientError as exc: return { "memory_write_enabled": True, "memory_write_error": str(exc), } return { "memory_write_enabled": True, "memory_written_id": memory.id, "memory_scope_type": scope_type, "memory_scope_id": scope_id, } def _resolve_memory_scope( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> tuple[MemoryScopeType, str] | None: scope_value = self._read_optional_string( agent_version.memory_policy_json, "memory_scope", ) or "session" if scope_value == "tenant": return "tenant", agent_run.tenant_id if scope_value == "agent": return "agent", agent_run.agent_id if scope_value == "session" and agent_run.session_id: return "session", agent_run.session_id if scope_value == "user": user_id = self._read_input_json_string(agent_run=agent_run, key="user_id") if user_id is not None: return "user", user_id if scope_value == "team": team_id = self._read_input_json_string(agent_run=agent_run, key="team_id") if team_id is not None: return "team", team_id return None def _format_memory_context(self, memory_results: list[MemorySearchResultContract]) -> str: lines = ["Relevant memories:"] for index, result in enumerate(memory_results, start=1): lines.append(f"{index}. {result.item.content_text}") return "\n".join(lines) def _format_interaction_memory(self, *, agent_run: AgentRun, output_text: str) -> str: input_text = agent_run.input_text or str(agent_run.input_json or {}) return f"User input: {input_text}\nAgent output: {output_text}" def _read_bool(self, payload: dict[str, JSONValue], key: str, *, default: bool) -> bool: value = payload.get(key) if isinstance(value, bool): return value return default def _read_int(self, payload: dict[str, JSONValue], key: str, *, default: int) -> int: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return default def _read_nested_int( self, payload: dict[str, JSONValue], parent_key: str, child_key: str, *, default: int, ) -> int: parent_value = payload.get(parent_key) if not isinstance(parent_value, dict): return default return self._read_int( cast(dict[str, JSONValue], parent_value), child_key, default=default, ) def _read_input_json_string(self, *, agent_run: AgentRun, key: str) -> str | None: if agent_run.input_json is None: return None value = agent_run.input_json.get(key) if isinstance(value, str) and value: return value return None def build_agent_application_service( *, db: Session, settings: AgentServiceSettings, ) -> AgentApplicationService: redis_client = try_build_redis_client(settings.redis_url) return AgentApplicationService( agent_repository=AgentDefinitionRepository(db), agent_version_repository=AgentVersionRepository(db), agent_run_repository=AgentRunRepository(db), agent_tool_invocation_repository=AgentToolInvocationRepository(db), model_gateway_client=ModelGatewayClient( base_url=settings.model_gateway_service_url, timeout_seconds=settings.model_gateway_timeout_seconds, ), memory_client=MemoryClient( base_url=settings.memory_service_url, timeout_seconds=settings.memory_service_timeout_seconds, ), tool_client=ToolServiceClient( base_url=settings.tool_service_url, timeout_seconds=settings.tool_service_timeout_seconds, ), skill_client=SkillServiceClient( base_url=settings.skill_service_url, timeout_seconds=settings.skill_service_timeout_seconds, ), event_client=EventServiceClient( base_url=settings.event_service_url, timeout_seconds=settings.event_service_timeout_seconds, ), task_queue_publisher=( TaskQueuePublisher(client=redis_client) if redis_client is not None else None ), react_max_steps=settings.react_max_steps, )