NAME MongoDBx::Queue - A message queue implemented with MongoDB VERSION version 2.001 SYNOPSIS use v5.10; use MongoDBx::Queue; my $queue = MongoDBx::Queue->new( version => 2, database_name => "queue_db", client_options => { host => "mongodb://example.net:27017", username => "willywonka", password => "ilovechocolate", } ); $queue->add_task( { msg => "Hello World" } ); $queue->add_task( { msg => "Goodbye World" } ); while ( my $task = $queue->reserve_task ) { say $task->{msg}; $queue->remove_task( $task ); } DESCRIPTION MongoDBx::Queue implements a simple, prioritized message queue using MongoDB as a backend. By default, messages are prioritized by insertion time, creating a FIFO queue. On a single host with MongoDB, it provides a zero-configuration message service across local applications. Alternatively, it can use a MongoDB database cluster that provides replication and fail-over for an even more durable, multi-host message queue. Features: * messages as hash references, not objects * arbitrary message fields * arbitrary scheduling on insertion * atomic message reservation * stalled reservations can be timed-out * task rescheduling * automatically creates correct index * fork-safe Not yet implemented: * parameter checking * error handling Warning: do not use with capped collections, as the queued messages will not meet the constraints required by a capped collection. ATTRIBUTES database_name A MongoDB database name. Unless a "db_name" is provided in the "client_options" attribute, this database will be the default for authentication. Defaults to 'test' client_options A hash reference of MongoDB::MongoClient options that will be passed to its "connect" method. collection_name A collection name for the queue. Defaults to 'queue'. The collection must only be used by MongoDBx::Queue or unpredictable awful things will happen. version The implementation version to use as a backend. Defaults to '1', which is the legacy implementation for backwards compatibility. Version '2' has better index coverage and will perform better for very large queues. WARNING Versions are not compatible. You MUST NOT have V1 and V2 clients using the same database+collection name. See "MIGRATION BETWEEN VERSIONS" for more. METHODS new $queue = MongoDBx::Queue->new( version => 2, database_name => "my_app", client_options => { host => "mongodb://example.net:27017", username => "willywonka", password => "ilovechocolate", }, ); Creates and returns a new queue object. add_task $queue->add_task( \%message, \%options ); Adds a task to the queue. The "\%message" hash reference will be shallow copied into the task and not include objects except as described by MongoDB::DataTypes. Top-level keys must not start with underscores, which are reserved for MongoDBx::Queue. The "\%options" hash reference is optional and may contain the following key: * "priority": sets the priority for the task. Defaults to "time()". Note that setting a "future" priority may cause a task to be invisible to "reserve_task". See that method for more details. reserve_task $task = $queue->reserve_task; $task = $queue->reserve_task( \%options ); Atomically marks and returns a task. The task is marked in the queue as "reserved" (in-progress) so it can not be reserved again unless is is rescheduled or timed-out. The task returned is a hash reference containing the data added in "add_task", including private keys for use by MongoDBx::Queue methods. Tasks are returned in priority order from lowest to highest. If multiple tasks have identical, lowest priorities, their ordering is undefined. If no tasks are available or visible, it will return "undef". The "\%options" hash reference is optional and may contain the following key: * "max_priority": sets the maximum priority for the task. Defaults to "time()". The "max_priority" option controls whether "future" tasks are visible. If the lowest task priority is greater than the "max_priority", this method returns "undef". reschedule_task $queue->reschedule_task( $task ); $queue->reschedule_task( $task, \%options ); Releases the reservation on a task so it can be reserved again. The "\%options" hash reference is optional and may contain the following key: * "priority": sets the priority for the task. Defaults to the task's original priority. Note that setting a "future" priority may cause a task to be invisible to "reserve_task". See that method for more details. remove_task $queue->remove_task( $task ); Removes a task from the queue (i.e. indicating the task has been processed). apply_timeout $queue->apply_timeout( $seconds ); Removes reservations that occurred more than $seconds ago. If no argument is given, the timeout defaults to 120 seconds. The timeout should be set longer than the expected task processing time, so that only dead/hung tasks are returned to the active queue. search my @results = $queue->search( \%query, \%options ); Returns a list of tasks in the queue based on search criteria. The query should be expressed in the usual MongoDB fashion. In addition to MongoDB options (e.g. "limit", "skip" and "sort") as described in the MongoDB documentation for "find" in MongoDB::Collection, this method supports a "reserved" option. If present, results will be limited to reserved tasks if true or unreserved tasks if false. peek $task = $queue->peek( $task ); Retrieves a full copy of the task from the queue. This is useful to retrieve all fields from a projected result from "search". It is equivalent to: $self->search( { _id => $task->{_id} } ); Returns undef if the task is not found. size $queue->size; Returns the number of tasks in the queue, including in-progress ones. waiting $queue->waiting; Returns the number of tasks in the queue that have not been reserved. MIGRATION BETWEEN VERSIONS Implementation versions are not compatible. Migration of active tasks from version '1' to version '2' is an exercise left to end users. One approach to migration could be to run a script with two "MongoDBx::Queue" clients, one using version '1' and one using version '2', using different "database_name" attributes. Such a script could iteratively reserve a task with the v1 client, add the task via the v2 client, then remove it via the v1 client. Workers could be operating on one or both versions of the queue while migration is going on, depending on your needs. SUPPORT Bugs / Feature Requests Please report any bugs or feature requests through the issue tracker at <https://github.com/dagolden/MongoDBx-Queue/issues>. You will be notified automatically of any progress on your issue. Source Code This is open source software. The code repository is available for public review and contribution under the terms of the license. <https://github.com/dagolden/MongoDBx-Queue> git clone https://github.com/dagolden/MongoDBx-Queue.git AUTHOR David Golden <dagolden@cpan.org> COPYRIGHT AND LICENSE This software is Copyright (c) 2012 by David Golden. This is free software, licensed under: The Apache License, Version 2.0, January 2004