from core_domain import HumanTaskStatus from app.db.models import HumanTask from app.domain.repositories import HumanTaskRepository from app.schemas.human import ( HumanTaskClaimRequest, HumanTaskCompleteRequest, HumanTaskCreateRequest, ) class HumanApplicationService: def __init__(self, *, human_task_repository: HumanTaskRepository) -> None: self.human_task_repository = human_task_repository def create_task(self, payload: HumanTaskCreateRequest) -> HumanTask: return self.human_task_repository.create( task_type=payload.task_type, title=payload.title, description=payload.description, source_type=payload.source_type, source_id=payload.source_id, run_id=payload.run_id, node_run_id=payload.node_run_id, requested_by=payload.requested_by, assigned_to=payload.assigned_to, request_payload_json=payload.request_payload_json, due_time=payload.due_time) def list_tasks( self, *, status: HumanTaskStatus | None = None, assigned_to: str | None = None, run_id: str | None = None, limit: int = 100) -> list[HumanTask]: return self.human_task_repository.list_by_scope( status=status, assigned_to=assigned_to, run_id=run_id, limit=limit) def get_task(self, *, human_task_id: str) -> HumanTask | None: return self.human_task_repository.get_by_id( human_task_id=human_task_id) def claim_task( self, *, human_task_id: str, payload: HumanTaskClaimRequest) -> HumanTask | None: return self.human_task_repository.claim( human_task_id=human_task_id, claimed_by=payload.claimed_by) def complete_task( self, *, human_task_id: str, payload: HumanTaskCompleteRequest) -> HumanTask | None: return self.human_task_repository.complete( human_task_id=human_task_id, status=payload.status, response_payload_json=payload.response_payload_json)