services.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from core_domain import HumanTaskStatus
  2. from app.db.models import HumanTask
  3. from app.domain.repositories import HumanTaskRepository
  4. from app.schemas.human import (
  5. HumanTaskClaimRequest,
  6. HumanTaskCompleteRequest,
  7. HumanTaskCreateRequest,
  8. )
  9. class HumanApplicationService:
  10. def __init__(self, *, human_task_repository: HumanTaskRepository) -> None:
  11. self.human_task_repository = human_task_repository
  12. def create_task(self, payload: HumanTaskCreateRequest) -> HumanTask:
  13. return self.human_task_repository.create(
  14. tenant_id=payload.tenant_id,
  15. task_type=payload.task_type,
  16. title=payload.title,
  17. description=payload.description,
  18. source_type=payload.source_type,
  19. source_id=payload.source_id,
  20. run_id=payload.run_id,
  21. node_run_id=payload.node_run_id,
  22. requested_by=payload.requested_by,
  23. assigned_to=payload.assigned_to,
  24. request_payload_json=payload.request_payload_json,
  25. due_time=payload.due_time,
  26. )
  27. def list_tasks(
  28. self,
  29. *,
  30. tenant_id: str,
  31. status: HumanTaskStatus | None = None,
  32. assigned_to: str | None = None,
  33. run_id: str | None = None,
  34. limit: int = 100,
  35. ) -> list[HumanTask]:
  36. return self.human_task_repository.list_by_scope(
  37. tenant_id=tenant_id,
  38. status=status,
  39. assigned_to=assigned_to,
  40. run_id=run_id,
  41. limit=limit,
  42. )
  43. def claim_task(
  44. self,
  45. *,
  46. human_task_id: str,
  47. payload: HumanTaskClaimRequest,
  48. ) -> HumanTask | None:
  49. return self.human_task_repository.claim(
  50. tenant_id=payload.tenant_id,
  51. human_task_id=human_task_id,
  52. claimed_by=payload.claimed_by,
  53. )
  54. def complete_task(
  55. self,
  56. *,
  57. human_task_id: str,
  58. payload: HumanTaskCompleteRequest,
  59. ) -> HumanTask | None:
  60. return self.human_task_repository.complete(
  61. tenant_id=payload.tenant_id,
  62. human_task_id=human_task_id,
  63. status=payload.status,
  64. response_payload_json=payload.response_payload_json,
  65. )