Вход Регистрация
Файл: concrete5.7.5.6/concrete/vendor/zendframework/zend-queue/tests/ZendQueue/Custom/DbForUpdate.php
Строк: 91
<?php
/**
 * Zend Framework (http://framework.zend.com/)
 *
 * @link      http://github.com/zendframework/zf2 for the canonical source repository
 * @copyright Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
 * @license   http://framework.zend.com/license/new-bsd New BSD License
 * @package   Zend_Queue
 */

namespace ZendQueueTestCustom;


/**
 * Class for using connecting to a Zend_Db-based queuing system
 *
 * $config['options'][Zend_Db_Select::FOR_UPDATE] is a new feature that was
 * written after this code was written.  However, this will still serve as a
 * good example adapter
 *
 * @category   Zend
 * @package    Zend_Queue
 * @subpackage UnitTests
 */
class DbForUpdate extends ZendQueueAdapterDB
{
    
/**
     * Return the first element in the queue
     *
     * @param  integer           $maxMessages
     * @param  integer           $timeout
     * @param  ZendQueueQueue $queue
     * @return ZendQueueMessageMessageIterator
     */
    
public function receive($maxMessages=null$timeout=nullZendQueueQueue $queue=null)
    {
        if (
$maxMessages === null) {
            
$maxMessages 1;
        }
        if (
$timeout === null) {
            
$timeout self::RECEIVE_TIMEOUT_DEFAULT;
        }
        if (
$queue === null) {
            
$queue $this->_queue;
        }

        
$msgs = array();

        
$info $this->_msg_table->info();

        
$microtime microtime(true); // cache microtime

        
$db $this->_msg_table->getAdapter();

        try {
            
// transaction must start before the select query.
            
$db->beginTransaction();

            
// changes: added forUpdate
            
$query $db->select()->forUpdate();
            
$query->from($info['name'], array('*'));
            
$query->where('queue_id=?'$this->getQueueId($queue->getName()));
            
$query->where('handle IS NULL OR timeout+' . (int)$timeout ' < ' . (int)$microtime);
            
$query->limit($maxMessages);

            foreach (
$db->fetchAll($query) as $data) {
                
// setup our changes to the message
                
$data['handle'] = md5(uniqid(rand(), true));

                
$update = array(
                    
'handle'  => $data['handle'],
                    
'timeout' => $microtime
                
);

                
// update the database
                
$where = array();
                
$where[] = $db->quoteInto('message_id=?'$data['message_id']);

                
$count $db->update($info['name'], $update$where);

                
// we check count to make sure no other thread has gotten
                // the rows after our select, but before our update.
                
if ($count 0) {
                    
$msgs[] = $data;
                    
$this->getLogger()->debug('Received message:' $data['message_id'] . ' byte size=' strlen($data['body']));
                }
            }
            
$db->commit();
        } catch (
Exception $e) {
            
$db->rollBack();
            
$this->getLogger()->err($e->getMessage() . ' code ' $e->getCode());

            throw new 
ZendQueueException($e->getMessage(), $e->getCode());
        }

        
$config = array(
            
'queue'    => $queue,
            
'data'     => $msgs,
            
'messageClass' => $queue->getMessageClass()
        );

        
$classname $queue->getMessageSetClass();

        return new 
$classname($config);
    }
}
Онлайн: 0
Реклама