from datetime import datetime from typing import TYPE_CHECKING from core_domain import WorkflowConfigContract from core_shared import JSONValue from pydantic import BaseModel, Field from app.application.designer import ( DiagnosticSeverity, WorkflowDebugPlan, WorkflowDiagnostic, WorkflowEdgeInspection, WorkflowInspection, WorkflowNodeInspection, ) if TYPE_CHECKING: from app.db.models import WorkflowDefinitionModel, WorkflowConfig class WorkflowCreateRequest(BaseModel): app_id: str code: str name: str workflow_type: str = "main" class WorkflowDefinitionResponse(BaseModel): id: str app_id: str code: str name: str workflow_type: str created_time: datetime @classmethod def from_entity(cls, entity: "WorkflowDefinitionModel") -> "WorkflowDefinitionResponse": return cls.model_validate(entity, from_attributes=True) class WorkflowListRequest(BaseModel): app_id: str | None = None class WorkflowConfigCreateRequest(BaseModel): workflow_id: str dsl_json: dict[str, JSONValue] | None = None compiled_plan_json: dict[str, JSONValue] | None = None checksum: str | None = None class WorkflowConfigResponse(WorkflowConfigContract): @classmethod def from_entity(cls, entity: "WorkflowConfig") -> "WorkflowConfigResponse": return cls.model_validate(entity, from_attributes=True) class WorkflowConfigListRequest(BaseModel): workflow_id: str class WorkflowConfigDetailRequest(BaseModel): workflow_config_id: str class WorkflowDesignerValidateRequest(BaseModel): dsl_json: dict[str, JSONValue] class WorkflowDesignerDiagnosticResponse(BaseModel): severity: DiagnosticSeverity code: str message: str node_id: str | None = None edge_index: int | None = None @classmethod def from_inspection(cls, item: WorkflowDiagnostic) -> "WorkflowDesignerDiagnosticResponse": return cls( severity=item.severity, code=item.code, message=item.message, node_id=item.node_id, edge_index=item.edge_index) class WorkflowDesignerNodeResponse(BaseModel): id: str type: str name: str | None = None incoming_count: int outgoing_count: int reachable: bool @classmethod def from_inspection(cls, item: WorkflowNodeInspection) -> "WorkflowDesignerNodeResponse": return cls( id=item.id, type=item.type, name=item.name, incoming_count=item.incoming_count, outgoing_count=item.outgoing_count, reachable=item.reachable) class WorkflowDesignerEdgeResponse(BaseModel): source: str target: str condition: str | None = None valid_source: bool valid_target: bool @classmethod def from_inspection(cls, item: WorkflowEdgeInspection) -> "WorkflowDesignerEdgeResponse": return cls( source=item.source, target=item.target, condition=item.condition, valid_source=item.valid_source, valid_target=item.valid_target) class WorkflowDesignerValidateResponse(BaseModel): valid: bool diagnostics: list[WorkflowDesignerDiagnosticResponse] node_count: int edge_count: int nodes: list[WorkflowDesignerNodeResponse] edges: list[WorkflowDesignerEdgeResponse] entry_node_ids: list[str] terminal_node_ids: list[str] isolated_node_ids: list[str] unreachable_node_ids: list[str] cycle_detected: bool normalized_dsl_json: dict[str, JSONValue] | None = None @classmethod def from_inspection(cls, inspection: WorkflowInspection) -> "WorkflowDesignerValidateResponse": normalized_dsl_json: dict[str, JSONValue] | None = None if inspection.workflow is not None: normalized_dsl_json = inspection.workflow.model_dump(mode="json") return cls( valid=inspection.valid, diagnostics=[ WorkflowDesignerDiagnosticResponse.from_inspection(item) for item in inspection.diagnostics ], node_count=len(inspection.nodes), edge_count=len(inspection.edges), nodes=[WorkflowDesignerNodeResponse.from_inspection(item) for item in inspection.nodes], edges=[WorkflowDesignerEdgeResponse.from_inspection(item) for item in inspection.edges], entry_node_ids=inspection.entry_node_ids, terminal_node_ids=inspection.terminal_node_ids, isolated_node_ids=inspection.isolated_node_ids, unreachable_node_ids=inspection.unreachable_node_ids, cycle_detected=inspection.cycle_detected, normalized_dsl_json=normalized_dsl_json) class WorkflowDebuggerPlanRequest(BaseModel): dsl_json: dict[str, JSONValue] max_preview_steps: int = Field(default=50, ge=1, le=500) class WorkflowConfigDebuggerPlanRequest(BaseModel): workflow_config_id: str max_preview_steps: int = Field(default=50, ge=1, le=500) class WorkflowDebuggerStepResponse(BaseModel): step_index: int node_id: str node_type: str name: str | None = None next_node_ids: list[str] class WorkflowDebuggerPlanResponse(WorkflowDesignerValidateResponse): execution_preview: list[WorkflowDebuggerStepResponse] max_preview_steps: int truncated: bool @classmethod def from_plan(cls, plan: WorkflowDebugPlan) -> "WorkflowDebuggerPlanResponse": base = WorkflowDesignerValidateResponse.from_inspection(plan.inspection) return cls( **base.model_dump(), execution_preview=[ WorkflowDebuggerStepResponse( step_index=item.step_index, node_id=item.node_id, node_type=item.node_type, name=item.name, next_node_ids=item.next_node_ids) for item in plan.execution_preview ], max_preview_steps=plan.max_preview_steps, truncated=plan.truncated)