Python task queue using Redis
TaskTiger
instance available for the task via global state (#170)unique_key
specifies the list of kwargs to use to construct the unique key (#180)TaskTiger.purge_errored_tasks
has been added__init__.py
. This should be transparent unless calling code uses internal variables/constants. If this affects you, it should be obvious as only imports should break. The fix is just to change the import path.This breaking change only affects periodic tasks, and scheduled unique tasks. See https://github.com/closeio/tasktiger/issues/146 for details.
Unique tasks rely on their ID to enforce uniqueness. The id is a hash of function_name
, args
, and kwargs
. There were some cases where creating unique scheduled tasks manually using Task
objects or manually .delay()
-ing a periodic task would inconsistently use None
for args
/kwargs
instead of []
and {}
. With this release, args
and kwargs
will always be normalized to []
/{}
no matter how the Task
was created. Existing scheduled unique tasks will have to be migrated to use a consistent id format.
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Re-schedule tasks that have malformed ids
Be sure to ``pip install click tasktiger limitlion tqdm`` as well.
Recommended steps:
* stop tasktiger workers
* upgrade tasktiger version
* run this script (without --apply) and check the logs to make sure it's
doing what you'd expect
* run this script (with --apply), which corrects all unique scheduled task
ids
* start tasktiger workers
"""
from __future__ import absolute_import, print_function, unicode_literals
import datetime
import json
import click
import limitlion
import tqdm
from redis import Redis
from tasktiger import Task, TaskTiger
from tasktiger._internal import SCHEDULED, gen_unique_id, queue_matches
# Connect to Redis (defaults to localhost)
redis_connection = Redis(decode_responses=True)
# Initialize tasktiger
tiger = TaskTiger(redis_connection)
# Initialize limitlion (optional, see throttling comment below)
limitlion.throttle_configure(redis_connection)
class JSONLineLog(object):
"""Safe and convenient json logger
Usage::
with JSONLineLog("my_file.json") as logger:
logger.write({'key': 'this is serialized as json'})
"""
def __init__(self, filename):
self.filename = filename
def __enter__(self):
self.file = open(self.filename, 'a')
self.file.__enter__()
return self
def write(self, log_entry):
print(json.dumps(log_entry), file=self.file)
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.__exit__(exc_type, exc_val, exc_tb)
@click.command()
@click.option(
"--apply",
is_flag=True,
help="Actually make these changes. This is not a drill!",
default=False,
)
@click.option("--limit", help="Limit to processing N tasks", default=None)
@click.option(
"--only-queues",
help="Only process these queues (comma delimited)",
default=None,
)
@click.option(
"--exclude-queues",
help="Exclude these queues from processing (comma delimited)",
default=None,
)
def main(apply=False, limit=None, only_queues=None, exclude_queues=None):
# warn if not applying change
if not apply:
print('*** NO CHANGES WILL BE MADE')
print('To apply this migration run with --apply.')
else:
print('*** CHANGES WILL BE APPLIED')
print()
# If we're actually running this on a production redis instance, we
# probably don't want to iterate overall the keys as fast as we can.
# limitlion is a simple token-bucket throttle that gets the job done.
# This step is optional but recommended. If you don't want to use
# limitlion, maybe do something simple like ``lambda: time.sleep(.1)``
throttle = limitlion.throttle_wait('migrations', rps=10)
# actually do the migration
with JSONLineLog("task_id_migration.json") as migration_log:
print("Writing log to {}".format(migration_log.filename))
if limit:
limit = int(limit)
if only_queues:
only_queues = only_queues.split(",")
if exclude_queues:
exclude_queues = exclude_queues.split(",")
def unique_scheduled_tasks():
"""Yields all unique scheduled tasks"""
queues_with_scheduled_tasks = tiger.connection.smembers(
tiger._key(SCHEDULED)
)
for queue in queues_with_scheduled_tasks:
if not queue_matches(
queue,
only_queues=only_queues,
exclude_queues=exclude_queues,
):
continue
skip = 0
total_tasks = None
task_limit = 5000
while total_tasks is None or skip < total_tasks:
throttle()
total_tasks, tasks = Task.tasks_from_queue(
tiger, queue, SCHEDULED, skip=skip, limit=task_limit
)
for task in tasks:
if task.unique:
yield task
skip += task_limit
# note that tqdm is completely optional here, but shows a nice progress
# bar.
total_with_wrong_id = 0
total_processed = 0
for idx, task in enumerate(tqdm.tqdm(unique_scheduled_tasks())):
# generate the new correct id
correct_id = gen_unique_id(
task.serialized_func, task.args, task.kwargs
)
if task.id != correct_id:
total_with_wrong_id += 1
when = datetime.datetime.fromtimestamp(
tiger.connection.zscore(
tiger._key(SCHEDULED, task.queue), task.id
)
)
migration_log.write(
{
"incorrect_task_id": task.id,
"correct_task_id": correct_id,
"serialized_func": task.serialized_func,
"queue": task.queue,
"ts": datetime.datetime.utcnow(),
"apply": apply,
"scheduled_at": when,
}
)
# Reschedule the task with the correct id. There's a 10 second
# buffer here in case any tasktigers are still running so we're
# not racing with them.
if apply and (
when - datetime.datetime.utcnow()
) > datetime.timedelta(seconds=10):
new_task = task.clone()
new_task._data["id"] = correct_id
new_task.delay(when=when)
task.cancel()
throttle()
total_processed = idx + 1
if limit and total_processed >= limit:
break
print(
'Processed {} tasks, found {} with incorrect ids'.format(
total_processed, total_with_wrong_id
)
)
print("Done")
if __name__ == "__main__":
main()
black
Major changes:
max_queue_size
parameter and raise QueueFullException
when it is reached.CHILD_CONTEXT_MANAGERS
configuration setting that allows specifying context managers that will be invoked before/after the forked child process runs.--no-store-tracebacks
option to not include tracebacks in execution histories. This can be used to reduce Redis storage requirements.