Alternative rabbitmq client implementation
Experimenting with TPL and buffer pools to see if a better rabbitmq client comes out of it.
The goal is to drastically reduce contention, memory allocation and therefore GC pauses.
The way this is accomplished is two fold:
Api invocations return future objects (in .net the idiomatic/de-facto way is to return a Task which combined with async/await makes for a decent experience) so nothing is blocked on IO.
writes and reads related to the socket are consumer/producer of ringbuffers thus one upfront big allocation and that's it. The read/write loop happen in two dedicated threads.
Not ready for production use.
* master branch is stable, but still being tested/stressed heavily
It's in production for a while now.
Handshake [Done]
Create channel [Done]
Exchange declare [Done]
Queue declare [Done]
Queue bind [Done]
Basic publish [Done]
Basic Ack [Done]
Basic NAck [Done]
connection close (started by server or client) [Done]
Channel close (started by server or client) [Done]
Queue Consume / Basic Deliver [Done]
BasicReturn [Done]
Publish confirm [Done] - will wait the Task on BasicPublish until Ack from server, so it's slow and should be judiciously used
Exchange bind [Done]
Exchange unbind [Done]
Queue unbind [Done]
Queue delete [Done]
Queue purge [Done]
Exchange delete [Done]
RabbitConsole from Castle.RabbitMq. Very useful to set up integration tests
ChannelFlow [Done]
ConnectionBlock [Done]
Heartbeats
Connection/channels recovery / Programming model friendly [Done] ** Upon disconnection, try to reconnect and when successfull restore the channels and consumers
Not planning to support:
Special support:
var conn = await ConnectionFactory.Connect(...);
var channel = await conn.CreateChannel();
await channel.BasicQos(0, 250, false); // setting prefetch to 250
// Task ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments, bool waitConfirmation)
await channel1.ExchangeDeclare("test_direct_conf", "direct", true, false, null, waitConfirmation: true);
// QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments, bool waitConfirmation)
var queueInfo = await channel.QueueDeclare("queue1", false, true, false, false, null, waitConfirmation: true);
// Task QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments, bool waitConfirmation)
await channel1.QueueBind("queue_direct_conf", "test_direct_conf", "routing", null, waitConfirmation: true);
Use the BasicPublishFast overload if you dont want to 'await' on the result. Note: the server does not confirm the receipt of a message unless your channel is on confirmation mode, so the task is set as complete once we write the frame to the buffer (which is not the underlying socket).
TaskSlim BasicPublish(string exchange, string routingKey, bool mandatory, bool immediate,
BasicProperties properties, ArraySegment<byte> buffer)
Can be done using a delegate or a QueueConsumer subclass. There are two modes:
Task<string> BasicConsume(ConsumeMode mode, IQueueConsumer consumer,
string queue, string consumerTag, bool withoutAcks, bool exclusive,
IDictionary<string, object> arguments, bool waitConfirmation)
Task<string> BasicConsume(ConsumeMode mode, Func<MessageDelivery, Task> consumer,
string queue, string consumerTag, bool withoutAcks, bool exclusive,
IDictionary<string, object> arguments, bool waitConfirmation)
Note: MessageDelivery implements IDisposable, and is disposed automatically. If you're saving it for later for any reason you need to either call SafeClone or set TakenOver=true. If the mode = SingleThreaded, TakenOver is ignored and the stream pointer will move to the next frame, so SafeClone is your friend.
Two helpers are offered:
Task<RpcHelper> CreateRpcHelper(ConsumeMode mode, int? timeoutInMs, int maxConcurrentCalls = 500);
Task<RpcAggregateHelper> CreateRpcAggregateHelper(ConsumeMode mode, int? timeoutInMs, int maxConcurrentCalls = 500);
These helpers implement the boilerplate code of setting up a temporary queue, sending messages with correct CorrelationId and ReplyTo sets, and processing replies, while also respecting timeouts. It also support recovery scenarios.
It does all that with the minimum amount of gen0 allocations and is (with a few exceptions) lock free.
var helper = await channel.CreateRpcHelper(ConsumeMode.SingleThread, timeoutInMs: 100);
using(var reply = await helper.Call(exchangeName, routing, properties, body))
{
...
}
Mostly focused on decreasing allocs.