Drupal 8.0.0 beta 12. More info: https://www.drupal.org/node/2514176

This commit is contained in:
Pantheon Automation 2015-08-17 17:00:26 -07:00 committed by Greg Anderson
commit 9921556621
13277 changed files with 1459781 additions and 0 deletions

View file

@ -0,0 +1,58 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\Batch.
*/
namespace Drupal\Core\Queue;
/**
* Defines a batch queue handler used by the Batch API.
*
* This implementation:
* - Ensures FIFO ordering.
* - Allows an item to be repeatedly claimed until it is actually deleted (no
* notion of lease time or 'expire' date), to allow multipass operations.
*
* Stale items from failed batches are cleaned from the {queue} table on cron
* using the 'created' date.
*
* @ingroup queue
*/
class Batch extends DatabaseQueue {
/**
* Overrides \Drupal\Core\Queue\DatabaseQueue::claimItem().
*
* Unlike \Drupal\Core\Queue\DatabaseQueue::claimItem(), this method provides
* a default lease time of 0 (no expiration) instead of 30. This allows the
* item to be claimed repeatedly until it is deleted.
*/
public function claimItem($lease_time = 0) {
$item = $this->connection->queryRange('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
if ($item) {
$item->data = unserialize($item->data);
return $item;
}
return FALSE;
}
/**
* Retrieves all remaining items in the queue.
*
* This is specific to Batch API and is not part of the
* \Drupal\Core\Queue\QueueInterface.
*
* @return array
* An array of queue items.
*/
public function getAllItems() {
$result = array();
$items = $this->connection->query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll();
foreach ($items as $item) {
$result[] = unserialize($item->data);
}
return $result;
}
}

View file

@ -0,0 +1,54 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\BatchMemory.
*/
namespace Drupal\Core\Queue;
/**
* Defines a batch queue handler used by the Batch API for non-progressive
* batches.
*
* This implementation:
* - Ensures FIFO ordering.
* - Allows an item to be repeatedly claimed until it is actually deleted (no
* notion of lease time or 'expire' date), to allow multipass operations.
*
* @ingroup queue
*/
class BatchMemory extends Memory {
/**
* Overrides \Drupal\Core\Queue\Memory::claimItem().
*
* Unlike \Drupal\Core\Queue\Memory::claimItem(), this method provides a
* default lease time of 0 (no expiration) instead of 30. This allows the item
* to be claimed repeatedly until it is deleted.
*/
public function claimItem($lease_time = 0) {
if (!empty($this->queue)) {
reset($this->queue);
return current($this->queue);
}
return FALSE;
}
/**
* Retrieves all remaining items in the queue.
*
* This is specific to Batch API and is not part of the
* \Drupal\Core\Queue\QueueInterface.
*
* @return array
* An array of queue items.
*/
public function getAllItems() {
$result = array();
foreach ($this->queue as $item) {
$result[] = $item->data;
}
return $result;
}
}

View file

@ -0,0 +1,146 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\DatabaseQueue.
*/
namespace Drupal\Core\Queue;
use Drupal\Core\Database\Connection;
use Drupal\Core\DependencyInjection\DependencySerializationTrait;
/**
* Default queue implementation.
*
* @ingroup queue
*/
class DatabaseQueue implements ReliableQueueInterface {
use DependencySerializationTrait;
/**
* The name of the queue this instance is working with.
*
* @var string
*/
protected $name;
/**
* The database connection.
*
* @var \Drupal\Core\Database\Connection $connection
*/
protected $connection;
/**
* Constructs a \Drupal\Core\Queue\DatabaseQueue object.
*
* @param string $name
* The name of the queue.
* @param \Drupal\Core\Database\Connection $connection
* The Connection object containing the key-value tables.
*/
function __construct($name, Connection $connection) {
$this->name = $name;
$this->connection = $connection;
}
/**
* {@inheritdoc}
*/
public function createItem($data) {
$query = $this->connection->insert('queue')
->fields(array(
'name' => $this->name,
'data' => serialize($data),
// We cannot rely on REQUEST_TIME because many items might be created
// by a single request which takes longer than 1 second.
'created' => time(),
));
// Return the new serial ID, or FALSE on failure.
return $query->execute();
}
/**
* {@inheritdoc}
*/
public function numberOfItems() {
return $this->connection->query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
}
/**
* {@inheritdoc}
*/
public function claimItem($lease_time = 30) {
// Claim an item by updating its expire fields. If claim is not successful
// another thread may have claimed the item in the meantime. Therefore loop
// until an item is successfully claimed or we are reasonably sure there
// are no unclaimed items left.
while (TRUE) {
$item = $this->connection->queryRange('SELECT data, created, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
if ($item) {
// Try to update the item. Only one thread can succeed in UPDATEing the
// same row. We cannot rely on REQUEST_TIME because items might be
// claimed by a single consumer which runs longer than 1 second. If we
// continue to use REQUEST_TIME instead of the current time(), we steal
// time from the lease, and will tend to reset items before the lease
// should really expire.
$update = $this->connection->update('queue')
->fields(array(
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id)
->condition('expire', 0);
// If there are affected rows, this update succeeded.
if ($update->execute()) {
$item->data = unserialize($item->data);
return $item;
}
}
else {
// No items currently available to claim.
return FALSE;
}
}
}
/**
* {@inheritdoc}
*/
public function releaseItem($item) {
$update = $this->connection->update('queue')
->fields(array(
'expire' => 0,
))
->condition('item_id', $item->item_id);
return $update->execute();
}
/**
* {@inheritdoc}
*/
public function deleteItem($item) {
$this->connection->delete('queue')
->condition('item_id', $item->item_id)
->execute();
}
/**
* {@inheritdoc}
*/
public function createQueue() {
// All tasks are stored in a single database table (which is created when
// Drupal is first installed) so there is nothing we need to do to create
// a new queue.
}
/**
* {@inheritdoc}
*/
public function deleteQueue() {
$this->connection->delete('queue')
->condition('name', $this->name)
->execute();
}
}

View file

@ -0,0 +1,111 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\Memory.
*/
namespace Drupal\Core\Queue;
/**
* Static queue implementation.
*
* This allows "undelayed" variants of processes relying on the Queue
* interface. The queue data resides in memory. It should only be used for
* items that will be queued and dequeued within a given page request.
*
* @ingroup queue
*/
class Memory implements QueueInterface {
/**
* The queue data.
*
* @var array
*/
protected $queue;
/**
* Counter for item ids.
*
* @var int
*/
protected $idSequence;
/**
* Constructs a Memory object.
*
* @param string $name
* An arbitrary string. The name of the queue to work with.
*/
public function __construct($name) {
$this->queue = array();
$this->idSequence = 0;
}
/**
* {@inheritdoc}
*/
public function createItem($data) {
$item = new \stdClass();
$item->item_id = $this->idSequence++;
$item->data = $data;
$item->created = time();
$item->expire = 0;
$this->queue[$item->item_id] = $item;
return $item->item_id;
}
/**
* {@inheritdoc}
*/
public function numberOfItems() {
return count($this->queue);
}
/**
* {@inheritdoc}
*/
public function claimItem($lease_time = 30) {
foreach ($this->queue as $key => $item) {
if ($item->expire == 0) {
$item->expire = time() + $lease_time;
$this->queue[$key] = $item;
return $item;
}
}
return FALSE;
}
/**
* {@inheritdoc}
*/
public function deleteItem($item) {
unset($this->queue[$item->item_id]);
}
/**
* {@inheritdoc}
*/
public function releaseItem($item) {
if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
$this->queue[$item->item_id]->expire = 0;
return TRUE;
}
return FALSE;
}
/**
* {@inheritdoc}
*/
public function createQueue() {
// Nothing needed here.
}
/**
* {@inheritdoc}
*/
public function deleteQueue() {
$this->queue = array();
$this->idSequence = 0;
}
}

View file

@ -0,0 +1,46 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueDatabaseFactory.
*/
namespace Drupal\Core\Queue;
use Drupal\Core\Database\Connection;
/**
* Defines the key/value store factory for the database backend.
*/
class QueueDatabaseFactory {
/**
* The database connection.
*
* @var \Drupal\Core\Database\Connection $connection
*/
protected $connection;
/**
* Constructs this factory object.
*
* @param \Drupal\Core\Database\Connection $connection
* The Connection object containing the key-value tables.
*/
function __construct(Connection $connection) {
$this->connection = $connection;
}
/**
* Constructs a new queue object for a given name.
*
* @param string $name
* The name of the collection holding key and value pairs.
*
* @return \Drupal\Core\Queue\DatabaseQueue
* A key/value store implementation for the given $collection.
*/
public function get($name) {
return new DatabaseQueue($name, $this->connection);
}
}

View file

@ -0,0 +1,72 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueFactory.
*/
namespace Drupal\Core\Queue;
use Drupal\Core\Site\Settings;
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
/**
* Defines the queue factory.
*/
class QueueFactory implements ContainerAwareInterface {
use ContainerAwareTrait;
/**
* Instantiated queues, keyed by name.
*
* @var array
*/
protected $queues = array();
/**
* The settings object.
*
* @var \Drupal\Core\Site\Settings
*/
protected $settings;
/**
* Constructs a queue factory.
*/
function __construct(Settings $settings) {
$this->settings = $settings;
}
/**
* Constructs a new queue.
*
* @param string $name
* The name of the queue to work with.
* @param bool $reliable
* (optional) TRUE if the ordering of items and guaranteeing every item executes at
* least once is important, FALSE if scalability is the main concern. Defaults
* to FALSE.
*
* @return \Drupal\Core\Queue\QueueInterface
* A queue implementation for the given name.
*/
public function get($name, $reliable = FALSE) {
if (!isset($this->queues[$name])) {
// If it is a reliable queue, check the specific settings first.
if ($reliable) {
$service_name = $this->settings->get('queue_reliable_service_' . $name);
}
// If no reliable queue was defined, check the service and global
// settings, fall back to queue.database.
if (empty($service_name)) {
$service_name = $this->settings->get('queue_service_' . $name, $this->settings->get('queue_default', 'queue.database'));
}
$this->queues[$name] = $this->container->get($service_name)->get($name);
}
return $this->queues[$name];
}
}

View file

@ -0,0 +1,114 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueInterface.
*/
namespace Drupal\Core\Queue;
/**
* Interface for a queue.
*
* Classes implementing this interface will do a best effort to preserve order
* in messages and to execute them at least once.
*
* @ingroup queue
*/
interface QueueInterface {
/**
* Adds a queue item and store it directly to the queue.
*
* @param $data
* Arbitrary data to be associated with the new task in the queue.
*
* @return
* A unique ID if the item was successfully created and was (best effort)
* added to the queue, otherwise FALSE. We don't guarantee the item was
* committed to disk etc, but as far as we know, the item is now in the
* queue.
*/
public function createItem($data);
/**
* Retrieves the number of items in the queue.
*
* This is intended to provide a "best guess" count of the number of items in
* the queue. Depending on the implementation and the setup, the accuracy of
* the results of this function may vary.
*
* e.g. On a busy system with a large number of consumers and items, the
* result might only be valid for a fraction of a second and not provide an
* accurate representation.
*
* @return
* An integer estimate of the number of items in the queue.
*/
public function numberOfItems();
/**
* Claims an item in the queue for processing.
*
* @param $lease_time
* How long the processing is expected to take in seconds, defaults to an
* hour. After this lease expires, the item will be reset and another
* consumer can claim the item. For idempotent tasks (which can be run
* multiple times without side effects), shorter lease times would result
* in lower latency in case a consumer fails. For tasks that should not be
* run more than once (non-idempotent), a larger lease time will make it
* more rare for a given task to run multiple times in cases of failure,
* at the cost of higher latency.
*
* @return
* On success we return an item object. If the queue is unable to claim an
* item it returns false. This implies a best effort to retrieve an item
* and either the queue is empty or there is some other non-recoverable
* problem.
*
* If returned, the object will have at least the following properties:
* - data: the same as what what passed into createItem().
* - item_id: the unique ID returned from createItem().
* - created: timestamp when the item was put into the queue.
*/
public function claimItem($lease_time = 3600);
/**
* Deletes a finished item from the queue.
*
* @param $item
* The item returned by \Drupal\Core\Queue\QueueInterface::claimItem().
*/
public function deleteItem($item);
/**
* Releases an item that the worker could not process.
*
* Another worker can come in and process it before the timeout expires.
*
* @param $item
* The item returned by \Drupal\Core\Queue\QueueInterface::claimItem().
*
* @return boolean
* TRUE if the item has been released, FALSE otherwise.
*/
public function releaseItem($item);
/**
* Creates a queue.
*
* Called during installation and should be used to perform any necessary
* initialization operations. This should not be confused with the
* constructor for these objects, which is called every time an object is
* instantiated to operate on a queue. This operation is only needed the
* first time a given queue is going to be initialized (for example, to make
* a new database table or directory to hold tasks for the queue -- it
* depends on the queue implementation if this is necessary at all).
*/
public function createQueue();
/**
* Deletes a queue and every item in the queue.
*/
public function deleteQueue();
}

View file

@ -0,0 +1,22 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerBase.
*/
namespace Drupal\Core\Queue;
use Drupal\Component\Plugin\PluginBase;
/**
* Provides a base implementation for a QueueWorker plugin.
*
* @see \Drupal\Core\Queue\QueueWorkerInterface
* @see \Drupal\Core\Queue\QueueWorkerManager
* @see \Drupal\Core\Annotation\QueueWorker
* @see plugin_api
*/
abstract class QueueWorkerBase extends PluginBase implements QueueWorkerInterface {
}

View file

@ -0,0 +1,46 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerInterface.
*/
namespace Drupal\Core\Queue;
use Drupal\Component\Plugin\PluginInspectionInterface;
/**
* Defines an interface for a QueueWorker plugin.
*
* @see \Drupal\Core\Queue\QueueWorkerBase
* @see \Drupal\Core\Queue\QueueWorkerManager
* @see \Drupal\Core\Annotation\QueueWorker
* @see plugin_api
*/
interface QueueWorkerInterface extends PluginInspectionInterface {
/**
* Works on a single queue item.
*
* @param mixed $data
* The data that was passed to
* \Drupal\Core\Queue\QueueInterface::createItem() when the item was queued.
*
* @throws \Exception
* A QueueWorker plugin may throw an exception to indicate there was a
* problem. The cron process will log the exception, and leave the item in
* the queue to be processed again later.
* @throws \Drupal\Core\Queue\SuspendQueueException
* More specifically, a SuspendQueueException should be thrown when a
* QueueWorker plugin is aware that the problem will affect all subsequent
* workers of its queue. For example, a callback that makes HTTP requests
* may find that the remote server is not responding. The cron process will
* behave as with a normal Exception, and in addition will not attempt to
* process further items from the current item's queue during the current
* cron run.
*
* @see \Drupal\Core\Cron::processQueues()
*/
public function processItem($data);
}

View file

@ -0,0 +1,65 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerManager.
*/
namespace Drupal\Core\Queue;
use Drupal\Core\Cache\CacheBackendInterface;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Core\Plugin\DefaultPluginManager;
/**
* Defines the queue worker manager.
*
* @see \Drupal\Core\Queue\QueueWorkerInterface
* @see \Drupal\Core\Queue\QueueWorkerBase
* @see \Drupal\Core\Annotation\QueueWorker
* @see plugin_api
*/
class QueueWorkerManager extends DefaultPluginManager implements QueueWorkerManagerInterface {
/**
* Constructs an QueueWorkerManager object.
*
* @param \Traversable $namespaces
* An object that implements \Traversable which contains the root paths
* keyed by the corresponding namespace to look for plugin implementations.
* @param \Drupal\Core\Cache\CacheBackendInterface $cache_backend
* Cache backend instance to use.
* @param \Drupal\Core\Extension\ModuleHandlerInterface $module_handler
* The module handler.
*/
public function __construct(\Traversable $namespaces, CacheBackendInterface $cache_backend, ModuleHandlerInterface $module_handler) {
parent::__construct('Plugin/QueueWorker', $namespaces, $module_handler, 'Drupal\Core\Queue\QueueWorkerInterface', 'Drupal\Core\Annotation\QueueWorker');
$this->setCacheBackend($cache_backend, 'queue_plugins');
$this->alterInfo('queue_info');
}
/**
* {@inheritdoc}
*/
public function processDefinition(&$definition, $plugin_id) {
parent::processDefinition($definition, $plugin_id);
// Assign a default time if a cron is specified.
if (isset($definition['cron'])) {
$definition['cron'] += [
'time' => 15,
];
}
}
/**
* {@inheritdoc}
*
* @return \Drupal\Core\Queue\QueueWorkerInterface
*/
public function createInstance($plugin_id, array $configuration = []) {
return parent::createInstance($plugin_id, $configuration);
}
}

View file

@ -0,0 +1,17 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\QueueWorkerManagerInterface.
*/
namespace Drupal\Core\Queue;
use Drupal\Component\Plugin\PluginManagerInterface;
/**
* Provides an interface for a queue worker manager.
*/
interface QueueWorkerManagerInterface extends PluginManagerInterface {
}

View file

@ -0,0 +1,19 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\ReliableQueueInterface.
*/
namespace Drupal\Core\Queue;
/**
* Reliable queue interface.
*
* Classes implementing this interface preserve the order of messages and
* guarantee that every item will be executed at least once.
*
* @ingroup queue
*/
interface ReliableQueueInterface extends QueueInterface {
}

View file

@ -0,0 +1,20 @@
<?php
/**
* @file
* Contains \Drupal\Core\Queue\SuspendQueueException.
*/
namespace Drupal\Core\Queue;
/**
* Exception class to throw to indicate that a cron queue should be skipped.
*
* An implementation of \Drupal\Core\Queue\QueueWorkerInterface::processItem()
* throws this class of exception to indicate that processing of the whole queue
* should be skipped. This should be thrown rather than a normal Exception if
* the problem encountered by the queue worker is such that it can be deduced
* that workers of subsequent items would encounter it too. For example, if a
* remote site that the queue worker depends on appears to be inaccessible.
*/
class SuspendQueueException extends \RuntimeException {}