chainlet.dataflow module¶
Helpers to modify the flow of data through a chain
-
class
chainlet.dataflow.NoOp¶ Bases:
chainlet.primitives.neutral.NeutralLinkA noop element that returns any input unchanged
This element is useful when an element is syntactically required, but no action is desired. For example, it can be used to split a pipeline into a modified and unmodifed version:
translate = parse_english >> (NoOp(), to_french, to_german)
Note: Unlike the NeutralLink, this element is not optimized away by linking.
-
chainlet.dataflow.joinlet(chainlet)¶ Decorator to mark a chainlet as joining
Parameters: chainlet ( ChainLink) – a chainlet to mark as joiningReturns: the chainlet modified inplace Return type: ChainLinkApplying this decorator is equivalent to setting
chain_joinonchainlet: every data chunk is an iterable containing all data returned by the parents. It is primarily intended for use with decorators that implicitly create a newChainLink.@joinlet @funclet def average(value: Iterable[Union[int, float]]): "Reduce all data of the last step to its average" values = list(value) # value is an iterable of values due to joining if not values: return 0 return sum(values) / len(values)
-
chainlet.dataflow.forklet(chainlet)¶ Decorator to mark a chainlet as forking
Parameters: chainlet ( ChainLink) – a chainlet to mark as forkingReturns: the chainlet modified inplace Return type: ChainLinkSee the note on
joinlet()for general features. This decorator setschain_fork, and implementations must provide an iterable.@forklet @funclet def friends(value, persons): "Split operations for every friend of a person" return (person for person in persons if person.is_friend(value))
-
class
chainlet.dataflow.MergeLink(*mergers)¶ Bases:
chainlet.primitives.link.ChainLinkElement that joins the data flow by merging individual data chunks
Parameters: mergers (tuple[type, callable]) – pairs of type, mergerto merge subclasses oftypewithmergerMerging works on the assumption that all data chunks from the previous step are of the same type. The type is deduced by peeking at the first chunk, based on which a
mergeris selected to perform the actual merging. The choice of amergeris re-evaluated at every step; a singleMergeLinkcan handle a different type on each step.Selection of a
mergeris based on testingissubclass(type(first), merger_type). This check is evaluated in order, iterating throughmergersbefore usingdefault_merger. For example,Counterprecedesdictto use a summation based merge strategy.Each
mergermust implement the call signature-
merger(base_value: T, iter_values: Iterable[T]) → T¶
where
base_valueis the value used for selecting themerger.-
chain_fork= False¶
-
chain_join= True¶
-
chainlet_send(value=None)¶ Send a value to this element for processing
-
default_merger= [(<class 'numbers.Number'>, <function merge_numerical>), (<class 'collections.Counter'>, <function merge_numerical>), (<class '_abcoll.MutableSequence'>, <function merge_iterable>), (<class '_abcoll.MutableSet'>, <function merge_iterable>), (<class '_abcoll.MutableMapping'>, <function merge_mappings>)]¶ type specific merge function mapping of the form
(type, merger)
-
-
chainlet.dataflow.either¶ alias of
chainlet.dataflow.Either