Java Dynamic Sqs Listener Versions Save

Java SQS Listener library built to be customisable and dynamic during runtime

v6.0.0

9 months ago

Version upgrade release! This bumps all the dependencies to allow for the library to work for Spring Boot 3. Upgrading this library to 6.0.0 would involve making sure the corresponding dependencies are at a later version:

  • Java 17 (Spring Boot 3 requires it so this library has just been bumped to 17)
  • Spring Boot 2.5.14 to 3.1.2
  • Jackson 2.12.7 to 2.15.2
  • sl4fj api 1.7.36 to 2.0.7
  • Spring Cloud 2.2.3.RELEASE to 2.2.8.RELEASE
  • ktor 2.2.4 to 2.3.2

v5.0.0

2 years ago

This is a small major release that upgrades some of the dependencies, such as Spring Boot, as well as removing the default message visibility for the Spring Queue Listener annotations.

Enhancements

Upgrade dependencies [GH-367]

See #368 for the list of dependency changes.

Spring SQS Listeners should not default to 30s visibility timeout [GH-365]

Changes the Spring Queue Listener annotations to use the Visibility Timeout set on the AWS SQS queue instead of defaulting to 30 seconds. This should prevent unexpected re-processing of messages when it wasn't realised that there was a default. This was also applied to the Kotlin DSL.

Upgrade Steps

If you wish to maintain the existing functionality you will need to change your queue annotations from:

 @QueueListener("${insert.queue.url.here}")

to be

 @QueueListener(value = "${insert.queue.url.here}", messageVisibilityTimeoutInSeconds = 30)

v4.4.0

3 years ago

Enhancements

Move Prefetching/Batching listener logic into core library [GH-345]

Moved the construction of Spring's @PrefetchingQueueListener and @QueueListener into the core library as the PrefetchingMessageListenerContainer and BatchingMessageListenerContainer respectively. This should simplify the usage of the core library, as well as making it easier to provide custom construction of these containers in the wrapping libraries, like the Spring or Ktor frameworks.

Attach MessageProcessingDecorators to individual Spring Message listeners [GH-343]

Currently it is only possible to attach a MessageProcessingDecorator to a core message listener by including it as a Spring bean. This has the disadvantage that it will be applied to all message listeners and therefore it can be desirable to want to apply it to only a subset of listeners. This can be achieved implementing a MessageProcessingDecoratorFactory which will have the opportunity to wrap a message listener, e.g. by looking for an annotation.

Example

  1. Implement the factory, for example one that looks for the MyAnnotation
     public class MyDecoratorFactory implements MessageProcessingDecoratorFactory<MyDecorator> {
    
        @Override
        public Optional<MyDecorator> buildDecorator(
            SqsAsyncClient sqsAsyncClient,
            QueueProperties queueProperties,
            String identifier,
            Object bean,
            Method method
        ) {
            return AnnotationUtils
                .findMethodAnnotation(method, MyAnnotation.class)
                .map(annotation -> new MyDecoratorProperties(annotation.value()))
                .map(properties -> new MyDecorator(properties));
         }
    }
    
  2. Add it as a bean in the Spring Context
    @Configuration
    class MyConfiguration {
    
        @Bean
        MyDecoratorFactory myDecoratorFactory() {
            return new MyDecoratorFactory();
        }
     }
    
  3. Apply the annotation to your message listener
     @QueueListener("${insert.queue.url.here}")
     @MyAnnotation("someValue")
     public void processMessage(@Payload final String payload) {
         // process the message payload here
     }
    

Support dynamic configuration of Spring Message Listener containers during runtime [GH-339]

Adds the ability to more easily override the values for a message listener annotation by providing a custom annotation parser bean. A use case for this it to provide custom logic for calculating the concurrency level allowing the message listener to dynamically change the concurrency rate.

Available annotation parsers

Example

  1. Provide a custom annotation parser bean, for example the PrefetchingQueueListenerParser which is used to parse the @PrefetchingQueueListener annotation.

     public class CustomPrefetchingQueueListenerParser extends PrefetchingQueueListenerParser {
         private static final Random random = new Random();
         private final LoadingCache<Boolean, Integer> cachedConcurrencyLevel = CacheBuilder
                             .newBuilder()
                             .expireAfterWrite(10, TimeUnit.SECONDS)
                             .build(CacheLoader.from(() -> random.nextInt(10)));
    
         public CustomPrefetchingQueueListenerParser(Environment environment) {
             super(environment);
         }
    
         @Override
         protected Supplier<Integer> concurrencySupplier(PrefetchingQueueListener annotation) {
             return () -> cachedConcurrencyLevel.getUnchecked(true);
         }
     }
    

    In this example above we are using a Guava cache to make sure that the concurrency level is cached for a certain time period before it switches values.

  2. Include this parser as a bean, this will replace the existing implementation and will be used for all @PrefetchingQueueListener message listeners.

class MyConfiguration {

    public PrefetchingQueueListenerParser customParser(final Environment environment) {
        return new CustomPrefetchingQueueListenerParser(environment);
    }
}

Upgrade steps

This will not break consumers of this library but if you are building your own aspects of this library manually you may need to modify some of the constructors. For example the PrefetchingMessageListenerContainerFactory now takes a PrefetchingQueueListenerParser instead of the Spring Environment.

Support for dynamically increasing visibility timeout for long-running message processing [GH-320]

Allows for the processing of long running messages with the message visibility of the message being automatically extended when it is about to reach its limit. If the message takes longer than a certain amount of time without completing it will be forced stopped via interrupting the thread.

Note: this only works with synchronous processing of messages, e.g. ones that do not return a CompletableFuture. Asynchronous message listener's will be supported at a later point.

Usage

Core
  1. Add the AutoVisibilityExtenderMessageProcessingDecorator to a DecoratingMessageProcessor wrapping your main MessageProcessor like the LambdaMessageProcessor.
        final MessageProcessingDecorator autoVisibilityExtender = new AutoVisibilityExtenderMessageProcessingDecorator(
            sqsAsyncClient,
            queueProperties,
            new AutoVisibilityExtenderMessageProcessingDecoratorProperties() {
    
                @Override
                public Duration visibilityTimeout() {
                    return Duration.ofMinutes(1);
                }
    
                @Override
                public Duration maxDuration() {
                    return Duration.ofMinutes(5);
                }
    
                @Override
                public Duration bufferDuration() {
                    return Duration.ofSeconds(30);
                }
            }
        );
    
        MessageProcessor processor = new DecoratingMessageProcessor(
             "listener-identifier",
              queueProperties,
              Collections.singletonList(autoVisibilityExtender),
              new LambdaMessageProcessor(
                   sqsAsyncClient,
                   queueProperties,
                   message -> {
                       try {
                           someLongFileIOMethod();
                       } catch (InterruptionException e) {
                            // the message took to long and it was interrupted
                       }
                   }
               )
        );
    
    See Core - How to extend message visibility during processing for more details.
Spring
  1. Add the @AutoVisibilityExtender annotation to your message listener.
    @QueueListener("${insert.queue.url.here}")
    @AutoVisibilityExtender(visibilityTimeoutInSeconds = 60, maximumDurationInSeconds = 300, bufferTimeInSeconds = 10)
    public void processMessage(@Payload final String payload) {
        // process the message payload here
    }
    
    See Spring - How to extend message visibility during processing for more details.

Version Bumps

Core

software.amazon.awssdk:sqs 2.14.22 -> 2.15.7 com.fasterxml.jackson.core:jackson-databind 2.11.2 -> 2.11.3

Kotlin/Ktor

io.ktor:ktor-server-core 1.4.0 -> 1.4.1

v4.3.0

3 years ago

Enhancements

FIFO SQS support using a FifoMessageListenerContainer [GH-313]

Adds the ability to consume FIFO SQS queues guaranteeing that messages in a message group consumed are executed in order and are not run at the same time. For more details about configuring this message listener, take a look at the FifoMessageListenerContainerProperties.

Usage

Java
public class Main {

    public static void main(String[] args) throws InterruptedException {
        final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); // or your own custom client
        final QueueProperties queueProperties = QueueProperties.builder().queueUrl("${insert.queue.url.here}").build();
        final MessageListenerContainer container = new FifoMessageListenerContainer(
            queueProperties,
            sqsAsyncClient,
            () ->
                new LambdaMessageProcessor(
                    sqsAsyncClient,
                    queueProperties,
                    message -> {
                        // process the message here
                    }
                ),
            ImmutableFifoMessageListenerContainerProperties.builder().identifier("listener-identifier").concurrencyLevel(10).build()
        );
        container.start();
        Runtime.getRuntime().addShutdownHook(new Thread(container::stop));
        Thread.currentThread().join();
    }
}

Spring Boot
@Component
class MessageListeners {

    @FifoQueueListener(value = "${insert.queue.url.here}", concurrencyLevel = 10)
    public void fifoListener(@Payload final String body) {
        // process message here
    }
}

Kotlin DSL/Ktor
fifoMessageListener("identifier", sqsAsyncClient, "${insert.queue.url.here}") {
    concurrencyLevel = { 10 }

    processor = lambdaProcessor {
        method { message ->
            // process the message payload here
        }
    }
}

Version Bumps

Core

org.immutables:value-annotations +2.8.8 com.fasterxml.jackson.core:jackson-databind 2.11.1 -> 2.11.2 software.amazon.awssdk:sqs 2.13.66 -> 2.14.22

Spring

org.springframework.boot:spring-boot-dependencies 2.3.2.RELEASE -> 2.3.4.RELEASE

AWS Xray Extension

com.amazonaws:aws-xray-recorder-sdk-core 2.6.1 -> 2.7.1

Brave Extension

io.zipkin.brave:brave 5.12.4 -> 5.12.6

Spring Cloud Schema Registry Extension

org.springframework.cloud:spring-cloud-schema-registry-client 1.0.7.RELEASE -> 1.0.8.RELEASE

Ktor

io.ktor:ktor-server-core 1.3.2 -> 1.4.0

v4.2.0

3 years ago

Enhancements

Set value of isAutoStartup for DefaultMessageListenerContainerCoordinator [GH-273]

Improves the DefaultMessageListenerContainerCoordinator by allowing the auto startup of the containers to be configured. For more information see the Spring - How to prevent containers starting on startup guide.

Adds a getContainers method to the MessageListenerContainerCoordinator to allow for smarter configuration of the containers during runtime.

Remove the dependency management for the modules in favour of explicit versioning [GH-270]

Updates how the dependencies of the library are managed resulting in the removal of the <dependencyManagement>/ constaints for each module in favour of explicitly setting the versions. The reason for this was a module like the api had a dependency management section setting values for libraries like Jackson or Avro which this module does not care about.

v4.1.0

3 years ago

Enhancements

Add ability to create a message processor using lambdas [GH-255]

The CoreMessageProcessor required the use of the Java reflection API to indicate the message listener method. This works great for the Spring Boot applications where you want to run a method on a bean in the application, it doesn't work well for simpler applications like a basic Java app or simpler Web application frameworks like Ktor. This adds the ability to use a lambda/the Java functional API for defining the message listener method instead.

Examples

Synchronous Lambda consuming the message

new LambdaMessageProcessor(sqsAsyncClient, queueProperties, (message) -> log.info("processing message"));

Asynchronous Lambda consuming the message

new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message) -> {
        log.info("processing message");
        return CompletableFuture.runAsync(() -> {
               // do asynchronous processing here
        });
});

Asynchronous Lambda using an acknowledge field

new AsyncLambdaMessageProcessor(sqsAsyncClient, queueProperties, (message, acknowledge) -> {
        log.info("processing message");
        return CompletableFuture.runAsync(() -> {
               // do asynchronous processing here
               acknowledge.acknowledgeSuccessful();
        });
});

Core Kotlin Dsl Example

val container = coreMessageListener("identifier", sqsAsyncClient, queueUrl) {
    processor = lambdaProcessor {
        method { message ->
            log.info("Message: {}", message.body())
        }
    }
    // ...other configuration
}

Add Batching and Prefetching Kotlin DSL container builders [GH-259]

The Spring Starter for the library provided some core annotations for listening to messages to simplify the configuration of a message listener by providing defaults for most of the configuration; see the @QueueListener and @PrefetchingQueueListener annotations. This adds these two types of message listeners into the Core Kotlin DSL so that consumers can more easily configure new message listeners.

Examples

val container = batchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
    concurrencyLevel = { 10 }
    batchSize = { 5 }
    batchingPeriod =  { Duration.ofSeconds(5) }

    processor = lambdaProcessor {
        method { message ->
            log.info("Message: {}", message.body())
        }
    }
}

or

val container = prefetchingMessageListener("identifier", sqsAsyncClient, queueUrl) {
    concurrencyLevel = { 2 }
    desiredPrefetchedMessages = 5
    maxPrefetchedMessages = 10

    processor = lambdaProcessor {
        method { message ->
            log.info("Message: {}", message.body())
        }
    }
}

Add Ktor Integration [GH-254]

Added a Ktor integration so that you can easily setup message listeners in a Ktor application. This utilises the Core Kotlin DSL to configure the message listeners and will take the containers and hook it into the lifecycle of the Ktor server.

Example

val server = embeddedServer(Netty, 8080) {
    prefetchingMessageListener("prefetching-listener", sqsAsyncClient, queueUrl) {
        concurrencyLevel = { 2 }
        desiredPrefetchedMessages = 5
        maxPrefetchedMessages = 10

        processor = lambdaProcessor {
            method { message -> log.info("Message: {}", message.body())  }
        }
   }
}

See the Ktor Example for an example application that can be used.

v4.0.0

3 years ago

Enhancements

New MessageProcessingDecorators functionality

Added support to wrap the message processing logic via a MessageProcessingDecorator. This allows the addition of functionality like tracing, metrics or any other functionality.

Xray Support [GH-150]

Added Support for Xray Tracing allowing you to trace messages being processed by your message listener. See Core - How to add Xray Tracing or Spring - How to add Xray Tracing for guides on how to integrate this. You can use the AWS Xray Spring Example to see an example of a Spring service with this configured.

Brave Support [GH-159]

Added support for Brave Tracing allowing you to trace messages being processed processed by your message listener. See Core - How to add Brave Tracing or Spring - How to Add Brave Tracing for guides on how to integrate this. You can use the Spring Sleuth Example to see an example of a Spring service with this configured.

Added a Core Kotlin DSL [GH-216]

You can now use the core module using a Kotlin DSL reducing the verbosity in configuring a message listener. See Core - Kotlin DSL for guides on how to integrate this. You can use the Core Kotlin Example to see an example of a Java program using the DSL.

Example

val container = coreMessageListener("core-example-container", sqsAsyncClient, queueUrl) {
        broker = concurrentBroker {
            concurrencyLevel = cached(Duration.ofSeconds(10)) {
                Random.nextInt(concurrencyLimit)
            }
            concurrencyPollingRate = { concurrencyLevelPeriod }
            errorBackoffTime = { Duration.ofMillis(500) }
        }
        retriever = prefetchingMessageRetriever {
            desiredPrefetchedMessages = 10
            maxPrefetchedMessages = 20
        }
        processor = coreProcessor {
            argumentResolverService = coreArgumentResolverService(objectMapper)
            bean = MessageListener()
            method = MessageListener::class.java.getMethod("listen", Request::class.java, String::class.java)
            decorators {
                add(BraveMessageProcessingDecorator(tracing))
            }
        }
        resolver = batchingResolver {
            bufferingSizeLimit = { 1 }
            bufferingTime = { Duration.ofSeconds(5) }
        }
    }

Remove usage of Guava [GH-166]

Guava was being used for some helper functions and it didn't seem like it was worth the extra dependency for the little gain that was provided for it. Therefore, Guava has been completely removed from the library and any necessary functions were reproduced in this library.

Improve component properties and make them consistent [GH-231]

The component properties now use a Duration type instead of integers/longs. For example, instead of backoffTimeInMs long property, it would be backoffTime as a Duration.

Major version bumps [GH-162]

See build.gradle.kts for the latest versions.

Bug Fixes

  • [GH-223]: Starting and then immediately stopping the CoreMessageListenerContainer can result in hung threads.
  • [GH-217]: Fixed a problem with Spring Auto Configuration where this library would replace the default ObjectMapper provided by Spring Boot Web.

Developer Changes

  • [GH-144]: convert the project to gradle
  • [GH-155]: the JUnit Rules/Extensions for doing integration tests as it was not provided a lot of value
  • [GH-180] [GH-230] [GH-177]: update all of the documentation to be a little simpler
  • [GH-163]: added markdown linting checks to make sure markdown is consistent and links don't break
  • [GH-170]: simplify the directory naming, e.g. java-dynamic-sqs-listener-core to be core
  • [GH-153]: change import structure to not prioritise Google Imports
  • [GH-154]: fix exceptions in Spring Integration tests when shutting down SQS queues

v3.1.1

3 years ago

This patch release fixes bugs and improves logging.

Bug fixes

v3.1.0

4 years ago

This release adds the ability to deserialise messages that have been serialised by a schema tool like Avro. It uses the Spring Cloud Schema Registry to obtain the schema for the published message to properly deserialise it. It utilises the Spring Cloud Schema Registry Client to obtain information about this schema.

Core Changes

Extensions

Bug fixes

v3.0.0-M3

4 years ago

This milestone contained big changes to the frameworks API to follow a non-blocking paradigm. This reduces the number of threads that are blocked while there are no messages to process. For example, if we have 2 listeners with concurrency of 30 threads each and there are no messages to process, there would be 60 threads blocked. With this approach only the threads that are being used to process messages will be around.

This also changed how the framework is shutdown by allowing for any messages that have been download and stored locally will have a chance to be processed before the application ends.

API Redesign

  • The MessageBroker has been redesigned to not need to have a background thread running to process messages. Instead it exposes methods to process methods where the container can provide how messages are retrieved and resolved.\
  • The MessageResolver got merged with the Async version because attempting to do graceful shutdowns where all messages are resolved before finishing don't work when there is no way to wait for the resolver to finish. In other words all reliable MessageResolvers needed to be Async.
  • The MessageRetriever has been made non blocking my returning a CompletableFuture for the message instead of blocking and returning the message when it is obtained. It has also been merged with the Async implementation as a non-async implementation doesn't work well with graceful shutdowns. When the retriever is shutdown it will also return any messages that have been stored internally so that they can be processed on shutdown if desired.
  • The MessageProcessor now returns a CompletableFuture for when the message has finished processing. It now also takes in a Runnable that should be called if the message has successfully been processed and should been resolved instead of requiring a dependency on the MessageResolver.
  • The MessageListenerContainer previously had the responsibility to start and stop the background which it could do all at once by stopping the ExecutorService. Now it has been made to have more responsibility with determining the order of startup and shutdown to allow for more graceful shutdowns. For example, it now has the responsibility of allowing the processing of any messages downloaded but not processed yet.

Core Changes

  • SimpleMessageListenerContainer was changed to CoreMessageListenerContainer as it isn't very simple anymore. It now handles gracefully shutting down the container so that any extra messages, e.g. messages prefetched, can be processed before shutting down.
  • RetryableMessageProcessor was removed. I don't want to maintain this and it is simple for consumers to implement this them self if they want.
  • BlockingMessageRetriever was removed. This was provided to allow for more graceful shutdowns as well as for testing. This isn't really needed anymore.
  • IndividualMessageRetriever was removed. This was originally used to show that you could just request a message when needed but I feel it wouldn't be used in production and therefore is a waste for me to maintain it. You can achieve the same by using a BatchingMessageRetriever with a batch size of 1.

Spring Changes

  • Core annotations updated to allow for the consumer to configure whether any messages downloaded but not processed should be processed on shutdown.
  • Core annotations updated to allow for the consumer to configure whether any messages currently being processed during a shutdown should be interrupted.