from datetime import datetime from core_domain import HumanTaskStatus, HumanTaskType from core_shared import JSONValue from sqlalchemy import select from sqlalchemy.orm import Session from app.db.models import HumanTask class HumanTaskRepository: def __init__(self, db: Session) -> None: self.db = db def create( self, *, task_type: HumanTaskType, title: str, description: str | None, source_type: str | None, source_id: str | None, run_id: str | None, node_run_id: str | None, requested_by: str | None, assigned_to: str | None, request_payload_json: dict[str, JSONValue], due_time: datetime | None) -> HumanTask: entity = HumanTask( task_type=task_type, title=title, description=description, source_type=source_type, source_id=source_id, run_id=run_id, node_run_id=node_run_id, requested_by=requested_by, assigned_to=assigned_to, request_payload_json=request_payload_json, due_time=due_time) self.db.add(entity) self.db.commit() self.db.refresh(entity) return entity def list_by_scope( self, *, status: HumanTaskStatus | None = None, assigned_to: str | None = None, run_id: str | None = None, limit: int = 100) -> list[HumanTask]: stmt = select(HumanTask) if status is not None: stmt = stmt.where(HumanTask.status == status) if assigned_to is not None: stmt = stmt.where(HumanTask.assigned_to == assigned_to) if run_id is not None: stmt = stmt.where(HumanTask.run_id == run_id) stmt = stmt.order_by(HumanTask.created_time.desc()).limit(limit) return list(self.db.scalars(stmt)) def get_by_id(self, *, human_task_id: str) -> HumanTask | None: stmt = ( select(HumanTask) .where(HumanTask.id == human_task_id) ) return self.db.scalar(stmt) def claim( self, *, human_task_id: str, claimed_by: str) -> HumanTask | None: entity = self.get_by_id(human_task_id=human_task_id) if entity is None: return None entity.status = "claimed" entity.claimed_by = claimed_by entity.claimed_time = datetime.utcnow() self.db.commit() self.db.refresh(entity) return entity def complete( self, *, human_task_id: str, status: HumanTaskStatus, response_payload_json: dict[str, JSONValue]) -> HumanTask | None: entity = self.get_by_id(human_task_id=human_task_id) if entity is None: return None entity.status = status entity.response_payload_json = response_payload_json entity.completed_time = datetime.utcnow() self.db.commit() self.db.refresh(entity) return entity