| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491 |
- import json
- from collections.abc import Iterator
- from datetime import datetime, timedelta
- from typing import cast
- from sqlalchemy.orm import Session
- from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
- from uuid import uuid4
- from core_domain import (
- AgentSkillRefContract,
- AgentToolRefContract,
- ChatCompletionRequestContract,
- ChatCompletionResponseContract,
- ChatMessageContract,
- MemoryCreateContract,
- MemoryScopeType,
- MemorySearchRequestContract,
- MemorySearchResultContract)
- from core_shared import JSONValue, try_build_redis_client
- from core_shared.task_queue import TaskQueuePublisher
- from app.bootstrap.settings import AgentServiceSettings
- from app.db.models import AgentDefinition, AgentRun, AgentToolInvocation, AgentConfig
- from app.domain.repositories import (
- AgentDefinitionRepository,
- AgentRunRepository,
- AgentToolInvocationRepository,
- AgentConfigRepository)
- from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError
- from app.infrastructure.memory_client import MemoryClient, MemoryClientError
- from app.infrastructure.skill_client import SkillServiceClient, SkillServiceClientError
- from app.infrastructure.tool_client import ToolServiceClient, ToolServiceClientError
- from app.schemas.agent import (
- AgentCreateRequest,
- AgentConfigCreateRequest,
- AgentConfigListRequest,
- AgentRunCreateRequest,
- AgentRunDetailRequest,
- AgentRunExecuteRequest,
- AgentRunStatusUpdateRequest,
- AgentStatusUpdateRequest,
- AgentUpdateRequest)
- def generate_agent_code() -> str:
- return f"agent_{uuid4().hex[:16]}"
- class AgentApplicationService:
- def __init__(
- self,
- *,
- agent_repository: AgentDefinitionRepository,
- agent_config_repository: AgentConfigRepository,
- agent_run_repository: AgentRunRepository,
- agent_tool_invocation_repository: AgentToolInvocationRepository,
- model_gateway_client: ModelGatewayClient | None = None,
- memory_client: MemoryClient | None = None,
- tool_client: ToolServiceClient | None = None,
- skill_client: SkillServiceClient | None = None,
- event_client: EventServiceClient | None = None,
- task_queue_publisher: TaskQueuePublisher | None = None,
- react_max_steps: int = 5,
- react_max_tool_calls: int = 10,
- react_tool_retry_count: int = 1) -> None:
- self.agent_repository = agent_repository
- self.agent_config_repository = agent_config_repository
- self.agent_run_repository = agent_run_repository
- self.agent_tool_invocation_repository = agent_tool_invocation_repository
- self.model_gateway_client = model_gateway_client
- self.memory_client = memory_client
- self.tool_client = tool_client
- self.skill_client = skill_client
- self.event_client = event_client
- self.task_queue_publisher = task_queue_publisher
- self.react_max_steps = react_max_steps
- self.react_max_tool_calls = react_max_tool_calls
- self.react_tool_retry_count = react_tool_retry_count
- def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition:
- return self.agent_repository.create(
- code=payload.code or generate_agent_code(),
- name=payload.name,
- description=payload.description,
- agent_type=payload.agent_type,
- owner_user_id=payload.owner_user_id,
- metadata_json=payload.metadata_json)
- def list_agents(self) -> list[AgentDefinition]:
- return self.agent_repository.list_all()
- def get_agent(self, *, agent_id: str) -> AgentDefinition | None:
- return self.agent_repository.get_by_id(agent_id=agent_id)
- def update_agent(self, payload: AgentUpdateRequest) -> AgentDefinition | None:
- return self.agent_repository.update(
- agent_id=payload.agent_id,
- name=payload.name,
- description=payload.description,
- metadata_json=payload.metadata_json)
- def delete_agent(self, *, agent_id: str) -> bool:
- agent = self.agent_repository.get_by_id(agent_id=agent_id)
- if agent is None:
- return False
- runs = self.agent_run_repository.list_by_scope(agent_id=agent_id)
- for run in runs:
- self.agent_tool_invocation_repository.delete_by_run(agent_run_id=run.id)
- self.agent_run_repository.delete_by_agent(agent_id=agent_id)
- self.agent_config_repository.delete_by_agent(agent_id=agent_id)
- return self.agent_repository.delete(agent_id=agent_id) is not None
- def update_agent_status(
- self,
- *,
- agent_id: str,
- payload: AgentStatusUpdateRequest) -> AgentDefinition | None:
- return self.agent_repository.update_status(
- agent_id=agent_id,
- status=payload.status)
- def create_agent_config(self, payload: AgentConfigCreateRequest) -> AgentConfig:
- agent = self.agent_repository.get_by_id(
- agent_id=payload.agent_id)
- if agent is None:
- raise ValueError(f"agent not found: {payload.agent_id}")
- return self.agent_config_repository.create(
- agent_id=payload.agent_id,
- role=payload.role,
- goal=payload.goal,
- system_prompt=payload.system_prompt,
- model_config_json=payload.model_config_data.model_dump(mode="json"),
- memory_policy_json=payload.memory_policy.model_dump(mode="json"),
- tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs],
- skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs])
- def list_agent_configs(self, *, agent_id: str) -> list[AgentConfig]:
- return self.agent_config_repository.list_by_agent(agent_id=agent_id)
- def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun:
- agent_config = self._resolve_agent_config(
- agent_id=payload.agent_id,
- agent_config_id=payload.agent_config_id)
- if agent_config is None:
- raise ValueError("agent config not found")
- agent_run = self.agent_run_repository.create(
- agent_id=payload.agent_id,
- agent_config_id=agent_config.id,
- session_id=payload.session_id,
- input_text=payload.input_text,
- input_json=payload.input_json)
- self._publish_event(
- event_type="agent.run.created",
- agent_run=agent_run,
- payload_json={"agent_run_id": agent_run.id, "status": agent_run.status})
- if self.task_queue_publisher is not None:
- self.task_queue_publisher.publish_agent_run(
- agent_run_id=agent_run.id)
- return agent_run
- def list_agent_runs(
- self,
- *,
- agent_id: str | None = None,
- session_id: str | None = None) -> list[AgentRun]:
- return self.agent_run_repository.list_by_scope(
- agent_id=agent_id,
- session_id=session_id)
- def get_agent_run(self, payload: AgentRunDetailRequest) -> AgentRun | None:
- return self.agent_run_repository.get_by_id(
- agent_run_id=payload.agent_run_id)
- def list_agent_tool_invocations(
- self,
- *,
- agent_run_id: str) -> list[AgentToolInvocation]:
- return self.agent_tool_invocation_repository.list_by_run(
- agent_run_id=agent_run_id)
- def update_agent_run_status(
- self,
- *,
- agent_run_id: str,
- payload: AgentRunStatusUpdateRequest) -> AgentRun | None:
- entity = self.agent_run_repository.get_by_id(
- agent_run_id=agent_run_id)
- if entity is None:
- return None
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run_id,
- status=payload.status,
- worker_key=payload.worker_key,
- output_text=payload.output_text,
- output_json=payload.output_json,
- error_code=payload.error_code,
- error_message=payload.error_message)
- def execute_agent_run(
- self,
- *,
- agent_run_id: str,
- payload: AgentRunExecuteRequest) -> AgentRun | None:
- agent_run = self.agent_run_repository.get_by_id(
- agent_run_id=agent_run_id)
- if agent_run is None:
- return None
- agent_config = self.agent_config_repository.get_by_id(
- agent_config_id=agent_run.agent_config_id)
- if agent_config is None:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_config_missing",
- error_message=f"agent config not found: {agent_run.agent_config_id}")
- self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="running",
- worker_key=payload.worker_key)
- memory_results, memory_metadata = self._read_relevant_memories(
- agent_run=agent_run,
- agent_config=agent_config)
- selected_tools = self._select_tool_refs(agent_run=agent_run, agent_config=agent_config)
- selected_skills = self._select_skill_refs(agent_run=agent_run, agent_config=agent_config)
- if payload.dry_run:
- messages = self._build_chat_messages(
- agent_run=agent_run,
- agent_config=agent_config,
- memory_results=memory_results,
- capability_context=self._format_capability_plan(
- selected_tools=selected_tools,
- selected_skills=selected_skills))
- completed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=self._build_dry_run_output(
- agent_run=agent_run,
- agent_config=agent_config),
- output_json={
- "dry_run": True,
- "agent_config_id": agent_config.id,
- "message_count": len(messages),
- "messages": [message.model_dump(mode="json") for message in messages],
- "selected_tool_refs": [
- tool_ref.model_dump(mode="json") for tool_ref in selected_tools
- ],
- "selected_skill_refs": [
- skill_ref.model_dump(mode="json") for skill_ref in selected_skills
- ],
- **memory_metadata,
- })
- if completed_run is not None:
- self._publish_event(
- event_type="agent.run.completed",
- agent_run=completed_run,
- payload_json={
- "agent_run_id": completed_run.id,
- "dry_run": True,
- "status": completed_run.status,
- })
- return completed_run
- if self._read_bool(agent_config.model_config_json, "react_enabled", default=False):
- return self._execute_react_agent_run(
- agent_run=agent_run,
- agent_config=agent_config,
- payload=payload,
- memory_results=memory_results,
- memory_metadata=memory_metadata,
- selected_tools=selected_tools,
- selected_skills=selected_skills)
- tool_invocations = self._invoke_selected_tools(
- agent_run=agent_run,
- agent_config=agent_config,
- selected_tools=selected_tools)
- skill_invocations = self._invoke_selected_skills(
- agent_run=agent_run,
- selected_skills=selected_skills,
- worker_key=payload.worker_key)
- messages = self._build_chat_messages(
- agent_run=agent_run,
- agent_config=agent_config,
- memory_results=memory_results,
- capability_context=self._format_capability_results(
- tool_invocations=tool_invocations,
- skill_invocations=skill_invocations))
- if self.model_gateway_client is None:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_missing",
- error_message="model gateway client is not configured",
- output_json={
- "tool_invocations": tool_invocations,
- "skill_invocations": skill_invocations,
- **memory_metadata,
- })
- try:
- response = self.model_gateway_client.create_chat_completion(
- ChatCompletionRequestContract(
- model=self._read_optional_string(agent_config.model_config_json, "model"),
- temperature=self._read_optional_float(
- agent_config.model_config_json,
- "temperature"),
- max_tokens=self._read_optional_int(
- agent_config.model_config_json,
- "max_tokens"),
- messages=messages,
- metadata_json={
- "agent_id": agent_run.agent_id,
- "agent_config_id": agent_config.id,
- "agent_run_id": agent_run.id,
- })
- )
- except ModelGatewayClientError as exc:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_error",
- error_message=str(exc))
- memory_write_metadata = self._write_interaction_memory(
- agent_run=agent_run,
- agent_config=agent_config,
- output_text=response.content)
- completed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=response.content,
- output_json={
- "dry_run": False,
- "agent_config_id": agent_config.id,
- "model": response.model,
- "finish_reason": response.finish_reason,
- "usage_json": response.usage_json,
- "raw_response_json": response.raw_response_json,
- "tool_invocations": tool_invocations,
- "skill_invocations": skill_invocations,
- **memory_metadata,
- **memory_write_metadata,
- })
- if completed_run is not None:
- self._publish_event(
- event_type="agent.run.completed",
- agent_run=completed_run,
- payload_json={
- "agent_run_id": completed_run.id,
- "dry_run": False,
- "status": completed_run.status,
- })
- return completed_run
- def execute_agent_run_stream(
- self,
- *,
- agent_run_id: str,
- payload: AgentRunExecuteRequest) -> Iterator[dict[str, JSONValue]]:
- agent_run = self.agent_run_repository.get_by_id(
- agent_run_id=agent_run_id)
- if agent_run is None:
- return
- agent_config = self.agent_config_repository.get_by_id(
- agent_config_id=agent_run.agent_config_id)
- if agent_config is None:
- failed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="agent_config_missing",
- error_message=f"agent config not found: {agent_run.agent_config_id}")
- yield {"event": "agent.run.failed", "run": self._agent_run_to_json(failed_run)}
- return
- running_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="running",
- worker_key=payload.worker_key)
- yield {"event": "agent.run.started", "run": self._agent_run_to_json(running_run)}
- if payload.dry_run or self._read_bool(agent_config.model_config_json, "react_enabled", default=False):
- completed_run = self.execute_agent_run(agent_run_id=agent_run.id, payload=payload)
- yield {"event": "agent.run.completed", "run": self._agent_run_to_json(completed_run)}
- return
- memory_results, memory_metadata = self._read_relevant_memories(
- agent_run=agent_run,
- agent_config=agent_config)
- selected_tools = self._select_tool_refs(agent_run=agent_run, agent_config=agent_config)
- selected_skills = self._select_skill_refs(agent_run=agent_run, agent_config=agent_config)
- tool_invocations = self._invoke_selected_tools(
- agent_run=agent_run,
- agent_config=agent_config,
- selected_tools=selected_tools)
- skill_invocations = self._invoke_selected_skills(
- agent_run=agent_run,
- selected_skills=selected_skills,
- worker_key=payload.worker_key)
- messages = self._build_chat_messages(
- agent_run=agent_run,
- agent_config=agent_config,
- memory_results=memory_results,
- capability_context=self._format_capability_results(
- tool_invocations=tool_invocations,
- skill_invocations=skill_invocations))
- if self.model_gateway_client is None:
- failed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_missing",
- error_message="model gateway client is not configured",
- output_json={
- "tool_invocations": tool_invocations,
- "skill_invocations": skill_invocations,
- **memory_metadata,
- })
- yield {"event": "agent.run.failed", "run": self._agent_run_to_json(failed_run)}
- return
- output_parts: list[str] = []
- try:
- for delta in self.model_gateway_client.stream_chat_completion(
- ChatCompletionRequestContract(
- model=self._read_optional_string(agent_config.model_config_json, "model"),
- temperature=self._read_optional_float(
- agent_config.model_config_json,
- "temperature"),
- max_tokens=self._read_optional_int(
- agent_config.model_config_json,
- "max_tokens"),
- messages=messages,
- metadata_json={
- "agent_id": agent_run.agent_id,
- "agent_config_id": agent_config.id,
- "agent_run_id": agent_run.id,
- })):
- output_parts.append(delta)
- yield {"event": "agent.run.delta", "agent_run_id": agent_run.id, "delta": delta}
- except ModelGatewayClientError as exc:
- failed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_error",
- error_message=str(exc))
- yield {"event": "agent.run.failed", "run": self._agent_run_to_json(failed_run)}
- return
- output_text = "".join(output_parts)
- memory_write_metadata = self._write_interaction_memory(
- agent_run=agent_run,
- agent_config=agent_config,
- output_text=output_text)
- completed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=output_text,
- output_json={
- "dry_run": False,
- "agent_config_id": agent_config.id,
- "streamed": True,
- "tool_invocations": tool_invocations,
- "skill_invocations": skill_invocations,
- **memory_metadata,
- **memory_write_metadata,
- })
- if completed_run is not None:
- self._publish_event(
- event_type="agent.run.completed",
- agent_run=completed_run,
- payload_json={
- "agent_run_id": completed_run.id,
- "dry_run": False,
- "status": completed_run.status,
- "streamed": True,
- })
- yield {"event": "agent.run.completed", "run": self._agent_run_to_json(completed_run)}
- def _publish_event(
- self,
- *,
- event_type: str,
- agent_run: AgentRun,
- payload_json: dict[str, JSONValue]) -> None:
- if self.event_client is None:
- return
- try:
- self.event_client.publish_event(
- EventPublishContract(
- event_type=event_type,
- source_service="agent-service",
- aggregate_type="agent_run",
- aggregate_id=agent_run.id,
- correlation_id=agent_run.session_id,
- payload_json={
- **payload_json,
- "agent_id": agent_run.agent_id,
- "agent_config_id": agent_run.agent_config_id,
- })
- )
- except EventServiceClientError:
- return
- def _agent_run_to_json(self, agent_run: AgentRun | None) -> dict[str, JSONValue]:
- if agent_run is None:
- return {}
- return {
- "id": agent_run.id,
- "agent_id": agent_run.agent_id,
- "agent_config_id": agent_run.agent_config_id,
- "session_id": agent_run.session_id,
- "input_text": agent_run.input_text,
- "input_json": agent_run.input_json,
- "output_text": agent_run.output_text,
- "output_json": agent_run.output_json,
- "status": agent_run.status,
- "worker_key": agent_run.worker_key,
- "queued_time": agent_run.queued_time,
- "lease_expire_time": agent_run.lease_expire_time,
- "started_time": agent_run.started_time,
- "finished_time": agent_run.finished_time,
- "error_code": agent_run.error_code,
- "error_message": agent_run.error_message,
- "created_time": agent_run.created_time,
- }
- def _execute_react_agent_run(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig,
- payload: AgentRunExecuteRequest,
- memory_results: list[MemorySearchResultContract],
- memory_metadata: dict[str, JSONValue],
- selected_tools: list[AgentToolRefContract],
- selected_skills: list[AgentSkillRefContract]) -> AgentRun | None:
- if self.model_gateway_client is None:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_missing",
- error_message="model gateway client is not configured")
- skill_invocations = self._invoke_selected_skills(
- agent_run=agent_run,
- selected_skills=selected_skills,
- worker_key=payload.worker_key)
- messages = self._build_chat_messages(
- agent_run=agent_run,
- agent_config=agent_config,
- memory_results=memory_results,
- capability_context=self._format_react_instruction(
- agent_run=agent_run,
- selected_tools=selected_tools,
- skill_invocations=skill_invocations))
- react_steps: list[dict[str, JSONValue]] = []
- tool_invocations: list[dict[str, JSONValue]] = []
- final_answer: str | None = None
- tool_call_count = 0
- max_steps = self._read_int(
- agent_config.model_config_json,
- "react_max_steps",
- default=self.react_max_steps)
- for step_index in range(max(max_steps, 1)):
- try:
- response = self.model_gateway_client.create_chat_completion(
- self._build_chat_completion_request(
- agent_run=agent_run,
- agent_config=agent_config,
- messages=messages,
- selected_tools=selected_tools)
- )
- except ModelGatewayClientError as exc:
- return self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="failed",
- worker_key=payload.worker_key,
- error_code="model_gateway_error",
- error_message=str(exc),
- output_json={
- "react_steps": react_steps,
- "tool_invocations": tool_invocations,
- "skill_invocations": skill_invocations,
- **memory_metadata,
- })
- action = self._parse_react_action_from_response(response)
- react_step: dict[str, JSONValue] = {
- "step_index": step_index,
- "model_content": response.content,
- "action": action,
- }
- react_steps.append(react_step)
- if action.get("action") == "finish":
- answer_value = action.get("answer")
- final_answer = answer_value if isinstance(answer_value, str) else response.content
- break
- if action.get("action") != "tool":
- final_answer = response.content
- break
- max_tool_calls = self._read_int(
- agent_config.model_config_json,
- "react_max_tool_calls",
- default=self.react_max_tool_calls)
- if tool_call_count >= max(max_tool_calls, 0):
- final_answer = "Tool call budget exhausted."
- react_step["observation"] = final_answer
- break
- tool_code = action.get("tool_code")
- matching_tools = [
- item for item in selected_tools if item.tool_code == tool_code
- ]
- if not matching_tools:
- observation = f"tool not available: {tool_code}"
- react_step["observation"] = observation
- messages.append(ChatMessageContract(role="assistant", content=response.content))
- messages.append(ChatMessageContract(role="user", content=observation))
- continue
- tool_input = action.get("input_json")
- original_input_json = agent_run.input_json
- if isinstance(tool_input, dict):
- agent_run.input_json = {
- str(item_key): item_value for item_key, item_value in tool_input.items()
- }
- current_invocations = self._invoke_react_tool_with_retry(
- agent_run=agent_run,
- agent_config=agent_config,
- tool_ref=matching_tools[0])
- tool_call_count += len(current_invocations)
- agent_run.input_json = original_input_json
- tool_invocations.extend(current_invocations)
- observation = self._format_react_observation(current_invocations)
- react_step["observation"] = observation
- messages.append(ChatMessageContract(role="assistant", content=response.content))
- messages.append(ChatMessageContract(role="user", content=observation))
- if final_answer is None:
- final_answer = "ReAct loop reached max steps without a final answer."
- memory_write_metadata = self._write_interaction_memory(
- agent_run=agent_run,
- agent_config=agent_config,
- output_text=final_answer)
- completed_run = self.agent_run_repository.update_status(
- agent_run_id=agent_run.id,
- status="completed",
- worker_key=payload.worker_key,
- output_text=final_answer,
- output_json={
- "dry_run": False,
- "agent_config_id": agent_config.id,
- "react_enabled": True,
- "react_steps": react_steps,
- "react_tool_call_count": tool_call_count,
- "tool_invocations": tool_invocations,
- "skill_invocations": skill_invocations,
- **memory_metadata,
- **memory_write_metadata,
- })
- if completed_run is not None:
- self._publish_event(
- event_type="agent.run.completed",
- agent_run=completed_run,
- payload_json={
- "agent_run_id": completed_run.id,
- "react_enabled": True,
- "status": completed_run.status,
- })
- return completed_run
- def execute_next_claimed_agent_run(
- self,
- *,
- worker_key: str,
- lease_seconds: int,
- dry_run: bool,
- redis_client: object | None = None) -> tuple[AgentRun, int] | None:
- released_lease_count = self.agent_run_repository.release_expired_leases(
- now_time=datetime.utcnow())
- claimed_agent_run = self.agent_run_repository.claim_next_queued(
- worker_key=worker_key,
- lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
- if claimed_agent_run is None:
- return None
- if redis_client is not None:
- from core_shared.redis_primitives import DistributedLock, IdempotencyStore
- lock = DistributedLock(
- client=redis_client,
- name=f"agent-run:{claimed_agent_run.id}:lock",
- ttl_seconds=lease_seconds)
- if not lock.acquire():
- return None
- idempotency_store = IdempotencyStore(
- client=redis_client,
- prefix="agent-run-idempotency")
- if not idempotency_store.begin(key=claimed_agent_run.id):
- lock.release()
- return None
- else:
- lock = None
- idempotency_store = None
- try:
- result = self.execute_agent_run(
- agent_run_id=claimed_agent_run.id,
- payload=AgentRunExecuteRequest(
- worker_key=worker_key,
- dry_run=dry_run))
- if idempotency_store is not None and result is not None:
- idempotency_store.complete(
- key=claimed_agent_run.id,
- result={"status": result.status, "agent_run_id": result.id})
- finally:
- if lock is not None:
- lock.release()
- if result is None:
- return None
- return result, released_lease_count
- def _resolve_agent_config(
- self,
- *,
- agent_id: str,
- agent_config_id: str | None) -> AgentConfig | None:
- if agent_config_id is not None:
- return self.agent_config_repository.get_by_id(
- agent_config_id=agent_config_id)
- return self.agent_config_repository.get_latest_by_agent(
- agent_id=agent_id)
- def _build_chat_messages(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig,
- memory_results: list[MemorySearchResultContract] | None = None,
- capability_context: str | None = None) -> list[ChatMessageContract]:
- messages = [
- ChatMessageContract(role="system", content=agent_config.system_prompt),
- ]
- if agent_config.goal:
- messages.append(
- ChatMessageContract(role="system", content=f"Goal: {agent_config.goal}")
- )
- if memory_results:
- messages.append(
- ChatMessageContract(
- role="system",
- content=self._format_memory_context(memory_results))
- )
- if capability_context:
- messages.append(ChatMessageContract(role="system", content=capability_context))
- if agent_run.input_text:
- messages.append(ChatMessageContract(role="user", content=agent_run.input_text))
- if agent_run.input_json:
- messages.append(
- ChatMessageContract(
- role="user",
- content=f"Structured input: {agent_run.input_json}")
- )
- return messages
- def _select_tool_refs(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig) -> list[AgentToolRefContract]:
- input_preview = self._build_input_preview(agent_run)
- selected: list[AgentToolRefContract] = []
- for item in agent_config.tool_refs_json:
- ref = AgentToolRefContract.model_validate(item)
- if (
- ref.required
- or self._read_bool(ref.config_json, "auto_invoke", default=False)
- or self._matches_selection_keywords(ref.config_json, input_preview)
- ):
- selected.append(ref)
- return selected
- def _select_skill_refs(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig) -> list[AgentSkillRefContract]:
- input_preview = self._build_input_preview(agent_run)
- selected: list[AgentSkillRefContract] = []
- for item in agent_config.skill_refs_json:
- ref = AgentSkillRefContract.model_validate(item)
- auto_invoke = self._read_bool(ref.config_json, "auto_invoke", default=True)
- if auto_invoke or self._matches_selection_keywords(ref.config_json, input_preview):
- selected.append(ref)
- return selected
- def _invoke_selected_tools(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig,
- selected_tools: list[AgentToolRefContract]) -> list[dict[str, JSONValue]]:
- invocations: list[dict[str, JSONValue]] = []
- for ref in selected_tools:
- invocation = self.agent_tool_invocation_repository.create(
- agent_run_id=agent_run.id,
- agent_id=agent_run.agent_id,
- agent_config_id=agent_config.id,
- tool_code=ref.tool_code,
- tool_binding_id=ref.tool_binding_id,
- status="selected",
- input_json=agent_run.input_json or {})
- if ref.tool_binding_id is None:
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="skipped",
- reason="tool_binding_id_missing")
- invocations.append(
- {
- "status": "skipped",
- "reason": "tool_binding_id_missing",
- "tool_code": ref.tool_code,
- }
- )
- continue
- if self.tool_client is None:
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="failed",
- reason="tool_client_missing",
- error_message="tool client is not configured")
- invocations.append(
- {
- "status": "failed",
- "reason": "tool_client_missing",
- "tool_binding_id": ref.tool_binding_id,
- }
- )
- continue
- try:
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="running")
- detail = self.tool_client.get_tool_binding_detail(
- binding_id=ref.tool_binding_id)
- if not detail.binding.enabled:
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="failed",
- reason="tool_binding_disabled",
- error_message="tool binding is disabled")
- invocations.append(
- {
- "status": "failed",
- "reason": "tool_binding_disabled",
- "tool_binding_id": ref.tool_binding_id,
- }
- )
- continue
- if detail.tool_definition.tool_type != "http":
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="skipped",
- reason="unsupported_tool_type",
- error_message=detail.tool_definition.tool_type)
- invocations.append(
- {
- "status": "skipped",
- "reason": "unsupported_tool_type",
- "tool_type": detail.tool_definition.tool_type,
- "tool_binding_id": ref.tool_binding_id,
- }
- )
- continue
- output_text, output_json = self.tool_client.invoke_http_tool(
- detail=detail,
- input_json=agent_run.input_json or {},
- config_json=ref.config_json)
- except ToolServiceClientError as exc:
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="failed",
- reason="tool_service_error",
- error_message=str(exc))
- invocations.append(
- {
- "status": "failed",
- "reason": str(exc),
- "tool_binding_id": ref.tool_binding_id,
- }
- )
- continue
- self.agent_tool_invocation_repository.update_status(
- invocation_id=invocation.id,
- status="completed",
- output_text=output_text,
- output_json=output_json)
- invocations.append(
- {
- "status": "completed",
- "tool_binding_id": ref.tool_binding_id,
- "tool_code": detail.tool_definition.code,
- "output_text": output_text,
- "output_json": output_json,
- }
- )
- return invocations
- def _invoke_selected_skills(
- self,
- *,
- agent_run: AgentRun,
- selected_skills: list[AgentSkillRefContract],
- worker_key: str | None) -> list[dict[str, JSONValue]]:
- invocations: list[dict[str, JSONValue]] = []
- for ref in selected_skills:
- if self.skill_client is None:
- invocations.append(
- {
- "status": "failed",
- "reason": "skill_client_missing",
- "skill_id": ref.skill_id,
- "skill_code": ref.skill_code,
- }
- )
- continue
- skill_id = ref.skill_id or self._resolve_skill_id_by_code(
- skill_code=ref.skill_code)
- if skill_id is None:
- invocations.append(
- {
- "status": "failed",
- "reason": "skill_id_missing",
- "skill_code": ref.skill_code,
- }
- )
- continue
- try:
- created_run = self.skill_client.create_skill_run(
- skill_id=skill_id,
- installation_id=self._read_optional_string(
- ref.config_json,
- "installation_id"),
- input_json=self._build_skill_input_json(agent_run=agent_run, ref=ref))
- executed_run = self.skill_client.execute_skill_run(
- skill_run_id=created_run.id,
- worker_key=worker_key)
- except SkillServiceClientError as exc:
- invocations.append(
- {
- "status": "failed",
- "reason": str(exc),
- "skill_id": skill_id,
- "skill_code": ref.skill_code,
- }
- )
- continue
- invocations.append(
- {
- "status": executed_run.status,
- "skill_id": skill_id,
- "skill_code": ref.skill_code,
- "skill_run_id": executed_run.id,
- "output_text": executed_run.output_text,
- "output_json": executed_run.output_json,
- "error_code": executed_run.error_code,
- "error_message": executed_run.error_message,
- }
- )
- return invocations
- def _format_capability_plan(
- self,
- *,
- selected_tools: list[AgentToolRefContract],
- selected_skills: list[AgentSkillRefContract]) -> str:
- return (
- "Selected capability plan before model call:\n"
- f"Tools: {[item.model_dump(mode='json') for item in selected_tools]}\n"
- f"Skills: {[item.model_dump(mode='json') for item in selected_skills]}"
- )
- def _format_capability_results(
- self,
- *,
- tool_invocations: list[dict[str, JSONValue]],
- skill_invocations: list[dict[str, JSONValue]]) -> str | None:
- if not tool_invocations and not skill_invocations:
- return None
- return (
- "Capability invocation results before model call:\n"
- f"Tools: {tool_invocations}\n"
- f"Skills: {skill_invocations}"
- )
- def _format_react_instruction(
- self,
- *,
- agent_run: AgentRun,
- selected_tools: list[AgentToolRefContract],
- skill_invocations: list[dict[str, JSONValue]]) -> str:
- tool_schemas = self._build_react_tool_schemas(
- agent_run=agent_run,
- selected_tools=selected_tools)
- return (
- "Use ReAct JSON only. Respond with one JSON object per turn.\n"
- "To call a tool: "
- '{"action":"tool","tool_code":"code","input_json":{...}}\n'
- "To finish: "
- '{"action":"finish","answer":"final answer"}\n'
- f"Available tools: {tool_schemas}\n"
- f"Pre-run skill results: {skill_invocations}"
- )
- def _build_react_tool_schemas(
- self,
- *,
- agent_run: AgentRun,
- selected_tools: list[AgentToolRefContract]) -> list[dict[str, JSONValue]]:
- schemas: list[dict[str, JSONValue]] = []
- for ref in selected_tools:
- schema: dict[str, JSONValue] = {
- "tool_code": ref.tool_code,
- "tool_binding_id": ref.tool_binding_id,
- "required": ref.required,
- "config_json": ref.config_json,
- }
- if ref.tool_binding_id is not None and self.tool_client is not None:
- try:
- detail = self.tool_client.get_tool_binding_detail(
- binding_id=ref.tool_binding_id)
- schema.update(
- {
- "name": detail.tool_definition.name,
- "description": detail.tool_definition.description,
- "tool_type": detail.tool_definition.tool_type,
- "input_schema_json": detail.connection.input_schema_json or {},
- "output_schema_json": detail.connection.output_schema_json or {},
- "timeout_ms": detail.connection.timeout_ms,
- }
- )
- except ToolServiceClientError as exc:
- schema["schema_error"] = str(exc)
- schemas.append(schema)
- return schemas
- def _invoke_react_tool_with_retry(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig,
- tool_ref: AgentToolRefContract) -> list[dict[str, JSONValue]]:
- retry_count = self._read_int(
- agent_config.model_config_json,
- "react_tool_retry_count",
- default=self.react_tool_retry_count)
- attempts: list[dict[str, JSONValue]] = []
- for attempt_index in range(max(retry_count, 0) + 1):
- current = self._invoke_selected_tools(
- agent_run=agent_run,
- agent_config=agent_config,
- selected_tools=[tool_ref])
- for item in current:
- item["attempt_index"] = attempt_index
- attempts.extend(current)
- if current and current[-1].get("status") == "completed":
- break
- return attempts
- def _format_react_observation(
- self,
- tool_invocations: list[dict[str, JSONValue]]) -> str:
- return f"Observation: {tool_invocations}"
- def _parse_react_action(self, content: str) -> dict[str, JSONValue]:
- try:
- value = json.loads(content)
- except json.JSONDecodeError:
- start_index = content.find("{")
- end_index = content.rfind("}")
- if start_index < 0 or end_index <= start_index:
- return {"action": "finish", "answer": content}
- try:
- value = json.loads(content[start_index : end_index + 1])
- except json.JSONDecodeError:
- return {"action": "finish", "answer": content}
- if not isinstance(value, dict):
- return {"action": "finish", "answer": content}
- return {str(item_key): item_value for item_key, item_value in value.items()}
- def _parse_react_action_from_response(
- self,
- response: ChatCompletionResponseContract) -> dict[str, JSONValue]:
- if response.tool_calls_json:
- action = self._parse_openai_tool_call(response.tool_calls_json[0])
- if action is not None:
- return action
- return self._parse_react_action(response.content)
- def _parse_openai_tool_call(
- self,
- tool_call: dict[str, JSONValue]) -> dict[str, JSONValue] | None:
- function_value = tool_call.get("function")
- if not isinstance(function_value, dict):
- return None
- tool_code = function_value.get("name")
- if not isinstance(tool_code, str) or not tool_code:
- return None
- raw_arguments = function_value.get("arguments")
- input_json: dict[str, JSONValue] = {}
- if isinstance(raw_arguments, str) and raw_arguments:
- try:
- decoded = json.loads(raw_arguments)
- except json.JSONDecodeError:
- decoded = {"raw_arguments": raw_arguments}
- if isinstance(decoded, dict):
- input_json = {str(item_key): item_value for item_key, item_value in decoded.items()}
- elif isinstance(raw_arguments, dict):
- input_json = {str(item_key): item_value for item_key, item_value in raw_arguments.items()}
- tool_call_id = tool_call.get("id")
- return {
- "action": "tool",
- "tool_code": tool_code,
- "input_json": input_json,
- "tool_call_id": tool_call_id if isinstance(tool_call_id, str) else None,
- "tool_call_protocol": "openai",
- }
- def _build_chat_completion_request(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig,
- messages: list[ChatMessageContract],
- selected_tools: list[AgentToolRefContract] | None = None) -> ChatCompletionRequestContract:
- function_calling_enabled = self._read_bool(
- agent_config.model_config_json,
- "function_calling_enabled",
- default=False) or self._read_bool(
- agent_config.model_config_json,
- "tool_calling_enabled",
- default=False)
- return ChatCompletionRequestContract(
- model=self._read_optional_string(agent_config.model_config_json, "model"),
- temperature=self._read_optional_float(
- agent_config.model_config_json,
- "temperature"),
- max_tokens=self._read_optional_int(
- agent_config.model_config_json,
- "max_tokens"),
- messages=messages,
- tools_json=(
- self._build_openai_tool_schemas(
- agent_run=agent_run,
- selected_tools=selected_tools or [])
- if function_calling_enabled
- else []
- ),
- tool_choice="auto" if function_calling_enabled and selected_tools else None,
- metadata_json={
- "agent_id": agent_run.agent_id,
- "agent_config_id": agent_config.id,
- "agent_run_id": agent_run.id,
- })
- def _build_openai_tool_schemas(
- self,
- *,
- agent_run: AgentRun,
- selected_tools: list[AgentToolRefContract]) -> list[dict[str, JSONValue]]:
- tool_schemas: list[dict[str, JSONValue]] = []
- for schema in self._build_react_tool_schemas(
- agent_run=agent_run,
- selected_tools=selected_tools):
- tool_code = schema.get("tool_code")
- if not isinstance(tool_code, str) or not tool_code:
- continue
- description = schema.get("description")
- input_schema = schema.get("input_schema_json")
- tool_schemas.append(
- {
- "type": "function",
- "function": {
- "name": tool_code,
- "description": description if isinstance(description, str) else "",
- "parameters": input_schema if isinstance(input_schema, dict) else {},
- },
- }
- )
- return tool_schemas
- def _build_skill_input_json(
- self,
- *,
- agent_run: AgentRun,
- ref: AgentSkillRefContract) -> dict[str, JSONValue]:
- input_json: dict[str, JSONValue] = dict(agent_run.input_json or {})
- if agent_run.input_text:
- input_json.setdefault("input_text", agent_run.input_text)
- configured_input = ref.config_json.get("input_json")
- if isinstance(configured_input, dict):
- input_json.update(
- {str(item_key): item_value for item_key, item_value in configured_input.items()}
- )
- return input_json
- def _resolve_skill_id_by_code(self, *, skill_code: str | None) -> str | None:
- if skill_code is None or self.skill_client is None:
- return None
- try:
- skills = self.skill_client.list_skills()
- except SkillServiceClientError:
- return None
- for skill in skills:
- if skill.code == skill_code:
- return skill.id
- return None
- def _build_input_preview(self, agent_run: AgentRun) -> str:
- return f"{agent_run.input_text or ''} {agent_run.input_json or {}}".lower()
- def _matches_selection_keywords(
- self,
- config_json: dict[str, JSONValue],
- input_preview: str) -> bool:
- keywords = config_json.get("selection_keywords")
- if not isinstance(keywords, list):
- return False
- return any(
- isinstance(keyword, str) and keyword.lower() in input_preview
- for keyword in keywords
- )
- def _build_dry_run_output(self, *, agent_run: AgentRun, agent_config: AgentConfig) -> str:
- input_preview = agent_run.input_text or str(agent_run.input_json or {})
- return (
- f"[dry-run] Agent role={agent_config.role} "
- f"received: {input_preview}"
- )
- def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
- value = payload.get(key)
- if isinstance(value, str) and value:
- return value
- return None
- def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None:
- value = payload.get(key)
- if isinstance(value, (int, float)) and not isinstance(value, bool):
- return float(value)
- return None
- def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return None
- def _read_relevant_memories(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig) -> tuple[list[MemorySearchResultContract], dict[str, JSONValue]]:
- if self.memory_client is None:
- return [], {"memory_read_enabled": False, "memory_read_reason": "client_missing"}
- if not self._read_bool(agent_config.memory_policy_json, "enabled", default=True):
- return [], {"memory_read_enabled": False, "memory_read_reason": "policy_disabled"}
- query = agent_run.input_text or str(agent_run.input_json or "")
- if not query:
- return [], {"memory_read_enabled": True, "memory_read_count": 0}
- scope = self._resolve_memory_scope(agent_run=agent_run, agent_config=agent_config)
- if scope is None:
- return [], {
- "memory_read_enabled": True,
- "memory_read_count": 0,
- "memory_read_reason": "scope_unavailable",
- }
- scope_type, scope_id = scope
- try:
- results = self.memory_client.search_memories(
- MemorySearchRequestContract(
- query=query,
- scope_type=scope_type,
- scope_id=scope_id,
- owner_agent_id=agent_run.agent_id,
- session_id=agent_run.session_id,
- limit=self._read_int(
- agent_config.memory_policy_json,
- "read_top_k",
- default=8))
- )
- except MemoryClientError as exc:
- return [], {
- "memory_read_enabled": True,
- "memory_read_count": 0,
- "memory_read_error": str(exc),
- }
- return results, {
- "memory_read_enabled": True,
- "memory_read_count": len(results),
- "memory_scope_type": scope_type,
- "memory_scope_id": scope_id,
- }
- def _write_interaction_memory(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig,
- output_text: str) -> dict[str, JSONValue]:
- if self.memory_client is None:
- return {"memory_write_enabled": False, "memory_write_reason": "client_missing"}
- if not self._read_bool(agent_config.memory_policy_json, "write_enabled", default=True):
- return {"memory_write_enabled": False, "memory_write_reason": "policy_disabled"}
- scope = self._resolve_memory_scope(agent_run=agent_run, agent_config=agent_config)
- if scope is None:
- return {"memory_write_enabled": True, "memory_write_reason": "scope_unavailable"}
- scope_type, scope_id = scope
- try:
- memory = self.memory_client.create_memory(
- MemoryCreateContract(
- scope_type=scope_type,
- scope_id=scope_id,
- memory_type="conversation",
- content_text=self._format_interaction_memory(
- agent_run=agent_run,
- output_text=output_text),
- content_json={
- "agent_run_id": agent_run.id,
- "agent_config_id": agent_config.id,
- "input_text": agent_run.input_text,
- "output_text": output_text,
- },
- metadata_json={
- "source": "agent-service",
- "role": agent_config.role,
- },
- owner_agent_id=agent_run.agent_id,
- session_id=agent_run.session_id,
- source_ref=f"agent_run:{agent_run.id}",
- importance_score=self._read_nested_int(
- agent_config.memory_policy_json,
- "config_json",
- "write_importance_score",
- default=50))
- )
- except MemoryClientError as exc:
- return {
- "memory_write_enabled": True,
- "memory_write_error": str(exc),
- }
- return {
- "memory_write_enabled": True,
- "memory_written_id": memory.id,
- "memory_scope_type": scope_type,
- "memory_scope_id": scope_id,
- }
- def _resolve_memory_scope(
- self,
- *,
- agent_run: AgentRun,
- agent_config: AgentConfig) -> tuple[MemoryScopeType, str] | None:
- scope_value = self._read_optional_string(
- agent_config.memory_policy_json,
- "memory_scope") or "session"
- if scope_value == "global":
- return "global", "global"
- if scope_value == "agent":
- return "agent", agent_run.agent_id
- if scope_value == "session" and agent_run.session_id:
- return "session", agent_run.session_id
- if scope_value == "user":
- user_id = self._read_input_json_string(agent_run=agent_run, key="user_id")
- if user_id is not None:
- return "user", user_id
- if scope_value == "team":
- team_id = self._read_input_json_string(agent_run=agent_run, key="team_id")
- if team_id is not None:
- return "team", team_id
- return None
- def _format_memory_context(self, memory_results: list[MemorySearchResultContract]) -> str:
- lines = ["Relevant memories:"]
- for index, result in enumerate(memory_results, start=1):
- lines.append(f"{index}. {result.item.content_text}")
- return "\n".join(lines)
- def _format_interaction_memory(self, *, agent_run: AgentRun, output_text: str) -> str:
- input_text = agent_run.input_text or str(agent_run.input_json or {})
- return f"User input: {input_text}\nAgent output: {output_text}"
- def _read_bool(self, payload: dict[str, JSONValue], key: str, *, default: bool) -> bool:
- value = payload.get(key)
- if isinstance(value, bool):
- return value
- return default
- def _read_int(self, payload: dict[str, JSONValue], key: str, *, default: int) -> int:
- value = payload.get(key)
- if isinstance(value, int) and not isinstance(value, bool):
- return value
- return default
- def _read_nested_int(
- self,
- payload: dict[str, JSONValue],
- parent_key: str,
- child_key: str,
- *,
- default: int) -> int:
- parent_value = payload.get(parent_key)
- if not isinstance(parent_value, dict):
- return default
- return self._read_int(
- cast(dict[str, JSONValue], parent_value),
- child_key,
- default=default)
- def _read_input_json_string(self, *, agent_run: AgentRun, key: str) -> str | None:
- if agent_run.input_json is None:
- return None
- value = agent_run.input_json.get(key)
- if isinstance(value, str) and value:
- return value
- return None
- def build_agent_application_service(
- *,
- db: Session,
- settings: AgentServiceSettings) -> AgentApplicationService:
- redis_client = try_build_redis_client(settings.redis_url)
- return AgentApplicationService(
- agent_repository=AgentDefinitionRepository(db),
- agent_config_repository=AgentConfigRepository(db),
- agent_run_repository=AgentRunRepository(db),
- agent_tool_invocation_repository=AgentToolInvocationRepository(db),
- model_gateway_client=ModelGatewayClient(
- base_url=settings.model_gateway_service_url,
- timeout_seconds=settings.model_gateway_timeout_seconds),
- memory_client=MemoryClient(
- base_url=settings.memory_service_url,
- timeout_seconds=settings.memory_service_timeout_seconds),
- tool_client=ToolServiceClient(
- base_url=settings.tool_service_url,
- timeout_seconds=settings.tool_service_timeout_seconds),
- skill_client=SkillServiceClient(
- base_url=settings.skill_service_url,
- timeout_seconds=settings.skill_service_timeout_seconds),
- event_client=EventServiceClient(
- base_url=settings.event_service_url,
- timeout_seconds=settings.event_service_timeout_seconds),
- task_queue_publisher=(
- TaskQueuePublisher(client=redis_client) if redis_client is not None else None
- ),
- react_max_steps=settings.react_max_steps)
|