| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- from datetime import datetime
- from typing import TYPE_CHECKING
- from core_domain import WorkflowConfigContract
- from core_shared import JSONValue
- from pydantic import BaseModel, Field
- from app.application.designer import (
- DiagnosticSeverity,
- WorkflowDebugPlan,
- WorkflowDiagnostic,
- WorkflowEdgeInspection,
- WorkflowInspection,
- WorkflowNodeInspection,
- )
- if TYPE_CHECKING:
- from app.db.models import WorkflowDefinitionModel, WorkflowConfig
- class WorkflowCreateRequest(BaseModel):
- app_id: str
- code: str
- name: str
- workflow_type: str = "main"
- class WorkflowDefinitionResponse(BaseModel):
- id: str
- app_id: str
- code: str
- name: str
- workflow_type: str
- created_time: datetime
- @classmethod
- def from_entity(cls, entity: "WorkflowDefinitionModel") -> "WorkflowDefinitionResponse":
- return cls.model_validate(entity, from_attributes=True)
- class WorkflowListRequest(BaseModel):
- app_id: str | None = None
- class WorkflowConfigCreateRequest(BaseModel):
- workflow_id: str
- dsl_json: dict[str, JSONValue] | None = None
- compiled_plan_json: dict[str, JSONValue] | None = None
- checksum: str | None = None
- class WorkflowConfigResponse(WorkflowConfigContract):
- @classmethod
- def from_entity(cls, entity: "WorkflowConfig") -> "WorkflowConfigResponse":
- return cls.model_validate(entity, from_attributes=True)
- class WorkflowConfigListRequest(BaseModel):
- workflow_id: str
- class WorkflowConfigDetailRequest(BaseModel):
- workflow_config_id: str
- class WorkflowDesignerValidateRequest(BaseModel):
- dsl_json: dict[str, JSONValue]
- class WorkflowDesignerDiagnosticResponse(BaseModel):
- severity: DiagnosticSeverity
- code: str
- message: str
- node_id: str | None = None
- edge_index: int | None = None
- @classmethod
- def from_inspection(cls, item: WorkflowDiagnostic) -> "WorkflowDesignerDiagnosticResponse":
- return cls(
- severity=item.severity,
- code=item.code,
- message=item.message,
- node_id=item.node_id,
- edge_index=item.edge_index)
- class WorkflowDesignerNodeResponse(BaseModel):
- id: str
- type: str
- name: str | None = None
- incoming_count: int
- outgoing_count: int
- reachable: bool
- @classmethod
- def from_inspection(cls, item: WorkflowNodeInspection) -> "WorkflowDesignerNodeResponse":
- return cls(
- id=item.id,
- type=item.type,
- name=item.name,
- incoming_count=item.incoming_count,
- outgoing_count=item.outgoing_count,
- reachable=item.reachable)
- class WorkflowDesignerEdgeResponse(BaseModel):
- source: str
- target: str
- condition: str | None = None
- valid_source: bool
- valid_target: bool
- @classmethod
- def from_inspection(cls, item: WorkflowEdgeInspection) -> "WorkflowDesignerEdgeResponse":
- return cls(
- source=item.source,
- target=item.target,
- condition=item.condition,
- valid_source=item.valid_source,
- valid_target=item.valid_target)
- class WorkflowDesignerValidateResponse(BaseModel):
- valid: bool
- diagnostics: list[WorkflowDesignerDiagnosticResponse]
- node_count: int
- edge_count: int
- nodes: list[WorkflowDesignerNodeResponse]
- edges: list[WorkflowDesignerEdgeResponse]
- entry_node_ids: list[str]
- terminal_node_ids: list[str]
- isolated_node_ids: list[str]
- unreachable_node_ids: list[str]
- cycle_detected: bool
- normalized_dsl_json: dict[str, JSONValue] | None = None
- @classmethod
- def from_inspection(cls, inspection: WorkflowInspection) -> "WorkflowDesignerValidateResponse":
- normalized_dsl_json: dict[str, JSONValue] | None = None
- if inspection.workflow is not None:
- normalized_dsl_json = inspection.workflow.model_dump(mode="json")
- return cls(
- valid=inspection.valid,
- diagnostics=[
- WorkflowDesignerDiagnosticResponse.from_inspection(item)
- for item in inspection.diagnostics
- ],
- node_count=len(inspection.nodes),
- edge_count=len(inspection.edges),
- nodes=[WorkflowDesignerNodeResponse.from_inspection(item) for item in inspection.nodes],
- edges=[WorkflowDesignerEdgeResponse.from_inspection(item) for item in inspection.edges],
- entry_node_ids=inspection.entry_node_ids,
- terminal_node_ids=inspection.terminal_node_ids,
- isolated_node_ids=inspection.isolated_node_ids,
- unreachable_node_ids=inspection.unreachable_node_ids,
- cycle_detected=inspection.cycle_detected,
- normalized_dsl_json=normalized_dsl_json)
- class WorkflowDebuggerPlanRequest(BaseModel):
- dsl_json: dict[str, JSONValue]
- max_preview_steps: int = Field(default=50, ge=1, le=500)
- class WorkflowConfigDebuggerPlanRequest(BaseModel):
- workflow_config_id: str
- max_preview_steps: int = Field(default=50, ge=1, le=500)
- class WorkflowDebuggerStepResponse(BaseModel):
- step_index: int
- node_id: str
- node_type: str
- name: str | None = None
- next_node_ids: list[str]
- class WorkflowDebuggerPlanResponse(WorkflowDesignerValidateResponse):
- execution_preview: list[WorkflowDebuggerStepResponse]
- max_preview_steps: int
- truncated: bool
- @classmethod
- def from_plan(cls, plan: WorkflowDebugPlan) -> "WorkflowDebuggerPlanResponse":
- base = WorkflowDesignerValidateResponse.from_inspection(plan.inspection)
- return cls(
- **base.model_dump(),
- execution_preview=[
- WorkflowDebuggerStepResponse(
- step_index=item.step_index,
- node_id=item.node_id,
- node_type=item.node_type,
- name=item.name,
- next_node_ids=item.next_node_ids)
- for item in plan.execution_preview
- ],
- max_preview_steps=plan.max_preview_steps,
- truncated=plan.truncated)
|