chainlet.concurrency.base module

class chainlet.concurrency.base.ConcurrentBundle(elements)

Bases: chainlet.chainlink.Bundle

A group of chainlets that concurrently process each data chunk

Processing of chainlets is performed using only the requesting threads. This allows thread-safe usage, but requires explicit concurrent usage for blocking actions, such as file I/O or time.sleep(), to be run in parallel.

Concurrent bundles implement element concurrency: the same data is processed concurrently by multiple elements.

chainlet_send(value=None)

Send a value to this element for processing

executor = <chainlet.concurrency.base.LocalExecutor object>
class chainlet.concurrency.base.ConcurrentChain(elements)

Bases: chainlet.chainlink.Chain

A group of chainlets that concurrently process each data chunk

Processing of chainlets is performed using only the requesting threads. This allows thread-safe usage, but requires explicit concurrent usage for blocking actions, such as file I/O or time.sleep(), to be run in parallel.

Concurrent chains implement data concurrency: multiple data is processed concurrently by the same elements.

Note:A ConcurrentChain will always join and fork to handle all data.
chainlet_send(value=None)

Send a value to this element for processing

executor = <chainlet.concurrency.base.LocalExecutor object>
class chainlet.concurrency.base.FutureChainResults(futures)

Bases: object

Chain result computation stored for future and concurrent execution

Acts as an iterable for the actual results. Each future can be executed prematurely by a concurrent executor, with a synchronous fallback as required. Iteration can lazily advance through all available results before blocking.

If any future raises an exception, iteration re-raises the exception at the appropriate position.

Parameters:futures (list[StoredFuture]) – the stored futures for each result chunk
class chainlet.concurrency.base.LocalExecutor(max_workers, identifier='')

Bases: object

Executor for futures using local execution stacks without concurrency

Parameters:
  • max_workers (int or float) – maximum number of threads in pool
  • identifier (str) – base identifier for all workers
static submit(call, *args, **kwargs)

Submit a call for future execution

Returns:future for the call execution
Return type:StoredFuture
class chainlet.concurrency.base.SafeTee(iterable, n=2)

Bases: object

Thread-safe version of itertools.tee()

Parameters:
  • iterable – source iterable to split
  • n (int) – number of safe iterators to produce for iterable
class chainlet.concurrency.base.StoredFuture(call, *args, **kwargs)

Bases: object

Call stored for future execution

Parameters:
  • call – callable to execute
  • args – positional arguments to call
  • kwargs – keyword arguments to call
await_result()

Wait for the future to be realised

realise()

Realise the future if possible

If the future has not been realised yet, do so in the current thread. This will block execution until the future is realised. Otherwise, do not block but return whether the result is already available.

This will not return the result nor propagate any exceptions of the future itself.

Returns:whether the future has been realised
Return type:bool
result

The result from realising the future

If the result is not available, block until done.

Returns:result of the future
Raises:any exception encountered during realising the future
chainlet.concurrency.base.multi_iter(iterable, count=2)

Return count independent, thread-safe iterators for iterable