Вход Регистрация
Файл: system/core/queue.php
Строк: 94
<?php

class cmsQueue {

    protected static 
$max_attempts 4;
    protected static 
$max_run_jobs 20;
    protected static 
$table 'jobs';

    
/**
     * Добавляет задачу в очередь
     *
     * @param string $queue Название очереди
     * @param array $data
     * @param integer $priority Приоритет, по умолчанию 1
     * @return integer
     */
    
public static function pushOn($queue$data$priority 1) {

        return static::
pushToDatabase(array(
            
'payload'  => $data,
            
'queue'    => $queue,
            
'priority' => $priority
        
));

    }

    
/**
     * Добавляет задачу в очередь с отсрочкой в секундах или по дате
     * @param integer|date $date
     * @param string $queue
     * @param array $data
     * @param integer $priority
     * @return integer
     */
    
public static function pushOnLater($date$queue$data$priority 1) {

        return static::
pushToDatabase(array(
            
'payload'      => $data,
            
'queue'        => $queue,
            
'date_started' => date('Y-m-d H:i:s', (is_numeric($date) ? (time() + $date) : strtotime($date))),
            
'priority'     => $priority
        
));

    }

    public static function 
getMaxAttempts() {
        return static::
$max_attempts;
    }

    public static function 
getTableName() {
        return static::
$table;
    }

    public static function 
setTableName($name) {
        static::
$table $name;
    }

    protected static function 
pushToDatabase($data) {

        
$model = new cmsModel();

        
$data['payload'] = json_encode($data['payload']);

        return 
$model->insert(static::$table$data);

    }

    public static function 
runJobs($queue null) {

        
$model = new cmsModel();

        
$model->orderByList(array(
            array(
'by' => 'date_started''to' => 'asc'),
            array(
'by' => 'priority''to' => 'desc'),
            array(
'by' => 'date_created''to' => 'asc')
        ));

        
$model->limit(static::$max_run_jobs);

        if(
$queue){
            
$model->filterEqual('queue'$queue);
        }

        
$jobs $model->
                
filterIsNull('is_locked')->
                
filterLtEqual('attempts', static::$max_attempts)->
                
filterStart()->
                    
filterIsNull('date_started')->
                    
filterOr()->
                    
filterLtEqual('date_started'date('Y-m-d H:i:s'))->
                
filterEnd()->get(static::$table, function ($item$model){

                    
$item['payload'] = json_decode($item['payload'], true);

                    if(!isset(
$item['payload']['params'])){
                        
$item['payload']['params'] = array();
                    }

                    
array_unshift($item['payload']['params'], ($item['attempts'] + 1));

                    return 
$item;

                });

        if(!
$jobs){
            return 
false;
        }

        
// помечаем полученные задания как запущенные
        
$model->filterIn('id'array_keys($jobs))->updateFiltered(static::$table, array(
            
'is_locked'    => 1,
            
'date_started' => '',
            
'attempts' => function ($db){
                return 
'`attempts` + 1';
            }
        ), 
true);

        foreach (
$jobs as $job) {

            
$result = static::runJob($job);

            
// если задание выполнено успешно, удаляем его
            
if($result === true){
                static::
deleteJob($job);
            } else
            
// в случае если передали false, то нам нужен повторный запуск
            
if($result === false) {
                static::
unlockJob($job);
            }
            
// иначе пишем ошибку, неразблокируя задачу
            
else {
                static::
setJobError($job$result);
            }

        }

        return 
true;

    }

    public static function 
runJob($job) {

        
$controller cmsCore::getControllerInstance($job['payload']['controller']);

        try {

            
$result true;

            if(isset(
$job['payload']['hook'])){
                
$result $controller->runHook($job['payload']['hook'], $job['payload']['params']);
            }

            if(isset(
$job['payload']['action'])){
                
$result $controller->runAction($job['payload']['action'], $job['payload']['params']);
            }

        } catch (
Exception $e) {

            
$result $e->getMessage();

        }

        return 
$result;

    }

    public static function 
deleteJob($job) {

        
$model = new cmsModel();

        return 
$model->delete(static::$table$job['id']);

    }

    public static function 
setJobError($job$error_text) {

        
$model = new cmsModel();

        return 
$model->update(static::$table$job['id'], array('last_error' => $error_text), true);

    }

    public static function 
unlockJob($job) {

        
$model = new cmsModel();

        return 
$model->update(static::$table$job['id'], array('is_locked' => null), true);

    }

    public static function 
restartJob($job) {

        
$model = new cmsModel();

        return 
$model->update(static::$table$job['id'], array('is_locked' => null'last_error' => null'attempts' => 0), true);

    }

}
Онлайн: 0
Реклама