import json import subprocess import tempfile from pathlib import Path from core_domain import CodeExecutionRequestContract, CodeExecutionResponseContract from core_shared import JSONValue from app.bootstrap.settings import CodeRunnerServiceSettings class CodeRunnerError(Exception): pass class PythonCodeRunner: def __init__(self, *, settings: CodeRunnerServiceSettings) -> None: self.settings = settings def execute(self, payload: CodeExecutionRequestContract) -> CodeExecutionResponseContract: script_content = _build_python_runner_script(payload.code) with tempfile.TemporaryDirectory(prefix="agent-platform-code-") as temp_dir: temp_path = Path(temp_dir) script_file = temp_path / "runner.py" input_file = temp_path / "input.json" script_file.write_text(script_content, encoding="utf-8") input_file.write_text( json.dumps(payload.input_json, ensure_ascii=False), encoding="utf-8") try: completed = subprocess.run( [self.settings.python_bin, str(script_file), str(input_file)], capture_output=True, text=True, encoding="utf-8", timeout=payload.timeout_seconds, check=False) except subprocess.TimeoutExpired as exc: return CodeExecutionResponseContract( success=False, stderr=exc.stderr or "", error_message=f"code execution timed out after {payload.timeout_seconds} seconds") except OSError as exc: raise CodeRunnerError(f"failed to start python runner: {exc}") from exc stdout = completed.stdout stderr = completed.stderr output_json = _extract_result_json(stdout) success = completed.returncode == 0 error_message = None if success else f"python exited with code {completed.returncode}" return CodeExecutionResponseContract( success=success, stdout=stdout, stderr=stderr, output_json=output_json, error_message=error_message) def _build_python_runner_script(user_code: str) -> str: escaped_code = json.dumps(user_code) return ( "import json\n" "import pathlib\n" "import sys\n" "\n" "input_path = pathlib.Path(sys.argv[1])\n" "payload = json.loads(input_path.read_text(encoding='utf-8'))\n" "namespace = {\n" " 'payload': payload,\n" " 'result': None,\n" "}\n" f"exec({escaped_code}, namespace, namespace)\n" "print('\\n__RESULT_JSON__=' + json.dumps(namespace.get('result'), ensure_ascii=False))\n" ) def _extract_result_json(stdout: str) -> dict[str, JSONValue]: marker = "__RESULT_JSON__=" lines = stdout.splitlines() for index in range(len(lines) - 1, -1, -1): line = lines[index] if not line.startswith(marker): continue raw_payload = line[len(marker) :] try: payload = json.loads(raw_payload) except json.JSONDecodeError: return {} if isinstance(payload, dict): return {str(key): value for key, value in payload.items()} return {"result": payload} return {}