from datetime import datetime, timedelta from sqlalchemy.orm import Session from core_domain import ChatCompletionRequestContract, ChatMessageContract from core_shared import JSONValue from app.bootstrap.settings import AgentServiceSettings from app.db.models import AgentDefinition, AgentRun, AgentVersion from app.domain.repositories import ( AgentDefinitionRepository, AgentRunRepository, AgentVersionRepository, ) from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError from app.schemas.agent import ( AgentCreateRequest, AgentRunCreateRequest, AgentRunExecuteRequest, AgentRunStatusUpdateRequest, AgentStatusUpdateRequest, AgentVersionCreateRequest, ) class AgentApplicationService: def __init__( self, *, agent_repository: AgentDefinitionRepository, agent_version_repository: AgentVersionRepository, agent_run_repository: AgentRunRepository, model_gateway_client: ModelGatewayClient | None = None, ) -> None: self.agent_repository = agent_repository self.agent_version_repository = agent_version_repository self.agent_run_repository = agent_run_repository self.model_gateway_client = model_gateway_client def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition: return self.agent_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, agent_type=payload.agent_type, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json, ) def list_agents(self, *, tenant_id: str) -> list[AgentDefinition]: return self.agent_repository.list_by_tenant(tenant_id=tenant_id) def update_agent_status( self, *, agent_id: str, payload: AgentStatusUpdateRequest, ) -> AgentDefinition | None: return self.agent_repository.update_status( tenant_id=payload.tenant_id, agent_id=agent_id, status=payload.status, ) def create_agent_version(self, payload: AgentVersionCreateRequest) -> AgentVersion: agent = self.agent_repository.get_by_id( tenant_id=payload.tenant_id, agent_id=payload.agent_id, ) if agent is None: raise ValueError(f"agent not found: {payload.agent_id}") return self.agent_version_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, status=payload.status, role=payload.role, goal=payload.goal, system_prompt=payload.system_prompt, model_config_json=payload.model_config_data.model_dump(mode="json"), memory_policy_json=payload.memory_policy.model_dump(mode="json"), tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs], skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs], ) def list_agent_versions(self, *, tenant_id: str, agent_id: str) -> list[AgentVersion]: return self.agent_version_repository.list_by_agent(tenant_id=tenant_id, agent_id=agent_id) def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun: agent_version = self._resolve_agent_version( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=payload.agent_version_id, ) if agent_version is None: raise ValueError("published agent version not found") return self.agent_run_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=agent_version.id, session_id=payload.session_id, input_text=payload.input_text, input_json=payload.input_json, ) def list_agent_runs( self, *, tenant_id: str, agent_id: str | None = None, session_id: str | None = None, ) -> list[AgentRun]: return self.agent_run_repository.list_by_scope( tenant_id=tenant_id, agent_id=agent_id, session_id=session_id, ) def update_agent_run_status( self, *, agent_run_id: str, payload: AgentRunStatusUpdateRequest, ) -> AgentRun | None: entity = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if entity is None: return None return self.agent_run_repository.update_status( agent_run_id=agent_run_id, status=payload.status, worker_key=payload.worker_key, output_text=payload.output_text, output_json=payload.output_json, error_code=payload.error_code, error_message=payload.error_message, ) def execute_agent_run( self, *, agent_run_id: str, payload: AgentRunExecuteRequest, ) -> AgentRun | None: agent_run = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if agent_run is None: return None agent_version = self.agent_version_repository.get_by_id( tenant_id=payload.tenant_id, agent_version_id=agent_run.agent_version_id, ) if agent_version is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="agent_version_missing", error_message=f"agent version not found: {agent_run.agent_version_id}", ) self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="running", worker_key=payload.worker_key, ) messages = self._build_chat_messages(agent_run=agent_run, agent_version=agent_version) if payload.dry_run: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=self._build_dry_run_output( agent_run=agent_run, agent_version=agent_version, ), output_json={ "dry_run": True, "agent_version_id": agent_version.id, "message_count": len(messages), "messages": [message.model_dump(mode="json") for message in messages], }, ) if self.model_gateway_client is None: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_missing", error_message="model gateway client is not configured", ) try: response = self.model_gateway_client.create_chat_completion( ChatCompletionRequestContract( model=self._read_optional_string(agent_version.model_config_json, "model"), temperature=self._read_optional_float( agent_version.model_config_json, "temperature", ), max_tokens=self._read_optional_int(agent_version.model_config_json, "max_tokens"), messages=messages, metadata_json={ "tenant_id": agent_run.tenant_id, "agent_id": agent_run.agent_id, "agent_version_id": agent_version.id, "agent_run_id": agent_run.id, }, ) ) except ModelGatewayClientError as exc: return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="failed", worker_key=payload.worker_key, error_code="model_gateway_error", error_message=str(exc), ) return self.agent_run_repository.update_status( agent_run_id=agent_run.id, status="completed", worker_key=payload.worker_key, output_text=response.content, output_json={ "dry_run": False, "agent_version_id": agent_version.id, "model": response.model, "finish_reason": response.finish_reason, "usage_json": response.usage_json, "raw_response_json": response.raw_response_json, }, ) def execute_next_claimed_agent_run( self, *, worker_key: str, lease_seconds: int, dry_run: bool, ) -> tuple[AgentRun, int] | None: released_lease_count = self.agent_run_repository.release_expired_leases( now_time=datetime.utcnow(), ) claimed_agent_run = self.agent_run_repository.claim_next_queued( worker_key=worker_key, lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds), ) if claimed_agent_run is None: return None result = self.execute_agent_run( agent_run_id=claimed_agent_run.id, payload=AgentRunExecuteRequest( tenant_id=claimed_agent_run.tenant_id, worker_key=worker_key, dry_run=dry_run, ), ) if result is None: return None return result, released_lease_count def _resolve_agent_version( self, *, tenant_id: str, agent_id: str, agent_version_id: str | None, ) -> AgentVersion | None: if agent_version_id is not None: return self.agent_version_repository.get_by_id( tenant_id=tenant_id, agent_version_id=agent_version_id, ) return self.agent_version_repository.get_latest_published( tenant_id=tenant_id, agent_id=agent_id, ) def _build_chat_messages( self, *, agent_run: AgentRun, agent_version: AgentVersion, ) -> list[ChatMessageContract]: messages = [ ChatMessageContract(role="system", content=agent_version.system_prompt), ] if agent_version.goal: messages.append(ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}")) if agent_run.input_text: messages.append(ChatMessageContract(role="user", content=agent_run.input_text)) if agent_run.input_json: messages.append( ChatMessageContract( role="user", content=f"Structured input: {agent_run.input_json}", ) ) return messages def _build_dry_run_output(self, *, agent_run: AgentRun, agent_version: AgentVersion) -> str: input_preview = agent_run.input_text or str(agent_run.input_json or {}) return ( f"[dry-run] Agent role={agent_version.role} " f"version={agent_version.version_no} received: {input_preview}" ) def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None: value = payload.get(key) if isinstance(value, str) and value: return value return None def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None: value = payload.get(key) if isinstance(value, (int, float)) and not isinstance(value, bool): return float(value) return None def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None: value = payload.get(key) if isinstance(value, int) and not isinstance(value, bool): return value return None def build_agent_application_service( *, db: Session, settings: AgentServiceSettings, ) -> AgentApplicationService: return AgentApplicationService( agent_repository=AgentDefinitionRepository(db), agent_version_repository=AgentVersionRepository(db), agent_run_repository=AgentRunRepository(db), model_gateway_client=ModelGatewayClient( base_url=settings.model_gateway_service_url, timeout_seconds=settings.model_gateway_timeout_seconds, ), )