mirror of
https://github.com/kennethreitz/python-guide.git
synced 2026-06-05 23:00:18 +00:00
+206
-2
@@ -226,13 +226,212 @@ Numba
|
|||||||
-----
|
-----
|
||||||
.. todo:: Write about Numba and the autojit compiler for NumPy
|
.. todo:: Write about Numba and the autojit compiler for NumPy
|
||||||
|
|
||||||
Threading
|
Concurrency
|
||||||
:::::::::
|
:::::::::::
|
||||||
|
|
||||||
|
|
||||||
|
Concurrent.futures
|
||||||
|
------------------
|
||||||
|
|
||||||
|
The `concurrent.futures`_ module is a module in the standard library that
|
||||||
|
provides a "high-level interface for asynchronously executing callables". It
|
||||||
|
abstracts away a lot of the more complicated details about using multiple
|
||||||
|
threads or processes for concurrency, and allows the user to focus on
|
||||||
|
accomplishing the task at hand.
|
||||||
|
|
||||||
|
The `concurrent.futures`_ module exposes two main classes, the
|
||||||
|
`ThreadPoolExecutor` and the `ProcessPoolExecutor`. The ThreadPoolExecutor
|
||||||
|
will create a pool of worker threads that a user can submit jobs to. These jobs
|
||||||
|
will then be executed in another thread when the next worker thread becomes
|
||||||
|
available.
|
||||||
|
|
||||||
|
The ProcessPoolExecutor works in the same way, except instead of using multiple
|
||||||
|
threads for its workers, it will use multiple processes. This makes it possible
|
||||||
|
to side-step the GIL, however because of the way things are passed to worker
|
||||||
|
processes, only picklable objects can be executed and returned.
|
||||||
|
|
||||||
|
Because of the way the GIL works, a good rule of thumb is to use a
|
||||||
|
ThreadPoolExecutor when the task being executed involves a lot of blocking
|
||||||
|
(i.e. making requests over the network) and to use a ProcessPoolExecutor
|
||||||
|
executor when the task is computationally expensive.
|
||||||
|
|
||||||
|
There are two main ways of executing things in parallel using the two
|
||||||
|
Executors. One way is with the `map(func, iterables)` method. This works
|
||||||
|
almost exactly like the builtin `map()` function, except it will execute
|
||||||
|
everything in parallel. :
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
import requests
|
||||||
|
|
||||||
|
def get_webpage(url):
|
||||||
|
page = requests.get(url)
|
||||||
|
return page
|
||||||
|
|
||||||
|
pool = ThreadPoolExecutor(max_workers=5)
|
||||||
|
|
||||||
|
my_urls = ['http://google.com/']*10 # Create a list of urls
|
||||||
|
|
||||||
|
for page in pool.map(get_webpage, my_urls):
|
||||||
|
# Do something with the result
|
||||||
|
print(page.text)
|
||||||
|
|
||||||
|
For even more control, the `submit(func, *args, **kwargs)` method will schedule
|
||||||
|
a callable to be executed ( as `func(*args, **kwargs)`) and returns a `Future`_
|
||||||
|
object that represents the execution of the callable.
|
||||||
|
|
||||||
|
The Future object provides various methods that can be used to check on the
|
||||||
|
progress of the scheduled callable. These include:
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
Attempt to cancel the call.
|
||||||
|
cancelled()
|
||||||
|
Return True if the call was successfully cancelled.
|
||||||
|
running()
|
||||||
|
Return True if the call is currently being executed and cannot be
|
||||||
|
cancelled.
|
||||||
|
done()
|
||||||
|
Return True if the call was successfully cancelled or finished running.
|
||||||
|
result()
|
||||||
|
Return the value returned by the call. Note that this call will block until
|
||||||
|
the scheduled callable returns by default.
|
||||||
|
exception()
|
||||||
|
Return the exception raised by the call. If no exception was raised then
|
||||||
|
this returns `None`. Note that this will block just like `result()`.
|
||||||
|
add_done_callback(fn)
|
||||||
|
Attach a callback function that will be executed (as `fn(future)`) when the
|
||||||
|
scheduled callable returns.
|
||||||
|
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||||||
|
|
||||||
|
def is_prime(n):
|
||||||
|
if n % 2 == 0:
|
||||||
|
return n, False
|
||||||
|
|
||||||
|
sqrt_n = int(n**0.5)
|
||||||
|
for i in range(3, sqrt_n + 1, 2):
|
||||||
|
if n % i == 0:
|
||||||
|
return n, False
|
||||||
|
return n, True
|
||||||
|
|
||||||
|
PRIMES = [
|
||||||
|
112272535095293,
|
||||||
|
112582705942171,
|
||||||
|
112272535095293,
|
||||||
|
115280095190773,
|
||||||
|
115797848077099,
|
||||||
|
1099726899285419]
|
||||||
|
|
||||||
|
futures = []
|
||||||
|
with ProcessPoolExecutor(max_workers=4) as pool:
|
||||||
|
# Schedule the ProcessPoolExecutor to check if a number is prime
|
||||||
|
# and add the returned Future to our list of futures
|
||||||
|
for p in PRIMES:
|
||||||
|
fut = pool.submit(is_prime, p)
|
||||||
|
futures.append(fut)
|
||||||
|
|
||||||
|
# As the jobs are completed, print out the results
|
||||||
|
for number, result in as_completed(futures):
|
||||||
|
if result:
|
||||||
|
print("{} is prime".format(number))
|
||||||
|
else:
|
||||||
|
print("{} is not prime".format(number))
|
||||||
|
|
||||||
|
The `concurrent.futures`_ module contains two helper functions for working with
|
||||||
|
Futures. The `as_completed(futures)` function returns an iterator over the list
|
||||||
|
of futures, yielding the futures as they complete.
|
||||||
|
|
||||||
|
The `wait(futures)` function will simply block until all futures in the list of
|
||||||
|
futures provided have completed.
|
||||||
|
|
||||||
|
For more information, on using the `concurrent.futures`_ module, consult the
|
||||||
|
official documentation.
|
||||||
|
|
||||||
Threading
|
Threading
|
||||||
---------
|
---------
|
||||||
|
|
||||||
|
The standard library comes with a `threading`_ module that allows a user to
|
||||||
|
work with multiple threads manually.
|
||||||
|
|
||||||
|
Running a function in another thread is as simple as passing a callable and
|
||||||
|
it's arguments to `Thread`'s constructor and then calling `start()`:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from threading import Thread
|
||||||
|
import requests
|
||||||
|
|
||||||
|
def get_webpage(url):
|
||||||
|
page = requests.get(url)
|
||||||
|
return page
|
||||||
|
|
||||||
|
some_thread = Thread(get_webpage, 'http://google.com/')
|
||||||
|
some_thread.start()
|
||||||
|
|
||||||
|
To wait until the thread has terminated, call `join()`:
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
some_thread.join()
|
||||||
|
|
||||||
|
After calling `join()`, it is always a good idea to check whether the thread is
|
||||||
|
still alive (because the join call timed out):
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
if some_thread.is_alive():
|
||||||
|
print("join() must have timed out.")
|
||||||
|
else:
|
||||||
|
print("Our thread has terminated.")
|
||||||
|
|
||||||
|
Because multiple threads have access to the same section of memory, sometimes
|
||||||
|
there might be situations where two or more threads are trying to write to the
|
||||||
|
same resource at the same time or where the output is dependent on the sequence
|
||||||
|
or timing of certain events. This is called a `data race`_ or race condition.
|
||||||
|
When this happens, the output will be garbled or you may encounter problems
|
||||||
|
which are difficult to debug. A good example is this `stackoverflow post`_.
|
||||||
|
|
||||||
|
The way this can be avoided is by using a `Lock`_ that each thread needs to
|
||||||
|
acquire before writing to a shared resource. Locks can be acquired and released
|
||||||
|
through either the contextmanager protocol (`with` statement), or by using
|
||||||
|
`acquire()` and `release()` directly. Here is a (rather contrived) example:
|
||||||
|
|
||||||
|
|
||||||
|
.. code-block:: python
|
||||||
|
|
||||||
|
from threading import Lock, Thread
|
||||||
|
|
||||||
|
file_lock = Lock()
|
||||||
|
|
||||||
|
def log(msg):
|
||||||
|
with file_lock:
|
||||||
|
open('website_changes.log', 'w') as f:
|
||||||
|
f.write(changes)
|
||||||
|
|
||||||
|
def monitor_website(some_website):
|
||||||
|
"""
|
||||||
|
Monitor a website and then if there are any changes,
|
||||||
|
log them to disk.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
changes = check_for_changes(some_website)
|
||||||
|
if changes:
|
||||||
|
log(changes)
|
||||||
|
|
||||||
|
websites = ['http://google.com/', ... ]
|
||||||
|
for website in websites:
|
||||||
|
t = Thread(monitor_website, website)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
Here, we have a bunch of threads checking for changes on a list of sites and
|
||||||
|
whenever there are any changes, they attempt to write those changes to a file
|
||||||
|
by calling `log(changes)`. When `log()` is called, it will wait to acquire
|
||||||
|
the lock with `with file_lock:`. This ensures that at any one time, only one
|
||||||
|
thread is writing to the file.
|
||||||
|
|
||||||
Spawning Processes
|
Spawning Processes
|
||||||
------------------
|
------------------
|
||||||
@@ -248,3 +447,8 @@ Multiprocessing
|
|||||||
.. _`New GIL`: http://www.dabeaz.com/python/NewGIL.pdf
|
.. _`New GIL`: http://www.dabeaz.com/python/NewGIL.pdf
|
||||||
.. _`Special care`: http://docs.python.org/c-api/init.html#threads
|
.. _`Special care`: http://docs.python.org/c-api/init.html#threads
|
||||||
.. _`David Beazley's`: http://www.dabeaz.com/GIL/gilvis/measure2.py
|
.. _`David Beazley's`: http://www.dabeaz.com/GIL/gilvis/measure2.py
|
||||||
|
.. _`concurrent.futures`: https://docs.python.org/3/library/concurrent.futures.html
|
||||||
|
.. _`Future`: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future
|
||||||
|
.. _`threading`: https://docs.python.org/3/library/threading.html
|
||||||
|
.. _`stackoverflow post`: http://stackoverflow.com/questions/26688424/python-threads-are-printing-at-the-same-time-messing-up-the-text-output
|
||||||
|
.. _`data race`: https://en.wikipedia.org/wiki/Race_condition
|
||||||
|
|||||||
Reference in New Issue
Block a user