Add Workflow Schema Validation Support#516
Conversation
|
At first glance, based on your current implementation (particularly the endpoint that receives the {
"Schema": "https://flowsynx.io/schemas/v1/workflow-schema.json",
"Definition": {
"Name": "NightlyDatabaseBackup",
"Description": "Dump PostgreSQL database, compress, and archive to Azure Blob Storage",
"Configuration": {},
"Tasks": []
}
}Based on what you described in #454, is that correct, or am I misunderstanding something? |
|
the endpoint still accepts the historic flat JSON blob (option A below), and the new If callers send the nested form we honor the schema URL and validate the inner definition; if they send the legacy flat payload we default the schema URL to null and keep the behavior unchanged. That lets us ship schema validation incrementally without breaking FlowCtl or existing automation. |
ziagham
left a comment
There was a problem hiding this comment.
Please resolve and address the review comments.
| <PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.3.0" /> | ||
| <PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="9.0.4" /> | ||
| <PackageReference Include="Microsoft.Extensions.Http" Version="9.0.4" /> | ||
| <PackageReference Include="Newtonsoft.Json.Schema" Version="4.1.0" /> |
There was a problem hiding this comment.
Version 4.1.0 of Newtonsoft.Json.Schema does not exist. The correct version is 4.0.1, which is the latest available release. Change it to version 4.0.1.
src/FlowSynx/Endpoints/Workflows.cs
Outdated
|
|
||
| private sealed class WorkflowUpsertPayload | ||
| { | ||
| public string? Definition { get; init; } |
There was a problem hiding this comment.
Please rename Definition to Workflow for better clarity and professionalism. The updated structure should look like this:
{
"Schema": "https://flowsynx.io/schemas/v1/workflow-schema.json",
"Workflow": {
"Name": "NightlyDatabaseBackup",
"Description": "Dump PostgreSQL database, compress, and archive to Azure Blob Storage",
"Configuration": {},
"Tasks": []
}
}| var definition = await ParseAndValidateDefinitionAsync( | ||
| workflow.Definition, | ||
| workflow.SchemaUrl, | ||
| cancellationToken); |
There was a problem hiding this comment.
Workflow Orchestrator Process Overview
The workflow orchestrator operates through two primary methods defined in the IWorkflowOrchestrator interface:
- CreateWorkflowExecutionAsync
- ExecuteWorkflowAsync
CreateWorkflowExecutionAsync
The main responsibility of this method is to create a new instance of WorkflowExecutionEntity and persist it to the database.
For example, around line 98:
var execution = new WorkflowExecutionEntity
{
Id = Guid.NewGuid(),
WorkflowId = workflowId,
UserId = userId,
WorkflowDefinition = workflow.Definition,
ExecutionStart = _systemClock.UtcNow,
Status = WorkflowExecutionStatus.Pending,
TaskExecutions = new List<WorkflowTaskExecutionEntity>()
};In this step, the workflow definition (workflow.Definition) is copied into the WorkflowDefinition property of the WorkflowExecutionEntity.
This design ensures that when a previously executed workflow is re-executed, the orchestrator uses the exact version of the workflow definition that was active at the time of execution — rather than reloading a potentially modified version from the WorkflowEntity.
To fully preserve the workflow’s original configuration, we should also include the schema reference used for validation.
Therefore, add a WorkflowSchemaUrl property to WorkflowExecutionEntity, and assign it from workflow.SchemaUrl during creation.
| var definition = await ParseAndValidateDefinitionAsync( | ||
| workflow.Definition, | ||
| workflow.SchemaUrl, | ||
| cancellationToken); |
There was a problem hiding this comment.
In ExecuteWorkflowAsync method, the orchestrator retrieves the previously created WorkflowExecutionEntity and executes it.
For instance, around line 139:
var execution = await _workflowExecutionService.Get(userId, workflowId, executionId, cancellationToken);Currently, the workflow definition is re-parsed and validated using the original workflow object:
var definition = await ParseAndValidateDefinitionAsync(
workflow.Definition,
workflow.SchemaUrl,
cancellationToken);However, since the orchestrator should execute the stored version of the workflow (not the potentially updated one), this should be changed to:
var definition = await ParseAndValidateDefinitionAsync(
execution.WorkflowDefinition,
execution.WorkflowSchemaUrl,
cancellationToken);This modification ensures that workflow execution always relies on the exact definition and schema that were originally used when the execution record was created, maintaining version consistency and execution integrity.
|
|
||
| namespace FlowSynx.Infrastructure.Serialization; | ||
|
|
||
| internal static partial class JsonSanitizer |
|
Addressed the feedback:
Rebuilt the solution in the dotnet/sdk:9.0 container (see |
|
Follow-up for the FOSSA license failure: replaced the commercial-licensed Tried to rerun |
|
@Sakeeb91 Users can now reference the schema directly using a URL like: |
|
This really adds an extra layer of convenience! Great idea! |
Summary
Closes #454
Testing