chainlet.concurrency.base module¶
-
class
chainlet.concurrency.base.
ConcurrentBundle
(elements)¶ Bases:
chainlet.chainlink.Bundle
A group of chainlets that concurrently process each data chunk
Processing of chainlets is performed using only the requesting threads. This allows thread-safe usage, but requires explicit concurrent usage for blocking actions, such as file I/O or
time.sleep()
, to be run in parallel.Concurrent bundles implement element concurrency: the same data is processed concurrently by multiple elements.
-
chainlet_send
(value=None)¶ Send a value to this element for processing
-
executor
= <chainlet.concurrency.base.LocalExecutor object>¶
-
-
class
chainlet.concurrency.base.
ConcurrentChain
(elements)¶ Bases:
chainlet.chainlink.Chain
A group of chainlets that concurrently process each data chunk
Processing of chainlets is performed using only the requesting threads. This allows thread-safe usage, but requires explicit concurrent usage for blocking actions, such as file I/O or
time.sleep()
, to be run in parallel.Concurrent chains implement data concurrency: multiple data is processed concurrently by the same elements.
Note: A ConcurrentChain
will always join and fork to handle all data.-
chainlet_send
(value=None)¶ Send a value to this element for processing
-
executor
= <chainlet.concurrency.base.LocalExecutor object>¶
-
-
class
chainlet.concurrency.base.
FutureChainResults
(futures)¶ Bases:
object
Chain result computation stored for future and concurrent execution
Acts as an iterable for the actual results. Each future can be executed prematurely by a concurrent executor, with a synchronous fallback as required. Iteration can lazily advance through all available results before blocking.
If any future raises an exception, iteration re-raises the exception at the appropriate position.
Parameters: futures (list[StoredFuture]) – the stored futures for each result chunk
-
class
chainlet.concurrency.base.
LocalExecutor
(max_workers, identifier='')¶ Bases:
object
Executor for futures using local execution stacks without concurrency
Parameters: -
static
submit
(call, *args, **kwargs)¶ Submit a call for future execution
Returns: future for the call execution Return type: StoredFuture
-
static
-
class
chainlet.concurrency.base.
SafeTee
(iterable, n=2)¶ Bases:
object
Thread-safe version of
itertools.tee()
Parameters: - iterable – source iterable to split
- n (int) – number of safe iterators to produce for iterable
-
class
chainlet.concurrency.base.
StoredFuture
(call, *args, **kwargs)¶ Bases:
object
Call stored for future execution
Parameters: - call – callable to execute
- args – positional arguments to
call
- kwargs – keyword arguments to
call
-
await_result
()¶ Wait for the future to be realised
-
realise
()¶ Realise the future if possible
If the future has not been realised yet, do so in the current thread. This will block execution until the future is realised. Otherwise, do not block but return whether the result is already available.
This will not return the result nor propagate any exceptions of the future itself.
Returns: whether the future has been realised Return type: bool
-
result
¶ The result from realising the future
If the result is not available, block until done.
Returns: result of the future Raises: any exception encountered during realising the future
-
chainlet.concurrency.base.
multi_iter
(iterable, count=2)¶ Return count independent, thread-safe iterators for iterable