Introduction to Parallel Processing
For parallelization, it’s essential to divide a problem into sub-units that are independent (or have minimal dependencies) on other sub-units. A problem where the sub-units are entirely independent of each other is called embarrassingly parallel.
For example, an element-wise operation on an array. In this case, the operation only needs to be aware of the specific element it’s currently working with.
In another scenario, a problem divided into sub-units might need to share some data to perform operations. This leads to performance issues due to communication overhead.
There are two main approaches to managing parallel programs:
Threads are one way to achieve parallelism with shared memory. These are independent sub-tasks that originate from a single process and share memory. However, due to the Global Interpreter Lock (GIL) in CPython (the standard Python implementation), threads cannot achieve true parallelism for CPU-bound tasks. The GIL is a mechanism that allows only one Python instruction to be executed at a time within a single process. By using processes instead of threads, the GIL limitation can be completely avoided for CPU-bound parallelism. Using processes has some minor drawbacks, such as more complex inter-process communication compared to shared memory with threads, but it’s more effective for utilizing multiple CPU cores.
Multiprocessing for Parallel Processing
Using the standard multiprocessing
module, we can parallelize tasks by creating child processes. This module provides an easy-to-use interface and includes a set of helper tools for handling task distribution and synchronization.
Process Class and Pool
Process
By subclassing multiprocessing.Process
, you can create a process that runs independently. By extending the __init__
method, you can initialize 1 resources, and by implementing the Process.run()
method, you can write the subprocess code. The following code demonstrates how to create processes that print their assigned IDs:
Python
import multiprocessing
import time
class Process(multiprocessing.Process):
def __init__(self, id):
super(Process, self).__init__()
self.id = id
def run(self):
time.sleep(1)
print("I'm the process with id: {}".format(self.id))
if __name__ == '__main__': # Important for multiprocessing, especially on Windows
p = Process(0)
p.start()
p.join() # Wait for process 0 to finish
p = Process(1)
p.start()
p.join() # Wait for process 1 to finish
To create a process, we initialize our Process
object and call the Process.start()
method. Process.start()
creates a new process and calls the Process.run()
method within it.
The code after p.start()
executes concurrently with the new process. To wait for the process to complete, you use Process.join()
.
Pool Class
The Pool
class can be used to execute a function in parallel for different input data. The multiprocessing.Pool()
class creates a pool of processes (workers) and can distribute tasks using the apply
/apply_async
and map
/map_async
methods. For parallel mapping, you first initialize a multiprocessing.Pool()
object. The first argument is the number of workers. If not given, it defaults to the number of CPU cores in the system.
Here’s an example demonstrating how to distribute a function that calculates the square of a number:
Python
import multiprocessing
import time
def square(x):
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool() # Uses all available cores
# pool = multiprocessing.Pool(processes=4) # Uses 4 processes
inputs = [0, 1, 2, 3, 4]
outputs = pool.map(square, inputs)
print("Input: {}".format(inputs))
print("Output: {}".format(outputs))
When using the regular map
method, the program execution blocks until all workers complete their tasks. Using map_async()
, an AsyncResult
object is returned immediately without blocking the main program, and the work is done in the background. The result can be retrieved using the AsyncResult.get()
method at any time:
Python
import multiprocessing
import time
def square(x):
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool()
inputs = [0, 1, 2, 3, 4]
outputs_async = pool.map_async(square, inputs)
outputs = outputs_async.get() # Blocks until results are available
print("Output: {}".format(outputs))
Pool.apply_async
assigns a task consisting of a function to one of the workers. It takes the function and its arguments and returns an AsyncResult
object.
Python
import multiprocessing
import time
def square(x):
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool()
result_async = [pool.apply_async(square, args=(i,)) for i in range(10)]
results = [r.get() for r in result_async]
print("Output: {}".format(results))
IPython Parallel Framework
The IPython parallel package provides a framework for setting up and running computations on single-core machines and multi-node networked clusters. In IPython.parallel, you set up a set of worker processes called Engines, which are managed by a Controller. The Controller is an entity that helps establish communication between the client and the engines. In this approach, worker processes are started separately and wait indefinitely for commands from the client.
The ipcluster
shell commands are used to start the Controller and Engines.
Bash
ipcluster start
After the above process, we can use an IPython shell to perform parallel work. IPython has two main interfaces:
Direct Interface
The direct interface allows you to explicitly send commands to each of the computational units (Engines). It’s flexible and easy to use. To interact with the units, you need to start the engines and then an IPython session in a separate shell. You can connect to the Controller by creating a Client. In the following code, we import the Client
class and create an instance:
Python
from IPython.parallel import Client
rc = Client()
rc.ids
Here, Client.ids
provides a list of integers representing the available engine IDs.
Using a Direct View instance, you can issue commands to the engines. You can get a Direct View instance in two ways:
dview = rc[0]
(accesses engine 0)DirectView.direct_view
method: dview = rc.direct_view('all')
(accesses all engines)As the final step, you can execute commands using the DirectView.execute
method:
Python
dview.execute('a = 1')
The above command is executed individually by each engine. You can retrieve the result as an AsyncResult
object using the get
method:
Python
dview.pull('a').get()
dview.push({'a': 2})
As shown above, you can retrieve data using the DirectView.pull
method and send data using the DirectView.push
method.
Task-Based Interface
The task-based interface provides a more intelligent way to perform computational tasks. From the user’s perspective, this interface is less flexible but is efficient in load balancing across the engines and can resubmit failed tasks, thereby improving performance.
The LoadBalanceView
class provides the task-based interface using the load_balanced_view
method.
Python
from IPython.parallel import Client
rc = Client()
tview = rc.load_balanced_view()
You can execute some tasks using the map
and apply
methods. In LoadBalanceView
, task assignment depends on the load that is on the engines at that time. This ensures that all engines work without being overloaded.