Event sourcing journal implementation using Kafka as main storage
Stream data from two sources where one is eventually consistent and the other one loses its tail
This library provides ability to use kafka as storage for events.
Kafka is a perfect fit in case you want to have streaming capabilities for your events
However it also uses cassandra to keep data access performance on acceptable level and overcome kafka retention policy
Cassandra is a default choice, but you may use any other storage which satisfies following interfaces:
Reading events performance depends on finding the closest offset to the marker as well on replication latency (time difference between the moment event has been written to kafka and the moment when event gets into cassandra)
We may share same kafka consumer for many simultaneous recoveries
read
+ write
kafka and read
cassandraread
kafka and read
+ write
cassandraHence, we recommend configuring access rights accordingly.
trait Journals[F[_]] {
def apply(key: Key): Journal[F]
}
trait Journal[F[_]] {
/**
* @param expireAfter Define expireAfter in order to expire whole journal for given entity
*/
def append(
events: Nel[Event],
expireAfter: Option[ExpireAfter],
metadata: Option[JsValue],
headers: Headers
): F[PartitionOffset]
def read(from: SeqNr): Stream[F, EventRecord]
def pointer: F[Option[SeqNr]]
/**
* Deletes events up to provided SeqNr, consecutive pointer call will return last seen value
*/
def delete(to: DeleteTo): F[Option[PartitionOffset]]
/**
* Deletes all data with regards to journal, consecutive pointer call will return none
*/
def purge: F[Option[PartitionOffset]]
}
Kafka client tends to log some exceptions at error
level, however in reality those are harmless in case of operation retried successfully.
Retriable exceptions usually extend RetriableException
Here is the list of known error logs you may ignore:
In order to use kafka-journal as akka persistence plugin you would need to add following to your *.conf
file:
akka.persistence.journal.plugin = "evolutiongaming.kafka-journal.persistence.journal"
Unfortunately akka persistence snapshot
plugin is not implemented yet.
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")
libraryDependencies += "com.evolutiongaming" %% "kafka-journal" % "0.0.153"
libraryDependencies += "com.evolutiongaming" %% "kafka-journal-persistence" % "0.0.153"
libraryDependencies += "com.evolutiongaming" %% "kafka-journal-replicator" % "0.0.153"
libraryDependencies += "com.evolutiongaming" %% "kafka-journal-eventual-cassandra" % "0.0.153"
To run unit-test, have to have Docker environment running (Docker Desktop, Rancher Desktop etc). Some tests expect to
have /var/run/docker.sock
available. In case of Rancher Desktop, one has to amend local setup with:
sudo ln -s $HOME/.rd/docker.sock /var/run/docker.sock