Вход Регистрация
Файл: vendor/laravel/framework/src/Illuminate/Queue/DatabaseQueue.php
Строк: 334
<?php

namespace IlluminateQueue;

use 
IlluminateContractsQueueClearableQueue;
use 
IlluminateContractsQueueQueue as QueueContract;
use 
IlluminateDatabaseConnection;
use 
IlluminateQueueJobsDatabaseJob;
use 
IlluminateQueueJobsDatabaseJobRecord;
use 
IlluminateSupportCarbon;
use 
IlluminateSupportStr;
use 
PDO;

class 
DatabaseQueue extends Queue implements QueueContractClearableQueue
{
    
/**
     * The database connection instance.
     *
     * @var IlluminateDatabaseConnection
     */
    
protected $database;

    
/**
     * The database table that holds the jobs.
     *
     * @var string
     */
    
protected $table;

    
/**
     * The name of the default queue.
     *
     * @var string
     */
    
protected $default;

    
/**
     * The expiration time of a job.
     *
     * @var int|null
     */
    
protected $retryAfter 60;

    
/**
     * Create a new database queue instance.
     *
     * @param  IlluminateDatabaseConnection  $database
     * @param  string  $table
     * @param  string  $default
     * @param  int  $retryAfter
     * @param  bool  $dispatchAfterCommit
     * @return void
     */
    
public function __construct(Connection $database,
                                
$table,
                                
$default 'default',
                                
$retryAfter 60,
                                
$dispatchAfterCommit false)
    {
        
$this->table $table;
        
$this->default $default;
        
$this->database $database;
        
$this->retryAfter $retryAfter;
        
$this->dispatchAfterCommit $dispatchAfterCommit;
    }

    
/**
     * Get the size of the queue.
     *
     * @param  string|null  $queue
     * @return int
     */
    
public function size($queue null)
    {
        return 
$this->database->table($this->table)
                    ->
where('queue'$this->getQueue($queue))
                    ->
count();
    }

    
/**
     * Push a new job onto the queue.
     *
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
     */
    
public function push($job$data ''$queue null)
    {
        return 
$this->enqueueUsing(
            
$job,
            
$this->createPayload($job$this->getQueue($queue), $data),
            
$queue,
            
null,
            function (
$payload$queue) {
                return 
$this->pushToDatabase($queue$payload);
            }
        );
    }

    
/**
     * Push a raw payload onto the queue.
     *
     * @param  string  $payload
     * @param  string|null  $queue
     * @param  array  $options
     * @return mixed
     */
    
public function pushRaw($payload$queue null, array $options = [])
    {
        return 
$this->pushToDatabase($queue$payload);
    }

    
/**
     * Push a new job onto the queue after (n) seconds.
     *
     * @param  DateTimeInterface|DateInterval|int  $delay
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
     */
    
public function later($delay$job$data ''$queue null)
    {
        return 
$this->enqueueUsing(
            
$job,
            
$this->createPayload($job$this->getQueue($queue), $data),
            
$queue,
            
$delay,
            function (
$payload$queue$delay) {
                return 
$this->pushToDatabase($queue$payload$delay);
            }
        );
    }

    
/**
     * Push an array of jobs onto the queue.
     *
     * @param  array  $jobs
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
     */
    
public function bulk($jobs$data ''$queue null)
    {
        
$queue $this->getQueue($queue);

        
$now $this->availableAt();

        return 
$this->database->table($this->table)->insert(collect((array) $jobs)->map(
            function (
$job) use ($queue$data$now) {
                return 
$this->buildDatabaseRecord(
                    
$queue,
                    
$this->createPayload($job$this->getQueue($queue), $data),
                    isset(
$job->delay) ? $this->availableAt($job->delay) : $now,
                );
            }
        )->
all());
    }

    
/**
     * Release a reserved job back onto the queue after (n) seconds.
     *
     * @param  string  $queue
     * @param  IlluminateQueueJobsDatabaseJobRecord  $job
     * @param  int  $delay
     * @return mixed
     */
    
public function release($queue$job$delay)
    {
        return 
$this->pushToDatabase($queue$job->payload$delay$job->attempts);
    }

    
/**
     * Push a raw payload to the database with a given delay of (n) seconds.
     *
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  DateTimeInterface|DateInterval|int  $delay
     * @param  int  $attempts
     * @return mixed
     */
    
protected function pushToDatabase($queue$payload$delay 0$attempts 0)
    {
        return 
$this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
            
$this->getQueue($queue), $payload$this->availableAt($delay), $attempts
        
));
    }

    
/**
     * Create an array to insert for the given job.
     *
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  int  $availableAt
     * @param  int  $attempts
     * @return array
     */
    
protected function buildDatabaseRecord($queue$payload$availableAt$attempts 0)
    {
        return [
            
'queue' => $queue,
            
'attempts' => $attempts,
            
'reserved_at' => null,
            
'available_at' => $availableAt,
            
'created_at' => $this->currentTime(),
            
'payload' => $payload,
        ];
    }

    
/**
     * Pop the next job off of the queue.
     *
     * @param  string|null  $queue
     * @return IlluminateContractsQueueJob|null
     *
     * @throws Throwable
     */
    
public function pop($queue null)
    {
        
$queue $this->getQueue($queue);

        return 
$this->database->transaction(function () use ($queue) {
            if (
$job $this->getNextAvailableJob($queue)) {
                return 
$this->marshalJob($queue$job);
            }
        });
    }

    
/**
     * Get the next available job for the queue.
     *
     * @param  string|null  $queue
     * @return IlluminateQueueJobsDatabaseJobRecord|null
     */
    
protected function getNextAvailableJob($queue)
    {
        
$job $this->database->table($this->table)
                    ->
lock($this->getLockForPopping())
                    ->
where('queue'$this->getQueue($queue))
                    ->
where(function ($query) {
                        
$this->isAvailable($query);
                        
$this->isReservedButExpired($query);
                    })
                    ->
orderBy('id''asc')
                    ->
first();

        return 
$job ? new DatabaseJobRecord((object) $job) : null;
    }

    
/**
     * Get the lock required for popping the next job.
     *
     * @return string|bool
     */
    
protected function getLockForPopping()
    {
        
$databaseEngine $this->database->getPdo()->getAttribute(PDO::ATTR_DRIVER_NAME);
        
$databaseVersion $this->database->getConfig('version') ?? $this->database->getPdo()->getAttribute(PDO::ATTR_SERVER_VERSION);

        if (
Str::of($databaseVersion)->contains('MariaDB')) {
            
$databaseEngine 'mariadb';
            
$databaseVersion Str::before(Str::after($databaseVersion'5.5.5-'), '-');
        } elseif (
Str::of($databaseVersion)->contains(['vitess''PlanetScale'])) {
            
$databaseEngine 'vitess';
            
$databaseVersion Str::before($databaseVersion'-');
        }

        if ((
$databaseEngine === 'mysql' && version_compare($databaseVersion'8.0.1''>=')) ||
            (
$databaseEngine === 'mariadb' && version_compare($databaseVersion'10.6.0''>=')) ||
            (
$databaseEngine === 'pgsql' && version_compare($databaseVersion'9.5''>=')) ||
            (
$databaseEngine === 'vitess' && version_compare($databaseVersion'19.0''>='))) {
            return 
'FOR UPDATE SKIP LOCKED';
        }

        if (
$databaseEngine === 'sqlsrv') {
            return 
'with(rowlock,updlock,readpast)';
        }

        return 
true;
    }

    
/**
     * Modify the query to check for available jobs.
     *
     * @param  IlluminateDatabaseQueryBuilder  $query
     * @return void
     */
    
protected function isAvailable($query)
    {
        
$query->where(function ($query) {
            
$query->whereNull('reserved_at')
                  ->
where('available_at''<='$this->currentTime());
        });
    }

    
/**
     * Modify the query to check for jobs that are reserved but have expired.
     *
     * @param  IlluminateDatabaseQueryBuilder  $query
     * @return void
     */
    
protected function isReservedButExpired($query)
    {
        
$expiration Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();

        
$query->orWhere(function ($query) use ($expiration) {
            
$query->where('reserved_at''<='$expiration);
        });
    }

    
/**
     * Marshal the reserved job into a DatabaseJob instance.
     *
     * @param  string  $queue
     * @param  IlluminateQueueJobsDatabaseJobRecord  $job
     * @return IlluminateQueueJobsDatabaseJob
     */
    
protected function marshalJob($queue$job)
    {
        
$job $this->markJobAsReserved($job);

        return new 
DatabaseJob(
            
$this->container$this$job$this->connectionName$queue
        
);
    }

    
/**
     * Mark the given job ID as reserved.
     *
     * @param  IlluminateQueueJobsDatabaseJobRecord  $job
     * @return IlluminateQueueJobsDatabaseJobRecord
     */
    
protected function markJobAsReserved($job)
    {
        
$this->database->table($this->table)->where('id'$job->id)->update([
            
'reserved_at' => $job->touch(),
            
'attempts' => $job->increment(),
        ]);

        return 
$job;
    }

    
/**
     * Delete a reserved job from the queue.
     *
     * @param  string  $queue
     * @param  string  $id
     * @return void
     *
     * @throws Throwable
     */
    
public function deleteReserved($queue$id)
    {
        
$this->database->transaction(function () use ($id) {
            if (
$this->database->table($this->table)->lockForUpdate()->find($id)) {
                
$this->database->table($this->table)->where('id'$id)->delete();
            }
        });
    }

    
/**
     * Delete a reserved job from the reserved queue and release it.
     *
     * @param  string  $queue
     * @param  IlluminateQueueJobsDatabaseJob  $job
     * @param  int  $delay
     * @return void
     */
    
public function deleteAndRelease($queue$job$delay)
    {
        
$this->database->transaction(function () use ($queue$job$delay) {
            if (
$this->database->table($this->table)->lockForUpdate()->find($job->getJobId())) {
                
$this->database->table($this->table)->where('id'$job->getJobId())->delete();
            }

            
$this->release($queue$job->getJobRecord(), $delay);
        });
    }

    
/**
     * Delete all of the jobs from the queue.
     *
     * @param  string  $queue
     * @return int
     */
    
public function clear($queue)
    {
        return 
$this->database->table($this->table)
                    ->
where('queue'$this->getQueue($queue))
                    ->
delete();
    }

    
/**
     * Get the queue or return the default.
     *
     * @param  string|null  $queue
     * @return string
     */
    
public function getQueue($queue)
    {
        return 
$queue ?: $this->default;
    }

    
/**
     * Get the underlying database instance.
     *
     * @return IlluminateDatabaseConnection
     */
    
public function getDatabase()
    {
        return 
$this->database;
    }
}
Онлайн: 0
Реклама