services.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. from datetime import datetime, timedelta
  2. from sqlalchemy.orm import Session
  3. from core_domain import ChatCompletionRequestContract, ChatMessageContract
  4. from core_shared import JSONValue
  5. from app.bootstrap.settings import AgentServiceSettings
  6. from app.db.models import AgentDefinition, AgentRun, AgentVersion
  7. from app.domain.repositories import (
  8. AgentDefinitionRepository,
  9. AgentRunRepository,
  10. AgentVersionRepository,
  11. )
  12. from app.infrastructure.model_gateway_client import ModelGatewayClient, ModelGatewayClientError
  13. from app.schemas.agent import (
  14. AgentCreateRequest,
  15. AgentRunCreateRequest,
  16. AgentRunExecuteRequest,
  17. AgentRunStatusUpdateRequest,
  18. AgentStatusUpdateRequest,
  19. AgentVersionCreateRequest,
  20. )
  21. class AgentApplicationService:
  22. def __init__(
  23. self,
  24. *,
  25. agent_repository: AgentDefinitionRepository,
  26. agent_version_repository: AgentVersionRepository,
  27. agent_run_repository: AgentRunRepository,
  28. model_gateway_client: ModelGatewayClient | None = None,
  29. ) -> None:
  30. self.agent_repository = agent_repository
  31. self.agent_version_repository = agent_version_repository
  32. self.agent_run_repository = agent_run_repository
  33. self.model_gateway_client = model_gateway_client
  34. def create_agent(self, payload: AgentCreateRequest) -> AgentDefinition:
  35. return self.agent_repository.create(
  36. tenant_id=payload.tenant_id,
  37. code=payload.code,
  38. name=payload.name,
  39. description=payload.description,
  40. agent_type=payload.agent_type,
  41. owner_user_id=payload.owner_user_id,
  42. metadata_json=payload.metadata_json,
  43. )
  44. def list_agents(self, *, tenant_id: str) -> list[AgentDefinition]:
  45. return self.agent_repository.list_by_tenant(tenant_id=tenant_id)
  46. def update_agent_status(
  47. self,
  48. *,
  49. agent_id: str,
  50. payload: AgentStatusUpdateRequest,
  51. ) -> AgentDefinition | None:
  52. return self.agent_repository.update_status(
  53. tenant_id=payload.tenant_id,
  54. agent_id=agent_id,
  55. status=payload.status,
  56. )
  57. def create_agent_version(self, payload: AgentVersionCreateRequest) -> AgentVersion:
  58. agent = self.agent_repository.get_by_id(
  59. tenant_id=payload.tenant_id,
  60. agent_id=payload.agent_id,
  61. )
  62. if agent is None:
  63. raise ValueError(f"agent not found: {payload.agent_id}")
  64. return self.agent_version_repository.create(
  65. tenant_id=payload.tenant_id,
  66. agent_id=payload.agent_id,
  67. status=payload.status,
  68. role=payload.role,
  69. goal=payload.goal,
  70. system_prompt=payload.system_prompt,
  71. model_config_json=payload.model_config_data.model_dump(mode="json"),
  72. memory_policy_json=payload.memory_policy.model_dump(mode="json"),
  73. tool_refs_json=[item.model_dump(mode="json") for item in payload.tool_refs],
  74. skill_refs_json=[item.model_dump(mode="json") for item in payload.skill_refs],
  75. )
  76. def list_agent_versions(self, *, tenant_id: str, agent_id: str) -> list[AgentVersion]:
  77. return self.agent_version_repository.list_by_agent(tenant_id=tenant_id, agent_id=agent_id)
  78. def create_agent_run(self, payload: AgentRunCreateRequest) -> AgentRun:
  79. agent_version = self._resolve_agent_version(
  80. tenant_id=payload.tenant_id,
  81. agent_id=payload.agent_id,
  82. agent_version_id=payload.agent_version_id,
  83. )
  84. if agent_version is None:
  85. raise ValueError("published agent version not found")
  86. return self.agent_run_repository.create(
  87. tenant_id=payload.tenant_id,
  88. agent_id=payload.agent_id,
  89. agent_version_id=agent_version.id,
  90. session_id=payload.session_id,
  91. input_text=payload.input_text,
  92. input_json=payload.input_json,
  93. )
  94. def list_agent_runs(
  95. self,
  96. *,
  97. tenant_id: str,
  98. agent_id: str | None = None,
  99. session_id: str | None = None,
  100. ) -> list[AgentRun]:
  101. return self.agent_run_repository.list_by_scope(
  102. tenant_id=tenant_id,
  103. agent_id=agent_id,
  104. session_id=session_id,
  105. )
  106. def update_agent_run_status(
  107. self,
  108. *,
  109. agent_run_id: str,
  110. payload: AgentRunStatusUpdateRequest,
  111. ) -> AgentRun | None:
  112. entity = self.agent_run_repository.get_by_id(
  113. tenant_id=payload.tenant_id,
  114. agent_run_id=agent_run_id,
  115. )
  116. if entity is None:
  117. return None
  118. return self.agent_run_repository.update_status(
  119. agent_run_id=agent_run_id,
  120. status=payload.status,
  121. worker_key=payload.worker_key,
  122. output_text=payload.output_text,
  123. output_json=payload.output_json,
  124. error_code=payload.error_code,
  125. error_message=payload.error_message,
  126. )
  127. def execute_agent_run(
  128. self,
  129. *,
  130. agent_run_id: str,
  131. payload: AgentRunExecuteRequest,
  132. ) -> AgentRun | None:
  133. agent_run = self.agent_run_repository.get_by_id(
  134. tenant_id=payload.tenant_id,
  135. agent_run_id=agent_run_id,
  136. )
  137. if agent_run is None:
  138. return None
  139. agent_version = self.agent_version_repository.get_by_id(
  140. tenant_id=payload.tenant_id,
  141. agent_version_id=agent_run.agent_version_id,
  142. )
  143. if agent_version is None:
  144. return self.agent_run_repository.update_status(
  145. agent_run_id=agent_run.id,
  146. status="failed",
  147. worker_key=payload.worker_key,
  148. error_code="agent_version_missing",
  149. error_message=f"agent version not found: {agent_run.agent_version_id}",
  150. )
  151. self.agent_run_repository.update_status(
  152. agent_run_id=agent_run.id,
  153. status="running",
  154. worker_key=payload.worker_key,
  155. )
  156. messages = self._build_chat_messages(agent_run=agent_run, agent_version=agent_version)
  157. if payload.dry_run:
  158. return self.agent_run_repository.update_status(
  159. agent_run_id=agent_run.id,
  160. status="completed",
  161. worker_key=payload.worker_key,
  162. output_text=self._build_dry_run_output(
  163. agent_run=agent_run,
  164. agent_version=agent_version,
  165. ),
  166. output_json={
  167. "dry_run": True,
  168. "agent_version_id": agent_version.id,
  169. "message_count": len(messages),
  170. "messages": [message.model_dump(mode="json") for message in messages],
  171. },
  172. )
  173. if self.model_gateway_client is None:
  174. return self.agent_run_repository.update_status(
  175. agent_run_id=agent_run.id,
  176. status="failed",
  177. worker_key=payload.worker_key,
  178. error_code="model_gateway_missing",
  179. error_message="model gateway client is not configured",
  180. )
  181. try:
  182. response = self.model_gateway_client.create_chat_completion(
  183. ChatCompletionRequestContract(
  184. model=self._read_optional_string(agent_version.model_config_json, "model"),
  185. temperature=self._read_optional_float(
  186. agent_version.model_config_json,
  187. "temperature",
  188. ),
  189. max_tokens=self._read_optional_int(agent_version.model_config_json, "max_tokens"),
  190. messages=messages,
  191. metadata_json={
  192. "tenant_id": agent_run.tenant_id,
  193. "agent_id": agent_run.agent_id,
  194. "agent_version_id": agent_version.id,
  195. "agent_run_id": agent_run.id,
  196. },
  197. )
  198. )
  199. except ModelGatewayClientError as exc:
  200. return self.agent_run_repository.update_status(
  201. agent_run_id=agent_run.id,
  202. status="failed",
  203. worker_key=payload.worker_key,
  204. error_code="model_gateway_error",
  205. error_message=str(exc),
  206. )
  207. return self.agent_run_repository.update_status(
  208. agent_run_id=agent_run.id,
  209. status="completed",
  210. worker_key=payload.worker_key,
  211. output_text=response.content,
  212. output_json={
  213. "dry_run": False,
  214. "agent_version_id": agent_version.id,
  215. "model": response.model,
  216. "finish_reason": response.finish_reason,
  217. "usage_json": response.usage_json,
  218. "raw_response_json": response.raw_response_json,
  219. },
  220. )
  221. def execute_next_claimed_agent_run(
  222. self,
  223. *,
  224. worker_key: str,
  225. lease_seconds: int,
  226. dry_run: bool,
  227. ) -> tuple[AgentRun, int] | None:
  228. released_lease_count = self.agent_run_repository.release_expired_leases(
  229. now_time=datetime.utcnow(),
  230. )
  231. claimed_agent_run = self.agent_run_repository.claim_next_queued(
  232. worker_key=worker_key,
  233. lease_expire_time=datetime.utcnow() + timedelta(seconds=lease_seconds),
  234. )
  235. if claimed_agent_run is None:
  236. return None
  237. result = self.execute_agent_run(
  238. agent_run_id=claimed_agent_run.id,
  239. payload=AgentRunExecuteRequest(
  240. tenant_id=claimed_agent_run.tenant_id,
  241. worker_key=worker_key,
  242. dry_run=dry_run,
  243. ),
  244. )
  245. if result is None:
  246. return None
  247. return result, released_lease_count
  248. def _resolve_agent_version(
  249. self,
  250. *,
  251. tenant_id: str,
  252. agent_id: str,
  253. agent_version_id: str | None,
  254. ) -> AgentVersion | None:
  255. if agent_version_id is not None:
  256. return self.agent_version_repository.get_by_id(
  257. tenant_id=tenant_id,
  258. agent_version_id=agent_version_id,
  259. )
  260. return self.agent_version_repository.get_latest_published(
  261. tenant_id=tenant_id,
  262. agent_id=agent_id,
  263. )
  264. def _build_chat_messages(
  265. self,
  266. *,
  267. agent_run: AgentRun,
  268. agent_version: AgentVersion,
  269. ) -> list[ChatMessageContract]:
  270. messages = [
  271. ChatMessageContract(role="system", content=agent_version.system_prompt),
  272. ]
  273. if agent_version.goal:
  274. messages.append(ChatMessageContract(role="system", content=f"Goal: {agent_version.goal}"))
  275. if agent_run.input_text:
  276. messages.append(ChatMessageContract(role="user", content=agent_run.input_text))
  277. if agent_run.input_json:
  278. messages.append(
  279. ChatMessageContract(
  280. role="user",
  281. content=f"Structured input: {agent_run.input_json}",
  282. )
  283. )
  284. return messages
  285. def _build_dry_run_output(self, *, agent_run: AgentRun, agent_version: AgentVersion) -> str:
  286. input_preview = agent_run.input_text or str(agent_run.input_json or {})
  287. return (
  288. f"[dry-run] Agent role={agent_version.role} "
  289. f"version={agent_version.version_no} received: {input_preview}"
  290. )
  291. def _read_optional_string(self, payload: dict[str, JSONValue], key: str) -> str | None:
  292. value = payload.get(key)
  293. if isinstance(value, str) and value:
  294. return value
  295. return None
  296. def _read_optional_float(self, payload: dict[str, JSONValue], key: str) -> float | None:
  297. value = payload.get(key)
  298. if isinstance(value, (int, float)) and not isinstance(value, bool):
  299. return float(value)
  300. return None
  301. def _read_optional_int(self, payload: dict[str, JSONValue], key: str) -> int | None:
  302. value = payload.get(key)
  303. if isinstance(value, int) and not isinstance(value, bool):
  304. return value
  305. return None
  306. def build_agent_application_service(
  307. *,
  308. db: Session,
  309. settings: AgentServiceSettings,
  310. ) -> AgentApplicationService:
  311. return AgentApplicationService(
  312. agent_repository=AgentDefinitionRepository(db),
  313. agent_version_repository=AgentVersionRepository(db),
  314. agent_run_repository=AgentRunRepository(db),
  315. model_gateway_client=ModelGatewayClient(
  316. base_url=settings.model_gateway_service_url,
  317. timeout_seconds=settings.model_gateway_timeout_seconds,
  318. ),
  319. )