Python threads: communication and stopping

  eli.thegreenplace.ne        2011-12-28 07:38:32       3,216        0    

A very common doubt developers new to Python have is how to use its threads correctly. Specifically, a large amount of questions on StackOverflow show that people struggle most with two aspects:

  1. How to stop / kill a thread
  2. How to safely pass data to a thread and back

I already have a blog post touching on these issues right here, but I feel it’s too task-specific for sockets, and a more basic and general post would be appropriate. I assume the reader has a basic familiarity with Python threads, i.e. has at least went over the documentation.

So, without further ado, here’s a sample "worker" thread implementation. It can be given tasks, where each task is a directory name, and it does useful work. This work is recursively listing all the files contained in the given directory and its sub-directories.

import os, time
import threading, Queue

class WorkerThread(threading.Thread):
    """ A worker thread that takes directory names from a queue, finds all
        files in them recursively and reports the result.

        Input is done by placing directory names (as strings) into the
        Queue passed in dir_q.

        Output is done by placing tuples into the Queue passed in result_q.
        Each tuple is (thread name, dirname, [list of files]).

        Ask the thread to stop by calling its join() method.
    """
    def __init__(self, dir_q, result_q):
        super(WorkerThread, self).__init__()
        self.dir_q = dir_q
        self.result_q = result_q
        self.stoprequest = threading.Event()

    def run(self):
        # As long as we weren't asked to stop, try to take new tasks from the
        # queue. The tasks are taken with a blocking 'get', so no CPU
        # cycles are wasted while waiting.
        # Also, 'get' is given a timeout, so stoprequest is always checked,
        # even if there's nothing in the queue.
        while not self.stoprequest.isSet():
            try:
                dirname = self.dir_q.get(True, 0.05)
                filenames = list(self._files_in_dir(dirname))
                self.result_q.put((self.name, dirname, filenames))
            except Queue.Empty:
                continue

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerThread, self).join(timeout)

    def _files_in_dir(self, dirname):
        """ Given a directory name, yields the names of all files (not dirs)
            contained in this directory and its sub-directories.
        """
        for path, dirs, files in os.walk(dirname):
            for file in files:
                yield os.path.join(path, file)

For experienced programmers, going over the code and its documentation and comments should be enough. For folks with less experience, I will elaborate. Let’s address the two issues mentioned at the top of the post.

First, killing a thread. This is accomplished by politely asking the thread to die. The join method of Thread is overridden, and before calling the actual join of the parent class, it "sets" the self.stoprequest attribute, which is a threading.Event. The main loop in the thread’s run method checks this flag, and exits when it’s set. You can think of threading.Event as a synchronized boolean flag. Keep in mind that the join method is called in the context of the main thread, while the body of the run method is executed in the context of the worker thread.

Second, passing data into and out of a thread. This is best done with the help of Queue objects from the Queue module (yep, it’s Queue.Queue – a bit awkward, I agree. In Python 3 this was fixed and the module is named in lowercase – queue). When the worker thread is created, it is given a reference to one queue for input, and one queue for output. Queue objects can be safely shared between threads (any amount of threads, actually), and provide a synchronized FIFO queue interface.

Probably the most important part of the code to understand is these lines:

while not self.stoprequest.isSet():
    try:
        dirname = self.dir_q.get(True, 0.05)
        ... # do work
    except Queue.Empty:
        continue

This is a simple way to wait on two conditions simultaneously. The thread does work only if wasn’t asked to stop, and there’s work to do in the queue. Therefore, the Queue.get method is used as follows:

  • Its blocking argument is set to true, meaning that the call will block the worker thread until there’s an item in the queue, or…
  • The timeout argument is also set, meaning that get will block for at most timeout seconds. If no item appeared in the queue within this timeout, the method throws a Queue.Empty exception.

So what we have here is a way to both wait on the queue without wasting CPU cycles (get uses special OS services deep underneath to implement a non-spinning wait), and check the stoprequest event occasionally. There’s just one gotcha, however. If the work is taking long to perform, the next check of stoprequest may be delayed. If this is important for your application, consider doing the work in short chunks, checking stoprequest after each chunk.

Here’s some simple code that uses this worker thread. It creates a thread pool with 4 threads, feeds them work and waits for all the results to arrive:

def main(args):
    # Create a single input and a single output queue for all threads.
    dir_q = Queue.Queue()
    result_q = Queue.Queue()

    # Create the "thread pool"
    pool = [WorkerThread(dir_q=dir_q, result_q=result_q) for i in range(4)]

    # Start all threads
    for thread in pool:
        thread.start()

    # Give the workers some work to do
    work_count = 0
    for dir in args:
        if os.path.exists(dir):
            work_count += 1
            dir_q.put(dir)

    print 'Assigned %s dirs to workers' % work_count

    # Now get all the results
    while work_count > 0:
        # Blocking 'get' from a Queue.
        result = result_q.get()
        print 'From thread %s: %s files found in dir %s' % (
            result[0], len(result[2]), result[1])
        work_count -= 1

    # Ask threads to die and wait for them to do it
    for thread in pool:
        thread.join()

if __name__ == '__main__':
    import sys
    main(sys.argv[1:])

All worker threads in the pool share the same input queue and output queue. There’s absolutely no problem with this. On the contrary, as you can see it enables a very simple implementation of a thread pool to be quite functional.

Please treat this sample as a template rather than a one-size-fits-all solution for every possible threading scenario. Concurrency is a complex topic, and there are a lot of tradeoffs involved in the design of even the simplest multi-threaded programs.

Finally, note that such worker threads in Python are only useful if the work they do is not CPU bound. The thread shown here is a good example – listing directories and files is mostly an I/O bound task (well, less so if you have a really fast SSD). Other good candidates are socket I/O, user interaction, and anything involving the web (i.e. fetching data from HTTP or RPC services).

CPU-bound tasks are not a good fit for Python threads, due to the Global Interpreter Lock (GIL). Parallel computations in Python should be done in multiple processes, not threads. In a future article I’ll discuss how to use the multiprocessing module to manage worker processes in a similar manner.

Source : http://eli.thegreenplace.net/2011/12/27/python-threads-communication-and-stopping/

PYTHON  MULTITHREADING  COMMUNICATION  SYNCHRONIZE 

       

  RELATED


  0 COMMENT


No comment for this article.



  RANDOM FUN

Peter is fired again

Since last time we posted Don't call me Peter again, today we see that Peter is fired again.