| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- from datetime import datetime
- from sqlalchemy import select
- from sqlalchemy.orm import Session
- from core_domain import HumanTaskStatus, HumanTaskType
- from core_shared import JSONValue
- from app.db.models import HumanTask
- class HumanTaskRepository:
- def __init__(self, db: Session) -> None:
- self.db = db
- def create(
- self,
- *,
- tenant_id: str,
- task_type: HumanTaskType,
- title: str,
- description: str | None,
- source_type: str | None,
- source_id: str | None,
- run_id: str | None,
- node_run_id: str | None,
- requested_by: str | None,
- assigned_to: str | None,
- request_payload_json: dict[str, JSONValue],
- due_time: datetime | None,
- ) -> HumanTask:
- entity = HumanTask(
- tenant_id=tenant_id,
- task_type=task_type,
- title=title,
- description=description,
- source_type=source_type,
- source_id=source_id,
- run_id=run_id,
- node_run_id=node_run_id,
- requested_by=requested_by,
- assigned_to=assigned_to,
- request_payload_json=request_payload_json,
- due_time=due_time,
- )
- self.db.add(entity)
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def list_by_scope(
- self,
- *,
- tenant_id: str,
- status: HumanTaskStatus | None = None,
- assigned_to: str | None = None,
- run_id: str | None = None,
- limit: int = 100,
- ) -> list[HumanTask]:
- stmt = select(HumanTask).where(HumanTask.tenant_id == tenant_id)
- if status is not None:
- stmt = stmt.where(HumanTask.status == status)
- if assigned_to is not None:
- stmt = stmt.where(HumanTask.assigned_to == assigned_to)
- if run_id is not None:
- stmt = stmt.where(HumanTask.run_id == run_id)
- stmt = stmt.order_by(HumanTask.created_time.desc()).limit(limit)
- return list(self.db.scalars(stmt))
- def get_by_id(self, *, tenant_id: str, human_task_id: str) -> HumanTask | None:
- stmt = (
- select(HumanTask)
- .where(HumanTask.tenant_id == tenant_id)
- .where(HumanTask.id == human_task_id)
- )
- return self.db.scalar(stmt)
- def claim(
- self,
- *,
- tenant_id: str,
- human_task_id: str,
- claimed_by: str,
- ) -> HumanTask | None:
- entity = self.get_by_id(tenant_id=tenant_id, human_task_id=human_task_id)
- if entity is None:
- return None
- entity.status = "claimed"
- entity.claimed_by = claimed_by
- entity.claimed_time = datetime.utcnow()
- self.db.commit()
- self.db.refresh(entity)
- return entity
- def complete(
- self,
- *,
- tenant_id: str,
- human_task_id: str,
- status: HumanTaskStatus,
- response_payload_json: dict[str, JSONValue],
- ) -> HumanTask | None:
- entity = self.get_by_id(tenant_id=tenant_id, human_task_id=human_task_id)
- if entity is None:
- return None
- entity.status = status
- entity.response_payload_json = response_payload_json
- entity.completed_time = datetime.utcnow()
- self.db.commit()
- self.db.refresh(entity)
- return entity
|