Skip to content

poreflow.parallel

MemoryMonitor

Context manager for monitoring memory usage and progress during parallel processing.

Attributes:

Name Type Description
n_tasks int

Total number of tasks to be completed.

pool Pool

The multiprocessing pool being monitored.

update_interval int

Time interval in seconds between updates.

stop_event Event

Event to signal when monitoring should stop.

pbar tqdm

The progress bar to be updated.

thread

The monitoring thread.

__enter__()

Enters the context and starts monitoring.

Creates a progress bar and starts the monitoring thread.

Returns:

Name Type Description
tqdm

The progress bar instance.

__exit__(exc_type, exc_val, exc_tb)

Exits the context and stops monitoring.

Stops the monitoring thread and updates the progress bar with final statistics.

Parameters:

Name Type Description Default
exc_type

Exception type if an exception occurred.

required
exc_val

Exception value if an exception occurred.

required
exc_tb

Exception traceback if an exception occurred.

required

Returns:

Type Description

None

__init__(n_tasks, pool, update_interval=10)

Initializes the MemoryMonitor.

Parameters:

Name Type Description Default
n_tasks int

Total number of tasks to be completed.

required
pool Pool

The multiprocessing pool being monitored.

required
update_interval int

Time interval in seconds between updates. Defaults to 10.

10

MonitoredPool

Bases: Pool

A multiprocessing Pool with built-in memory and progress monitoring.

This class extends the standard multiprocessing Pool to include automatic memory usage and progress monitoring through a context manager.

Attributes:

Name Type Description
n_tasks int

Total number of tasks to complete..

pbar

The tqdm progress bar instance.

__enter__()

Enters the context and starts monitoring.

Initializes the parent Pool context and starts the memory monitor.

Returns:

Name Type Description
MonitoredPool

The pool instance for use in the context.

__exit__(exc_type, exc_val, exc_tb)

Exits the context and stops monitoring.

Stops the memory monitor and cleans up the parent Pool context.

Parameters:

Name Type Description Default
exc_type

Exception type if an exception occurred.

required
exc_val

Exception value if an exception occurred.

required
exc_tb

Exception traceback if an exception occurred.

required

Returns:

Type Description

None

__init__(n_tasks, *args, **kwargs)

Initializes the MonitoredPool.

Parameters:

Name Type Description Default
n_tasks int

Total number of tasks to complete.

required
*args

Additional arguments for Pool.init, consult See Also section.

()
**kwargs

Additional keyword arguments for Pool.init, consult See Also section.

{}
See Also

multiprocessing.pool.Pool_.

.. _multiprocessing.pool.Pool: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

mapper(worker_func, fname, pointers, callback=None, processes=None, verbose=0, kwargs=None)

Executes a worker function in parallel across multiple items.

This function provides parallel processing capabilities with optional progress monitoring and callback functionality. It handles argument packing, process management, and result collection.

Useful for doing tasks over a large number of channels/events. Generally, each worker reads data from a (Fast5) file, does processing, and returns an output. HDF5 supports a Single Writer Multiple Reader_ arrangement, so each worker is free to read from file. Writing to the annotations file has to be done for one worker at a time, which is done at the completion of a task by a worker using a callback function in the main thread.

Parameters:

Name Type Description Default
fname PathLike | str
required
worker_func Callable

The function to execute in parallel. Must accept (fname, pointer, **kwargs) as arguments.

required
handle PathLike | str

The filename or path to pass to worker_func.

required
pointers list

Iterable of pointers to process in parallel.

required
callback Callable

Function to call with each result. If provided, will be called as callback(result, lock=lock) or callback(result) depending on signature.

None
processes int | bool

Number of worker processes. If None, uses multiprocessing.cpu_count(). If False, runs workers sequentially without using a pool.

None
verbose int

Verbosity level: 0: Silent operation 1: Print worker startup information 2: Show progress bar with memory monitoring

0
kwargs dict

Additional keyword arguments to pass to worker_func.

None

Returns:

Name Type Description
list

List of results from worker_func calls, in order of completion.

.. _Single Writer Multiple Reader: https://docs.h5py.org/en/latest/swmr.html