Lightweight workflow engine for .NET Standard
Support for PostgeSQL is delayed because of this issue with upstream libraries
The JSON and YAML definition features into their own package.
Migration required for existing projects:
WorkflowCore.DSL
package from nuget.AddWorkflowDSL()
on your service collection.An activity is defined as an item on an external queue of work, that a workflow can wait for.
In this example the workflow will wait for activity-1
, before proceeding. It also passes the value of data.Value1
to the activity, it then maps the result of the activity to data.Value2
.
Then we create a worker to process the queue of activity items. It uses the GetPendingActivity
method to get an activity and the data that a workflow is waiting for.
public class ActivityWorkflow : IWorkflow<MyData>
{
public void Build(IWorkflowBuilder<MyData> builder)
{
builder
.StartWith<HelloWorld>()
.Activity("activity-1", (data) => data.Value1)
.Output(data => data.Value2, step => step.Result)
.Then<PrintMessage>()
.Input(step => step.Message, data => data.Value2);
}
}
...
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null)
{
Console.WriteLine(activity.Parameters);
host.SubmitActivitySuccess(activity.Token, "Some response data");
}
The JSON representation of this step would look like this
{
"Id": "activity-step",
"StepType": "WorkflowCore.Primitives.Activity, WorkflowCore",
"Inputs":
{
"ActivityName": "\"activity-1\"",
"Parameters": "data.Value1"
},
"Outputs": { "Value2": "step.Result" }
}
New IWorkflowPurger
service that can be injected from the IoC container
Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan)
Implementations are currently only for SQL Server, Postgres and MongoDB
SyncWorkflowRunner
service that enables workflows to be executed synchronously, you can also avoid persisting the state to the persistence store entirelyusage
var runner = serviceProvider.GetService<ISyncWorkflowRunner>();
...
var worfklow = await runner.RunWorkflowSync("my-workflow", 1, data, TimeSpan.FromSeconds(10));
Existing JSON definitions will be loaded as follows
using WorkflowCore.Services.DefinitionStorage;
...
DefinitionLoader.LoadDefinition(json, Deserializers.Json);
Targets .NET Standard 2.0
The core library now targets .NET Standard 2.0, in order to leverage newer features.
Support for YAML definitions
Added support for YAML workflow definitions, which can be loaded as follows
using WorkflowCore.Services.DefinitionStorage;
...
DefinitionLoader.LoadDefinition(json, Deserializers.Yaml);
Existing JSON definitions will be loaded as follows
using WorkflowCore.Services.DefinitionStorage;
...
DefinitionLoader.LoadDefinition(json, Deserializers.Json);
Object graphs and inline expressions on input properties
You can now pass object graphs to step inputs as opposed to just scalar values
"inputs":
{
"Body": {
"Value1": 1,
"Value2": 2
},
"Headers": {
"Content-Type": "application/json"
}
},
If you want to evaluate an expression for a given property of your object, simply prepend and @
and pass an expression string
"inputs":
{
"Body": {
"@Value1": "data.MyValue * 2",
"Value2": 5
},
"Headers": {
"Content-Type": "application/json"
}
},
Support for enum values on input properties
If your step has an enum property, you can now just pass the string representation of the enum value and it will be automatically converted.
Environment variables available in input expressions
You can now access environment variables from within input expressions. usage:
environment["VARIABLE_NAME"]
UseMaxConcurrentWorkflows
to WorkflowOptions to allow overriding the max number of concurrent workflows for a given nodeChanges the default retry behavior for steps within a saga to bubble up to the saga container.
This means you do not have to explicitly set each step within the saga to Compensate
.
Attach
and Id
to fluent API
This will enable one to attach the flow from a step to any other step with an Id
Control structure scope will be preserved
.StartWith<Step1>()
.Id("step1")
.Then<Step2>()
.Attach("step1")
Thank you to @MarioAndron
This release adds a feature where a DI scope is created around the construction of steps that are registered with your IoC container.
This enables steps to consume services registered as scoped
.
A search index plugin for Workflow Core backed by Elasticsearch, enabling you to index your workflows and search against the data and state of them.
Use the .UseElasticsearch
extension method on IServiceCollection
when building your service provider
using Nest;
...
services.AddWorkflow(cfg =>
{
...
cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://localhost:9200")), "index_name");
});
Inject the ISearchIndex
service into your code and use the Search
method.
Search(string terms, int skip, int take, params SearchFilter[] filters)
A whitespace separated string of search terms, an empty string will match everything. This will do a full text search on the following default fields
In addition you can search data within your own custom data object if it implements ISearchable
using WorkflowCore.Interfaces;
...
public class MyData : ISearchable
{
public string StrValue1 { get; set; }
public string StrValue2 { get; set; }
public IEnumerable<string> GetSearchTokens()
{
return new List<string>()
{
StrValue1,
StrValue2
};
}
}
Search all fields for "puppies"
searchIndex.Search("puppies", 0, 10);
Use skip
and take
to page your search results. Where skip
is the result number to start from and take
is the page size.
You can also supply a list of filters to apply to the search, these can be applied to both the standard fields as well as any field within your custom data objects.
There is no need to implement ISearchable
on your data object in order to use filters against it.
The following filter types are available
These exist in the WorkflowCore.Models.Search
namespace.
Filtering by reference
using WorkflowCore.Models.Search;
...
searchIndex.Search("", 0, 10, ScalarFilter.Equals(x => x.Reference, "My Reference"));
Filtering by workflows started after a date
searchIndex.Search("", 0, 10, DateRangeFilter.After(x => x.CreateTime, startDate));
Filtering by workflows completed within a period
searchIndex.Search("", 0, 10, DateRangeFilter.Between(x => x.CompleteTime, startDate, endDate));
Filtering by workflows in a state
searchIndex.Search("", 0, 10, StatusFilter.Equals(WorkflowStatus.Complete));
Filtering against your own custom data class
class MyData
{
public string Value1 { get; set; }
public int Value2 { get; set; }
}
searchIndex.Search("", 0, 10, ScalarFilter.Equals<MyData>(x => x.Value1, "blue moon"));
searchIndex.Search("", 0, 10, NumericRangeFilter.LessThan<MyData>(x => x.Value2, 5))
Added the action Input & Output overloads on the fluent step builder.
Input(Action<TStepBody, TData> action);
This will allow one to manipulate properties on the step before it executes and properties on the data object after it executes, for example
Input((step, data) => step.Value1 = data.Value1)
.Output((step, data) => data["Value3"] = step.Output)
.Output((step, data) => data.MyCollection.Add(step.Output))
The existing ability to assign values to entries in dictionaries or dynamic objects on .Output
was problematic,
since it broke the ability to pass collections on the Output mappings.
.Output(data => data["Value3"], step => step.Output)
This feature has been removed, and it is advised to use the action Output API instead, for example
.Output((step, data) => data["Value3"] = step.Output)
This functionality remains intact for JSON defined workflows.
Various performance optimizations, any users of the EntityFramework persistence providers will have to update their persistence libraries to the latest version as well.
Added CancelCondition
to fluent builder API.
.CancelCondition(data => <<expression>>, <<Continue after cancellation>>)
This allows you to specify a condition under which any active step can be prematurely cancelled. For example, suppose you create a future scheduled task, but you want to cancel the future execution of this task if some condition becomes true.
builder
.StartWith(context => Console.WriteLine("Hello"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith<DoSomething>()
.Then<DoSomethingFurther>()
)
.CancelCondition(data => !data.SheduledTaskRequired)
.Then(context => Console.WriteLine("Doing normal tasks"));
You could also use this implement a parallel flow where once a single path completes, all the other paths are cancelled.
.Parallel()
.Do(then => then
.StartWith<DoSomething>()
.WaitFor("Approval", (data, context) => context.Workflow.IdNow)
)
.Do(then => then
.StartWith<DoSomething>()
.Delay(data => TimeSpan.FromDays(3))
.Then<EscalateIssue>()
)
.Join()
.CancelCondition(data => data.IsApproved, true)
.Then<MoveAlong>();
Deprecated WorkflowCore.LockProviders.RedLock
in favour of WorkflowCore.Providers.Redis
Create a new WorkflowCore.Providers.Redis
library that includes providers for distributed locking, queues and event hubs.
This makes it possible to have a cluster of nodes processing your workflows.
Install the NuGet package "WorkflowCore.Providers.Redis"
Using Nuget package console
PM> Install-Package WorkflowCore.Providers.Redis
Using .NET CLI
dotnet add package WorkflowCore.Providers.Redis
Use the IServiceCollection
extension methods when building your service provider
services.AddWorkflow(cfg =>
{
cfg.UseRedisLocking("localhost:6379");
cfg.UseRedisQueues("localhost:6379", "my-app");
cfg.UseRedisEventHub("localhost:6379", "my-channel")
});