QuTee Save

PHP Background Jobs (Tasks) Manager

Project README

QuTee

Build Status Coverage Status

Simple queue manager and task processor for PHP using Beanstalkd, Redis or MySQL as backend. Event interface is provided for your logging or statsd-ing needs.

Example

<?php
/*
 * Bootstrap / DIC
 */
$beanstalkdParams    = array(
    'host'  => '127.0.0.1',
    'port'  => 11300
);
$queuePersistor = new Qutee\Persistor\Beanstalk();
$queuePersistor->setOptions($beanstalkdParams);

// or...

$redisParams    = array(
    'host'  => '127.0.0.1',
    'port'  => 6379
);
$queuePersistor = new Qutee\Persistor\Redis();
$queuePersistor->setOptions($redisParams);

// or...

$pdoParams    = array(
    'dsn'       => 'mysql:host=127.0.0.1;dbname=test;charset=utf8',
    'username'  => 'root',
    'password'  => '',
    'table_name'=> 'queue'
);
$queuePersistor = new Qutee\Persistor\Pdo();
$queuePersistor->setOptions($pdoParams);

$queue          = new Queue();
$queue->setPersistor($queuePersistor);

/*
 * App
 */

// Create Task
$task = new Task;
$task
    ->setName('Acme/DeleteFolder')
    ->setData(array(
        'path'      => '/usr',
    ))
    ->setPriority(Task::PRIORITY_HIGH);

// Queue it
$queue->addTask($task);

// Or do this in one go
Task::create('Acme/DeleteFolder', array('path' => '/usr'), Task::PRIORITY_HIGH);
<?php
// Worker - process all queues
$worker = new Worker;
while (true) {
    try {
        $worker->run();
    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

// Or, with more configuration
$worker = new Worker;
$worker
    ->setInterval(30)                           // Run every 30 seconds
    ->setPriority(Task::PRIORITY_HIGH)          // Will only do tasks of this priority
    ;

while (true) {
    try {
        if (null !== ($task = $worker->run())) {
            echo 'Ran task: '. $task->getName() . PHP_EOL;
        }
    } catch (Exception $e) {
        echo 'Error: '. $e->getMessage() . PHP_EOL;
    }
}

Logging example

// Initialize queue with persistor
$queue          = new Qutee\Queue();

// Setup the dispatcher, and register your subscriber
$dispatcher = new Symfony\Component\EventDispatcher\EventDispatcher;
$dispatcher->addSubscriber(new QuteeEventSubscriber());
$queue->setEventDispatcher($dispatcher);

// The subscriber:
class QuteeEventSubscriber implements \Symfony\Component\EventDispatcher\EventSubscriberInterface
{
    public static function getSubscribedEvents()
    {
        return array(
            \Qutee\Queue::EVENT_ADD_TASK => array(
                'addTask',
                0
            ),
            \Qutee\Worker::EVENT_START_PROCESSING_TASK => array(
                'processTask',
                0
            ),
            \Qutee\Worker::EVENT_END_PROCESSING_TASK => array(
                'processTaskEnd',
                0
            ),
        );
    }
    
    public function addTask(Qutee\Event $event)
    {
        $this->log('Added task: '. $event->getTask()->getName());
    }
    
    public function processTask(Qutee\Event $event)
    {
        $this->log('Processing task '. $event->getTask()->getName() .' started');
    }
    
    public function processTaskEnd(Qutee\Event $event)
    {
        $this->log('Processing task '. $event->getTask()->getName() .' finished, lasted '. ($event->getArgument('elapsedTime') / 1000) .' seconds');
    }
    
    protected function log($message)
    {
        file_put_contents(__DIR__ .'/events.log', $message . PHP_EOL, FILE_APPEND);
    }

}

Notes

  • Use supervisord or similar for process monitoring / babysitting

TODO

Open Source Agenda is not affiliated with "QuTee" Project. README Source: anorgan/QuTee
Stars
71
Open Issues
10
Last Commit
2 years ago
Repository
License

Open Source Agenda Badge

Open Source Agenda Rating