- 2.1 collectionsContainer Data Types
- 2.2 arraySequence of Fixed-Type Data
- 2.3 heapqHeap Sort Algorithm
- 2.4 bisectMaintain Lists in Sorted Order
- 2.5 QueueThread-Safe FIFO Implementation
- 2.6 structBinary Data Structures
- 2.7 weakrefImpermanent References to Objects
- 2.8 copyDuplicate Objects
- 2.9 pprintPretty-Print Data Structures
2.5 Queue—Thread-Safe FIFO Implementation
- Purpose Provides a thread-safe FIFO implementation.
- Python Version At least 1.4
The Queue module provides a first-in, first-out (FIFO) data structure suitable for multithreaded programming. It can be used to pass messages or other data safely between producer and consumer threads. Locking is handled for the caller, so many threads can work with the same Queue instance safely. The size of a Queue (the number of elements it contains) may be restricted to throttle memory usage or processing.
2.5.1 Basic FIFO Queue
The Queue class implements a basic first-in, first-out container. Elements are added to one end of the sequence using put(), and removed from the other end using get().
import Queue q = Queue.Queue() for i in range(5): q.put(i) while not q.empty(): print q.get(), print
This example uses a single thread to illustrate that elements are removed from the queue in the same order they are inserted.
$ python Queue_fifo.py 0 1 2 3 4
2.5.2 LIFO Queue
In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out (LIFO) ordering (normally associated with a stack data structure).
import Queue q = Queue.LifoQueue() for i in range(5): q.put(i) while not q.empty(): print q.get(), print
The item most recently put into the queue is removed by get.
$ python Queue_lifo.py 4 3 2 1 0
2.5.3 Priority Queue
Sometimes, the processing order of the items in a queue needs to be based on characteristics of those items, rather than just on the order in which they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing printed by a developer. PriorityQueue uses the sort order of the contents of the queue to decide which to retrieve.
import Queue import threading class Job(object): def __init__(self, priority, description): self.priority = priority self.description = description print 'New job:', description return def __cmp__(self, other): return cmp(self.priority, other.priority) q = Queue.PriorityQueue() q.put( Job(3, 'Mid-level job') ) q.put( Job(10, 'Low-level job') ) q.put( Job(1, 'Important job') ) def process_job(q): while True: next_job = q.get() print 'Processing job:', next_job.description q.task_done() workers = [ threading.Thread(target=process_job, args=(q,)), threading.Thread(target=process_job, args=(q,)), ] for w in workers: w.setDaemon(True) w.start() q.join()
This example has multiple threads consuming the jobs, which are to be processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.
$ python Queue_priority.py New job: Mid-level job New job: Low-level job New job: Important job Processing job: Important job Processing job: Mid-level job Processing job: Low-level job
2.5.4 Building a Threaded Podcast Client
The source code for the podcasting client in this section demonstrates how to use the Queue class with multiple threads. The program reads one or more RSS feeds, queues up the enclosures for the five most recent episodes to be downloaded, and processes several downloads in parallel using threads. It does not have enough error handling for production use, but the skeleton implementation provides an example of how to use the Queue module.
First, some operating parameters are established. Normally, these would come from user inputs (preferences, a database, etc.). The example uses hard-coded values for the number of threads and a list of URLs to fetch.
from Queue import Queue from threading import Thread import time import urllib import urlparse import feedparser # Set up some global variables num_fetch_threads = 2 enclosure_queue = Queue() # A real app wouldn't use hard-coded data... feed_urls = [ 'http://advocacy.python.org/podcasts/littlebit.rss', ]
The function downloadEnclosures() will run in the worker thread and process the downloads using urllib.
def downloadEnclosures(i, q): """This is the worker thread function. It processes items in the queue one after another. These daemon threads go into an infinite loop, and only exit when the main thread ends. """ while True: print '%s: Looking for the next enclosure' % i url = q.get() parsed_url = urlparse.urlparse(url) print '%s: Downloading:' % i, parsed_url.path response = urllib.urlopen(url) data = response.read() # Save the downloaded file to the current directory outfile_name = url.rpartition('/')[-1] with open(outfile_name, 'wb') as outfile: outfile.write(data) q.task_done()
Once the threads' target function is defined, the worker threads can be started. When downloadEnclosures() processes the statement url = q.get(), it blocks and waits until the queue has something to return. That means it is safe to start the threads before there is anything in the queue.
# Set up some threads to fetch the enclosures for i in range(num_fetch_threads): worker = Thread(target=downloadEnclosures, args=(i, enclosure_queue,)) worker.setDaemon(True) worker.start()
The next step is to retrieve the feed contents using Mark Pilgrim's feedparser module (www.feedparser.org) and enqueue the URLs of the enclosures. As soon as the first URL is added to the queue, one of the worker threads picks it up and starts downloading it. The loop will continue to add items until the feed is exhausted, and the worker threads will take turns dequeuing URLs to download them.
# Download the feed(s) and put the enclosure URLs into # the queue. for url in feed_urls: response = feedparser.parse(url, agent='fetch_podcasts.py') for entry in response['entries'][-5:]: for enclosure in entry.get('enclosures', []): parsed_url = urlparse.urlparse(enclosure['url']) print 'Queuing:', parsed_url.path enclosure_queue.put(enclosure['url'])
The only thing left to do is wait for the queue to empty out again, using join().
# Now wait for the queue to be empty, indicating that we have # processed all the downloads. print '*** Main thread waiting' enclosure_queue.join() print '*** Done'
Running the sample script produces the following.
$ python fetch_podcasts.py 0: Looking for the next enclosure 1: Looking for the next enclosure Queuing: /podcasts/littlebit/2010-04-18.mp3 Queuing: /podcasts/littlebit/2010-05-22.mp3 Queuing: /podcasts/littlebit/2010-06-06.mp3 Queuing: /podcasts/littlebit/2010-07-26.mp3 Queuing: /podcasts/littlebit/2010-11-25.mp3 *** Main thread waiting 0: Downloading: /podcasts/littlebit/2010-04-18.mp3 0: Looking for the next enclosure 0: Downloading: /podcasts/littlebit/2010-05-22.mp3 0: Looking for the next enclosure 0: Downloading: /podcasts/littlebit/2010-06-06.mp3 0: Looking for the next enclosure 0: Downloading: /podcasts/littlebit/2010-07-26.mp3 0: Looking for the next enclosure 0: Downloading: /podcasts/littlebit/2010-11-25.mp3 0: Looking for the next enclosure *** Done
The actual output will depend on the contents of the RSS feed used.
See Also:
- Queue (http://docs.python.org/lib/module-Queue.html) Standard library documentation for this module.
- Deque (page 75) from collections (page 70) The collections module includes a deque (double-ended queue) class.
- Queue data structures (http://en.wikipedia.org/wiki/Queue_(data_structure)) Wikipedia article explaining queues.
- FIFO (http://en.wikipedia.org/wiki/FIFO) Wikipedia article explaining first-in, first-out data structures.