from __future__ import annotations import sys from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[1] for module_name in list(sys.modules): if module_name == "app" or module_name.startswith("app."): del sys.modules[module_name] for path in [ REPO_ROOT / "libs" / "core-domain" / "src", REPO_ROOT / "libs" / "core-shared" / "src", REPO_ROOT / "libs" / "core-db" / "src", REPO_ROOT / "libs" / "core-events" / "src", REPO_ROOT / "services" / "agent-service", ]: sys.path.insert(0, str(path)) from core_db import Base from core_domain import ChatCompletionResponseContract from app.application.services import AgentApplicationService from app.bootstrap.settings import AgentServiceSettings from app.db.session import build_session_factory from app.domain.repositories import ( AgentDefinitionRepository, AgentRunRepository, AgentToolInvocationRepository, AgentVersionRepository, ) from app.schemas.agent import ( AgentCreateRequest, AgentRunCreateRequest, AgentRunExecuteRequest, AgentVersionCreateRequest, ) class FakeModelClient: def __init__(self) -> None: self.calls = 0 def create_chat_completion(self, payload: object) -> ChatCompletionResponseContract: self.calls += 1 if self.calls == 1: return ChatCompletionResponseContract( model="fake", content=( '{"action":"tool","tool_code":"lookup_order",' '"input_json":{"order_id":"123"}}' ), ) return ChatCompletionResponseContract( model="fake", content='{"action":"finish","answer":"Order lookup attempted."}', ) class FakeFunctionCallingModelClient: def __init__(self) -> None: self.calls = 0 self.request_tools: list[object] = [] def create_chat_completion(self, payload: object) -> ChatCompletionResponseContract: self.calls += 1 self.request_tools.append(getattr(payload, "tools_json", [])) if self.calls == 1: return ChatCompletionResponseContract( model="fake", content="", finish_reason="tool_calls", tool_calls_json=[ { "id": "call_1", "type": "function", "function": { "name": "lookup_order", "arguments": '{"order_id":"123"}', }, } ], ) return ChatCompletionResponseContract( model="fake", content='{"action":"finish","answer":"Function call handled."}', ) def test_react_loop_records_steps_and_tool_invocation(tmp_path: Path) -> None: session_factory = build_session_factory( settings=AgentServiceSettings( database_url=f"sqlite:///{tmp_path / 'agent_service.db'}", ), ) engine = session_factory.kw["bind"] Base.metadata.create_all(bind=engine) with session_factory() as db: service = AgentApplicationService( agent_repository=AgentDefinitionRepository(db), agent_version_repository=AgentVersionRepository(db), agent_run_repository=AgentRunRepository(db), agent_tool_invocation_repository=AgentToolInvocationRepository(db), model_gateway_client=FakeModelClient(), memory_client=None, tool_client=None, skill_client=None, event_client=None, react_max_steps=3, ) agent = service.create_agent( AgentCreateRequest(tenant_id="t1", code="react", name="React") ) service.create_agent_version( AgentVersionCreateRequest( tenant_id="t1", agent_id=agent.id, status="published", system_prompt="Use ReAct.", model_config={"react_enabled": True, "react_max_steps": 3}, memory_policy={"enabled": False, "write_enabled": False}, tool_refs=[ {"tool_code": "lookup_order", "required": True, "config_json": {}} ], ) ) run = service.create_agent_run( AgentRunCreateRequest( tenant_id="t1", agent_id=agent.id, input_text="check order", ) ) result = service.execute_agent_run( agent_run_id=run.id, payload=AgentRunExecuteRequest(tenant_id="t1", worker_key="test"), ) assert result is not None assert result.status == "completed" assert result.output_text == "Order lookup attempted." assert result.output_json is not None assert result.output_json["react_enabled"] is True assert len(result.output_json["react_steps"]) == 2 invocations = service.list_agent_tool_invocations( tenant_id="t1", agent_run_id=run.id, ) assert len(invocations) == 2 assert all(item.status == "skipped" for item in invocations) engine.dispose() def test_react_loop_accepts_openai_tool_calls(tmp_path: Path) -> None: session_factory = build_session_factory( settings=AgentServiceSettings( database_url=f"sqlite:///{tmp_path / 'agent_service.db'}", ), ) engine = session_factory.kw["bind"] Base.metadata.create_all(bind=engine) with session_factory() as db: model_client = FakeFunctionCallingModelClient() service = AgentApplicationService( agent_repository=AgentDefinitionRepository(db), agent_version_repository=AgentVersionRepository(db), agent_run_repository=AgentRunRepository(db), agent_tool_invocation_repository=AgentToolInvocationRepository(db), model_gateway_client=model_client, memory_client=None, tool_client=None, skill_client=None, event_client=None, react_max_steps=3, ) agent = service.create_agent( AgentCreateRequest(tenant_id="t1", code="react-fn", name="React Function") ) service.create_agent_version( AgentVersionCreateRequest( tenant_id="t1", agent_id=agent.id, status="published", system_prompt="Use ReAct.", model_config={ "react_enabled": True, "react_max_steps": 3, "function_calling_enabled": True, }, memory_policy={"enabled": False, "write_enabled": False}, tool_refs=[ {"tool_code": "lookup_order", "required": True, "config_json": {}} ], ) ) run = service.create_agent_run( AgentRunCreateRequest( tenant_id="t1", agent_id=agent.id, input_text="check order", ) ) result = service.execute_agent_run( agent_run_id=run.id, payload=AgentRunExecuteRequest(tenant_id="t1", worker_key="test"), ) assert result is not None assert result.status == "completed" assert result.output_text == "Function call handled." assert result.output_json is not None assert result.output_json["react_steps"][0]["action"]["tool_call_protocol"] == "openai" assert result.output_json["react_steps"][0]["action"]["input_json"] == {"order_id": "123"} assert model_client.request_tools[0] engine.dispose()