Working with parallel processing and Python

Python programming
Python programming

Introduction to Parallel Processing

For parallelization, it’s essential to divide a problem into sub-units that are independent (or have minimal dependencies) on other sub-units. A problem where the sub-units are entirely independent of each other is called embarrassingly parallel.

For example, an element-wise operation on an array. In this case, the operation only needs to be aware of the specific element it’s currently working with.

In another scenario, a problem divided into sub-units might need to share some data to perform operations. This leads to performance issues due to communication overhead.

There are two main approaches to managing parallel programs:

  • Shared Memory: In shared memory, sub-units can communicate with each other through the same memory space. The advantage is that you don’t need to explicitly manage communication, as simply reading from or writing to the shared memory is sufficient. However, problems arise when multiple processes access the same memory location and try to modify it simultaneously. These conflicts can be prevented using synchronization techniques (like locks, semaphores, etc.).
  • Distributed Memory: In distributed memory, each process is completely isolated and has its own memory space. In this scenario, communication is explicitly handled between processes, often using message passing over a network. Since communication happens over a network interface, it’s generally more expensive compared to shared memory.

Threads are one way to achieve parallelism with shared memory. These are independent sub-tasks that originate from a single process and share memory. However, due to the Global Interpreter Lock (GIL) in CPython (the standard Python implementation), threads cannot achieve true parallelism for CPU-bound tasks. The GIL is a mechanism that allows only one Python instruction to be executed at a time within a single process. By using processes instead of threads, the GIL limitation can be completely avoided for CPU-bound parallelism. Using processes has some minor drawbacks, such as more complex inter-process communication compared to shared memory with threads, but it’s more effective for utilizing multiple CPU cores.

Multiprocessing for Parallel Processing

Using the standard multiprocessing module, we can parallelize tasks by creating child processes. This module provides an easy-to-use interface and includes a set of helper tools for handling task distribution and synchronization.

Process Class and Pool

Process

By subclassing multiprocessing.Process, you can create a process that runs independently. By extending the __init__ method, you can initialize 1 resources, and by implementing the Process.run() method, you can write the subprocess code. The following code demonstrates how to create processes that print their assigned IDs:  

Python

import multiprocessing
import time

class Process(multiprocessing.Process):
    def __init__(self, id):
        super(Process, self).__init__()
        self.id = id

    def run(self):
        time.sleep(1)
        print("I'm the process with id: {}".format(self.id))

if __name__ == '__main__':  # Important for multiprocessing, especially on Windows
    p = Process(0)
    p.start()
    p.join()  # Wait for process 0 to finish

    p = Process(1)
    p.start()
    p.join()  # Wait for process 1 to finish

To create a process, we initialize our Process object and call the Process.start() method. Process.start() creates a new process and calls the Process.run() method within it.

The code after p.start() executes concurrently with the new process. To wait for the process to complete, you use Process.join().

Pool Class

The Pool class can be used to execute a function in parallel for different input data. The multiprocessing.Pool() class creates a pool of processes (workers) and can distribute tasks using the apply/apply_async and map/map_async methods. For parallel mapping, you first initialize a multiprocessing.Pool() object. The first argument is the number of workers. If not given, it defaults to the number of CPU cores in the system.

Here’s an example demonstrating how to distribute a function that calculates the square of a number:

Python

import multiprocessing
import time

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool()  # Uses all available cores
    # pool = multiprocessing.Pool(processes=4) # Uses 4 processes
    inputs = [0, 1, 2, 3, 4]
    outputs = pool.map(square, inputs)
    print("Input: {}".format(inputs))
    print("Output: {}".format(outputs))

When using the regular map method, the program execution blocks until all workers complete their tasks. Using map_async(), an AsyncResult object is returned immediately without blocking the main program, and the work is done in the background. The result can be retrieved using the AsyncResult.get() method at any time:

Python

import multiprocessing
import time

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    inputs = [0, 1, 2, 3, 4]
    outputs_async = pool.map_async(square, inputs)
    outputs = outputs_async.get()  # Blocks until results are available
    print("Output: {}".format(outputs))

Pool.apply_async assigns a task consisting of a function to one of the workers. It takes the function and its arguments and returns an AsyncResult object.

Python

import multiprocessing
import time

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    result_async = [pool.apply_async(square, args=(i,)) for i in range(10)]
    results = [r.get() for r in result_async]
    print("Output: {}".format(results))

IPython Parallel Framework

The IPython parallel package provides a framework for setting up and running computations on single-core machines and multi-node networked clusters. In IPython.parallel, you set up a set of worker processes called Engines, which are managed by a Controller. The Controller is an entity that helps establish communication between the client and the engines. In this approach, worker processes are started separately and wait indefinitely for commands from the client.

The ipcluster shell commands are used to start the Controller and Engines.

Bash

ipcluster start

After the above process, we can use an IPython shell to perform parallel work. IPython has two main interfaces:

  • Direct Interface
  • Task-Based Interface

Direct Interface

The direct interface allows you to explicitly send commands to each of the computational units (Engines). It’s flexible and easy to use. To interact with the units, you need to start the engines and then an IPython session in a separate shell. You can connect to the Controller by creating a Client. In the following code, we import the Client class and create an instance:

Python

from IPython.parallel import Client
rc = Client()
rc.ids

Here, Client.ids provides a list of integers representing the available engine IDs.

Using a Direct View instance, you can issue commands to the engines. You can get a Direct View instance in two ways:

  • By indexing the client instance: dview = rc[0] (accesses engine 0)
  • By calling the DirectView.direct_view method: dview = rc.direct_view('all') (accesses all engines)

As the final step, you can execute commands using the DirectView.execute method:

Python

dview.execute('a = 1')

The above command is executed individually by each engine. You can retrieve the result as an AsyncResult object using the get method:

Python

dview.pull('a').get()
dview.push({'a': 2})

As shown above, you can retrieve data using the DirectView.pull method and send data using the DirectView.push method.

Task-Based Interface

The task-based interface provides a more intelligent way to perform computational tasks. From the user’s perspective, this interface is less flexible but is efficient in load balancing across the engines and can resubmit failed tasks, thereby improving performance.

The LoadBalanceView class provides the task-based interface using the load_balanced_view method.

Python

from IPython.parallel import Client
rc = Client()
tview = rc.load_balanced_view()

You can execute some tasks using the map and apply methods. In LoadBalanceView, task assignment depends on the load that is on the engines at that time. This ensures that all engines work without being overloaded.