chainlet.dataflow module

Helpers to modify the flow of data through a chain

class chainlet.dataflow.NoOp

Bases: chainlet.chainlink.NeutralLink

A noop element that returns any input unchanged

This element is useful when an element is syntactically required, but no action is desired. For example, it can be used to split a pipeline into a modified and unmodifed version:

translate = parse_english >> (NoOp(), to_french, to_german)
Note:Unlike the NeutralLink, this element is not optimized away by linking.
chainlet.dataflow.joinlet(chainlet)

Decorator to mark a chainlet as joining

Parameters:chainlet (chainlink.ChainLink) – a chainlet to mark as joining
Returns:the chainlet modified inplace
Return type:chainlink.ChainLink

Applying this decorator is equivalent to setting chain_join on chainlet: every data chunk is an iterable containing all data returned by the parents. It is primarily intended for use with decorators that implicitly create a new ChainLink.

@joinlet
@funclet
def average(value: Iterable[Union[int, float]]):
    "Reduce all data of the last step to its average"
    values = list(value)  # value is an iterable of values due to joining
    if not values:
        return 0
    return sum(values) / len(values)
chainlet.dataflow.forklet(chainlet)

Decorator to mark a chainlet as forking

Parameters:chainlet (chainlink.ChainLink) – a chainlet to mark as forking
Returns:the chainlet modified inplace
Return type:chainlink.ChainLink

See the note on joinlet() for general features. This decorator sets chain_fork, and implementations must provide an iterable.

@forklet
@funclet
def friends(value):
    "Split operations for every friend of a person"
    return (person for person in persons if person.is_friend(value))

Bases: chainlet.chainlink.ChainLink

Element that joins the data flow by merging individual data chunks

Parameters:mergers (tuple[type, callable]) – pairs of type, merger to merge subclasses of type with merger

Merging works on the assumption that all data chunks from the previous step are of the same type. The type is deduced by peeking at the first chunk, based on which a merger is selected to perform the actual merging. The choice of a merger is re-evaluated at every step; a single MergeLink can handle a different type on each step.

Selection of a merger is based on testing issubclass(type(first), merger_type). This check is evaluated in order, iterating through mergers before using default_merger. For example, Counter precedes dict to use a summation based merge strategy.

Each merger must implement the call signature

merger(base_value: T, iter_values: Iterable[T]) → T

where base_value is the value used for selecting the merger.

chain_fork = False
chain_join = True
chainlet_send(value=None)

Send a value to this element for processing

default_merger = [(<class 'numbers.Number'>, <function merge_numerical>), (<class 'collections.Counter'>, <function merge_numerical>), (<class '_abcoll.MutableSequence'>, <function merge_iterable>), (<class '_abcoll.MutableSet'>, <function merge_iterable>), (<class '_abcoll.MutableMapping'>, <function merge_mappings>)]

type specific merge function mapping of the form (type, merger)

chainlet.dataflow.either

alias of chainlet.dataflow.Either