Вход Регистрация
Файл: vendor/laravel/framework/src/Illuminate/Bus/DynamoBatchRepository.php
Строк: 452
<?php

namespace IlluminateBus;

use 
AwsDynamoDbDynamoDbClient;
use 
AwsDynamoDbMarshaler;
use 
CarbonCarbonImmutable;
use 
Closure;
use 
IlluminateSupportStr;

class 
DynamoBatchRepository implements BatchRepository
{
    
/**
     * The batch factory instance.
     *
     * @var IlluminateBusBatchFactory
     */
    
protected $factory;

    
/**
     * The database connection instance.
     *
     * @var AwsDynamoDbDynamoDbClient
     */
    
protected $dynamoDbClient;

    
/**
     * The application name.
     *
     * @var string
     */
    
protected $applicationName;

    
/**
     * The table to use to store batch information.
     *
     * @var string
     */
    
protected $table;

    
/**
     * The time-to-live value for batch records.
     *
     * @var int
     */
    
protected $ttl;

    
/**
     * The name of the time-to-live attribute for batch records.
     *
     * @var string
     */
    
protected $ttlAttribute;

    
/**
     * The DynamoDB marshaler instance.
     *
     * @var AwsDynamoDbMarshaler
     */
    
protected $marshaler;

    
/**
     * Create a new batch repository instance.
     */
    
public function __construct(
        
BatchFactory $factory,
        
DynamoDbClient $dynamoDbClient,
        
string $applicationName,
        
string $table,
        ?
int $ttl,
        ?
string $ttlAttribute
    
) {
        
$this->factory $factory;
        
$this->dynamoDbClient $dynamoDbClient;
        
$this->applicationName $applicationName;
        
$this->table $table;
        
$this->ttl $ttl;
        
$this->ttlAttribute $ttlAttribute;
        
$this->marshaler = new Marshaler;
    }

    
/**
     * Retrieve a list of batches.
     *
     * @param  int  $limit
     * @param  mixed  $before
     * @return IlluminateBusBatch[]
     */
    
public function get($limit 50$before null)
    {
        
$condition 'application = :application';

        if (
$before) {
            
$condition 'application = :application AND id < :id';
        }

        
$result $this->dynamoDbClient->query([
            
'TableName' => $this->table,
            
'KeyConditionExpression' => $condition,
            
'ExpressionAttributeValues' => array_filter([
                
':application' => ['S' => $this->applicationName],
                
':id' => array_filter(['S' => $before]),
            ]),
            
'Limit' => $limit,
            
'ScanIndexForward' => false,
        ]);

        return 
array_map(
            
fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($bmapAsObjecttrue)),
            
$result['Items']
        );
    }

    
/**
     * Retrieve information about an existing batch.
     *
     * @param  string  $batchId
     * @return IlluminateBusBatch|null
     */
    
public function find(string $batchId)
    {
        if (
$batchId === '') {
            return 
null;
        }

        
$b $this->dynamoDbClient->getItem([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
        ]);

        if (! isset(
$b['Item'])) {
            
// If we didn't find it via a standard read, attempt consistent read...
            
$b $this->dynamoDbClient->getItem([
                
'TableName' => $this->table,
                
'Key' => [
                    
'application' => ['S' => $this->applicationName],
                    
'id' => ['S' => $batchId],
                ],
                
'ConsistentRead' => true,
            ]);

            if (! isset(
$b['Item'])) {
                return 
null;
            }
        }

        
$batch $this->marshaler->unmarshalItem($b['Item'], mapAsObjecttrue);

        if (
$batch) {
            return 
$this->toBatch($batch);
        }
    }

    
/**
     * Store a new pending batch.
     *
     * @param  IlluminateBusPendingBatch  $batch
     * @return IlluminateBusBatch
     */
    
public function store(PendingBatch $batch)
    {
        
$id = (string) Str::orderedUuid();

        
$batch = [
            
'id' => $id,
            
'name' => $batch->name,
            
'total_jobs' => 0,
            
'pending_jobs' => 0,
            
'failed_jobs' => 0,
            
'failed_job_ids' => [],
            
'options' => $this->serialize($batch->options ?? []),
            
'created_at' => time(),
            
'cancelled_at' => null,
            
'finished_at' => null,
        ];

        if (! 
is_null($this->ttl)) {
            
$batch[$this->ttlAttribute] = time() + $this->ttl;
        }

        
$this->dynamoDbClient->putItem([
            
'TableName' => $this->table,
            
'Item' => $this->marshaler->marshalItem(
                
array_merge(['application' => $this->applicationName], $batch)
            ),
        ]);

        return 
$this->find($id);
    }

    
/**
     * Increment the total number of jobs within the batch.
     *
     * @param  string  $batchId
     * @param  int  $amount
     * @return void
     */
    
public function incrementTotalJobs(string $batchIdint $amount)
    {
        
$update 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val';

        if (
$this->ttl) {
            
$update "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl";
        }

        
$this->dynamoDbClient->updateItem(array_filter([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
            
'UpdateExpression' => $update,
            
'ExpressionAttributeValues' => array_filter([
                
':val' => ['N' => "$amount"],
                
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            
'ReturnValues' => 'ALL_NEW',
        ]));
    }

    
/**
     * Decrement the total number of pending jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return IlluminateBusUpdatedBatchJobCounts
     */
    
public function decrementPendingJobs(string $batchIdstring $jobId)
    {
        
$update 'SET pending_jobs = pending_jobs - :inc';

        if (
$this->ttl !== null) {
            
$update "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl";
        }

        
$batch $this->dynamoDbClient->updateItem(array_filter([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
            
'UpdateExpression' => $update,
            
'ExpressionAttributeValues' => array_filter([
                
':inc' => ['N' => '1'],
                
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            
'ReturnValues' => 'ALL_NEW',
        ]));

        
$values $this->marshaler->unmarshalItem($batch['Attributes']);

        return new 
UpdatedBatchJobCounts(
            
$values['pending_jobs'],
            
$values['failed_jobs']
        );
    }

    
/**
     * Increment the total number of failed jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return IlluminateBusUpdatedBatchJobCounts
     */
    
public function incrementFailedJobs(string $batchIdstring $jobId)
    {
        
$update 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)';

        if (
$this->ttl !== null) {
            
$update "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl";
        }

        
$batch $this->dynamoDbClient->updateItem(array_filter([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
            
'UpdateExpression' => $update,
            
'ExpressionAttributeValues' => array_filter([
                
':jobId' => $this->marshaler->marshalValue([$jobId]),
                
':inc' => ['N' => '1'],
                
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
            
'ReturnValues' => 'ALL_NEW',
        ]));

        
$values $this->marshaler->unmarshalItem($batch['Attributes']);

        return new 
UpdatedBatchJobCounts(
            
$values['pending_jobs'],
            
$values['failed_jobs']
        );
    }

    
/**
     * Mark the batch that has the given ID as finished.
     *
     * @param  string  $batchId
     * @return void
     */
    
public function markAsFinished(string $batchId)
    {
        
$update 'SET finished_at = :timestamp';

        if (
$this->ttl !== null) {
            
$update "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
        }

        
$this->dynamoDbClient->updateItem(array_filter([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
            
'UpdateExpression' => $update,
            
'ExpressionAttributeValues' => array_filter([
                
':timestamp' => ['N' => (string) time()],
                
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
        ]));
    }

    
/**
     * Cancel the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    
public function cancel(string $batchId)
    {
        
$update 'SET cancelled_at = :timestamp, finished_at = :timestamp';

        if (
$this->ttl !== null) {
            
$update "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
        }

        
$this->dynamoDbClient->updateItem(array_filter([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
            
'UpdateExpression' => $update,
            
'ExpressionAttributeValues' => array_filter([
                
':timestamp' => ['N' => (string) time()],
                
':ttl' => array_filter(['N' => $this->getExpiryTime()]),
            ]),
            
'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
        ]));
    }

    
/**
     * Delete the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    
public function delete(string $batchId)
    {
        
$this->dynamoDbClient->deleteItem([
            
'TableName' => $this->table,
            
'Key' => [
                
'application' => ['S' => $this->applicationName],
                
'id' => ['S' => $batchId],
            ],
        ]);
    }

    
/**
     * Execute the given Closure within a storage specific transaction.
     *
     * @param  Closure  $callback
     * @return mixed
     */
    
public function transaction(Closure $callback)
    {
        return 
$callback();
    }

    
/**
     * Rollback the last database transaction for the connection.
     *
     * @return void
     */
    
public function rollBack()
    {
    }

    
/**
     * Convert the given raw batch to a Batch object.
     *
     * @param  object  $batch
     * @return IlluminateBusBatch
     */
    
protected function toBatch($batch)
    {
        return 
$this->factory->make(
            
$this,
            
$batch->id,
            
$batch->name,
            (int) 
$batch->total_jobs,
            (int) 
$batch->pending_jobs,
            (int) 
$batch->failed_jobs,
            
$batch->failed_job_ids,
            
$this->unserialize($batch->options) ?? [],
            
CarbonImmutable::createFromTimestamp($batch->created_at),
            
$batch->cancelled_at CarbonImmutable::createFromTimestamp($batch->cancelled_at) : $batch->cancelled_at,
            
$batch->finished_at CarbonImmutable::createFromTimestamp($batch->finished_at) : $batch->finished_at
        
);
    }

    
/**
     * Create the underlying DynamoDB table.
     *
     * @return void
     */
    
public function createAwsDynamoTable(): void
    
{
        
$definition = [
            
'TableName' => $this->table,
            
'AttributeDefinitions' => [
                [
                    
'AttributeName' => 'application',
                    
'AttributeType' => 'S',
                ],
                [
                    
'AttributeName' => 'id',
                    
'AttributeType' => 'S',
                ],
            ],
            
'KeySchema' => [
                [
                    
'AttributeName' => 'application',
                    
'KeyType' => 'HASH',
                ],
                [
                    
'AttributeName' => 'id',
                    
'KeyType' => 'RANGE',
                ],
            ],
            
'BillingMode' => 'PAY_PER_REQUEST',
        ];

        
$this->dynamoDbClient->createTable($definition);

        if (! 
is_null($this->ttl)) {
            
$this->dynamoDbClient->updateTimeToLive([
                
'TableName' => $this->table,
                
'TimeToLiveSpecification' => [
                    
'AttributeName' => $this->ttlAttribute,
                    
'Enabled' => true,
                ],
            ]);
        }
    }

    
/**
     * Delete the underlying DynamoDB table.
     */
    
public function deleteAwsDynamoTable(): void
    
{
        
$this->dynamoDbClient->deleteTable([
            
'TableName' => $this->table,
        ]);
    }

    
/**
     * Get the expiry time based on the configured time-to-live.
     *
     * @return string|null
     */
    
protected function getExpiryTime(): ?string
    
{
        return 
is_null($this->ttl) ? null : (string) (time() + $this->ttl);
    }

    
/**
     * Get the expression attribute name for the time-to-live attribute.
     *
     * @return array
     */
    
protected function ttlExpressionAttributeName(): array
    {
        return 
is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}=> $this->ttlAttribute];
    }

    
/**
     * Serialize the given value.
     *
     * @param  mixed  $value
     * @return string
     */
    
protected function serialize($value)
    {
        return 
serialize($value);
    }

    
/**
     * Unserialize the given value.
     *
     * @param  string  $serialized
     * @return mixed
     */
    
protected function unserialize($serialized)
    {
        return 
unserialize($serialized);
    }

    
/**
     * Get the underlying DynamoDB client instance.
     *
     * @return AwsDynamoDbDynamoDbClient
     */
    
public function getDynamoClient(): DynamoDbClient
    
{
        return 
$this->dynamoDbClient;
    }

    
/**
     * The the name of the table that contains the batch records.
     *
     * @return string
     */
    
public function getTable(): string
    
{
        return 
$this->table;
    }
}
Онлайн: 2
Реклама