From 1d25e8bffe5bada0f188b56215ef21e9bb2ca482 Mon Sep 17 00:00:00 2001 From: Michael Bryan Date: Fri, 3 Jun 2016 15:24:21 +0800 Subject: [PATCH] Added a bit about concurrent.futures --- docs/scenarios/speed.rst | 127 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 125 insertions(+), 2 deletions(-) diff --git a/docs/scenarios/speed.rst b/docs/scenarios/speed.rst index 6050c98..9cba3e2 100644 --- a/docs/scenarios/speed.rst +++ b/docs/scenarios/speed.rst @@ -226,10 +226,131 @@ Numba ----- .. todo:: Write about Numba and the autojit compiler for NumPy -Threading -::::::::: +Concurrency +::::::::::: +Concurrent.futures +------------------ + +The `concurrent.futures`_ module is a module in the standard library that +provides a "high-level interface for asynchronously executing callables". It +abstracts away a lot of the more complicated details about using multiple +threads or processes for concurrency, and allows the user to focus on +accomplishing the task at hand. + +The `concurrent.futures`_ module exposes two main classes, the +`ThreadPoolExecutor` and the `ProcessPoolExecutor`. The ThreadPoolExecutor +will create a pool of worker threads that a user can submit jobs to. These jobs +will then be executed in another thread when the next worker thread becomes +available. + +The ProcessPoolExecutor works in the same way, except instead of using multiple +threads for its workers, it will use multiple processes. This makes it possible +to side-step the GIL, however because of the way things are passed to worker +processes, only picklable objects can be executed and returned. + +Because of the way the GIL works, a good rule of thumb is to use a +ThreadPoolExecutor when the task being executed involves a lot of blocking +(i.e. making requests over the network) and to use a ProcessPoolExecutor +executor when the task is computationally expensive. + +There are two main ways of executing things in parallel using the two +Executors. One way is with the `map(func, iterables)` method. This works +almost exactly like the builtin `map()` function, except it will execute +everything in parallel. :: + + from concurrent.futures import ThreadPoolExecutor + import requests + + def get_webpage(url): + """ + Some blocking function. + """ + page = requests.get(url) + return page + + pool = ThreadPoolExecutor(max_workers=5) + + my_urls = ['http://google.com/']*10 # Create a list of urls + + for page in pool.map(get_webpage, my_urls): + # Do something with the result + print(page.text) + +For even more control, the `submit(func, *args, **kwargs)` method will schedule +a callable to be executed ( as `func(*args, **kwargs)`) and returns a `Future`_ +object that represents the execution of the callable. + +The Future object provides various methods that can be used to check on the +progress of the scheduled callable. These include: + +cancel() + Attempt to cancel the call. +cancelled() + Return True if the call was successfully cancelled. +running() + Return True if the call is currently being executed and cannot be + cancelled. +done() + Return True if the call was successfully cancelled or finished running. +result() + Return the value returned by the call. Note that this call will block until + the scheduled callable returns by default. +exception() + Return the exception raised by the call. If no exception was raised then + this returns `None`. Note that this will block just like `result()`. +add_done_callback(fn) + Attach a callback function that will be executed (as `fn(future)`) when the + scheduled callable returns. + +:: + + from concurrent.futures import ProcessPoolExecutor, as_completed + + def is_prime(n): + if n % 2 == 0: + return n, False + + sqrt_n = int(n**0.5) + for i in range(3, sqrt_n + 1, 2): + if n % i == 0: + return n, False + return n, True + + PRIMES = [ + 112272535095293, + 112582705942171, + 112272535095293, + 115280095190773, + 115797848077099, + 1099726899285419] + + futures = [] + with ProcessPoolExecutor(max_workers=4) as pool: + # Schedule the ProcessPoolExecutor to check if a number is prime + # and add the returned Future to our list of futures + for p in PRIMES: + fut = pool.submit(is_prime, p) + futures.append(fut) + + # As the jobs are completed, print out the results + for number, result in as_completed(futures): + if result: + print("{} is prime".format(number)) + else: + print("{} is not prime".format(number)) + +The `concurrent.futures`_ module contains two helper functions for working with +Futures. The `as_completed(futures)` function returns an iterator over the list +of futures, yielding the futures as they complete. + +The `wait(futures)` function will simply block until all futures in the list of +futures provided have completed. + +For more information, on using the `concurrent.futures`_ module, consult the +official documentation. + Threading --------- @@ -248,3 +369,5 @@ Multiprocessing .. _`New GIL`: http://www.dabeaz.com/python/NewGIL.pdf .. _`Special care`: http://docs.python.org/c-api/init.html#threads .. _`David Beazley's`: http://www.dabeaz.com/GIL/gilvis/measure2.py +.. _`concurrent.futures`: https://docs.python.org/3/library/concurrent.futures.html +.. _`Future`: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future