from core_domain import HumanTaskStatus from app.db.models import HumanTask from app.domain.repositories import HumanTaskRepository from app.schemas.human import ( HumanTaskClaimRequest, HumanTaskCompleteRequest, HumanTaskCreateRequest, ) class HumanApplicationService: def __init__(self, *, human_task_repository: HumanTaskRepository) -> None: self.human_task_repository = human_task_repository def create_task(self, payload: HumanTaskCreateRequest) -> HumanTask: return self.human_task_repository.create( tenant_id=payload.tenant_id, task_type=payload.task_type, title=payload.title, description=payload.description, source_type=payload.source_type, source_id=payload.source_id, run_id=payload.run_id, node_run_id=payload.node_run_id, requested_by=payload.requested_by, assigned_to=payload.assigned_to, request_payload_json=payload.request_payload_json, due_time=payload.due_time, ) def list_tasks( self, *, tenant_id: str, status: HumanTaskStatus | None = None, assigned_to: str | None = None, run_id: str | None = None, limit: int = 100, ) -> list[HumanTask]: return self.human_task_repository.list_by_scope( tenant_id=tenant_id, status=status, assigned_to=assigned_to, run_id=run_id, limit=limit, ) def claim_task( self, *, human_task_id: str, payload: HumanTaskClaimRequest, ) -> HumanTask | None: return self.human_task_repository.claim( tenant_id=payload.tenant_id, human_task_id=human_task_id, claimed_by=payload.claimed_by, ) def complete_task( self, *, human_task_id: str, payload: HumanTaskCompleteRequest, ) -> HumanTask | None: return self.human_task_repository.complete( tenant_id=payload.tenant_id, human_task_id=human_task_id, status=payload.status, response_payload_json=payload.response_payload_json, )