- <?php
- * @file
- * Queue functionality.
- */
-
- * @defgroup queue Queue operations
- * @{
- * Queue items to allow later processing.
- *
- * The queue system allows placing items in a queue and processing them later.
- * The system tries to ensure that only one consumer can process an item.
- *
- * Before a queue can be used it needs to be created by
- * BackdropQueueInterface::createQueue().
- *
- * Items can be added to the queue by passing an arbitrary data object to
- * BackdropQueueInterface::createItem().
- *
- * To process an item, call BackdropQueueInterface::claimItem() and specify how
- * long you want to have a lease for working on that item. When finished
- * processing, the item needs to be deleted by calling
- * BackdropQueueInterface::deleteItem(). If the consumer dies, the item will be
- * made available again by the BackdropQueueInterface implementation once the
- * lease expires. Another consumer will then be able to receive it when calling
- * BackdropQueueInterface::claimItem(). Due to this, the processing code should
- * be aware that an item might be handed over for processing more than once.
- *
- * The $item object used by the BackdropQueueInterface can contain arbitrary
- * metadata depending on the implementation. Systems using the interface should
- * only rely on the data property which will contain the information passed to
- * BackdropQueueInterface::createItem(). The full queue item returned by
- * BackdropQueueInterface::claimItem() needs to be passed to
- * BackdropQueueInterface::deleteItem() once processing is completed.
- *
- * There are two kinds of queue backends available: reliable, which preserves
- * the order of messages and guarantees that every item will be executed at
- * least once. The non-reliable kind only does a best effort to preserve order
- * in messages and to execute them at least once but there is a small chance
- * that some items get lost. For example, some distributed back-ends like
- * Amazon SQS will be managing jobs for a large set of producers and consumers
- * where a strict FIFO ordering will likely not be preserved. Another example
- * would be an in-memory queue backend which might lose items if it crashes.
- * However, such a backend would be able to deal with significantly more writes
- * than a reliable queue and for many tasks this is more important. Another
- * example is doing Twitter statistics -- the small possibility of losing a few
- * items is insignificant next to power of the queue being able to keep up with
- * writes. As described in the processing section, regardless of the queue being
- * reliable or not, the processing code should be aware that an item might be
- * handed over for processing more than once (because the processing code might
- * time out before it finishes).
- */
-
- * Factory class for interacting with queues.
- */
- class BackdropQueue {
-
- * Returns the queue object for a given name.
- *
- * The following can be set in settings.php through the $settings variable:
- * - queue_class_$name: the class to be used for the queue $name.
- * - queue_default_class: the class to use when queue_class_$name is not
- * defined. Defaults to SystemQueue, a reliable backend using SQL.
- * - queue_default_reliable_class: the class to use when queue_class_$name is
- * not defined and the queue_default_class is not reliable. Defaults to
- * SystemQueue.
- *
- * @param $name
- * Arbitrary string. The name of the queue to work with.
- * @param $reliable
- * TRUE if the ordering of items and guaranteeing every item executes at
- * least once is important, FALSE if scalability is the main concern.
- *
- * @return BackdropQueueInterface
- * The queue object for a given name.
- */
- public static function get($name, $reliable = FALSE) {
- static $queues;
- if (!isset($queues[$name])) {
- $class = settings_get('queue_class_' . $name, NULL);
- if (!$class) {
- $class = settings_get('queue_default_class', 'SystemQueue');
- }
- $object = new $class($name);
- if ($reliable && !$object instanceof BackdropReliableQueueInterface) {
- $class = settings_get('queue_default_reliable_class', 'SystemQueue');
- $object = new $class($name);
- }
- $queues[$name] = $object;
- }
- return $queues[$name];
- }
- }
-
- interface BackdropQueueInterface {
-
-
- * Add a queue item and store it directly to the queue.
- *
- * @param $data
- * Arbitrary data to be associated with the new task in the queue.
- * @return
- * TRUE if the item was successfully created and was (best effort) added
- * to the queue, otherwise FALSE. We don't guarantee the item was
- * committed to disk etc, but as far as we know, the item is now in the
- * queue.
- */
- public function createItem($data);
-
-
- * Retrieve the number of items in the queue.
- *
- * This is intended to provide a "best guess" count of the number of items in
- * the queue. Depending on the implementation and the setup, the accuracy of
- * the results of this function may vary.
- *
- * e.g. On a busy system with a large number of consumers and items, the
- * result might only be valid for a fraction of a second and not provide an
- * accurate representation.
- *
- * @return
- * An integer estimate of the number of items in the queue.
- */
- public function numberOfItems();
-
-
- * Claim an item in the queue for processing.
- *
- * @param $lease_time
- * How long the processing is expected to take in seconds, defaults to an
- * hour. After this lease expires, the item will be reset and another
- * consumer can claim the item. For idempotent tasks (which can be run
- * multiple times without side effects), shorter lease times would result
- * in lower latency in case a consumer fails. For tasks that should not be
- * run more than once (non-idempotent), a larger lease time will make it
- * more rare for a given task to run multiple times in cases of failure,
- * at the cost of higher latency.
- * @return
- * On success we return an item object. If the queue is unable to claim an
- * item it returns false. This implies a best effort to retrieve an item
- * and either the queue is empty or there is some other non-recoverable
- * problem.
- */
- public function claimItem($lease_time = 3600);
-
-
- * Delete a finished item from the queue.
- *
- * @param $item
- * The item returned by BackdropQueueInterface::claimItem().
- */
- public function deleteItem($item);
-
-
- * Release an item that the worker could not process, so another
- * worker can come in and process it before the timeout expires.
- *
- * @param $item
- * @return boolean
- */
- public function releaseItem($item);
-
-
- * Create a queue.
- *
- * Called during installation and should be used to perform any necessary
- * initialization operations. This should not be confused with the
- * constructor for these objects, which is called every time an object is
- * instantiated to operate on a queue. This operation is only needed the
- * first time a given queue is going to be initialized (for example, to make
- * a new database table or directory to hold tasks for the queue -- it
- * depends on the queue implementation if this is necessary at all).
- */
- public function createQueue();
-
-
- * Delete a queue and every item in the queue.
- */
- public function deleteQueue();
- }
-
- * Reliable queue interface.
- *
- * Classes implementing this interface preserve the order of messages and
- * guarantee that every item will be executed at least once.
- */
- interface BackdropReliableQueueInterface extends BackdropQueueInterface {
- }
-
- * Default queue implementation.
- */
- class SystemQueue implements BackdropReliableQueueInterface {
-
- * The name of the queue this instance is working with.
- *
- * @var string
- */
- protected $name;
-
- public function __construct($name) {
- $this->name = $name;
- }
-
- public function createItem($data) {
-
-
- $query = db_insert('queue')
- ->fields(array(
- 'name' => $this->name,
- 'data' => serialize($data),
-
-
- 'created' => time(),
- ));
- return (bool) $query->execute();
- }
-
- public function numberOfItems() {
- return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
- }
-
- public function claimItem($lease_time = 30) {
-
-
-
-
- while (TRUE) {
- $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
- if ($item) {
-
-
-
-
-
-
- $update = db_update('queue')
- ->fields(array(
- 'expire' => time() + $lease_time,
- ))
- ->condition('item_id', $item->item_id)
- ->condition('expire', 0);
-
- if ($update->execute()) {
- $item->data = unserialize($item->data);
- return $item;
- }
- }
- else {
-
- return FALSE;
- }
- }
- }
-
- public function releaseItem($item) {
- $update = db_update('queue')
- ->fields(array(
- 'expire' => 0,
- ))
- ->condition('item_id', $item->item_id);
- return $update->execute();
- }
-
- public function deleteItem($item) {
- db_delete('queue')
- ->condition('item_id', $item->item_id)
- ->execute();
- }
-
- public function createQueue() {
-
-
-
- }
-
- public function deleteQueue() {
- db_delete('queue')
- ->condition('name', $this->name)
- ->execute();
- }
- }
-
- * Static queue implementation.
- *
- * This allows processes that rely on the Queue interface to be executed
- * immediately and all in one page request. The queue data resides in memory.
- * It should only be used for items that will be queued and dequeued within the
- * same page request.
- */
- class MemoryQueue implements BackdropQueueInterface {
-
- * The queue data.
- *
- * @var array
- */
- protected $queue;
-
-
- * Counter for item ids.
- *
- * @var int
- */
- protected $id_sequence;
-
-
- * Start working with a queue.
- *
- * @param $name
- * Arbitrary string. The name of the queue to work with.
- */
- public function __construct($name) {
- $this->queue = array();
- $this->id_sequence = 0;
- }
-
- public function createItem($data) {
- $item = new stdClass();
- $item->item_id = $this->id_sequence++;
- $item->data = $data;
- $item->created = time();
- $item->expire = 0;
- $this->queue[$item->item_id] = $item;
- return TRUE;
- }
-
- public function numberOfItems() {
- return count($this->queue);
- }
-
- public function claimItem($lease_time = 30) {
- foreach ($this->queue as $key => $item) {
- if ($item->expire == 0) {
- $item->expire = time() + $lease_time;
- $this->queue[$key] = $item;
- return $item;
- }
- }
- return FALSE;
- }
-
- public function deleteItem($item) {
- unset($this->queue[$item->item_id]);
- }
-
- public function releaseItem($item) {
- if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
- $this->queue[$item->item_id]->expire = 0;
- return TRUE;
- }
- return FALSE;
- }
-
- public function createQueue() {
-
- }
-
- public function deleteQueue() {
- $this->queue = array();
- $this->id_sequence = 0;
- }
- }
-
- * @} End of "defgroup queue".
- */