💻 Microservice lib designed to ease service building using Python and asyncio, with ready to use support for HTTP + WS, AWS SNS+SQS, RabbitMQ / AMQP, middlewares, envelopes, logging, lifecycles. Extend to GraphQL, protobuf, etc.
Send messages directly to SQS queues (SQS.SendMessage)
Added the function tomodachi.sqs_send_message
(tomodachi.aws_sns_sqs_send_message
) to send a message directly to an SQS queue. It mostly works the same way as the tomodachi.sns_publish
(tomodachi.aws_sns_sqs_publish
) function, but sends a message directly to the specified SQS queue.
This is useful when you know the recipient queue of a message, for example when manually sending messages to a dead-letter queue or to do command-like messaging instead of event-based messaging.
The message body of tomodachi.sqs_send_message
calls is by default built using the tomodachi.transport.aws_sns_sqs.MessageBodyFormatter
implementation, which will encode the message body as JSON and embed it within the "Message"
property also using the service specified message envelope functionality.
The message includes some additional properties, mimicing how SNS notifications are represented when they are sent to subscribed SQS queues (with the exception that these messages are of type "Message"
to differentiate from SNS notifications that are by AWS typed "Notification"
.
For custom or more advanced setups requiring more flexibility, the implementation can be overridden by providing a callable to the message_body_formatter
keyword argument of sqs_send_message
calls. The callable receives a message context object and returns the raw message (as a string) that will be sent to SQS as the message body.
Setting message_body_formatter
to None
will disable default message body formatting (and incidentally also the message envelope builder), causing the message body to be as is specified within the data
argument to the sqs_send_message
call.
Trying to send messages to an SQS queue that does not exist will result in a tomodachi.transport.aws_sns_sqs.QueueDoesNotExistError
exception being raised, which can be caught also by tomodachi.transport.aws_sns_sqs.AWSSNSSQSException
, botocore.exceptions.ClientError
or <botocore_sqs_client>.exceptions.QueueDoesNotExist
.
Message attributes' improvements and fixes
Receives and propagates message attributes for all messages received on a queue, which means message attribute propagation will work for messages published to SNS, using either raw message delivery or not, and/or messages that were directly sent to SQS via SQS.SendMessage calls.
Currently, messages on SQS queues consumed and parsed by tomodachi services still need to be JSON formatted and be embedded within the "Message"
property (which is how SNS notification messages are represented by default, unless a queue is setup for raw message delivery).
Fixes a bug that was causing message attributes in binary form to be invalidly processed.
OTEL updates
tomodachi.sqs_send_message
functionality.messaging.destination_publish.name
.messaging.aws_sqs.duration
, which was previously named messaging.amazonsqs.duration
.Type hints and input validation
types-aiobotocore-*
packages to provide improved type hints internally.Handler arguments
message_type
(str | None
) to the list of keyword argument provided transport values that can be used in function signatures for AWS SNS+SQS handlers. Holds the value "Notification"
for messages received as part of an SNS notification, unless the queue uses raw message delivery. In other cases the value is set to the value from a message body's JSON property "Type"
if it exists.raw_message_body
(str
) has also been added to the list of keyword argument provided transport values for AWS SNS+SQS handlers. The raw_message_body
value is set to the full content (non-decoded, as a string) from a received message' "Body"
, which can be used to implement custom listener with greater access to the raw message data.topic
argument to message handlers in functions was always populated with the value of the topic
argument to the @tomodachi.aws_sns_sqs
decorator, even if the message was sent to a different topic. The topic
argument to message handlers in functions will now be populated with the actual topic name destination to where the message was published, if published via SNS. If the message was not published to a topic, the value is set to empty string.Other
TracerProvider
could end up with different resource attributes, iff a service class' name
attribute's value were to be used as the service.name
OTEL resource attribute (instead set to "unknown_service"
for tracers that were created before the service initialized).tomodachi
services started within the same Python process, the name from the first instrumented service class will now be used as the service.name
OTEL resource attribute for all tracers derived from the same tracer provider. The recommended solution if this is an issue, is to split the services into separate processes instead.tomodachi.aws_sns_sqs.get_queue_url
and tomodachi.aws_sns_sqs.get_queue_url_from_arn
with better support for additional types of input and/or potentially prefixing of queues. The function also now caches the queue URL to avoid unnecessary calls to the AWS API.aiobotocore
2.10.x releases and 2.11.x releases.SIGINT
or SIGTERM
signal. Added details describing how handlers are awaited and for how long as part of the graceful termination sequence.ConsoleRenderer
extending structlog.dev.ConsoleRenderer
(which caused an exception to be raised upon service start, when using structlog
23.3.0 or 24.1.0).ConsoleRenderer
class from structlog
when a modern version of structlog
is used (23.3.0 or newer).opentelemetry-exporter-prometheus
dependency to use the correct (non-yanked) version range. Now >=0.40b0,<1.0.0
from July 2023 and forward. Previously constrained to use the older, now yanked, 1.12.0rc1
version from May 2022. The 0.x releases of opentelemetry-exporter-prometheus
are more recent than the deprecated 1.x releases.aiobotocore
2.8.x releases and 2.9.x releases.aiohttp
3.9.x versions.opentelemetry-sdk
1.21.0).tomodachi.logging
on OTEL instrumented services will now include TraceContext (span_id
, trace_id
and parent_span_id
) on non-recording (but valid) spans.message_deduplication_id
and message_group_id
) to tomodachi
handler functions as keyword argument provided transport values. Details in #1810. (github: @filipsnastins)tomodachi
services.🎉 This release is mainly focused on improving observability when operating servies. It introduces OpenTelemetry instrumentation and a completely refactored interface for logging.
tomodachi.get_logger()
built on structlog
. Contextual logger available for all handlers, etc.tomodachi
to a custom logger the user provides.opentelemetry
(OTEL / OpenTelemetry) that can be enabled if tomodachi
is installed using the opentelemetry
extras. The OpenTelemetry instrumentation usage is explained in detail in https://github.com/kalaspuff/tomodachi/pull/1790.tomodachi run
argument --opentelemetry-instrument
(equivalent to setting env: TOMODACHI_OPENTELEMETRY_INSTRUMENT=1
) or using the opentelemetry-instrument
CLI.opentelemetry-exporter-prometheus
extras and using the OTEL_PYTHON_METER_PROVIDER=tomodachi_prometheus
environment value in combination with OTEL instrumentation.--production
. The banner now includes the operating system, architecture, which Python runtime from which virtualenv is used, etc. in order to aid debugging during development for issues caused by environment misconfiguration.--help
.tomodachi.__build_time__
which includes the timestamp when the build for the installed release was done. The time that has passed since build time will be included in the start banner.asyncio
tasks instead of simply awaiting the coroutines so that the context from contextvars will be propagated correctly and not risk being corrupted by handlers.tomodachi run
is now accompanied with an environment variable to do the same. For example --log-level warning
can be achieved by setting TOMODACHI_LOG_LEVEL=warning
.logging.root
logger on service start.tomodachi.logging
which may cause duplicate log output depending on how the third party logger is configured.log_setup()
function that previously was added to the service object on class initialization and that was used to setup log output to disk.tomodachi
frames, which usually will be uninteresting for debugging._start_service
, _started_service
, etc.) which previously could stall a service raising an exception while starting, instead of exiting with a non-zero exit code.--log-level
CLI argument value is actually applied to loggers._start_service
).log()
function added to the service object is deprecated. Use the structlog
logger from tomodachi.get_logger()
instead.RequestHandler.get_request_ip
is deprecated. Instead use the tomodachi.get_forwarded_remote_ip()
function.-c
(--config
) which could be used to set object attributes from a JSON file. A better pattern is to read optional config data from an environment variable.Fix for an issue where a wrapped function is used as a handler function, which would then cause the keyword argument provided transport values to rely on the keyword arguments from the wrapped function's signature to be used instead of the keyword arguments from the wrapper function's signature. Described with examples in https://github.com/kalaspuff/tomodachi/pull/1781.
The bug was found to be present in the 0.25.0 release, which included major refactoring of the keyword argument provided transport values functionality.
The middleware execution logic has been improved to handle different argument types and edge cases more smoothly. Enhanced the way arguments are passed to middlewares and handlers, allowing better flexibility.
Function handlers, middlewares and envelopes can all now specify additional keyword arguments in their signatures and receive transport centric values.
Previously a few of these keyword values could be used for function handlers or envelopes, but not within middlewares. With this update these keywords can be used across all kind of handler functions to allow for more flexibility in how to structure apps, logging, tracing, authentication, etc.
Resolved an edge case where a service could end up calling SNS.CreateTopic
numerous times due to thousands of messages simultanously being published to a topic that were previously unknown to the service. Described in the detail in https://github.com/kalaspuff/tomodachi/pull/1768.
The aws_sns_sqs_publish
function will now return the SNS message identifier as a str
value if it is called with wait=True
(default), or instead return an asyncio.Task
object if called with wait=False
.
Updated documentation on middlewares and function signature keyword arguments:
This is an example of a middleware function for AWS SNS SQS messaging that adds trace spans on receiving messages, with a context extracted from a message attribute holding with a "traceparent" value. The topic name and SNS message identifier is also added as attributes to the trace span.
async def trace_middleware(
func: Callable[... Awaitable],
*,
topic: str,
message_attributes: dict,
sns_message_id: str
) -> None:
ctx: Context | None = None
if carrier_traceparent := message_attributes.get("telemetry.carrier.traceparent"):
carrier: dict[str, list[str] | str] = {"traceparent": carrier_traceparent}
ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
with tracer.start_as_current_span(f"SNSSQS handler '{func.__name__}'", context=ctx) as span:
span.set_attribute("messaging.system", "AmazonSQS")
span.set_attribute("messaging.operation", "process")
span.set_attribute("messaging.source.name", topic)
span.set_attribute("messaging.message.id", sns_message_id)
try:
# Calls the handler function (or next middleware in the chain)
await func()
except BaseException as exc:
logging.getLogger("exception").exception(exc)
span.record_exception(exc, escaped=True)
span.set_status(StatusCode.ERROR, f"{exc.__class__.__name__}: {exc}")
raise exc
Fixes an issue in the internal retry logic when using aws_sns_sqs_publish
if calls to the AWS API SNS.Publish
would intermittently respond with 408 response without any body, which previously would've resulted in a AWSSNSSQSException("Missing MessageId in response")
immediately without retries.
This was previously attempted to be fixed in https://github.com/kalaspuff/tomodachi/pull/1664 (released in 0.23.0), but due to an error it fell through to become an exception with the "Missing MessageId in response"
message instead.
The publish function will now catch exceptions from botocore
of type ResponseParserError
to which botocore
has added that "Further retries may succeed"
. tomodachi
will retry such SNS.Publish
calls up to 3 times and if still failing after all retries the library will reraise the original botocore
exception.
It seems that botocore
does not automatically retry such errors itself.
Similar to the above, the same kind of retries will now also be done during AWS API calls for SQS.DeleteMessage
, where the botocore.parser.QueryParser
would raise an ResponseParserError
exception on 408 responses without body.
aiobotocore
2.5.x releases.