Файл: system/core/queue.php
Строк: 94
<?php
class cmsQueue {
protected static $max_attempts = 4;
protected static $max_run_jobs = 20;
protected static $table = 'jobs';
/**
* Добавляет задачу в очередь
*
* @param string $queue Название очереди
* @param array $data
* @param integer $priority Приоритет, по умолчанию 1
* @return integer
*/
public static function pushOn($queue, $data, $priority = 1) {
return static::pushToDatabase(array(
'payload' => $data,
'queue' => $queue,
'priority' => $priority
));
}
/**
* Добавляет задачу в очередь с отсрочкой в секундах или по дате
* @param integer|date $date
* @param string $queue
* @param array $data
* @param integer $priority
* @return integer
*/
public static function pushOnLater($date, $queue, $data, $priority = 1) {
return static::pushToDatabase(array(
'payload' => $data,
'queue' => $queue,
'date_started' => date('Y-m-d H:i:s', (is_numeric($date) ? (time() + $date) : strtotime($date))),
'priority' => $priority
));
}
public static function getMaxAttempts() {
return static::$max_attempts;
}
public static function getTableName() {
return static::$table;
}
public static function setTableName($name) {
static::$table = $name;
}
protected static function pushToDatabase($data) {
$model = new cmsModel();
$data['payload'] = json_encode($data['payload']);
return $model->insert(static::$table, $data);
}
public static function runJobs($queue = null) {
$model = new cmsModel();
$model->orderByList(array(
array('by' => 'date_started', 'to' => 'asc'),
array('by' => 'priority', 'to' => 'desc'),
array('by' => 'date_created', 'to' => 'asc')
));
$model->limit(static::$max_run_jobs);
if($queue){
$model->filterEqual('queue', $queue);
}
$jobs = $model->
filterIsNull('is_locked')->
filterLtEqual('attempts', static::$max_attempts)->
filterStart()->
filterIsNull('date_started')->
filterOr()->
filterLtEqual('date_started', date('Y-m-d H:i:s'))->
filterEnd()->get(static::$table, function ($item, $model){
$item['payload'] = json_decode($item['payload'], true);
if(!isset($item['payload']['params'])){
$item['payload']['params'] = array();
}
array_unshift($item['payload']['params'], ($item['attempts'] + 1));
return $item;
});
if(!$jobs){
return false;
}
// помечаем полученные задания как запущенные
$model->filterIn('id', array_keys($jobs))->updateFiltered(static::$table, array(
'is_locked' => 1,
'date_started' => '',
'attempts' => function ($db){
return '`attempts` + 1';
}
), true);
foreach ($jobs as $job) {
$result = static::runJob($job);
// если задание выполнено успешно, удаляем его
if($result === true){
static::deleteJob($job);
} else
// в случае если передали false, то нам нужен повторный запуск
if($result === false) {
static::unlockJob($job);
}
// иначе пишем ошибку, неразблокируя задачу
else {
static::setJobError($job, $result);
}
}
return true;
}
public static function runJob($job) {
$controller = cmsCore::getControllerInstance($job['payload']['controller']);
try {
$result = true;
if(isset($job['payload']['hook'])){
$result = $controller->runHook($job['payload']['hook'], $job['payload']['params']);
}
if(isset($job['payload']['action'])){
$result = $controller->runAction($job['payload']['action'], $job['payload']['params']);
}
} catch (Exception $e) {
$result = $e->getMessage();
}
return $result;
}
public static function deleteJob($job) {
$model = new cmsModel();
return $model->delete(static::$table, $job['id']);
}
public static function setJobError($job, $error_text) {
$model = new cmsModel();
return $model->update(static::$table, $job['id'], array('last_error' => $error_text), true);
}
public static function unlockJob($job) {
$model = new cmsModel();
return $model->update(static::$table, $job['id'], array('is_locked' => null), true);
}
public static function restartJob($job) {
$model = new cmsModel();
return $model->update(static::$table, $job['id'], array('is_locked' => null, 'last_error' => null, 'attempts' => 0), true);
}
}