from datetime import datetime from typing import TYPE_CHECKING from pydantic import BaseModel, Field from core_domain import WorkflowVersionContract from core_shared import JSONValue from app.application.designer import ( DiagnosticSeverity, WorkflowDebugPlan, WorkflowDiagnostic, WorkflowEdgeInspection, WorkflowInspection, WorkflowNodeInspection, ) if TYPE_CHECKING: from app.db.models import WorkflowDefinitionModel, WorkflowVersion class WorkflowCreateRequest(BaseModel): tenant_id: str app_id: str code: str name: str workflow_type: str = "main" class WorkflowDefinitionResponse(BaseModel): id: str tenant_id: str app_id: str code: str name: str workflow_type: str latest_version_no: int created_time: datetime @classmethod def from_entity(cls, entity: "WorkflowDefinitionModel") -> "WorkflowDefinitionResponse": return cls.model_validate(entity, from_attributes=True) class WorkflowVersionCreateRequest(BaseModel): tenant_id: str workflow_id: str dsl_json: dict[str, JSONValue] | None = None compiled_plan_json: dict[str, JSONValue] | None = None schema_version: str | None = None checksum: str | None = None status: str = "draft" class WorkflowVersionResponse(WorkflowVersionContract): @classmethod def from_entity(cls, entity: "WorkflowVersion") -> "WorkflowVersionResponse": return cls.model_validate(entity, from_attributes=True) class WorkflowDesignerValidateRequest(BaseModel): dsl_json: dict[str, JSONValue] class WorkflowDesignerDiagnosticResponse(BaseModel): severity: DiagnosticSeverity code: str message: str node_id: str | None = None edge_index: int | None = None @classmethod def from_inspection(cls, item: WorkflowDiagnostic) -> "WorkflowDesignerDiagnosticResponse": return cls( severity=item.severity, code=item.code, message=item.message, node_id=item.node_id, edge_index=item.edge_index, ) class WorkflowDesignerNodeResponse(BaseModel): id: str type: str name: str | None = None incoming_count: int outgoing_count: int reachable: bool @classmethod def from_inspection(cls, item: WorkflowNodeInspection) -> "WorkflowDesignerNodeResponse": return cls( id=item.id, type=item.type, name=item.name, incoming_count=item.incoming_count, outgoing_count=item.outgoing_count, reachable=item.reachable, ) class WorkflowDesignerEdgeResponse(BaseModel): source: str target: str condition: str | None = None valid_source: bool valid_target: bool @classmethod def from_inspection(cls, item: WorkflowEdgeInspection) -> "WorkflowDesignerEdgeResponse": return cls( source=item.source, target=item.target, condition=item.condition, valid_source=item.valid_source, valid_target=item.valid_target, ) class WorkflowDesignerValidateResponse(BaseModel): valid: bool diagnostics: list[WorkflowDesignerDiagnosticResponse] node_count: int edge_count: int nodes: list[WorkflowDesignerNodeResponse] edges: list[WorkflowDesignerEdgeResponse] entry_node_ids: list[str] terminal_node_ids: list[str] isolated_node_ids: list[str] unreachable_node_ids: list[str] cycle_detected: bool normalized_dsl_json: dict[str, JSONValue] | None = None @classmethod def from_inspection(cls, inspection: WorkflowInspection) -> "WorkflowDesignerValidateResponse": normalized_dsl_json: dict[str, JSONValue] | None = None if inspection.workflow is not None: normalized_dsl_json = inspection.workflow.model_dump(mode="json") return cls( valid=inspection.valid, diagnostics=[ WorkflowDesignerDiagnosticResponse.from_inspection(item) for item in inspection.diagnostics ], node_count=len(inspection.nodes), edge_count=len(inspection.edges), nodes=[WorkflowDesignerNodeResponse.from_inspection(item) for item in inspection.nodes], edges=[WorkflowDesignerEdgeResponse.from_inspection(item) for item in inspection.edges], entry_node_ids=inspection.entry_node_ids, terminal_node_ids=inspection.terminal_node_ids, isolated_node_ids=inspection.isolated_node_ids, unreachable_node_ids=inspection.unreachable_node_ids, cycle_detected=inspection.cycle_detected, normalized_dsl_json=normalized_dsl_json, ) class WorkflowDebuggerPlanRequest(BaseModel): dsl_json: dict[str, JSONValue] max_preview_steps: int = Field(default=50, ge=1, le=500) class WorkflowDebuggerStepResponse(BaseModel): step_index: int node_id: str node_type: str name: str | None = None next_node_ids: list[str] class WorkflowDebuggerPlanResponse(WorkflowDesignerValidateResponse): execution_preview: list[WorkflowDebuggerStepResponse] max_preview_steps: int truncated: bool @classmethod def from_plan(cls, plan: WorkflowDebugPlan) -> "WorkflowDebuggerPlanResponse": base = WorkflowDesignerValidateResponse.from_inspection(plan.inspection) return cls( **base.model_dump(), execution_preview=[ WorkflowDebuggerStepResponse( step_index=item.step_index, node_id=item.node_id, node_type=item.node_type, name=item.name, next_node_ids=item.next_node_ids, ) for item in plan.execution_preview ], max_preview_steps=plan.max_preview_steps, truncated=plan.truncated, )