Вход Регистрация
Файл: vendor/laravel/framework/src/Illuminate/Queue/Queue.php
Строк: 429
<?php

namespace IlluminateQueue;

use 
Closure;
use 
DateTimeInterface;
use 
IlluminateContainerContainer;
use 
IlluminateContractsEncryptionEncrypter;
use 
IlluminateContractsQueueShouldBeEncrypted;
use 
IlluminateContractsQueueShouldQueueAfterCommit;
use 
IlluminateQueueEventsJobQueued;
use 
IlluminateQueueEventsJobQueueing;
use 
IlluminateSupportArr;
use 
IlluminateSupportInteractsWithTime;
use 
IlluminateSupportStr;

abstract class 
Queue
{
    use 
InteractsWithTime;

    
/**
     * The IoC container instance.
     *
     * @var IlluminateContainerContainer
     */
    
protected $container;

    
/**
     * The connection name for the queue.
     *
     * @var string
     */
    
protected $connectionName;

    
/**
     * Indicates that jobs should be dispatched after all database transactions have committed.
     *
     * @var bool
     */
    
protected $dispatchAfterCommit;

    
/**
     * The create payload callbacks.
     *
     * @var callable[]
     */
    
protected static $createPayloadCallbacks = [];

    
/**
     * Push a new job onto the queue.
     *
     * @param  string  $queue
     * @param  string  $job
     * @param  mixed  $data
     * @return mixed
     */
    
public function pushOn($queue$job$data '')
    {
        return 
$this->push($job$data$queue);
    }

    
/**
     * Push a new job onto a specific queue after (n) seconds.
     *
     * @param  string  $queue
     * @param  DateTimeInterface|DateInterval|int  $delay
     * @param  string  $job
     * @param  mixed  $data
     * @return mixed
     */
    
public function laterOn($queue$delay$job$data '')
    {
        return 
$this->later($delay$job$data$queue);
    }

    
/**
     * Push an array of jobs onto the queue.
     *
     * @param  array  $jobs
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return void
     */
    
public function bulk($jobs$data ''$queue null)
    {
        foreach ((array) 
$jobs as $job) {
            
$this->push($job$data$queue);
        }
    }

    
/**
     * Create a payload string from the given job and data.
     *
     * @param  Closure|string|object  $job
     * @param  string  $queue
     * @param  mixed  $data
     * @return string
     *
     * @throws IlluminateQueueInvalidPayloadException
     */
    
protected function createPayload($job$queue$data '')
    {
        if (
$job instanceof Closure) {
            
$job CallQueuedClosure::create($job);
        }

        
$payload json_encode($value $this->createPayloadArray($job$queue$data), JSON_UNESCAPED_UNICODE);

        if (
json_last_error() !== JSON_ERROR_NONE) {
            throw new 
InvalidPayloadException(
                
'Unable to JSON encode payload. Error ('.json_last_error().'): '.json_last_error_msg(), $value
            
);
        }

        return 
$payload;
    }

    
/**
     * Create a payload array from the given job and data.
     *
     * @param  string|object  $job
     * @param  string  $queue
     * @param  mixed  $data
     * @return array
     */
    
protected function createPayloadArray($job$queue$data '')
    {
        return 
is_object($job)
                    ? 
$this->createObjectPayload($job$queue)
                    : 
$this->createStringPayload($job$queue$data);
    }

    
/**
     * Create a payload for an object-based queue handler.
     *
     * @param  object  $job
     * @param  string  $queue
     * @return array
     */
    
protected function createObjectPayload($job$queue)
    {
        
$payload $this->withCreatePayloadHooks($queue, [
            
'uuid' => (string) Str::uuid(),
            
'displayName' => $this->getDisplayName($job),
            
'job' => 'IlluminateQueueCallQueuedHandler@call',
            
'maxTries' => $this->getJobTries($job) ?? null,
            
'maxExceptions' => $job->maxExceptions ?? null,
            
'failOnTimeout' => $job->failOnTimeout ?? false,
            
'backoff' => $this->getJobBackoff($job),
            
'timeout' => $job->timeout ?? null,
            
'retryUntil' => $this->getJobExpiration($job),
            
'data' => [
                
'commandName' => $job,
                
'command' => $job,
            ],
        ]);

        
$command $this->jobShouldBeEncrypted($job) && $this->container->bound(Encrypter::class)
                    ? 
$this->container[Encrypter::class]->encrypt(serialize(clone $job))
                    : 
serialize(clone $job);

        return 
array_merge($payload, [
            
'data' => array_merge($payload['data'], [
                
'commandName' => get_class($job),
                
'command' => $command,
            ]),
        ]);
    }

    
/**
     * Get the display name for the given job.
     *
     * @param  object  $job
     * @return string
     */
    
protected function getDisplayName($job)
    {
        return 
method_exists($job'displayName')
                        ? 
$job->displayName() : get_class($job);
    }

    
/**
     * Get the maximum number of attempts for an object-based queue handler.
     *
     * @param  mixed  $job
     * @return mixed
     */
    
public function getJobTries($job)
    {
        if (! 
method_exists($job'tries') && ! isset($job->tries)) {
            return;
        }

        if (isset(
$job->tries)) {
            return 
$job->tries;
        }

        if (
method_exists($job'tries') && ! is_null($job->tries())) {
            return 
$job->tries();
        }
    }

    
/**
     * Get the backoff for an object-based queue handler.
     *
     * @param  mixed  $job
     * @return mixed
     */
    
public function getJobBackoff($job)
    {
        if (! 
method_exists($job'backoff') && ! isset($job->backoff)) {
            return;
        }

        if (
is_null($backoff $job->backoff ?? $job->backoff())) {
            return;
        }

        return 
collect(Arr::wrap($backoff))
            ->
map(function ($backoff) {
                return 
$backoff instanceof DateTimeInterface
                                
$this->secondsUntil($backoff) : $backoff;
            })->
implode(',');
    }

    
/**
     * Get the expiration timestamp for an object-based queue handler.
     *
     * @param  mixed  $job
     * @return mixed
     */
    
public function getJobExpiration($job)
    {
        if (! 
method_exists($job'retryUntil') && ! isset($job->retryUntil)) {
            return;
        }

        
$expiration $job->retryUntil ?? $job->retryUntil();

        return 
$expiration instanceof DateTimeInterface
                        
$expiration->getTimestamp() : $expiration;
    }

    
/**
     * Determine if the job should be encrypted.
     *
     * @param  object  $job
     * @return bool
     */
    
protected function jobShouldBeEncrypted($job)
    {
        if (
$job instanceof ShouldBeEncrypted) {
            return 
true;
        }

        return isset(
$job->shouldBeEncrypted) && $job->shouldBeEncrypted;
    }

    
/**
     * Create a typical, string based queue payload array.
     *
     * @param  string  $job
     * @param  string  $queue
     * @param  mixed  $data
     * @return array
     */
    
protected function createStringPayload($job$queue$data)
    {
        return 
$this->withCreatePayloadHooks($queue, [
            
'uuid' => (string) Str::uuid(),
            
'displayName' => is_string($job) ? explode('@'$job)[0] : null,
            
'job' => $job,
            
'maxTries' => null,
            
'maxExceptions' => null,
            
'failOnTimeout' => false,
            
'backoff' => null,
            
'timeout' => null,
            
'data' => $data,
        ]);
    }

    
/**
     * Register a callback to be executed when creating job payloads.
     *
     * @param  callable|null  $callback
     * @return void
     */
    
public static function createPayloadUsing($callback)
    {
        if (
is_null($callback)) {
            static::
$createPayloadCallbacks = [];
        } else {
            static::
$createPayloadCallbacks[] = $callback;
        }
    }

    
/**
     * Create the given payload using any registered payload hooks.
     *
     * @param  string  $queue
     * @param  array  $payload
     * @return array
     */
    
protected function withCreatePayloadHooks($queue, array $payload)
    {
        if (! empty(static::
$createPayloadCallbacks)) {
            foreach (static::
$createPayloadCallbacks as $callback) {
                
$payload array_merge($payload$callback($this->getConnectionName(), $queue$payload));
            }
        }

        return 
$payload;
    }

    
/**
     * Enqueue a job using the given callback.
     *
     * @param  Closure|string|object  $job
     * @param  string  $payload
     * @param  string  $queue
     * @param  DateTimeInterface|DateInterval|int|null  $delay
     * @param  callable  $callback
     * @return mixed
     */
    
protected function enqueueUsing($job$payload$queue$delay$callback)
    {
        if (
$this->shouldDispatchAfterCommit($job) &&
            
$this->container->bound('db.transactions')) {
            return 
$this->container->make('db.transactions')->addCallback(
                function () use (
$payload$queue$delay$callback$job) {
                    
$this->raiseJobQueueingEvent($job$payload);

                    return 
tap($callback($payload$queue$delay), function ($jobId) use ($job$payload) {
                        
$this->raiseJobQueuedEvent($jobId$job$payload);
                    });
                }
            );
        }

        
$this->raiseJobQueueingEvent($job$payload);

        return 
tap($callback($payload$queue$delay), function ($jobId) use ($job$payload) {
            
$this->raiseJobQueuedEvent($jobId$job$payload);
        });
    }

    
/**
     * Determine if the job should be dispatched after all database transactions have committed.
     *
     * @param  Closure|string|object  $job
     * @return bool
     */
    
protected function shouldDispatchAfterCommit($job)
    {
        if (
is_object($job) && $job instanceof ShouldQueueAfterCommit) {
            return 
true;
        }

        if (! 
$job instanceof Closure && is_object($job) && isset($job->afterCommit)) {
            return 
$job->afterCommit;
        }

        if (isset(
$this->dispatchAfterCommit)) {
            return 
$this->dispatchAfterCommit;
        }

        return 
false;
    }

    
/**
     * Raise the job queueing event.
     *
     * @param  Closure|string|object  $job
     * @param  string  $payload
     * @return void
     */
    
protected function raiseJobQueueingEvent($job$payload)
    {
        if (
$this->container->bound('events')) {
            
$this->container['events']->dispatch(new JobQueueing($this->connectionName$job$payload));
        }
    }

    
/**
     * Raise the job queued event.
     *
     * @param  string|int|null  $jobId
     * @param  Closure|string|object  $job
     * @param  string  $payload
     * @return void
     */
    
protected function raiseJobQueuedEvent($jobId$job$payload)
    {
        if (
$this->container->bound('events')) {
            
$this->container['events']->dispatch(new JobQueued($this->connectionName$jobId$job$payload));
        }
    }

    
/**
     * Get the connection name for the queue.
     *
     * @return string
     */
    
public function getConnectionName()
    {
        return 
$this->connectionName;
    }

    
/**
     * Set the connection name for the queue.
     *
     * @param  string  $name
     * @return $this
     */
    
public function setConnectionName($name)
    {
        
$this->connectionName $name;

        return 
$this;
    }

    
/**
     * Get the container instance being used by the connection.
     *
     * @return IlluminateContainerContainer
     */
    
public function getContainer()
    {
        return 
$this->container;
    }

    
/**
     * Set the IoC container instance.
     *
     * @param  IlluminateContainerContainer  $container
     * @return void
     */
    
public function setContainer(Container $container)
    {
        
$this->container $container;
    }
}
Онлайн: 1
Реклама