chainlet package

Bases: object

BaseClass for elements in a chain

A chain is created by binding ChainLinks together. This is a directional process: a binding is always made between parent and child. Each child can be the parent to another child, and vice versa.

The direction dictates how data is passed along the chain:

  • A parent may send() a data chunk to a child.
  • A child may pull the next() data chunk from the parent.

Chaining is done with >> and << operators as parent >> child and child << parent. Forking and joining of chains requires a sequence of multiple elements as parent or child.

parent >> child
child << parent

Bind child and parent. Both directions of the statement are equivalent: if a is made a child of b, then b` is made a parent of a, and vice versa.

parent >> (child_a, child_b, ...)
parent >> [child_a, child_b, ...]
parent >> {child_a, child_b, ...}

Bind child_a, child_b, etc. as children of parent.

(parent_a, parent_b, ...) >> child
[parent_a, parent_b, ...] >> child
{parent_a, parent_b, ...} >> child

Bind parent_a, parent_b, etc. as parents of child.

Aside from binding, every ChainLink implements the Generator-Iterator Methods interface:

iter(link)

Create an iterator over all data chunks that can be created. Empty results are ignored.

link.__next__()
link.send(None)
next(link)

Create a new chunk of data. Raise StopIteration if there are no more chunks. Implicitly used by next(link).

link.send(chunk)

Process a data chunk, and return the result.

Note

The next variants contrast with iter by also returning empty chunks. Use variations of next(iter(link)) for an explicit iteration.

link.chainlet_send(chunk)

Process a data chunk locally, and return the result.

This method implements data processing in an element; subclasses must overwrite it to define how they handle data.

This method should only be called to explicitly traverse elements in a chain. Client code should use next(link) and link.send(chunk) instead.

link.throw(type[, value[, traceback]])

Raises an exception of type inside the link. The link may either return a final result (including None), raise StopIteration if there are no more results, or propagate any other, unhandled exception.

link.close()

Close the link, cleaning up any resources.. A closed link may raise RuntimeError if data is requested via next or processed via send.

When used in a chain, each ChainLink is distinguished by its handling of input and output. There are two attributes to signal the behaviour when chained. These specify whether the element performs a 1 -> 1, n -> 1, 1 -> m or n -> m processing of data.

chain_join

A bool indicating that the element expects the values of all preceding elements at once. That is, the chunk passed in via send() is an iterable providing the return values of the previous elements.

chain_fork

A bool indicating that the element produces several values at once. That is, the return value is an iterable of data chunks, each of which should be passed on independently.

To prematurely stop the traversal of a chain, 1 -> n and n -> m elements should return an empty container. Any 1 -> 1 and n -> 1 element must raise StopTraversal.

chain_fork = False

whether this element produces several data chunks at once

chain_join = False

whether this element processes several data chunks at once

chain_types = <chainlet.primitives.linker.LinkPrimitives object>
chainlet_send(value=None)

Send a value to this element for processing

close()

Close this element, freeing resources and possibly blocking further interactions

dispatch(values)

Dispatch multiple values to this element for processing

next()
send(value=None)

Send a single value to this element for processing

static throw(type, value=None, traceback=None)

Throw an exception in this element

exception chainlet.StopTraversal

Bases: exceptions.Exception

Stop the traversal of a chain

Any chain element raising StopTraversal signals that subsequent elements of the chain should not be visited with the current value.

Raising StopTraversal does not mean the element is exhausted. It may still produce values regularly on future traversal. If an element will never produce values again, it should raise ChainExit.

Note:This signal explicitly affects the current chain only. It does not affect other, parallel chains of a graph.

Changed in version 1.3: The return_value parameter was removed.

chainlet.funclet(function)

Convert a function to a ChainLink

@funclet
def square(value):
    "Convert every data chunk to its numerical square"
    return value ** 2

The data chunk value is passed anonymously as the first positional parameter. In other words, the wrapped function should have the signature:

.slave(value, *args, **kwargs)
chainlet.genlet(generator_function=None, prime=True)

Decorator to convert a generator function to a ChainLink

Parameters:
  • generator_function (generator) – the generator function to convert
  • prime (bool) – advance the generator to the next/first yield

When used as a decorator, this function can also be called with and without keywords.

@genlet
def pingpong():
    "Chainlet that passes on its value"
    last = yield
    while True:
        last = yield last

@genlet(prime=True)
def produce():
    "Chainlet that produces a value"
    while True:
        yield time.time()

@genlet(True)
def read(iterable):
    "Chainlet that reads from an iterable"
    for item in iterable:
        yield item
chainlet.joinlet(chainlet)

Decorator to mark a chainlet as joining

Parameters:chainlet (ChainLink) – a chainlet to mark as joining
Returns:the chainlet modified inplace
Return type:ChainLink

Applying this decorator is equivalent to setting chain_join on chainlet: every data chunk is an iterable containing all data returned by the parents. It is primarily intended for use with decorators that implicitly create a new ChainLink.

@joinlet
@funclet
def average(value: Iterable[Union[int, float]]):
    "Reduce all data of the last step to its average"
    values = list(value)  # value is an iterable of values due to joining
    if not values:
        return 0
    return sum(values) / len(values)
chainlet.forklet(chainlet)

Decorator to mark a chainlet as forking

Parameters:chainlet (ChainLink) – a chainlet to mark as forking
Returns:the chainlet modified inplace
Return type:ChainLink

See the note on joinlet() for general features. This decorator sets chain_fork, and implementations must provide an iterable.

@forklet
@funclet
def friends(value, persons):
    "Split operations for every friend of a person"
    return (person for person in persons if person.is_friend(value))