Tasktiger Versions Save

Python task queue using Redis

v0.12

3 years ago

Breaking changes

  • Drop support for redis-py 2 (#183)

Other changes

  • Make the TaskTiger instance available for the task via global state (#170)
  • Support for custom task runners (#175)
  • Add ability to configure a poll- vs push-method for task runners to discover new tasks (#176)
  • unique_key specifies the list of kwargs to use to construct the unique key (#180)

Bugfixes

  • Ensure task exists in the given queue when retrieving it (#184)
  • Clear retried executions from successful periodic tasks (#188)

v0.10.1

4 years ago

Major Changes:

  • Breaking Change: Unique tasks ids have changed slightly. See more details below.
  • TaskTiger.purge_errored_tasks has been added
  • Breaking Change: Internal refactoring moved a lot of code out of __init__.py. This should be transparent unless calling code uses internal variables/constants. If this affects you, it should be obvious as only imports should break. The fix is just to change the import path.

Breaking Change: Unique Task Ids Changed

This breaking change only affects periodic tasks, and scheduled unique tasks. See https://github.com/closeio/tasktiger/issues/146 for details.

Unique tasks rely on their ID to enforce uniqueness. The id is a hash of function_name, args, and kwargs. There were some cases where creating unique scheduled tasks manually using Task objects or manually .delay()-ing a periodic task would inconsistently use None for args/kwargs instead of [] and {}. With this release, args and kwargs will always be normalized to []/{} no matter how the Task was created. Existing scheduled unique tasks will have to be migrated to use a consistent id format.

Here's a script that migrates task ids:

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""Re-schedule tasks that have malformed ids

Be sure to ``pip install click tasktiger limitlion tqdm`` as well.

Recommended steps:
    * stop tasktiger workers
    * upgrade tasktiger version
    * run this script (without --apply) and check the logs to make sure it's
      doing what you'd expect
    * run this script (with --apply), which corrects all unique scheduled task
      ids
    * start tasktiger workers
"""

from __future__ import absolute_import, print_function, unicode_literals

import datetime
import json

import click
import limitlion
import tqdm
from redis import Redis
from tasktiger import Task, TaskTiger
from tasktiger._internal import SCHEDULED, gen_unique_id, queue_matches

# Connect to Redis (defaults to localhost)
redis_connection = Redis(decode_responses=True)

# Initialize tasktiger
tiger = TaskTiger(redis_connection)

# Initialize limitlion (optional, see throttling comment below)
limitlion.throttle_configure(redis_connection)


class JSONLineLog(object):
    """Safe and convenient json logger

    Usage::

        with JSONLineLog("my_file.json") as logger:
            logger.write({'key': 'this is serialized as json'})
    """

    def __init__(self, filename):
        self.filename = filename

    def __enter__(self):
        self.file = open(self.filename, 'a')
        self.file.__enter__()
        return self

    def write(self, log_entry):
        print(json.dumps(log_entry), file=self.file)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.__exit__(exc_type, exc_val, exc_tb)


@click.command()
@click.option(
    "--apply",
    is_flag=True,
    help="Actually make these changes. This is not a drill!",
    default=False,
)
@click.option("--limit", help="Limit to processing N tasks", default=None)
@click.option(
    "--only-queues",
    help="Only process these queues (comma delimited)",
    default=None,
)
@click.option(
    "--exclude-queues",
    help="Exclude these queues from processing (comma delimited)",
    default=None,
)
def main(apply=False, limit=None, only_queues=None, exclude_queues=None):
    # warn if not applying change
    if not apply:
        print('*** NO CHANGES WILL BE MADE')
        print('To apply this migration run with --apply.')
    else:
        print('*** CHANGES WILL BE APPLIED')
    print()

    # If we're actually running this on a production redis instance, we
    # probably don't want to iterate overall the keys as fast as we can.
    # limitlion is a simple token-bucket throttle that gets the job done.
    # This step is optional but recommended. If you don't want to use
    # limitlion, maybe do something simple like ``lambda: time.sleep(.1)``
    throttle = limitlion.throttle_wait('migrations', rps=10)

    # actually do the migration
    with JSONLineLog("task_id_migration.json") as migration_log:
        print("Writing log to {}".format(migration_log.filename))

        if limit:
            limit = int(limit)
        if only_queues:
            only_queues = only_queues.split(",")
        if exclude_queues:
            exclude_queues = exclude_queues.split(",")

        def unique_scheduled_tasks():
            """Yields all unique scheduled tasks"""
            queues_with_scheduled_tasks = tiger.connection.smembers(
                tiger._key(SCHEDULED)
            )
            for queue in queues_with_scheduled_tasks:
                if not queue_matches(
                    queue,
                    only_queues=only_queues,
                    exclude_queues=exclude_queues,
                ):
                    continue

                skip = 0
                total_tasks = None
                task_limit = 5000
                while total_tasks is None or skip < total_tasks:
                    throttle()
                    total_tasks, tasks = Task.tasks_from_queue(
                        tiger, queue, SCHEDULED, skip=skip, limit=task_limit
                    )
                    for task in tasks:
                        if task.unique:
                            yield task
                    skip += task_limit

        # note that tqdm is completely optional here, but shows a nice progress
        # bar.
        total_with_wrong_id = 0
        total_processed = 0
        for idx, task in enumerate(tqdm.tqdm(unique_scheduled_tasks())):
            # generate the new correct id
            correct_id = gen_unique_id(
                task.serialized_func, task.args, task.kwargs
            )
            if task.id != correct_id:
                total_with_wrong_id += 1
                when = datetime.datetime.fromtimestamp(
                    tiger.connection.zscore(
                        tiger._key(SCHEDULED, task.queue), task.id
                    )
                )
                migration_log.write(
                    {
                        "incorrect_task_id": task.id,
                        "correct_task_id": correct_id,
                        "serialized_func": task.serialized_func,
                        "queue": task.queue,
                        "ts": datetime.datetime.utcnow(),
                        "apply": apply,
                        "scheduled_at": when,
                    }
                )
                # Reschedule the task with the correct id. There's a 10 second
                # buffer here in case any tasktigers are still running so we're
                # not racing with them.
                if apply and (
                    when - datetime.datetime.utcnow()
                ) > datetime.timedelta(seconds=10):
                    new_task = task.clone()
                    new_task._data["id"] = correct_id
                    new_task.delay(when=when)
                    task.cancel()
                    throttle()

            total_processed = idx + 1
            if limit and total_processed >= limit:
                break

        print(
            'Processed {} tasks, found {} with incorrect ids'.format(
                total_processed, total_with_wrong_id
            )
        )

    print("Done")


if __name__ == "__main__":
    main()

Minor Changes

  • The codebase is now formatted with black
  • Some additional testing infrastructure has been added to make local development/testing easier

v0.9.5

4 years ago

Major changes:

  • Support setting max queue size with max_queue_size parameter and raise QueueFullException when it is reached.
  • Refactor single worker queues to use new Semaphore lock that allows setting the maximum number of workers that should process a particular queue.
  • Added CHILD_CONTEXT_MANAGERS configuration setting that allows specifying context managers that will be invoked before/after the forked child process runs.
  • Added --no-store-tracebacks option to not include tracebacks in execution histories. This can be used to reduce Redis storage requirements.