An implementation of a PersistentStreamProvider for Microsoft Orleans and Kafka using the Confluent API
Kafka persistent stream provider for Microsoft Orleans that uses the Confluent SDK. This provider has the added benefit that it allows external messages (not generated from orleans) to be merged with orleans streaming system to be consumed as if the messages were generated by orleans.
Orleans.Streams.Kafka
has the following dependencies:
To start working with the Orleans.Streams.Kafka
make sure you do the following steps:
Orleans.Streams.Kafka
nuget from the nuget repository.Example KafkaStreamProvider configuration:
public class SiloBuilderConfigurator : ISiloBuilderConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
=> hostBuilder
.AddMemoryGrainStorage("PubSubStore")
.AddKafka("KafkaStreamProvider")
.WithOptions(options =>
{
options.BrokerList = new [] {"localhost:8080"};
options.ConsumerGroupId = "E2EGroup";
options.ConsumeMode = ConsumeMode.StreamEnd;
options
.AddTopic(Consts.StreamNamespace)
.AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
.AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
;
})
.AddJson()
.AddLoggingTracker()
.Build()
;
}
public class ClientBuilderConfigurator : IClientBuilderConfigurator
{
public virtual void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
=> clientBuilder
.AddKafka("KafkaStreamProvider")
.WithOptions(options =>
{
options.BrokerList = new [] {"localhost:8080"};
options.ConsumerGroupId = "E2EGroup";
options
.AddTopic(Consts.StreamNamespace)
.AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
.AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
;
})
.AddJson()
.Build()
;
}
var testGrain = clusterClient.GetGrain<ITestGrain>(grainId);
var result = await testGrain.GetThePhrase();
Console.BackgroundColor = ConsoleColor.DarkMagenta;
Console.WriteLine(result);
var streamProvider = clusterClient.GetStreamProvider("KafkaProvider");
var stream = streamProvider.GetStream<TestModel>("streamId", "topic1");
await stream.OnNextAsync(new TestModel
{
Greeting = "hello world"
});
var kafkaProvider = GetStreamProvider("KafkaStreamProvider");
var testStream = kafkaProvider.GetStream<TestModel>("streamId", "topic1");
// To resume stream in case of stream deactivation
var subscriptionHandles = await testStream.GetAllSubscriptionHandles();
if (subscriptionHandles.Count > 0)
{
foreach (var subscriptionHandle in subscriptionHandles)
{
await subscriptionHandle.ResumeAsync(OnNextTestMessage);
}
}
await testStream.SubscribeAsync(OnNextTestMessage);
Note: The Stream Namespace identifies the Kafka topic.
These are the configurable values that the Orleans.Streams.Kafka
:
orleans-kafka
100ms
KafkaAdapterReceiver
will continue to poll for messages (for the same batch) Default value is 500ms
5s
ConsumeMode.LastCommittedMessage
5s