- 2.1 enum: Enumeration Type
- 2.2 collections: Container Data Types
- 2.3 array: Sequence of Fixed-Type Data
- 2.4 heapq: Heap Sort Algorithm
- 2.5 bisect: Maintain Lists in Sorted Order
- 2.6 queue: Thread-Safe FIFO Implementation
- 2.7 struct: Binary Data Structures
- 2.8 weakref: Impermanent References to Objects
- 2.9 copy: Duplicate Objects
- 2.10 pprint: Pretty-Print Data Structures
2.6 queue: Thread-Safe FIFO Implementation
The queue module provides a first-in, first-out (FIFO) data structure suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so many threads can work with the same Queue instance safely and easily. The size of a Queue (the number of elements it contains) may be restricted to throttle memory usage or processing.
2.6.1 Basic FIFO Queue
The Queue class implements a basic first-in, first-out container. Elements are added to one “end” of the sequence using put(), and removed from the other end using get().
Listing 2.56: queue_fifo.py
import queue q = queue.Queue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end=' ') print()
This example uses a single thread to illustrate that elements are removed from the queue in the same order in which they are inserted.
$ python3 queue_fifo.py 0 1 2 3 4
2.6.2 LIFO Queue
In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure).
Listing 2.57: queue_lifo.py
import queue q = queue.LifoQueue() for i in range(5): q.put(i) while not q.empty(): print(q.get(), end=' ') print()
The item most recently put into the queue is removed by get.
$ python3 queue_lifo.py 4 3 2 1 0
2.6.3 Priority Queue
Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing that a developer wants to print. PriorityQueue uses the sort order of the contents of the queue to decide which item to retrieve.
Listing 2.58: queue_priority.py
import functools import queue import threading @functools.total_ordering class Job: def __init__(self, priority, description): self.priority = priority self.description = description print('New job:', description) return def __eq__(self, other): try: return self.priority == other.priority except AttributeError: return NotImplemented def __lt__(self, other): try: return self.priority < other.priority except AttributeError: return NotImplemented q = queue.PriorityQueue() q.put(Job(3, 'Mid-level job')) q.put(Job(10, 'Low-level job')) q.put(Job(1, 'Important job')) def process_job(q): while True: next_job = q.get() print('Processing job:', next_job.description) q.task_done() workers = [ threading.Thread(target=process_job, args=(q,)), threading.Thread(target=process_job, args=(q,)), ] for w in workers: w.setDaemon(True) w.start() q.join()
This example has multiple threads consuming the jobs, which are processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.
$ python3 queue_priority.py New job: Mid-level job New job: Low-level job New job: Important job Processing job: Important job Processing job: Mid-level job Processing job: Low-level job
2.6.4 Building a Threaded Podcast Client
The source code for the podcasting client in this section demonstrates how to use the Queue class with multiple threads. The program reads one or more RSS feeds, queues up the enclosures for the five most recent episodes from each feed to be downloaded, and processes several downloads in parallel using threads. It does not have enough error handling for production use, but the skeleton implementation illustrates the use of the queue module.
First, some operating parameters are established. Usually, these would come from user inputs (e.g., preferences or a database). The example uses hard-coded values for the number of threads and list of URLs to fetch.
Listing 2.59: fetch_podcasts.py
from queue import Queue import threading import time import urllib from urllib.parse import urlparse import feedparser # Set up some global variables. num_fetch_threads = 2 enclosure_queue = Queue() # A real app wouldn't use hard-coded data. feed_urls = [ 'http://talkpython.fm/episodes/rss', ] def message(s): print('{}: {}'.format(threading.current_thread().name, s))
The function download_enclosures() runs in the worker thread and processes the downloads using urllib.
def download_enclosures(q): """This is the worker thread function. It processes items in the queue one after another. These daemon threads go into an infinite loop, and exit only when the main thread ends. """ while True: message('looking for the next enclosure') url = q.get() filename = url.rpartition('/')[-1] message('downloading {}'.format(filename)) response = urllib.request.urlopen(url) data = response.read() # Save the downloaded file to the current directory. message('writing to {}'.format(filename)) with open(filename, 'wb') as outfile: outfile.write(data) q.task_done()
Once the target function for the threads is defined, the worker threads can be started. When download_enclosures() processes the statement url = q.get(), it blocks and waits until the queue has something to return. That means it is safe to start the threads before there is anything in the queue.
# Set up some threads to fetch the enclosures. for i in range(num_fetch_threads): worker = threading.Thread( target=download_enclosures, args=(enclosure_queue,), name='worker-{}'.format(i), ) worker.setDaemon(True) worker.start()
The next step is to retrieve the feed contents using the feedparser module and enqueue the URLs of the enclosures. As soon as the first URL is added to the queue, one of the worker threads picks it up and starts downloading it. The loop continues to add items until the feed is exhausted, and the worker threads take turns dequeuing URLs to download them.
# Download the feed(s) and put the enclosure URLs into # the queue. for url in feed_urls: response = feedparser.parse(url, agent='fetch_podcasts.py') for entry in response['entries'][:5]: for enclosure in entry.get('enclosures', []): parsed_url = urlparse(enclosure['url']) message('queuing {}'.format( parsed_url.path.rpartition('/')[-1])) enclosure_queue.put(enclosure['url'])
The only thing left to do is wait for the queue to empty out again, using join().
# Now wait for the queue to be empty, indicating that we have # processed all of the downloads. message('*** main thread waiting') enclosure_queue.join() message('*** done')
Running the sample script produces output similar to the following.
$ python3 fetch_podcasts.py worker-0: looking for the next enclosure worker-1: looking for the next enclosure MainThread: queuing turbogears-and-the-future-of-python-web-framework s.mp3 MainThread: queuing continuum-scientific-python-and-the-business-of-o pen-source.mp3 MainThread: queuing openstack-cloud-computing-built-on-python.mp3 MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3 MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3 MainThread: *** main thread waiting worker-0: downloading turbogears-and-the-future-of-python-web-framewo rks.mp3 worker-1: downloading continuum-scientific-python-and-the-business-of -open-source.mp3 worker-0: looking for the next enclosure worker-0: downloading openstack-cloud-computing-built-on-python.mp3 worker-1: looking for the next enclosure worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3 worker-0: looking for the next enclosure worker-0: downloading machine-learning-with-python-and-scikit-learn.m p3 worker-1: looking for the next enclosure worker-0: looking for the next enclosure MainThread: *** done
The actual output will depend on the contents of the RSS feed used.