gRPC Kotlin Coroutines, Protobuf DSL, Scripting for Protoc
ℹ️ | Docs are being expanded and moved to Readme.io
Run the following command to get started with a preconfigured template project. (kotlin-coroutines-gRPC-template)
git clone https://github.com/marcoferrer/kotlin-coroutines-gRPC-template && \
cd kotlin-coroutines-gRPC-template && \
./gradlew run
This generator creates lambda based builders for message types
val starPlatinum = Stand {
name = "Star Platinum"
powerLevel = 500
speed = 550
attack {
name = "ORA ORA ORA"
damage = 100
range = Attack.Range.CLOSE
}
}
val attack = Attack {
name = "ORA ORA ORA"
damage = 100
range = Attack.Range.CLOSE
}
// Copy extensions
val newAttack = attack.copy { damage = 200 }
// orDefault() will return the messages default instance when null
val nullableAttack: Attack? = null
nullableAttack.orDefault()
// Plus operator extensions
val mergedAttack = attack + Attack { name = "Sunlight Yellow Overdrive" }
This option requires the artifact kroto-plus-coroutines
as a dependency.
Client / Server Examples
Method Signature Option Support
CoroutineScope
will propagate to the server.RequestType.defaultsInstance
// Creates new stub with a default coroutine context of `EmptyCoroutineContext`
val stub = GreeterCoroutineGrpc.newStub(channel)
// Suspends and creates new stub using the current coroutine context as the default.
val stub = GreeterCoroutineGrpc.newStubWithContext(channel)
// An existing stub can replace its current coroutine context using either
stub.withCoroutineContext()
stub.withCoroutineContext(coroutineContext)
// Stubs can accept message builder lambdas as an argument
stub.sayHello { name = "John" }
// For more idiomatic usage in coroutines, stubs can be created
// with an explicit coroutine scope using the `newGrpcStub` scope extension function.
launch {
// Using `newGrpcStub` makes it clear that the resulting stub will use the receiving
// coroutine scope to launch any concurrent work. (usually for manual flow control in streaming apis)
val stub = newGrpcStub(GreeterCoroutineGrpc.GreeterCoroutineStub, channel)
val (requestChannel, responseChannel) = stub.sayHelloStreaming()
...
}
CoroutineName
set to MethodDescriptor.fullMethodName
GrpcContextElement
set to io.grpc.Context.current()
ServiceScope
and allow overriding the initial coroutineContext
used for each rpc method invocation.initialContext
defaults to EmptyCoroutineContext
initialContext
is for setting up application specific ThreadContextElement
or CoroutineDispatcher
, such as MDCContext()
or newFixedThreadPoolContext(...)
ClientCall.cancel()
in io.grpc.ClientCall.java for more details.StatusRuntimeException
and returned to the client.StatusRuntimeException
with a status of Status.CANCELLED
, and returned to the client.Client: Unary calls will suspend until a response is received from the corresponding server. In the event of a cancellation or the server responds with an error the call will throw the appropriate StatusRuntimeException
val response = stub.sayHello { name = "John" }
Server: Unary rpc methods can respond to client requests by either returning the expected response type, or throwing an exception.
override suspend fun sayHello(request: HelloRequest): HelloReply {
if (isValid(request.name))
return HelloReply { message = "Hello there, ${request.name}!" } else
throw Status.INVALID_ARGUMENT.asRuntimeException()
}
Client: requestChannel.send()
will suspend until the corresponding server signals it is ready by requesting a message. In the event of a cancellation or the server responds with an error, both requestChannel.send()
and response.await()
, will throw the appropriate StatusRuntimeException
.
val (requestChannel, response) = stub.sayHelloClientStreaming()
launchProducerJob(requestChannel){
repeat(5){
send { name = "name #$it" }
}
}
println("Client Streaming Response: ${response.await()}")
Server: Client streaming rpc methods can respond to client requests by either returning the expected response type, or throwing an exception. Calls to requestChannel.receive()
will suspend and notify the corresponding client that the server is ready to accept a message.
override suspend fun sayHelloClientStreaming(
requestChannel: ReceiveChannel<HelloRequest>
): HelloReply = HelloReply {
message = requestChannel.toList().joinToString()
}
Client: responseChannel.receive()
will suspend and notify the corresponding server that the client is ready to accept a message.
val responseChannel = stub.sayHelloServerStreaming { name = "John" }
responseChannel.consumeEach {
println("Server Streaming Response: $it")
}
Server: Server streaming rpc methods can respond to client requests by submitting messages of the expected response type to the response channel. Completion of service method implementations will automatically close response channels in order to prevent abandoned rpcs.
Calls to responseChannel.send()
will suspend until the corresponding client signals it is ready by requesting a message. Error responses can be returned to clients by either throwing an exception or invoking close on responseChannel
with the desired exception.
For an example of how to implement long lived response streams please reference MultipleClientSubscriptionsExample.kt.
override suspend fun sayHelloServerStreaming(
request: HelloRequest,
responseChannel: SendChannel<HelloReply>
) {
for(char in request.name) {
responseChannel.send {
message = "Hello $char!"
}
}
}
Client: requestChannel.send()
will suspend until the corresponding server signals it is ready by requesting a message. In the event of a cancellation or the server responds with an error, both requestChannel.send()
and response.await()
, will throw the appropriate StatusRuntimeException
.
val (requestChannel, responseChannel) = stub.sayHelloStreaming()
launchProducerJob(requestChannel){
repeat(5){
send { name = "person #$it" }
}
}
responseChannel.consumeEach {
println("Bidi Response: $it")
}
Server: Bidi streaming rpc methods can respond to client requests by submitting messages of the expected response type to the response channel. Completion of service method implementations will automatically close response channels in order to prevent abandoned rpcs.
Calls to responseChannel.send()
will suspend until the corresponding client signals it is ready by requesting a message. Error responses can be returned to clients by either throwing an exception or invoking close on responseChannel
with the desired exception.
For an example of how to implement long lived response streams please reference MultipleClientSubscriptionsExample.kt.
override suspend fun sayHelloStreaming(
requestChannel: ReceiveChannel<HelloRequest>,
responseChannel: SendChannel<HelloReply>
) {
requestChannel.mapTo(responseChannel){
HelloReply {
message = "Hello there, ${it.name}!"
}
}
}
This modules generates convenience extensions that overload the request message argument for rpc methods with a builder lambda block and a default value. It also supports generating overloads based off (google.api.method_signature) method options. More info available here
//Kroto+ Generated Extension
val response = serviceStub.myRpcMethod {
id = 100
name = "some name"
}
//Original Java Fluent builders
val response = serviceStub.myRpcMethod(ExampleServiceGrpc.MyRpcMethodRequest
.newBuilder()
.setId(100)
.setName("some name")
.build())
For unary rpc methods, the generator will create the following extensions
//Future Stub with default argument
fun ServiceBlockingStub.myRpcMethod(request: Request = Request.defaultInstance): ListenableFuture<Response>
//Future Stub with builder lambda
inline fun ServiceFutureStub.myRpcMethod(block: Request.Builder.() -> Unit): ListenableFuture<Response>
//Blocking Stub with default argument
fun ServiceBlockingStub.myRpcMethod(request: Request = Request.defaultInstance): Response
//Blocking Stub with builder lambda
inline fun ServiceBlockingStub.myRpcMethod(block: Request.Builder.() -> Unit): Response
In addition to request message arguments as builder lambda rpc overloads, coroutine overloads for rpc calls can also be generated. This provides the same functionality as the generated coroutine stubs. Usage is identical to the client examples outlined in Coroutine Client Examples.
kroto-plus-coroutines
as a dependency.io.grpc.Context
then you need to be sure to add a GrpcContextElement
to your CoroutineContext
when launching a coroutine.
Child coroutines will inherit this ThreadContextElement
and the dispatcher will ensure that your grpc context is present on the executing thread.
Context.current().withValue(MY_KEY, myValue).attach()
val myGrpcContext = Context.current()
val job = launch( GrpcContextElement() ) { //Alternate usage: myGrpcContext.asContextElement()
launch {
assertEquals(myGrpcContext, Context.current())
}
GlobalScope.launch{
assertNotEquals(myGrpcContext, Context.current())
}
}
This generator creates mock implementations of proto service definitions. This is useful for orchestrating a set of expected responses, aiding in unit testing methods that rely on rpc calls.
Full example for mocking services in unit tests. The code generated relies on the kroto-plus-test
artifact as a dependency. It is a small library that provides utility methods used by the mock services.
@Test fun `Test Unary Response Queue`(){
MockStandService.getStandByNameResponseQueue.apply {
//Queue up a valid response message
addMessage {
name = "Star Platinum"
powerLevel = 500
speed = 550
addAttacks {
name = "ORA ORA ORA"
damage = 100
range = StandProto.Attack.Range.CLOSE
}
}
//Queue up an error
addError(Status.INVALID_ARGUMENT)
}
val standStub = StandServiceGrpc.newBlockingStub(grpcServerRule.channel)
standStub.getStandByName { name = "Star Platinum" }.let{ response ->
assertEquals("Star Platinum",response.name)
assertEquals(500,response.powerLevel)
assertEquals(550,response.speed)
response.attacksList.first().let{ attack ->
assertEquals("ORA ORA ORA",attack.name)
assertEquals(100,attack.damage)
assertEquals(StandProto.Attack.Range.CLOSE,attack.range)
}
}
try{
standStub.getStandByName { name = "The World" }
fail("Exception was expected with status code: ${Status.INVALID_ARGUMENT.code}")
}catch (e: StatusRuntimeException){
assertEquals(Status.INVALID_ARGUMENT.code, e.status.code)
}
}
Generated code relies on the kroto-plus-message
artifact. This generator adds tagging interfaces to the java classes produce by protoc.
It also adds pseudo companion objects to provide a way to access proto message APIs in a non static manner.
The following is a small example of how to write generic methods and extensions that resolve both message and builders type.
inline fun <reified M, B> M.copy( block: B.() -> Unit ): M
where M : KpMessage<M, B>, B : KpBuilder<M> {
return this.toBuilder.apply(block).build()
}
// Usage
myMessage.copy { ... }
inline fun <reified M, B> build( block: B.() -> Unit ): M
where M : KpMessage<M, B>, B : KpBuilder<M> {
return KpCompanion.Registry[M::class.java].build(block)
}
// Usage
build<MyMessage> { ... }
inline fun <M, B> KpCompanion<M, B>.build( block: B.() -> Unit ): M
where B : KpBuilder<M>,M : KpMessage<M,B> {
return newBuilder().apply(block).build()
}
// Usage
MyMessage.Companion.build { ... }
Users can define kotlin scripts that they would like to run during code generation. For type completion, scripts can be couple with a small gradle build script, although this is completely optional. Samples are available in the kp-script directory of the example project.
There are two categories of scripts available.
ExtendableMessages
generator can be implemented using an insertion script, an example can be in the example script extendableMessages.kts.Generator
interface used by all internal kroto+ code generators.GeneratorContext
, which is available via the property context
.context
is used for iterating over files, messages, and services submitted by protoc.generators
package of the protoc-gen-kroto-plus
artifact.Community contributions for scripts are welcomed and more information regarding guidelines will be published soon.
Usage of (google.api.method_signature) method option is now supported. This allows users to customize the method parameters outputted in generated clients as well as stub extensions. To config your rpc methods, first add the google common proto dependency to your build
dependencies{
compileOnly "com.google.api.grpc:proto-google-common-protos:1.16.0"
}
Then add the following import to your proto definition.
import "google/api/client.proto";
Now the method option should be available for usage in your method definition
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply){
option (google.api.method_signature) = "name";
};
This will result in the following method signature being outputed from gRPC and stub extension code generators.
fun GreeterStub.sayHello(name: String): HelloReply{
val request = HelloRequest.newBuilder()
.setName(name)
.build()
return sayHello(request)
}
jcenter()
or mavenCentral()
repositories {
maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' }
}
// Useful when syncronization to jcenter or maven central are taking longer than expected
repositories {
maven { url 'https://dl.bintray.com/marcoferrer/kroto-plus/' }
}
plugins{
id 'com.google.protobuf' version '0.8.6'
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:$protobufVersion"}
plugins {
kroto {
artifact = "com.github.marcoferrer.krotoplus:protoc-gen-kroto-plus:$krotoPlusVersion"
}
}
generateProtoTasks {
def krotoConfig = file("krotoPlusConfig.asciipb") // Or .json
all().each{ task ->
// Adding the config file to the task inputs lets UP-TO-DATE checks
// include changes to configuration
task.inputs.files krotoConfig
task.plugins {
kroto {
outputSubDir = "java"
option "ConfigPath=$krotoConfig"
}
}
}
}
}
jcenter
or mavenCentral
<repository>
<id>oss-snapshot</id>
<name>OSS Snapshot Repository</name>
<url>https://oss.jfrog.org/artifactory/oss-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<!-- Useful when syncronization to jcenter or maven central are taking longer than expected-->
<repository>
<id>kroto-plus-bintray</id>
<name>Kroto Plus Bintray Repository</name>
<url>https://dl.bintray.com/marcoferrer/kroto-plus/</url>
</repository>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals><goal>compile</goal></goals>
</execution>
<execution>
<id>grpc-java</id>
<goals><goal>compile-custom</goal></goals>
<configuration>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.17.1:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
</execution>
<execution>
<id>kroto-plus</id>
<goals>
<goal>compile-custom</goal>
</goals>
<configuration>
<pluginId>kroto-plus</pluginId>
<pluginArtifact>com.github.marcoferrer.krotoplus:protoc-gen-kroto-plus:${krotoPlusVersion}:exe:${os.detected.classifier}</pluginArtifact>
<pluginParameter>ConfigPath=${project.basedir}/krotoPlusConfig.asciipb</pluginParameter>
</configuration>
</execution>
</executions>
</plugin>
Add generated sources to Kotlin plugin
<plugin>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/target/generated-sources/protobuf/kroto-plus</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
</plugin>
This project relies on Kotlin Poet for building Kotlin sources. A big thanks to all of its contributors.