Threads allow Python programs to handle multiple functions at once as opposed to running a sequence of commands individually. This topic explains the principles behind threading and demonstrates its usage.
# Basics of multithreading
threading module, a new thread of execution may be started by creating a new
threading.Thread and assigning it a function to execute:
import threading def foo(): print "Hello threading!" my_thread = threading.Thread(target=foo)
target parameter references the function (or callable object) to be run. The thread will not begin execution until
start is called on the
Starting a Thread
my_thread.start() # prints 'Hello threading!'
my_thread has run and terminated, calling
start again will produce a
RuntimeError. If you'd like to run your thread as a daemon, passing the
daemon=True kwarg, or setting
True before calling
start(), causes your
Thread to run silently in the background as a daemon.
Joining a Thread
In cases where you split up one big job into several small ones and want to run them concurrently, but need to wait for all of them to finish before continuing,
Thread.join() is the method you're looking for.
For example, let's say you want to download several pages of a website and compile them into a single page. You'd do this:
import requests from threading import Thread from queue import Queue q = Queue(maxsize=20) def put_page_to_q(page_num): q.put(requests.get('http://some-website.com/page_%s.html' % page_num) def compile(q): # magic function that needs all pages before being able to be executed if not q.full(): raise ValueError else: print("Done compiling!") threads =  for page_num in range(20): t = Thread(target=requests.get, args=(page_num,)) t.start() threads.append(t) # Next, join all threads to make sure all threads are done running before # we continue. join() is a blocking call (unless specified otherwise using # the kwarg blocking=False when calling join) for t in threads: t.join() # Call compile() now, since all threads have completed compile(q)
A closer look at how
join() works can be found here (opens new window).
Create a Custom Thread Class
threading.Thread class we can subclass new custom Thread class.
we must override
run method in a subclass.
from threading import Thread import time class Sleepy(Thread): def run(self): time.sleep(5) print("Hello form Thread") if __name__ == "__main__": t = Sleepy() t.start() # start method automatic call Thread class run method. # print 'The main program continues to run in foreground.' t.join() print("The main program continues to run in the foreground.")
# Communicating between threads
There are multiple threads in your code and you need to safely communicate between them.
You can use a
Queue from the
from queue import Queue from threading import Thread # create a data producer def producer(output_queue): while True: data = data_computation() output_queue.put(data) # create a consumer def consumer(input_queue): while True: # retrieve data (blocking) data = input_queue.get() # do something with the data # indicate data has been consumed input_queue.task_done()
Creating producer and consumer threads with a shared queue
q = Queue() t1 = Thread(target=consumer, args=(q,)) t2 = Thread(target=producer, args=(q,)) t1.start() t2.start()
# Creating a worker pool
from socket import socket, AF_INET, SOCK_STREAM from threading import Thread from queue import Queue def echo_server(addr, nworkers): print('Echo server running at', addr) # Launch the client workers q = Queue() for n in range(nworkers): t = Thread(target=echo_client, args=(q,)) t.daemon = True t.start() # Run the server sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() q.put((client_sock, client_addr)) echo_server(('',15000), 128)
from socket import AF_INET, SOCK_STREAM, socket from concurrent.futures import ThreadPoolExecutor def echo_server(addr): print('Echo server running at', addr) pool = ThreadPoolExecutor(128) sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() pool.submit(echo_client, client_sock, client_addr) echo_server(('',15000))
Python Cookbook, 3rd edition, by David Beazley and Brian K. Jones (O’Reilly). Copyright 2013 David Beazley and Brian Jones, 978-1-449-34037-7.
# Advanced use of multithreads
This section will contain some of the most advanced examples realized using Multithreading.
# Advanced printer (logger)
A thread that prints everything is received and modifies the output according to the terminal width. The nice part is that also the "already written" output is modified when the width of the terminal changes.
#!/usr/bin/env python2 import threading import Queue import time import sys import subprocess from backports.shutil_get_terminal_size import get_terminal_size printq = Queue.Queue() interrupt = False lines =  def main(): ptt = threading.Thread(target=printer) # Turn the printer on ptt.daemon = True ptt.start() # Stupid example of stuff to print for i in xrange(1,100): printq.put(' '.join([str(x) for x in range(1,i)])) # The actual way to send stuff to the printer time.sleep(.5) def split_line(line, cols): if len(line) > cols: new_line = '' ww = line.split() i = 0 while len(new_line) <= (cols - len(ww[i]) - 1): new_line += ww[i] + ' ' i += 1 print len(new_line) if new_line == '': return (line, '') return (new_line, ' '.join(ww[i:])) else: return (line, '') def printer(): while True: cols, rows = get_terminal_size() # Get the terminal dimensions msg = '#' + '-' * (cols - 2) + '#\n' # Create the try: new_line = str(printq.get_nowait()) if new_line != '!@#EXIT#@!': # A nice way to turn the printer # thread out gracefully lines.append(new_line) printq.task_done() else: printq.task_done() sys.exit() except Queue.Empty: pass # Build the new message to show and split too long lines for line in lines: res = line # The following is to split lines which are # longer than cols. while len(res) !=0: toprint, res = split_line(res, cols) msg += '\n' + toprint # Clear the shell and print the new output subprocess.check_call('clear') # Keep the shell clean sys.stdout.write(msg) sys.stdout.flush() time.sleep(.5)
# Stoppable Thread with a while Loop
import threading import time class StoppableThread(threading.Thread): """Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition.""" def __init__(self): super(StoppableThread, self).__init__() self._stop_event = threading.Event() def stop(self): self._stop_event.set() def join(self, *args, **kwargs): self.stop() super(StoppableThread,self).join(*args, **kwargs) def run() while not self._stop_event.is_set(): print("Still running!") time.sleep(2) print("stopped!"
Based on this Question (opens new window).