chainlet.dataflow module¶
Helpers to modify the flow of data through a chain
-
class
chainlet.dataflow.
NoOp
¶ Bases:
chainlet.chainlink.NeutralLink
A noop element that returns any input unchanged
This element is useful when an element is syntactically required, but no action is desired. For example, it can be used to split a pipeline into a modified and unmodifed version:
translate = parse_english >> (NoOp(), to_french, to_german)
Note: Unlike the NeutralLink
, this element is not optimized away by linking.
-
chainlet.dataflow.
joinlet
(chainlet)¶ Decorator to mark a chainlet as joining
Parameters: chainlet (chainlink.ChainLink) – a chainlet to mark as joining Returns: the chainlet modified inplace Return type: chainlink.ChainLink Applying this decorator is equivalent to setting
chain_join
onchainlet
: every data chunk is an iterable containing all data returned by the parents. It is primarily intended for use with decorators that implicitly create a newChainLink
.@joinlet @funclet def average(value: Iterable[Union[int, float]]): "Reduce all data of the last step to its average" values = list(value) # value is an iterable of values due to joining if not values: return 0 return sum(values) / len(values)
-
chainlet.dataflow.
forklet
(chainlet)¶ Decorator to mark a chainlet as forking
Parameters: chainlet (chainlink.ChainLink) – a chainlet to mark as forking Returns: the chainlet modified inplace Return type: chainlink.ChainLink See the note on
joinlet()
for general features. This decorator setschain_fork
, and implementations must provide an iterable.@forklet @funclet def friends(value): "Split operations for every friend of a person" return (person for person in persons if person.is_friend(value))
-
class
chainlet.dataflow.
MergeLink
(*mergers)¶ Bases:
chainlet.chainlink.ChainLink
Element that joins the data flow by merging individual data chunks
Parameters: mergers (tuple[type, callable]) – pairs of type, merger
to merge subclasses oftype
withmerger
Merging works on the assumption that all data chunks from the previous step are of the same type. The type is deduced by peeking at the first chunk, based on which a
merger
is selected to perform the actual merging. The choice of amerger
is re-evaluated at every step; a singleMergeLink
can handle a different type on each step.Selection of a
merger
is based on testingissubclass(type(first), merger_type)
. This check is evaluated in order, iterating throughmergers
before usingdefault_merger
. For example,Counter
precedesdict
to use a summation based merge strategy.Each
merger
must implement the call signature-
merger
(base_value: T, iter_values: Iterable[T]) → T¶
where
base_value
is the value used for selecting themerger
.-
chain_fork
= False¶
-
chain_join
= True¶
-
chainlet_send
(value=None)¶ Send a value to this element for processing
-
default_merger
= [(<class 'numbers.Number'>, <function merge_numerical>), (<class 'collections.Counter'>, <function merge_numerical>), (<class '_abcoll.MutableSequence'>, <function merge_iterable>), (<class '_abcoll.MutableSet'>, <function merge_iterable>), (<class '_abcoll.MutableMapping'>, <function merge_mappings>)]¶ type specific merge function mapping of the form
(type, merger)
-
-
chainlet.dataflow.
either
¶ alias of
chainlet.dataflow.Either