from app.db.models import AgentDefinition, AgentRun, AgentVersion from app.domain.repositories import ( AgentDefinitionRepository, AgentRunRepository, AgentVersionRepository, ) from app.schemas.agent import ( AgentCreateRequest, AgentRunCreateRequest, AgentRunStatusUpdateRequest, AgentStatusUpdateRequest, AgentVersionCreateRequest, ) class AgentApplicationService: def __init__( self, *, agent_repository: AgentDefinitionRepository, agent_version_repository: AgentVersionRepository, agent_run_repository: AgentRunRepository, ) -> None: self.agent_repository = agent_repository self.agent_version_repository = agent_version_repository self.agent_run_repository = agent_run_repository def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition: return self.agent_repository.create( tenant_id=payload.tenant_id, code=payload.code, name=payload.name, description=payload.description, agent_type=payload.agent_type, owner_user_id=payload.owner_user_id, metadata_json=payload.metadata_json, ) def list_agents(self, *, tenant_id: str) -> list[AgentDefinition]: return self.agent_repository.list_by_tenant(tenant_id=tenant_id) def update_agent_status( self, *, agent_id: str, payload: AgentStatusUpdateRequest, ) -> AgentDefinition | None: return self.agent_repository.update_status( tenant_id=payload.tenant_id, agent_id=agent_id, status=payload.status, ) def create_agent_version(self, payload: AgentVersionCreateRequest) -> AgentVersion: agent = self.agent_repository.get_by_id( tenant_id=payload.tenant_id, agent_id=payload.agent_id, ) if agent is None: raise ValueError(f"agent not found: {payload.agent_id}") return self.agent_version_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, status=payload.status, role=payload.role, goal=payload.goal, system_prompt=payload.system_prompt, model_config_json=payload.model_config_data.model_dump(mode="json"), memory_policy_json=payload.memory_policy.model_dump(mode="json"), tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs], skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs], ) def list_agent_versions(self, *, tenant_id: str, agent_id: str) -> list[AgentVersion]: return self.agent_version_repository.list_by_agent(tenant_id=tenant_id, agent_id=agent_id) def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun: agent_version = self._resolve_agent_version( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=payload.agent_version_id, ) if agent_version is None: raise ValueError("published agent version not found") return self.agent_run_repository.create( tenant_id=payload.tenant_id, agent_id=payload.agent_id, agent_version_id=agent_version.id, session_id=payload.session_id, input_text=payload.input_text, input_json=payload.input_json, ) def list_agent_runs( self, *, tenant_id: str, agent_id: str | None = None, session_id: str | None = None, ) -> list[AgentRun]: return self.agent_run_repository.list_by_scope( tenant_id=tenant_id, agent_id=agent_id, session_id=session_id, ) def update_agent_run_status( self, *, agent_run_id: str, payload: AgentRunStatusUpdateRequest, ) -> AgentRun | None: entity = self.agent_run_repository.get_by_id( tenant_id=payload.tenant_id, agent_run_id=agent_run_id, ) if entity is None: return None return self.agent_run_repository.update_status( agent_run_id=agent_run_id, status=payload.status, worker_key=payload.worker_key, output_text=payload.output_text, output_json=payload.output_json, error_code=payload.error_code, error_message=payload.error_message, ) def _resolve_agent_version( self, *, tenant_id: str, agent_id: str, agent_version_id: str | None, ) -> AgentVersion | None: if agent_version_id is not None: return self.agent_version_repository.get_by_id( tenant_id=tenant_id, agent_version_id=agent_version_id, ) return self.agent_version_repository.get_latest_published( tenant_id=tenant_id, agent_id=agent_id, )