metafunctions is a function composition library for python.
It allows for data pipeline creation separate from execution, so instead of writing:
result = step3(step2(step1(data)))
#or
result_1 = step1(data)
result_2 = step2(result_1)
result_3 = step3(result_2)
You can write:
pipeline = step1 | step2 | step3
result = pipeline(data)
Well you may not need a new syntax, but the ability to compose a data pipeline before executing it does impart some advantages, such as:
# load, parse, clean, validate, and format are functions
preprocess = load | parse | clean | validate | format
# preprocess is now a MetaFunction, and can be reused
clean_data1 = preprocess('path/to/data/file')
clean_data2 = preprocess('path/to/different/file')
# Preprocess can be included in larger pipelines
pipeline = preprocess | step1 | step2 | step3
step1 | step2 | step3
is both read and executed from left to right, unlike step3(step2(step1()))
, which is executed from innermost function outwards.str
will tell you:
>>> str(preprocess)
"(load | parse | clean | validate | format)"
step1
to both steps 2
and 3
, then sum the results? The traditional approach requires an intermediate variable and can quickly become unwieldy:
result1 = step1(data)
result2 = step2(result1) + step3(result1)
Using metafunctions, you can declare a pipeline that does the same thing:
pipeline = step1 | step2 + step3
result = pipeline(data)
MetaFunctions supports python 3.5+ (tested to 3.10+)
pip install metafunctions
Conceptually, a MetaFunction is a function that contains other functions. When you call a MetaFunction, the MetaFunction calls the functions it contains.
You can create a MetaFunction using the node
decorator:
from metafunctions import node
@node
def get_name(prompt):
return input(prompt)
@node
def say_hello(name):
return 'Hello {}!'.format(name)
MetaFunctions override certain operators to allow for composition. For example, the following creates a new MetaFunction that combines get_name
and say_hello
:
greet = get_name | say_hello
When we call the greet
MetaFunction, it calls both its internal functions in turn.
# First, `get_name` is called, which prints our prompt to the screen.
# If we enter 'Tom' at the prompt, the second function returns the string 'Hello Tom!'
greeting = greet('Please enter your name ')
print(greeting) # Hello Tom!
MetaFunctions are also capable of upgrading regular functions to MetaFunctions at composition time, so we can simplify our example by composing say_hello
directly with the builtin input
and print
functions:
>>> greet = input | say_hello | print
>>> greet('Please enter your name: ')
# Please enter your name: Tom
# Hello Tom!
Errors in composed functions can be confusing. If an exception occurs in a MetaFunction, the exception traceback will tell you which function the exception occurred in. But what if that function appears multiple times in the data pipeline?
Imagine this function, which downloads stringified numeric data from a web api:
>>> compute_value = (query_volume | float) * (query_price | float)
>>> compute_value('http://prices.com/123')
Here we've assumed that query_volume
and query_price
will return strings that convert cleanly to floats, but what if something goes wrong?
>>> compute_value('http://prices.com/123')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
...
builtins.ValueError: could not convert string to float: '$800'
We can deduce that float conversion failed, but which float function raised the exception? MetaFunctions addresses this by adding the locate_error
metafunction decorator, which adds a location information string to any exception raised within the pipeline:
>>> from metafunctions import locate_error
>>> with_location = locate_error(compute_value)
>>> with_location('http://prices.com/123')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
...
builtins.ValueError: could not convert string to float: '$800'
Occured in the following function: ((query_volume | float) + (query_price | ->float<-))
Metafunctions provides utilities for constructing advanced function pipelines.
store / recall: store the output of the preceding function, and recall it later to pass to a different function. For example:
# The following pipeline sends the output of `a` to `b`, and also adds it to the output of `c`
p = a | store('a') | b | recall('a') + c
mmap: A MetaFunction decorator that wraps a function (or MetaFunction) and calls it once per item in the input it receives. This allows you to create loops in function pipelines:
(1, 2, 3) | mmap(process) # <- equivalent to (1, 2, 3) | (process & process & process)
mmap
duplicates the behaviour of the builtin map
function.
star: Calls the wrapped MetaFunction with *args instead of args (It's analogous to lambda args, **kwargs: metafunction(*args, **kwargs)
). This allows you to incorporate functions that accept more than one parameter into your function pipeline:
@node
def f(result1, result2):
...
# When cmp is called, f will receive the results of both a and b as positional args
cmp = (a & b) | star(f)
star
can be combined with the above mmap
to duplicate the behaviour of itertools.starmap
:
starmap = star(mmap(f))
For more discussion of star
, see this pull request
concurrent:
experimental, requires an os that provides os.fork()
Consider the following long running MetaFunction:
process_companies = get_company_data | filter | process
process_customers = get_customer_data | filter | process
process_all = process_companies & process_customers
Assuming the component functions in the process_all
MetaFunction follow good functional practices and do not have side effects, it's easy to see that process_companies
and process_customers
are independent of each other. If that's the case, we can safely execute them in parallel. The concurrent
metafunction decorator allows you to specify steps in the function pipeline to execute in parallel:
from metafunctions import concurrent
do_large_calculation_async = concurrent(process_companies + process_customers)
concurrent
can be combined with mmap
to create an asynchronous map, similar to multiprocessing.pool.map
:
map_async = concurrent(mmap(f))