| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import json
- import subprocess
- import tempfile
- from pathlib import Path
- from core_domain import CodeExecutionRequestContract, CodeExecutionResponseContract
- from core_shared import JSONValue
- from app.bootstrap.settings import CodeRunnerServiceSettings
- class CodeRunnerError(Exception):
- pass
- class PythonCodeRunner:
- def __init__(self, *, settings: CodeRunnerServiceSettings) -> None:
- self.settings = settings
- def execute(self, payload: CodeExecutionRequestContract) -> CodeExecutionResponseContract:
- script_content = _build_python_runner_script(payload.code)
- with tempfile.TemporaryDirectory(prefix="agent-platform-code-") as temp_dir:
- temp_path = Path(temp_dir)
- script_file = temp_path / "runner.py"
- input_file = temp_path / "input.json"
- script_file.write_text(script_content, encoding="utf-8")
- input_file.write_text(
- json.dumps(payload.input_json, ensure_ascii=False),
- encoding="utf-8")
- try:
- completed = subprocess.run(
- [self.settings.python_bin, str(script_file), str(input_file)],
- capture_output=True,
- text=True,
- encoding="utf-8",
- timeout=payload.timeout_seconds,
- check=False)
- except subprocess.TimeoutExpired as exc:
- return CodeExecutionResponseContract(
- success=False,
- stderr=exc.stderr or "",
- error_message=f"code execution timed out after {payload.timeout_seconds} seconds")
- except OSError as exc:
- raise CodeRunnerError(f"failed to start python runner: {exc}") from exc
- stdout = completed.stdout
- stderr = completed.stderr
- output_json = _extract_result_json(stdout)
- success = completed.returncode == 0
- error_message = None if success else f"python exited with code {completed.returncode}"
- return CodeExecutionResponseContract(
- success=success,
- stdout=stdout,
- stderr=stderr,
- output_json=output_json,
- error_message=error_message)
- def _build_python_runner_script(user_code: str) -> str:
- escaped_code = json.dumps(user_code)
- return (
- "import json\n"
- "import pathlib\n"
- "import sys\n"
- "\n"
- "input_path = pathlib.Path(sys.argv[1])\n"
- "payload = json.loads(input_path.read_text(encoding='utf-8'))\n"
- "namespace = {\n"
- " 'payload': payload,\n"
- " 'result': None,\n"
- "}\n"
- f"exec({escaped_code}, namespace, namespace)\n"
- "print('\\n__RESULT_JSON__=' + json.dumps(namespace.get('result'), ensure_ascii=False))\n"
- )
- def _extract_result_json(stdout: str) -> dict[str, JSONValue]:
- marker = "__RESULT_JSON__="
- lines = stdout.splitlines()
- for index in range(len(lines) - 1, -1, -1):
- line = lines[index]
- if not line.startswith(marker):
- continue
- raw_payload = line[len(marker) :]
- try:
- payload = json.loads(raw_payload)
- except json.JSONDecodeError:
- return {}
- if isinstance(payload, dict):
- return {str(key): value for key, value in payload.items()}
- return {"result": payload}
- return {}
|