services.py 61 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491
  1. import json
  2. from collections.abc import Iterator
  3. from datetime import datetime, timedelta
  4. from typing import cast
  5. from sqlalchemy.orm import Session
  6. from core_events import EventPublishContract, EventServiceClient, EventServiceClientError
  7. from uuid import uuid4
  8. from core_domain import (
  9. AgentSkillRefContract,
  10. AgentToolRefContract,
  11. ChatCompletionRequestContract,
  12. ChatCompletionResponseContract,
  13. ChatMessageContract,
  14. MemoryCreateContract,
  15. MemoryScopeType,
  16. MemorySearchRequestContract,
  17. MemorySearchResultContract)
  18. from core_shared import JSONValue, try_build_redis_client
  19. from core_shared.task_queue import TaskQueuePublisher
  20. from app.bootstrap.settings import AgentServiceSettings
  21. from app.db.models import AgentDefinition, AgentRun, AgentToolInvocation, AgentConfig
  22. from app.domain.repositories import (
  23. AgentDefinitionRepository,
  24. AgentRunRepository,
  25. AgentToolInvocationRepository,
  26. AgentConfigRepository)
  27. from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError
  28. from app.infrastructure.memory_client import MemoryClient, MemoryClientError
  29. from app.infrastructure.skill_client import SkillServiceClient, SkillServiceClientError
  30. from app.infrastructure.tool_client import ToolServiceClient, ToolServiceClientError
  31. from app.schemas.agent import (
  32. AgentCreateRequest,
  33. AgentConfigCreateRequest,
  34. AgentConfigListRequest,
  35. AgentRunCreateRequest,
  36. AgentRunDetailRequest,
  37. AgentRunExecuteRequest,
  38. AgentRunStatusUpdateRequest,
  39. AgentStatusUpdateRequest,
  40. AgentUpdateRequest)
  41. def generate_agent_code() -> str:
  42. return f"agent_{uuid4().hex[:16]}"
  43. class AgentApplicationService:
  44. def __init__(
  45. self,
  46. *,
  47. agent_repository: AgentDefinitionRepository,
  48. agent_config_repository: AgentConfigRepository,
  49. agent_run_repository: AgentRunRepository,
  50. agent_tool_invocation_repository: AgentToolInvocationRepository,
  51. model_gateway_client: ModelGatewayClient | None = None,
  52. memory_client: MemoryClient | None = None,
  53. tool_client: ToolServiceClient | None = None,
  54. skill_client: SkillServiceClient | None = None,
  55. event_client: EventServiceClient | None = None,
  56. task_queue_publisher: TaskQueuePublisher | None = None,
  57. react_max_steps: int = 5,
  58. react_max_tool_calls: int = 10,
  59. react_tool_retry_count: int = 1) -> None:
  60. self.agent_repository = agent_repository
  61. self.agent_config_repository = agent_config_repository
  62. self.agent_run_repository = agent_run_repository
  63. self.agent_tool_invocation_repository = agent_tool_invocation_repository
  64. self.model_gateway_client = model_gateway_client
  65. self.memory_client = memory_client
  66. self.tool_client = tool_client
  67. self.skill_client = skill_client
  68. self.event_client = event_client
  69. self.task_queue_publisher = task_queue_publisher
  70. self.react_max_steps = react_max_steps
  71. self.react_max_tool_calls = react_max_tool_calls
  72. self.react_tool_retry_count = react_tool_retry_count
  73. def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition:
  74. return self.agent_repository.create(
  75. code=payload.code or generate_agent_code(),
  76. name=payload.name,
  77. description=payload.description,
  78. agent_type=payload.agent_type,
  79. owner_user_id=payload.owner_user_id,
  80. metadata_json=payload.metadata_json)
  81. def list_agents(self) -> list[AgentDefinition]:
  82. return self.agent_repository.list_all()
  83. def get_agent(self, *, agent_id: str) -> AgentDefinition | None:
  84. return self.agent_repository.get_by_id(agent_id=agent_id)
  85. def update_agent(self, payload: AgentUpdateRequest) -> AgentDefinition | None:
  86. return self.agent_repository.update(
  87. agent_id=payload.agent_id,
  88. name=payload.name,
  89. description=payload.description,
  90. metadata_json=payload.metadata_json)
  91. def delete_agent(self, *, agent_id: str) -> bool:
  92. agent = self.agent_repository.get_by_id(agent_id=agent_id)
  93. if agent is None:
  94. return False
  95. runs = self.agent_run_repository.list_by_scope(agent_id=agent_id)
  96. for run in runs:
  97. self.agent_tool_invocation_repository.delete_by_run(agent_run_id=run.id)
  98. self.agent_run_repository.delete_by_agent(agent_id=agent_id)
  99. self.agent_config_repository.delete_by_agent(agent_id=agent_id)
  100. return self.agent_repository.delete(agent_id=agent_id) is not None
  101. def update_agent_status(
  102. self,
  103. *,
  104. agent_id: str,
  105. payload: AgentStatusUpdateRequest) -> AgentDefinition | None:
  106. return self.agent_repository.update_status(
  107. agent_id=agent_id,
  108. status=payload.status)
  109. def create_agent_config(self, payload: AgentConfigCreateRequest) -> AgentConfig:
  110. agent = self.agent_repository.get_by_id(
  111. agent_id=payload.agent_id)
  112. if agent is None:
  113. raise ValueError(f"agent not found: {payload.agent_id}")
  114. return self.agent_config_repository.create(
  115. agent_id=payload.agent_id,
  116. role=payload.role,
  117. goal=payload.goal,
  118. system_prompt=payload.system_prompt,
  119. model_config_json=payload.model_config_data.model_dump(mode="json"),
  120. memory_policy_json=payload.memory_policy.model_dump(mode="json"),
  121. tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs],
  122. skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs])
  123. def list_agent_configs(self, *, agent_id: str) -> list[AgentConfig]:
  124. return self.agent_config_repository.list_by_agent(agent_id=agent_id)
  125. def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun:
  126. agent_config = self._resolve_agent_config(
  127. agent_id=payload.agent_id,
  128. agent_config_id=payload.agent_config_id)
  129. if agent_config is None:
  130. raise ValueError("agent config not found")
  131. agent_run = self.agent_run_repository.create(
  132. agent_id=payload.agent_id,
  133. agent_config_id=agent_config.id,
  134. session_id=payload.session_id,
  135. input_text=payload.input_text,
  136. input_json=payload.input_json)
  137. self._publish_event(
  138. event_type="agent.run.created",
  139. agent_run=agent_run,
  140. payload_json={"agent_run_id": agent_run.id, "status": agent_run.status})
  141. if self.task_queue_publisher is not None:
  142. self.task_queue_publisher.publish_agent_run(
  143. agent_run_id=agent_run.id)
  144. return agent_run
  145. def list_agent_runs(
  146. self,
  147. *,
  148. agent_id: str | None = None,
  149. session_id: str | None = None) -> list[AgentRun]:
  150. return self.agent_run_repository.list_by_scope(
  151. agent_id=agent_id,
  152. session_id=session_id)
  153. def get_agent_run(self, payload: AgentRunDetailRequest) -> AgentRun | None:
  154. return self.agent_run_repository.get_by_id(
  155. agent_run_id=payload.agent_run_id)
  156. def list_agent_tool_invocations(
  157. self,
  158. *,
  159. agent_run_id: str) -> list[AgentToolInvocation]:
  160. return self.agent_tool_invocation_repository.list_by_run(
  161. agent_run_id=agent_run_id)
  162. def update_agent_run_status(
  163. self,
  164. *,
  165. agent_run_id: str,
  166. payload: AgentRunStatusUpdateRequest) -> AgentRun | None:
  167. entity = self.agent_run_repository.get_by_id(
  168. agent_run_id=agent_run_id)
  169. if entity is None:
  170. return None
  171. return self.agent_run_repository.update_status(
  172. agent_run_id=agent_run_id,
  173. status=payload.status,
  174. worker_key=payload.worker_key,
  175. output_text=payload.output_text,
  176. output_json=payload.output_json,
  177. error_code=payload.error_code,
  178. error_message=payload.error_message)
  179. def execute_agent_run(
  180. self,
  181. *,
  182. agent_run_id: str,
  183. payload: AgentRunExecuteRequest) -> AgentRun | None:
  184. agent_run = self.agent_run_repository.get_by_id(
  185. agent_run_id=agent_run_id)
  186. if agent_run is None:
  187. return None
  188. agent_config = self.agent_config_repository.get_by_id(
  189. agent_config_id=agent_run.agent_config_id)
  190. if agent_config is None:
  191. return self.agent_run_repository.update_status(
  192. agent_run_id=agent_run.id,
  193. status="failed",
  194. worker_key=payload.worker_key,
  195. error_code="agent_config_missing",
  196. error_message=f"agent config not found: {agent_run.agent_config_id}")
  197. self.agent_run_repository.update_status(
  198. agent_run_id=agent_run.id,
  199. status="running",
  200. worker_key=payload.worker_key)
  201. memory_results, memory_metadata = self._read_relevant_memories(
  202. agent_run=agent_run,
  203. agent_config=agent_config)
  204. selected_tools = self._select_tool_refs(agent_run=agent_run, agent_config=agent_config)
  205. selected_skills = self._select_skill_refs(agent_run=agent_run, agent_config=agent_config)
  206. if payload.dry_run:
  207. messages = self._build_chat_messages(
  208. agent_run=agent_run,
  209. agent_config=agent_config,
  210. memory_results=memory_results,
  211. capability_context=self._format_capability_plan(
  212. selected_tools=selected_tools,
  213. selected_skills=selected_skills))
  214. completed_run = self.agent_run_repository.update_status(
  215. agent_run_id=agent_run.id,
  216. status="completed",
  217. worker_key=payload.worker_key,
  218. output_text=self._build_dry_run_output(
  219. agent_run=agent_run,
  220. agent_config=agent_config),
  221. output_json={
  222. "dry_run": True,
  223. "agent_config_id": agent_config.id,
  224. "message_count": len(messages),
  225. "messages": [message.model_dump(mode="json") for message in messages],
  226. "selected_tool_refs": [
  227. tool_ref.model_dump(mode="json") for tool_ref in selected_tools
  228. ],
  229. "selected_skill_refs": [
  230. skill_ref.model_dump(mode="json") for skill_ref in selected_skills
  231. ],
  232. **memory_metadata,
  233. })
  234. if completed_run is not None:
  235. self._publish_event(
  236. event_type="agent.run.completed",
  237. agent_run=completed_run,
  238. payload_json={
  239. "agent_run_id": completed_run.id,
  240. "dry_run": True,
  241. "status": completed_run.status,
  242. })
  243. return completed_run
  244. if self._read_bool(agent_config.model_config_json, "react_enabled", default=False):
  245. return self._execute_react_agent_run(
  246. agent_run=agent_run,
  247. agent_config=agent_config,
  248. payload=payload,
  249. memory_results=memory_results,
  250. memory_metadata=memory_metadata,
  251. selected_tools=selected_tools,
  252. selected_skills=selected_skills)
  253. tool_invocations = self._invoke_selected_tools(
  254. agent_run=agent_run,
  255. agent_config=agent_config,
  256. selected_tools=selected_tools)
  257. skill_invocations = self._invoke_selected_skills(
  258. agent_run=agent_run,
  259. selected_skills=selected_skills,
  260. worker_key=payload.worker_key)
  261. messages = self._build_chat_messages(
  262. agent_run=agent_run,
  263. agent_config=agent_config,
  264. memory_results=memory_results,
  265. capability_context=self._format_capability_results(
  266. tool_invocations=tool_invocations,
  267. skill_invocations=skill_invocations))
  268. if self.model_gateway_client is None:
  269. return self.agent_run_repository.update_status(
  270. agent_run_id=agent_run.id,
  271. status="failed",
  272. worker_key=payload.worker_key,
  273. error_code="model_gateway_missing",
  274. error_message="model gateway client is not configured",
  275. output_json={
  276. "tool_invocations": tool_invocations,
  277. "skill_invocations": skill_invocations,
  278. **memory_metadata,
  279. })
  280. try:
  281. response = self.model_gateway_client.create_chat_completion(
  282. ChatCompletionRequestContract(
  283. model=self._read_optional_string(agent_config.model_config_json, "model"),
  284. temperature=self._read_optional_float(
  285. agent_config.model_config_json,
  286. "temperature"),
  287. max_tokens=self._read_optional_int(
  288. agent_config.model_config_json,
  289. "max_tokens"),
  290. messages=messages,
  291. metadata_json={
  292. "agent_id": agent_run.agent_id,
  293. "agent_config_id": agent_config.id,
  294. "agent_run_id": agent_run.id,
  295. })
  296. )
  297. except ModelGatewayClientError as exc:
  298. return self.agent_run_repository.update_status(
  299. agent_run_id=agent_run.id,
  300. status="failed",
  301. worker_key=payload.worker_key,
  302. error_code="model_gateway_error",
  303. error_message=str(exc))
  304. memory_write_metadata = self._write_interaction_memory(
  305. agent_run=agent_run,
  306. agent_config=agent_config,
  307. output_text=response.content)
  308. completed_run = self.agent_run_repository.update_status(
  309. agent_run_id=agent_run.id,
  310. status="completed",
  311. worker_key=payload.worker_key,
  312. output_text=response.content,
  313. output_json={
  314. "dry_run": False,
  315. "agent_config_id": agent_config.id,
  316. "model": response.model,
  317. "finish_reason": response.finish_reason,
  318. "usage_json": response.usage_json,
  319. "raw_response_json": response.raw_response_json,
  320. "tool_invocations": tool_invocations,
  321. "skill_invocations": skill_invocations,
  322. **memory_metadata,
  323. **memory_write_metadata,
  324. })
  325. if completed_run is not None:
  326. self._publish_event(
  327. event_type="agent.run.completed",
  328. agent_run=completed_run,
  329. payload_json={
  330. "agent_run_id": completed_run.id,
  331. "dry_run": False,
  332. "status": completed_run.status,
  333. })
  334. return completed_run
  335. def execute_agent_run_stream(
  336. self,
  337. *,
  338. agent_run_id: str,
  339. payload: AgentRunExecuteRequest) -> Iterator[dict[str, JSONValue]]:
  340. agent_run = self.agent_run_repository.get_by_id(
  341. agent_run_id=agent_run_id)
  342. if agent_run is None:
  343. return
  344. agent_config = self.agent_config_repository.get_by_id(
  345. agent_config_id=agent_run.agent_config_id)
  346. if agent_config is None:
  347. failed_run = self.agent_run_repository.update_status(
  348. agent_run_id=agent_run.id,
  349. status="failed",
  350. worker_key=payload.worker_key,
  351. error_code="agent_config_missing",
  352. error_message=f"agent config not found: {agent_run.agent_config_id}")
  353. yield {"event": "agent.run.failed", "run": self._agent_run_to_json(failed_run)}
  354. return
  355. running_run = self.agent_run_repository.update_status(
  356. agent_run_id=agent_run.id,
  357. status="running",
  358. worker_key=payload.worker_key)
  359. yield {"event": "agent.run.started", "run": self._agent_run_to_json(running_run)}
  360. if payload.dry_run or self._read_bool(agent_config.model_config_json, "react_enabled", default=False):
  361. completed_run = self.execute_agent_run(agent_run_id=agent_run.id, payload=payload)
  362. yield {"event": "agent.run.completed", "run": self._agent_run_to_json(completed_run)}
  363. return
  364. memory_results, memory_metadata = self._read_relevant_memories(
  365. agent_run=agent_run,
  366. agent_config=agent_config)
  367. selected_tools = self._select_tool_refs(agent_run=agent_run, agent_config=agent_config)
  368. selected_skills = self._select_skill_refs(agent_run=agent_run, agent_config=agent_config)
  369. tool_invocations = self._invoke_selected_tools(
  370. agent_run=agent_run,
  371. agent_config=agent_config,
  372. selected_tools=selected_tools)
  373. skill_invocations = self._invoke_selected_skills(
  374. agent_run=agent_run,
  375. selected_skills=selected_skills,
  376. worker_key=payload.worker_key)
  377. messages = self._build_chat_messages(
  378. agent_run=agent_run,
  379. agent_config=agent_config,
  380. memory_results=memory_results,
  381. capability_context=self._format_capability_results(
  382. tool_invocations=tool_invocations,
  383. skill_invocations=skill_invocations))
  384. if self.model_gateway_client is None:
  385. failed_run = self.agent_run_repository.update_status(
  386. agent_run_id=agent_run.id,
  387. status="failed",
  388. worker_key=payload.worker_key,
  389. error_code="model_gateway_missing",
  390. error_message="model gateway client is not configured",
  391. output_json={
  392. "tool_invocations": tool_invocations,
  393. "skill_invocations": skill_invocations,
  394. **memory_metadata,
  395. })
  396. yield {"event": "agent.run.failed", "run": self._agent_run_to_json(failed_run)}
  397. return
  398. output_parts: list[str] = []
  399. try:
  400. for delta in self.model_gateway_client.stream_chat_completion(
  401. ChatCompletionRequestContract(
  402. model=self._read_optional_string(agent_config.model_config_json, "model"),
  403. temperature=self._read_optional_float(
  404. agent_config.model_config_json,
  405. "temperature"),
  406. max_tokens=self._read_optional_int(
  407. agent_config.model_config_json,
  408. "max_tokens"),
  409. messages=messages,
  410. metadata_json={
  411. "agent_id": agent_run.agent_id,
  412. "agent_config_id": agent_config.id,
  413. "agent_run_id": agent_run.id,
  414. })):
  415. output_parts.append(delta)
  416. yield {"event": "agent.run.delta", "agent_run_id": agent_run.id, "delta": delta}
  417. except ModelGatewayClientError as exc:
  418. failed_run = self.agent_run_repository.update_status(
  419. agent_run_id=agent_run.id,
  420. status="failed",
  421. worker_key=payload.worker_key,
  422. error_code="model_gateway_error",
  423. error_message=str(exc))
  424. yield {"event": "agent.run.failed", "run": self._agent_run_to_json(failed_run)}
  425. return
  426. output_text = "".join(output_parts)
  427. memory_write_metadata = self._write_interaction_memory(
  428. agent_run=agent_run,
  429. agent_config=agent_config,
  430. output_text=output_text)
  431. completed_run = self.agent_run_repository.update_status(
  432. agent_run_id=agent_run.id,
  433. status="completed",
  434. worker_key=payload.worker_key,
  435. output_text=output_text,
  436. output_json={
  437. "dry_run": False,
  438. "agent_config_id": agent_config.id,
  439. "streamed": True,
  440. "tool_invocations": tool_invocations,
  441. "skill_invocations": skill_invocations,
  442. **memory_metadata,
  443. **memory_write_metadata,
  444. })
  445. if completed_run is not None:
  446. self._publish_event(
  447. event_type="agent.run.completed",
  448. agent_run=completed_run,
  449. payload_json={
  450. "agent_run_id": completed_run.id,
  451. "dry_run": False,
  452. "status": completed_run.status,
  453. "streamed": True,
  454. })
  455. yield {"event": "agent.run.completed", "run": self._agent_run_to_json(completed_run)}
  456. def _publish_event(
  457. self,
  458. *,
  459. event_type: str,
  460. agent_run: AgentRun,
  461. payload_json: dict[str, JSONValue]) -> None:
  462. if self.event_client is None:
  463. return
  464. try:
  465. self.event_client.publish_event(
  466. EventPublishContract(
  467. event_type=event_type,
  468. source_service="agent-service",
  469. aggregate_type="agent_run",
  470. aggregate_id=agent_run.id,
  471. correlation_id=agent_run.session_id,
  472. payload_json={
  473. **payload_json,
  474. "agent_id": agent_run.agent_id,
  475. "agent_config_id": agent_run.agent_config_id,
  476. })
  477. )
  478. except EventServiceClientError:
  479. return
  480. def _agent_run_to_json(self, agent_run: AgentRun | None) -> dict[str, JSONValue]:
  481. if agent_run is None:
  482. return {}
  483. return {
  484. "id": agent_run.id,
  485. "agent_id": agent_run.agent_id,
  486. "agent_config_id": agent_run.agent_config_id,
  487. "session_id": agent_run.session_id,
  488. "input_text": agent_run.input_text,
  489. "input_json": agent_run.input_json,
  490. "output_text": agent_run.output_text,
  491. "output_json": agent_run.output_json,
  492. "status": agent_run.status,
  493. "worker_key": agent_run.worker_key,
  494. "queued_time": agent_run.queued_time,
  495. "lease_expire_time": agent_run.lease_expire_time,
  496. "started_time": agent_run.started_time,
  497. "finished_time": agent_run.finished_time,
  498. "error_code": agent_run.error_code,
  499. "error_message": agent_run.error_message,
  500. "created_time": agent_run.created_time,
  501. }
  502. def _execute_react_agent_run(
  503. self,
  504. *,
  505. agent_run: AgentRun,
  506. agent_config: AgentConfig,
  507. payload: AgentRunExecuteRequest,
  508. memory_results: list[MemorySearchResultContract],
  509. memory_metadata: dict[str, JSONValue],
  510. selected_tools: list[AgentToolRefContract],
  511. selected_skills: list[AgentSkillRefContract]) -> AgentRun | None:
  512. if self.model_gateway_client is None:
  513. return self.agent_run_repository.update_status(
  514. agent_run_id=agent_run.id,
  515. status="failed",
  516. worker_key=payload.worker_key,
  517. error_code="model_gateway_missing",
  518. error_message="model gateway client is not configured")
  519. skill_invocations = self._invoke_selected_skills(
  520. agent_run=agent_run,
  521. selected_skills=selected_skills,
  522. worker_key=payload.worker_key)
  523. messages = self._build_chat_messages(
  524. agent_run=agent_run,
  525. agent_config=agent_config,
  526. memory_results=memory_results,
  527. capability_context=self._format_react_instruction(
  528. agent_run=agent_run,
  529. selected_tools=selected_tools,
  530. skill_invocations=skill_invocations))
  531. react_steps: list[dict[str, JSONValue]] = []
  532. tool_invocations: list[dict[str, JSONValue]] = []
  533. final_answer: str | None = None
  534. tool_call_count = 0
  535. max_steps = self._read_int(
  536. agent_config.model_config_json,
  537. "react_max_steps",
  538. default=self.react_max_steps)
  539. for step_index in range(max(max_steps, 1)):
  540. try:
  541. response = self.model_gateway_client.create_chat_completion(
  542. self._build_chat_completion_request(
  543. agent_run=agent_run,
  544. agent_config=agent_config,
  545. messages=messages,
  546. selected_tools=selected_tools)
  547. )
  548. except ModelGatewayClientError as exc:
  549. return self.agent_run_repository.update_status(
  550. agent_run_id=agent_run.id,
  551. status="failed",
  552. worker_key=payload.worker_key,
  553. error_code="model_gateway_error",
  554. error_message=str(exc),
  555. output_json={
  556. "react_steps": react_steps,
  557. "tool_invocations": tool_invocations,
  558. "skill_invocations": skill_invocations,
  559. **memory_metadata,
  560. })
  561. action = self._parse_react_action_from_response(response)
  562. react_step: dict[str, JSONValue] = {
  563. "step_index": step_index,
  564. "model_content": response.content,
  565. "action": action,
  566. }
  567. react_steps.append(react_step)
  568. if action.get("action") == "finish":
  569. answer_value = action.get("answer")
  570. final_answer = answer_value if isinstance(answer_value, str) else response.content
  571. break
  572. if action.get("action") != "tool":
  573. final_answer = response.content
  574. break
  575. max_tool_calls = self._read_int(
  576. agent_config.model_config_json,
  577. "react_max_tool_calls",
  578. default=self.react_max_tool_calls)
  579. if tool_call_count >= max(max_tool_calls, 0):
  580. final_answer = "Tool call budget exhausted."
  581. react_step["observation"] = final_answer
  582. break
  583. tool_code = action.get("tool_code")
  584. matching_tools = [
  585. item for item in selected_tools if item.tool_code == tool_code
  586. ]
  587. if not matching_tools:
  588. observation = f"tool not available: {tool_code}"
  589. react_step["observation"] = observation
  590. messages.append(ChatMessageContract(role="assistant", content=response.content))
  591. messages.append(ChatMessageContract(role="user", content=observation))
  592. continue
  593. tool_input = action.get("input_json")
  594. original_input_json = agent_run.input_json
  595. if isinstance(tool_input, dict):
  596. agent_run.input_json = {
  597. str(item_key): item_value for item_key, item_value in tool_input.items()
  598. }
  599. current_invocations = self._invoke_react_tool_with_retry(
  600. agent_run=agent_run,
  601. agent_config=agent_config,
  602. tool_ref=matching_tools[0])
  603. tool_call_count += len(current_invocations)
  604. agent_run.input_json = original_input_json
  605. tool_invocations.extend(current_invocations)
  606. observation = self._format_react_observation(current_invocations)
  607. react_step["observation"] = observation
  608. messages.append(ChatMessageContract(role="assistant", content=response.content))
  609. messages.append(ChatMessageContract(role="user", content=observation))
  610. if final_answer is None:
  611. final_answer = "ReAct loop reached max steps without a final answer."
  612. memory_write_metadata = self._write_interaction_memory(
  613. agent_run=agent_run,
  614. agent_config=agent_config,
  615. output_text=final_answer)
  616. completed_run = self.agent_run_repository.update_status(
  617. agent_run_id=agent_run.id,
  618. status="completed",
  619. worker_key=payload.worker_key,
  620. output_text=final_answer,
  621. output_json={
  622. "dry_run": False,
  623. "agent_config_id": agent_config.id,
  624. "react_enabled": True,
  625. "react_steps": react_steps,
  626. "react_tool_call_count": tool_call_count,
  627. "tool_invocations": tool_invocations,
  628. "skill_invocations": skill_invocations,
  629. **memory_metadata,
  630. **memory_write_metadata,
  631. })
  632. if completed_run is not None:
  633. self._publish_event(
  634. event_type="agent.run.completed",
  635. agent_run=completed_run,
  636. payload_json={
  637. "agent_run_id": completed_run.id,
  638. "react_enabled": True,
  639. "status": completed_run.status,
  640. })
  641. return completed_run
  642. def execute_next_claimed_agent_run(
  643. self,
  644. *,
  645. worker_key: str,
  646. lease_seconds: int,
  647. dry_run: bool,
  648. redis_client: object | None = None) -> tuple[AgentRun, int] | None:
  649. released_lease_count = self.agent_run_repository.release_expired_leases(
  650. now_time=datetime.utcnow())
  651. claimed_agent_run = self.agent_run_repository.claim_next_queued(
  652. worker_key=worker_key,
  653. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds))
  654. if claimed_agent_run is None:
  655. return None
  656. if redis_client is not None:
  657. from core_shared.redis_primitives import DistributedLock, IdempotencyStore
  658. lock = DistributedLock(
  659. client=redis_client,
  660. name=f"agent-run:{claimed_agent_run.id}:lock",
  661. ttl_seconds=lease_seconds)
  662. if not lock.acquire():
  663. return None
  664. idempotency_store = IdempotencyStore(
  665. client=redis_client,
  666. prefix="agent-run-idempotency")
  667. if not idempotency_store.begin(key=claimed_agent_run.id):
  668. lock.release()
  669. return None
  670. else:
  671. lock = None
  672. idempotency_store = None
  673. try:
  674. result = self.execute_agent_run(
  675. agent_run_id=claimed_agent_run.id,
  676. payload=AgentRunExecuteRequest(
  677. worker_key=worker_key,
  678. dry_run=dry_run))
  679. if idempotency_store is not None and result is not None:
  680. idempotency_store.complete(
  681. key=claimed_agent_run.id,
  682. result={"status": result.status, "agent_run_id": result.id})
  683. finally:
  684. if lock is not None:
  685. lock.release()
  686. if result is None:
  687. return None
  688. return result, released_lease_count
  689. def _resolve_agent_config(
  690. self,
  691. *,
  692. agent_id: str,
  693. agent_config_id: str | None) -> AgentConfig | None:
  694. if agent_config_id is not None:
  695. return self.agent_config_repository.get_by_id(
  696. agent_config_id=agent_config_id)
  697. return self.agent_config_repository.get_latest_by_agent(
  698. agent_id=agent_id)
  699. def _build_chat_messages(
  700. self,
  701. *,
  702. agent_run: AgentRun,
  703. agent_config: AgentConfig,
  704. memory_results: list[MemorySearchResultContract] | None = None,
  705. capability_context: str | None = None) -> list[ChatMessageContract]:
  706. messages = [
  707. ChatMessageContract(role="system", content=agent_config.system_prompt),
  708. ]
  709. if agent_config.goal:
  710. messages.append(
  711. ChatMessageContract(role="system", content=f"Goal: {agent_config.goal}")
  712. )
  713. if memory_results:
  714. messages.append(
  715. ChatMessageContract(
  716. role="system",
  717. content=self._format_memory_context(memory_results))
  718. )
  719. if capability_context:
  720. messages.append(ChatMessageContract(role="system", content=capability_context))
  721. if agent_run.input_text:
  722. messages.append(ChatMessageContract(role="user", content=agent_run.input_text))
  723. if agent_run.input_json:
  724. messages.append(
  725. ChatMessageContract(
  726. role="user",
  727. content=f"Structured input: {agent_run.input_json}")
  728. )
  729. return messages
  730. def _select_tool_refs(
  731. self,
  732. *,
  733. agent_run: AgentRun,
  734. agent_config: AgentConfig) -> list[AgentToolRefContract]:
  735. input_preview = self._build_input_preview(agent_run)
  736. selected: list[AgentToolRefContract] = []
  737. for item in agent_config.tool_refs_json:
  738. ref = AgentToolRefContract.model_validate(item)
  739. if (
  740. ref.required
  741. or self._read_bool(ref.config_json, "auto_invoke", default=False)
  742. or self._matches_selection_keywords(ref.config_json, input_preview)
  743. ):
  744. selected.append(ref)
  745. return selected
  746. def _select_skill_refs(
  747. self,
  748. *,
  749. agent_run: AgentRun,
  750. agent_config: AgentConfig) -> list[AgentSkillRefContract]:
  751. input_preview = self._build_input_preview(agent_run)
  752. selected: list[AgentSkillRefContract] = []
  753. for item in agent_config.skill_refs_json:
  754. ref = AgentSkillRefContract.model_validate(item)
  755. auto_invoke = self._read_bool(ref.config_json, "auto_invoke", default=True)
  756. if auto_invoke or self._matches_selection_keywords(ref.config_json, input_preview):
  757. selected.append(ref)
  758. return selected
  759. def _invoke_selected_tools(
  760. self,
  761. *,
  762. agent_run: AgentRun,
  763. agent_config: AgentConfig,
  764. selected_tools: list[AgentToolRefContract]) -> list[dict[str, JSONValue]]:
  765. invocations: list[dict[str, JSONValue]] = []
  766. for ref in selected_tools:
  767. invocation = self.agent_tool_invocation_repository.create(
  768. agent_run_id=agent_run.id,
  769. agent_id=agent_run.agent_id,
  770. agent_config_id=agent_config.id,
  771. tool_code=ref.tool_code,
  772. tool_binding_id=ref.tool_binding_id,
  773. status="selected",
  774. input_json=agent_run.input_json or {})
  775. if ref.tool_binding_id is None:
  776. self.agent_tool_invocation_repository.update_status(
  777. invocation_id=invocation.id,
  778. status="skipped",
  779. reason="tool_binding_id_missing")
  780. invocations.append(
  781. {
  782. "status": "skipped",
  783. "reason": "tool_binding_id_missing",
  784. "tool_code": ref.tool_code,
  785. }
  786. )
  787. continue
  788. if self.tool_client is None:
  789. self.agent_tool_invocation_repository.update_status(
  790. invocation_id=invocation.id,
  791. status="failed",
  792. reason="tool_client_missing",
  793. error_message="tool client is not configured")
  794. invocations.append(
  795. {
  796. "status": "failed",
  797. "reason": "tool_client_missing",
  798. "tool_binding_id": ref.tool_binding_id,
  799. }
  800. )
  801. continue
  802. try:
  803. self.agent_tool_invocation_repository.update_status(
  804. invocation_id=invocation.id,
  805. status="running")
  806. detail = self.tool_client.get_tool_binding_detail(
  807. binding_id=ref.tool_binding_id)
  808. if not detail.binding.enabled:
  809. self.agent_tool_invocation_repository.update_status(
  810. invocation_id=invocation.id,
  811. status="failed",
  812. reason="tool_binding_disabled",
  813. error_message="tool binding is disabled")
  814. invocations.append(
  815. {
  816. "status": "failed",
  817. "reason": "tool_binding_disabled",
  818. "tool_binding_id": ref.tool_binding_id,
  819. }
  820. )
  821. continue
  822. if detail.tool_definition.tool_type != "http":
  823. self.agent_tool_invocation_repository.update_status(
  824. invocation_id=invocation.id,
  825. status="skipped",
  826. reason="unsupported_tool_type",
  827. error_message=detail.tool_definition.tool_type)
  828. invocations.append(
  829. {
  830. "status": "skipped",
  831. "reason": "unsupported_tool_type",
  832. "tool_type": detail.tool_definition.tool_type,
  833. "tool_binding_id": ref.tool_binding_id,
  834. }
  835. )
  836. continue
  837. output_text, output_json = self.tool_client.invoke_http_tool(
  838. detail=detail,
  839. input_json=agent_run.input_json or {},
  840. config_json=ref.config_json)
  841. except ToolServiceClientError as exc:
  842. self.agent_tool_invocation_repository.update_status(
  843. invocation_id=invocation.id,
  844. status="failed",
  845. reason="tool_service_error",
  846. error_message=str(exc))
  847. invocations.append(
  848. {
  849. "status": "failed",
  850. "reason": str(exc),
  851. "tool_binding_id": ref.tool_binding_id,
  852. }
  853. )
  854. continue
  855. self.agent_tool_invocation_repository.update_status(
  856. invocation_id=invocation.id,
  857. status="completed",
  858. output_text=output_text,
  859. output_json=output_json)
  860. invocations.append(
  861. {
  862. "status": "completed",
  863. "tool_binding_id": ref.tool_binding_id,
  864. "tool_code": detail.tool_definition.code,
  865. "output_text": output_text,
  866. "output_json": output_json,
  867. }
  868. )
  869. return invocations
  870. def _invoke_selected_skills(
  871. self,
  872. *,
  873. agent_run: AgentRun,
  874. selected_skills: list[AgentSkillRefContract],
  875. worker_key: str | None) -> list[dict[str, JSONValue]]:
  876. invocations: list[dict[str, JSONValue]] = []
  877. for ref in selected_skills:
  878. if self.skill_client is None:
  879. invocations.append(
  880. {
  881. "status": "failed",
  882. "reason": "skill_client_missing",
  883. "skill_id": ref.skill_id,
  884. "skill_code": ref.skill_code,
  885. }
  886. )
  887. continue
  888. skill_id = ref.skill_id or self._resolve_skill_id_by_code(
  889. skill_code=ref.skill_code)
  890. if skill_id is None:
  891. invocations.append(
  892. {
  893. "status": "failed",
  894. "reason": "skill_id_missing",
  895. "skill_code": ref.skill_code,
  896. }
  897. )
  898. continue
  899. try:
  900. created_run = self.skill_client.create_skill_run(
  901. skill_id=skill_id,
  902. installation_id=self._read_optional_string(
  903. ref.config_json,
  904. "installation_id"),
  905. input_json=self._build_skill_input_json(agent_run=agent_run, ref=ref))
  906. executed_run = self.skill_client.execute_skill_run(
  907. skill_run_id=created_run.id,
  908. worker_key=worker_key)
  909. except SkillServiceClientError as exc:
  910. invocations.append(
  911. {
  912. "status": "failed",
  913. "reason": str(exc),
  914. "skill_id": skill_id,
  915. "skill_code": ref.skill_code,
  916. }
  917. )
  918. continue
  919. invocations.append(
  920. {
  921. "status": executed_run.status,
  922. "skill_id": skill_id,
  923. "skill_code": ref.skill_code,
  924. "skill_run_id": executed_run.id,
  925. "output_text": executed_run.output_text,
  926. "output_json": executed_run.output_json,
  927. "error_code": executed_run.error_code,
  928. "error_message": executed_run.error_message,
  929. }
  930. )
  931. return invocations
  932. def _format_capability_plan(
  933. self,
  934. *,
  935. selected_tools: list[AgentToolRefContract],
  936. selected_skills: list[AgentSkillRefContract]) -> str:
  937. return (
  938. "Selected capability plan before model call:\n"
  939. f"Tools: {[item.model_dump(mode='json') for item in selected_tools]}\n"
  940. f"Skills: {[item.model_dump(mode='json') for item in selected_skills]}"
  941. )
  942. def _format_capability_results(
  943. self,
  944. *,
  945. tool_invocations: list[dict[str, JSONValue]],
  946. skill_invocations: list[dict[str, JSONValue]]) -> str | None:
  947. if not tool_invocations and not skill_invocations:
  948. return None
  949. return (
  950. "Capability invocation results before model call:\n"
  951. f"Tools: {tool_invocations}\n"
  952. f"Skills: {skill_invocations}"
  953. )
  954. def _format_react_instruction(
  955. self,
  956. *,
  957. agent_run: AgentRun,
  958. selected_tools: list[AgentToolRefContract],
  959. skill_invocations: list[dict[str, JSONValue]]) -> str:
  960. tool_schemas = self._build_react_tool_schemas(
  961. agent_run=agent_run,
  962. selected_tools=selected_tools)
  963. return (
  964. "Use ReAct JSON only. Respond with one JSON object per turn.\n"
  965. "To call a tool: "
  966. '{"action":"tool","tool_code":"code","input_json":{...}}\n'
  967. "To finish: "
  968. '{"action":"finish","answer":"final answer"}\n'
  969. f"Available tools: {tool_schemas}\n"
  970. f"Pre-run skill results: {skill_invocations}"
  971. )
  972. def _build_react_tool_schemas(
  973. self,
  974. *,
  975. agent_run: AgentRun,
  976. selected_tools: list[AgentToolRefContract]) -> list[dict[str, JSONValue]]:
  977. schemas: list[dict[str, JSONValue]] = []
  978. for ref in selected_tools:
  979. schema: dict[str, JSONValue] = {
  980. "tool_code": ref.tool_code,
  981. "tool_binding_id": ref.tool_binding_id,
  982. "required": ref.required,
  983. "config_json": ref.config_json,
  984. }
  985. if ref.tool_binding_id is not None and self.tool_client is not None:
  986. try:
  987. detail = self.tool_client.get_tool_binding_detail(
  988. binding_id=ref.tool_binding_id)
  989. schema.update(
  990. {
  991. "name": detail.tool_definition.name,
  992. "description": detail.tool_definition.description,
  993. "tool_type": detail.tool_definition.tool_type,
  994. "input_schema_json": detail.connection.input_schema_json or {},
  995. "output_schema_json": detail.connection.output_schema_json or {},
  996. "timeout_ms": detail.connection.timeout_ms,
  997. }
  998. )
  999. except ToolServiceClientError as exc:
  1000. schema["schema_error"] = str(exc)
  1001. schemas.append(schema)
  1002. return schemas
  1003. def _invoke_react_tool_with_retry(
  1004. self,
  1005. *,
  1006. agent_run: AgentRun,
  1007. agent_config: AgentConfig,
  1008. tool_ref: AgentToolRefContract) -> list[dict[str, JSONValue]]:
  1009. retry_count = self._read_int(
  1010. agent_config.model_config_json,
  1011. "react_tool_retry_count",
  1012. default=self.react_tool_retry_count)
  1013. attempts: list[dict[str, JSONValue]] = []
  1014. for attempt_index in range(max(retry_count, 0) + 1):
  1015. current = self._invoke_selected_tools(
  1016. agent_run=agent_run,
  1017. agent_config=agent_config,
  1018. selected_tools=[tool_ref])
  1019. for item in current:
  1020. item["attempt_index"] = attempt_index
  1021. attempts.extend(current)
  1022. if current and current[-1].get("status") == "completed":
  1023. break
  1024. return attempts
  1025. def _format_react_observation(
  1026. self,
  1027. tool_invocations: list[dict[str, JSONValue]]) -> str:
  1028. return f"Observation: {tool_invocations}"
  1029. def _parse_react_action(self, content: str) -> dict[str, JSONValue]:
  1030. try:
  1031. value = json.loads(content)
  1032. except json.JSONDecodeError:
  1033. start_index = content.find("{")
  1034. end_index = content.rfind("}")
  1035. if start_index < 0 or end_index <= start_index:
  1036. return {"action": "finish", "answer": content}
  1037. try:
  1038. value = json.loads(content[start_index : end_index + 1])
  1039. except json.JSONDecodeError:
  1040. return {"action": "finish", "answer": content}
  1041. if not isinstance(value, dict):
  1042. return {"action": "finish", "answer": content}
  1043. return {str(item_key): item_value for item_key, item_value in value.items()}
  1044. def _parse_react_action_from_response(
  1045. self,
  1046. response: ChatCompletionResponseContract) -> dict[str, JSONValue]:
  1047. if response.tool_calls_json:
  1048. action = self._parse_openai_tool_call(response.tool_calls_json[0])
  1049. if action is not None:
  1050. return action
  1051. return self._parse_react_action(response.content)
  1052. def _parse_openai_tool_call(
  1053. self,
  1054. tool_call: dict[str, JSONValue]) -> dict[str, JSONValue] | None:
  1055. function_value = tool_call.get("function")
  1056. if not isinstance(function_value, dict):
  1057. return None
  1058. tool_code = function_value.get("name")
  1059. if not isinstance(tool_code, str) or not tool_code:
  1060. return None
  1061. raw_arguments = function_value.get("arguments")
  1062. input_json: dict[str, JSONValue] = {}
  1063. if isinstance(raw_arguments, str) and raw_arguments:
  1064. try:
  1065. decoded = json.loads(raw_arguments)
  1066. except json.JSONDecodeError:
  1067. decoded = {"raw_arguments": raw_arguments}
  1068. if isinstance(decoded, dict):
  1069. input_json = {str(item_key): item_value for item_key, item_value in decoded.items()}
  1070. elif isinstance(raw_arguments, dict):
  1071. input_json = {str(item_key): item_value for item_key, item_value in raw_arguments.items()}
  1072. tool_call_id = tool_call.get("id")
  1073. return {
  1074. "action": "tool",
  1075. "tool_code": tool_code,
  1076. "input_json": input_json,
  1077. "tool_call_id": tool_call_id if isinstance(tool_call_id, str) else None,
  1078. "tool_call_protocol": "openai",
  1079. }
  1080. def _build_chat_completion_request(
  1081. self,
  1082. *,
  1083. agent_run: AgentRun,
  1084. agent_config: AgentConfig,
  1085. messages: list[ChatMessageContract],
  1086. selected_tools: list[AgentToolRefContract] | None = None) -> ChatCompletionRequestContract:
  1087. function_calling_enabled = self._read_bool(
  1088. agent_config.model_config_json,
  1089. "function_calling_enabled",
  1090. default=False) or self._read_bool(
  1091. agent_config.model_config_json,
  1092. "tool_calling_enabled",
  1093. default=False)
  1094. return ChatCompletionRequestContract(
  1095. model=self._read_optional_string(agent_config.model_config_json, "model"),
  1096. temperature=self._read_optional_float(
  1097. agent_config.model_config_json,
  1098. "temperature"),
  1099. max_tokens=self._read_optional_int(
  1100. agent_config.model_config_json,
  1101. "max_tokens"),
  1102. messages=messages,
  1103. tools_json=(
  1104. self._build_openai_tool_schemas(
  1105. agent_run=agent_run,
  1106. selected_tools=selected_tools or [])
  1107. if function_calling_enabled
  1108. else []
  1109. ),
  1110. tool_choice="auto" if function_calling_enabled and selected_tools else None,
  1111. metadata_json={
  1112. "agent_id": agent_run.agent_id,
  1113. "agent_config_id": agent_config.id,
  1114. "agent_run_id": agent_run.id,
  1115. })
  1116. def _build_openai_tool_schemas(
  1117. self,
  1118. *,
  1119. agent_run: AgentRun,
  1120. selected_tools: list[AgentToolRefContract]) -> list[dict[str, JSONValue]]:
  1121. tool_schemas: list[dict[str, JSONValue]] = []
  1122. for schema in self._build_react_tool_schemas(
  1123. agent_run=agent_run,
  1124. selected_tools=selected_tools):
  1125. tool_code = schema.get("tool_code")
  1126. if not isinstance(tool_code, str) or not tool_code:
  1127. continue
  1128. description = schema.get("description")
  1129. input_schema = schema.get("input_schema_json")
  1130. tool_schemas.append(
  1131. {
  1132. "type": "function",
  1133. "function": {
  1134. "name": tool_code,
  1135. "description": description if isinstance(description, str) else "",
  1136. "parameters": input_schema if isinstance(input_schema, dict) else {},
  1137. },
  1138. }
  1139. )
  1140. return tool_schemas
  1141. def _build_skill_input_json(
  1142. self,
  1143. *,
  1144. agent_run: AgentRun,
  1145. ref: AgentSkillRefContract) -> dict[str, JSONValue]:
  1146. input_json: dict[str, JSONValue] = dict(agent_run.input_json or {})
  1147. if agent_run.input_text:
  1148. input_json.setdefault("input_text", agent_run.input_text)
  1149. configured_input = ref.config_json.get("input_json")
  1150. if isinstance(configured_input, dict):
  1151. input_json.update(
  1152. {str(item_key): item_value for item_key, item_value in configured_input.items()}
  1153. )
  1154. return input_json
  1155. def _resolve_skill_id_by_code(self, *, skill_code: str | None) -> str | None:
  1156. if skill_code is None or self.skill_client is None:
  1157. return None
  1158. try:
  1159. skills = self.skill_client.list_skills()
  1160. except SkillServiceClientError:
  1161. return None
  1162. for skill in skills:
  1163. if skill.code == skill_code:
  1164. return skill.id
  1165. return None
  1166. def _build_input_preview(self, agent_run: AgentRun) -> str:
  1167. return f"{agent_run.input_text or ''} {agent_run.input_json or {}}".lower()
  1168. def _matches_selection_keywords(
  1169. self,
  1170. config_json: dict[str, JSONValue],
  1171. input_preview: str) -> bool:
  1172. keywords = config_json.get("selection_keywords")
  1173. if not isinstance(keywords, list):
  1174. return False
  1175. return any(
  1176. isinstance(keyword, str) and keyword.lower() in input_preview
  1177. for keyword in keywords
  1178. )
  1179. def _build_dry_run_output(self, *, agent_run: AgentRun, agent_config: AgentConfig) -> str:
  1180. input_preview = agent_run.input_text or str(agent_run.input_json or {})
  1181. return (
  1182. f"[dry-run] Agent role={agent_config.role} "
  1183. f"received: {input_preview}"
  1184. )
  1185. def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
  1186. value = payload.get(key)
  1187. if isinstance(value, str) and value:
  1188. return value
  1189. return None
  1190. def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None:
  1191. value = payload.get(key)
  1192. if isinstance(value, (int, float)) and not isinstance(value, bool):
  1193. return float(value)
  1194. return None
  1195. def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None:
  1196. value = payload.get(key)
  1197. if isinstance(value, int) and not isinstance(value, bool):
  1198. return value
  1199. return None
  1200. def _read_relevant_memories(
  1201. self,
  1202. *,
  1203. agent_run: AgentRun,
  1204. agent_config: AgentConfig) -> tuple[list[MemorySearchResultContract], dict[str, JSONValue]]:
  1205. if self.memory_client is None:
  1206. return [], {"memory_read_enabled": False, "memory_read_reason": "client_missing"}
  1207. if not self._read_bool(agent_config.memory_policy_json, "enabled", default=True):
  1208. return [], {"memory_read_enabled": False, "memory_read_reason": "policy_disabled"}
  1209. query = agent_run.input_text or str(agent_run.input_json or "")
  1210. if not query:
  1211. return [], {"memory_read_enabled": True, "memory_read_count": 0}
  1212. scope = self._resolve_memory_scope(agent_run=agent_run, agent_config=agent_config)
  1213. if scope is None:
  1214. return [], {
  1215. "memory_read_enabled": True,
  1216. "memory_read_count": 0,
  1217. "memory_read_reason": "scope_unavailable",
  1218. }
  1219. scope_type, scope_id = scope
  1220. try:
  1221. results = self.memory_client.search_memories(
  1222. MemorySearchRequestContract(
  1223. query=query,
  1224. scope_type=scope_type,
  1225. scope_id=scope_id,
  1226. owner_agent_id=agent_run.agent_id,
  1227. session_id=agent_run.session_id,
  1228. limit=self._read_int(
  1229. agent_config.memory_policy_json,
  1230. "read_top_k",
  1231. default=8))
  1232. )
  1233. except MemoryClientError as exc:
  1234. return [], {
  1235. "memory_read_enabled": True,
  1236. "memory_read_count": 0,
  1237. "memory_read_error": str(exc),
  1238. }
  1239. return results, {
  1240. "memory_read_enabled": True,
  1241. "memory_read_count": len(results),
  1242. "memory_scope_type": scope_type,
  1243. "memory_scope_id": scope_id,
  1244. }
  1245. def _write_interaction_memory(
  1246. self,
  1247. *,
  1248. agent_run: AgentRun,
  1249. agent_config: AgentConfig,
  1250. output_text: str) -> dict[str, JSONValue]:
  1251. if self.memory_client is None:
  1252. return {"memory_write_enabled": False, "memory_write_reason": "client_missing"}
  1253. if not self._read_bool(agent_config.memory_policy_json, "write_enabled", default=True):
  1254. return {"memory_write_enabled": False, "memory_write_reason": "policy_disabled"}
  1255. scope = self._resolve_memory_scope(agent_run=agent_run, agent_config=agent_config)
  1256. if scope is None:
  1257. return {"memory_write_enabled": True, "memory_write_reason": "scope_unavailable"}
  1258. scope_type, scope_id = scope
  1259. try:
  1260. memory = self.memory_client.create_memory(
  1261. MemoryCreateContract(
  1262. scope_type=scope_type,
  1263. scope_id=scope_id,
  1264. memory_type="conversation",
  1265. content_text=self._format_interaction_memory(
  1266. agent_run=agent_run,
  1267. output_text=output_text),
  1268. content_json={
  1269. "agent_run_id": agent_run.id,
  1270. "agent_config_id": agent_config.id,
  1271. "input_text": agent_run.input_text,
  1272. "output_text": output_text,
  1273. },
  1274. metadata_json={
  1275. "source": "agent-service",
  1276. "role": agent_config.role,
  1277. },
  1278. owner_agent_id=agent_run.agent_id,
  1279. session_id=agent_run.session_id,
  1280. source_ref=f"agent_run:{agent_run.id}",
  1281. importance_score=self._read_nested_int(
  1282. agent_config.memory_policy_json,
  1283. "config_json",
  1284. "write_importance_score",
  1285. default=50))
  1286. )
  1287. except MemoryClientError as exc:
  1288. return {
  1289. "memory_write_enabled": True,
  1290. "memory_write_error": str(exc),
  1291. }
  1292. return {
  1293. "memory_write_enabled": True,
  1294. "memory_written_id": memory.id,
  1295. "memory_scope_type": scope_type,
  1296. "memory_scope_id": scope_id,
  1297. }
  1298. def _resolve_memory_scope(
  1299. self,
  1300. *,
  1301. agent_run: AgentRun,
  1302. agent_config: AgentConfig) -> tuple[MemoryScopeType, str] | None:
  1303. scope_value = self._read_optional_string(
  1304. agent_config.memory_policy_json,
  1305. "memory_scope") or "session"
  1306. if scope_value == "global":
  1307. return "global", "global"
  1308. if scope_value == "agent":
  1309. return "agent", agent_run.agent_id
  1310. if scope_value == "session" and agent_run.session_id:
  1311. return "session", agent_run.session_id
  1312. if scope_value == "user":
  1313. user_id = self._read_input_json_string(agent_run=agent_run, key="user_id")
  1314. if user_id is not None:
  1315. return "user", user_id
  1316. if scope_value == "team":
  1317. team_id = self._read_input_json_string(agent_run=agent_run, key="team_id")
  1318. if team_id is not None:
  1319. return "team", team_id
  1320. return None
  1321. def _format_memory_context(self, memory_results: list[MemorySearchResultContract]) -> str:
  1322. lines = ["Relevant memories:"]
  1323. for index, result in enumerate(memory_results, start=1):
  1324. lines.append(f"{index}. {result.item.content_text}")
  1325. return "\n".join(lines)
  1326. def _format_interaction_memory(self, *, agent_run: AgentRun, output_text: str) -> str:
  1327. input_text = agent_run.input_text or str(agent_run.input_json or {})
  1328. return f"User input: {input_text}\nAgent output: {output_text}"
  1329. def _read_bool(self, payload: dict[str, JSONValue], key: str, *, default: bool) -> bool:
  1330. value = payload.get(key)
  1331. if isinstance(value, bool):
  1332. return value
  1333. return default
  1334. def _read_int(self, payload: dict[str, JSONValue], key: str, *, default: int) -> int:
  1335. value = payload.get(key)
  1336. if isinstance(value, int) and not isinstance(value, bool):
  1337. return value
  1338. return default
  1339. def _read_nested_int(
  1340. self,
  1341. payload: dict[str, JSONValue],
  1342. parent_key: str,
  1343. child_key: str,
  1344. *,
  1345. default: int) -> int:
  1346. parent_value = payload.get(parent_key)
  1347. if not isinstance(parent_value, dict):
  1348. return default
  1349. return self._read_int(
  1350. cast(dict[str, JSONValue], parent_value),
  1351. child_key,
  1352. default=default)
  1353. def _read_input_json_string(self, *, agent_run: AgentRun, key: str) -> str | None:
  1354. if agent_run.input_json is None:
  1355. return None
  1356. value = agent_run.input_json.get(key)
  1357. if isinstance(value, str) and value:
  1358. return value
  1359. return None
  1360. def build_agent_application_service(
  1361. *,
  1362. db: Session,
  1363. settings: AgentServiceSettings) -> AgentApplicationService:
  1364. redis_client = try_build_redis_client(settings.redis_url)
  1365. return AgentApplicationService(
  1366. agent_repository=AgentDefinitionRepository(db),
  1367. agent_config_repository=AgentConfigRepository(db),
  1368. agent_run_repository=AgentRunRepository(db),
  1369. agent_tool_invocation_repository=AgentToolInvocationRepository(db),
  1370. model_gateway_client=ModelGatewayClient(
  1371. base_url=settings.model_gateway_service_url,
  1372. timeout_seconds=settings.model_gateway_timeout_seconds),
  1373. memory_client=MemoryClient(
  1374. base_url=settings.memory_service_url,
  1375. timeout_seconds=settings.memory_service_timeout_seconds),
  1376. tool_client=ToolServiceClient(
  1377. base_url=settings.tool_service_url,
  1378. timeout_seconds=settings.tool_service_timeout_seconds),
  1379. skill_client=SkillServiceClient(
  1380. base_url=settings.skill_service_url,
  1381. timeout_seconds=settings.skill_service_timeout_seconds),
  1382. event_client=EventServiceClient(
  1383. base_url=settings.event_service_url,
  1384. timeout_seconds=settings.event_service_timeout_seconds),
  1385. task_queue_publisher=(
  1386. TaskQueuePublisher(client=redis_client) if redis_client is not None else None
  1387. ),
  1388. react_max_steps=settings.react_max_steps)