Kotlin coroutine based library for RabbitMQ
The RabbitMQ Kotlin Coroutine Library is designed to provide Kotlin developers with an efficient, coroutine-based approach to interact with RabbitMQ.
This library simplifies message queue operations by integrating seamlessly with Kotlin's coroutines, offering a modern and reactive way to handle asynchronous messaging in Kotlin applications.
It supports a variety of advanced features including queue and exchange manipulations, message publishing with confirmation, message consuming with acknowledgment, transactional operations, and the Remote Procedure Call (RPC) pattern.
You need to have Java 8 installed.
repositories {
mavenCentral()
maven("https://s01.oss.sonatype.org/content/repositories/snapshots")
}
dependencies {
implementation("io.github.viartemev:rabbitmq-kotlin:0.7.0-SNAPSHOT")
}
Full list of examples could be found here
val connectionFactory = ConnectionFactory().apply { useNio() }
connectionFactory.newConnection().use { connection ->
connection.confirmChannel {
declareQueue(QueueSpecification(PUBLISHER_QUEUE_NAME)).queue
publish {
(1..TIMES).map { createMessage("") }.map { async(Dispatchers.IO) { publishWithConfirm(it) } }.awaitAll()
.forEach { println(it) }
}
}
}
Consume only n-messages:
val connectionFactory = ConnectionFactory().apply { useNio() }
connectionFactory.newConnection().use { connction ->
connction.channel {
consume(CONSUMER_QUEUE_NAME, 1) {
(1..CONSUME_TIMES).map { async(Dispatchers.IO) { consumeMessageWithConfirm(handler) } }.awaitAll()
}
}
}
RabbitMQ and AMQP itself offer rather scarce support for transaction. When considering using transactions you should be aware that:
com.rabbitmq.client.Channel
is not thread-safe;The library provides a convenient way to perform transactional publishing and receiving based on transaction
extension function. This function commits a transaction upon normal execution of the block and rolls it back if a RuntimeException
occurs. Exceptions are always propagated further. Coroutines are not used for publishing though, since there are no any asynchronous operations involved.
connection.txChannel {
transaction {
val message = createMessage(queue = oneTimeQueue, body = "Hello from tx")
publish(message)
}
}
ConnectionFactory().apply { useNio() }.newConnection().use { conn ->
conn.channel {
logger.info { "Asking for greeting request..." }
val response = withTimeoutOrNull(1000) {
async(Dispatchers.IO) {
rpc {
val result = call(message)
logger.info { "Got a message: ${String(result.body)}" }
result
}
}.await()
}
if (response == null) {
logger.info { "Timeout is exeeded" }
} else {
logger.info { "Result: ${String(response.body)}" }
}
}
}