An observable microservice bus library for .NET, that wraps the underlying message transports in simple Rx based interfaces.
System.Reactive 5.0
, supports netstandard2.0
, net472
and net5.0
. Mono-repoSystem.Reactive 4.1
, supports netstandard2.0
and net472
System.Reactive 3.1.1
, supports netstandard1.6
and net452
Define a root message type to identify messages as belonging to your service:
public interface IMyServiceMessage : IMessage { }
Create command/event/request/response message types:
public class MyCommand : IMyServiceMessage, ICommand { }
public class MyEvent : IMyServiceMessage, IEvent { }
public class MyRequest: IMyServiceMessage, IRequest { }
public class MyResponse : IMyServiceMessage, IResponse { }
Create your service bus:
IServiceBus serviceBus = ServiceBus.Configure()
.WithActiveMQEndpoints<IMyServiceMessage>()
.Named("MyService")
.UsingQueueFor<ICommand>()
.ConnectToBroker("tcp://localhost:61616")
.SerializedAsJson()
.AsClientAndServer()
.Create();
Send commands:
serviceBus.Commands.Subscribe(c => Console.WriteLine("Received a command!"));
await serviceBus.SendAsync(new MyCommand());
Publish events:
serviceBus.Events.Subscribe(e => Console.WriteLine("Received an event!"));
await serviceBus.PublishAsync(new MyEvent());
Request/response:
serviceBus.Requests
.OfType<MyRequest>()
.Subscribe(request => serviceBus.ReplyAsync(request, new MyResponse()));
serviceBus.GetResponses(new MyRequest())
.OfType<MyResponse>()
.Take(1)
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(r => Console.WriteLine("Received a response!"), err => Console.WriteLine("Oh no!"));
Define custom endpoints that can wrap API calls or integrations with other systems:
public class MyCustomEndpoint : IServiceEndpointClient
{
Type _serviceType = typeof(IMyCustomServiceMessage);
public IObservable<IEvent> Events
{
get
{
// subscribe to external MQ broker
}
}
public Task SendAsync(ICommand command)
{
// call external API
}
public IObservable<IResponse> GetResponses(IRequest request)
{
// call external API and wrap response in observable
}
public bool CanHandle(IMessage message)
{
return _serviceType.IsInstanceOfType(message);
}
}
...
IServiceBus serviceBus = ServiceBus.Configure()
.WithActiveMQEndpoints<IMyServiceMessage>()
.Named("MyService")
.UsingQueueFor<ICommand>()
.ConnectToBroker("tcp://localhost:61616")
.SerializedAsJson()
.AsClientAndServer()
.WithEndpoints(new MyCustomEndpoint())
.Create();
cd examples
docker-compose up
cd client
dotnet run -f netcoreapp3.1