Source code for state_chain

"""Model algorithms as a list of functions operating on a shared state dict.


Installation
------------

:py:mod:`state_chain` is available on `GitHub`_ and on `PyPI`_::

    $ pip install state_chain

The version of :py:mod:`state_chain` documented here has been `tested`_ against
Python 2.7, 3.4, and 3.5 on Ubuntu.

:py:mod:`state_chain` is MIT-licensed.


.. _GitHub: https://github.com/AspenWeb/state_chain.py
.. _PyPI: https://pypi.python.org/pypi/state_chain
.. _tested: https://travis-ci.org/AspenWeb/state_chain.py


Tutorial
--------

This module provides an abstraction for implementing arbitrary algorithms as a
list of functions that operate on a shared state dictionary. Algorithms defined
this way are easy to arbitrarily modify at run time, and they provide cascading
exception handling.

To get started, define some functions:

    >>> def foo():
    ...     return {'baz': 1}
    ...
    >>> def bar():
    ...     return {'buz': 2}
    ...
    >>> def bloo(baz, buz):
    ...     return {'sum': baz + buz}
    ...


Each function returns a :py:class:`dict`, which is used to update the state of
the current run of the algorithm. Names from the state dictionary are made
available to downstream functions via :py:mod:`dependency_injection`. Now
make an :py:class:`StateChain` object:

    >>> from state_chain import StateChain
    >>> blah = StateChain(foo, bar, bloo)


The functions you passed to the constructor are loaded into a list:

    >>> blah.functions          #doctest: +ELLIPSIS
    [<function foo ...>, <function bar ...>, <function bloo ...>]


Now you can use :py:func:`~StateChain.run` to run the functions. You'll get back
a dictionary representing the algorithm's final state:

    >>> state = blah.run()
    >>> state['sum']
    3

Okay!


Modifying a State Chain
+++++++++++++++++++++++

Let's add two functions to the state chain. First let's define the functions:

    >>> def uh_oh(baz):
    ...     if baz == 2:
    ...         raise heck
    ...
    >>> def deal_with_it(exception):
    ...     print("I am dealing with it!")
    ...     return {'exception': None}
    ...


Now let's interpolate them into our state chain. Let's put the ``uh_oh``
function between ``bar`` and ``bloo``:

    >>> blah.insert_before('bloo', uh_oh)
    >>> blah.functions      #doctest: +ELLIPSIS
    [<function foo ...>, <function bar ...>, <function uh_oh ...>, <function bloo ...>]


Then let's add our exception handler at the end:

    >>> blah.insert_after('bloo', deal_with_it)
    >>> blah.functions      #doctest: +ELLIPSIS
    [<function foo ...>, <function bar ...>, <function uh_oh ...>, <function bloo ...>, <function deal_with_it ...>]


Just for kicks, let's remove the ``foo`` function while we're at it:

    >>> blah.remove('foo')
    >>> blah.functions      #doctest: +ELLIPSIS
    [<function bar ...>, <function uh_oh ...>, <function bloo ...>, <function deal_with_it ...>]


If you're making extensive changes to a state chain, you should feel free to
directly manipulate the list of functions, rather than using the more
cumbersome :py:meth:`~state_chain.StateChain.insert_before`,
:py:meth:`~state_chain.StateChain.insert_after`, and
:py:meth:`~state_chain.StateChain.remove` methods. We could have achieved the
same result like so:

    >>> blah.functions = [ blah['bar']
    ...                  , uh_oh
    ...                  , blah['bloo']
    ...                  , deal_with_it
    ...                   ]
    >>> blah.functions      #doctest: +ELLIPSIS
    [<function bar ...>, <function uh_oh ...>, <function bloo ...>, <function deal_with_it ...>]


Either way, what happens when we run it? Since we no longer have the ``foo``
function providing a value for ``bar``, we'll need to supply that using a
keyword argument to :py:func:`~StateChain.run`:

    >>> state = blah.run(baz=2)
    I am dealing with it!


Exception Handling
++++++++++++++++++

Whenever a function raises an exception, like ``uh_oh`` did in the example
above, :py:class:`~StateChain.run` captures the exception and populates an
``exception`` key in the current run's state dictionary. While ``exception`` is
not ``None``, any normal function is skipped, and only functions that ask for
``exception`` get called. It's like a fast-forward. So in our example
``deal_with_it`` got called, but ``bloo`` didn't, which is why there is no
``sum``:

    >>> 'sum' in state
    False


If we run without tripping the exception in ``uh_oh`` then we have ``sum`` at
the end:

    >>> blah.run(baz=5)['sum']
    7


API Reference
-------------

"""
from __future__ import absolute_import, division, print_function, unicode_literals

import collections
import opcode
import sys
import types

from dependency_injection import get_signature, resolve_dependencies


__version__ = '1.3.0-dev'
PYTHON_2 = sys.version_info < (3, 0, 0)


if PYTHON_2:
    def exec_(some_python, namespace):
        # Have to double-exec because the Python 2 form is SyntaxError in 3.
        exec("exec some_python in namespace")
else:
    def exec_(some_python, namespace):
        exec(some_python, namespace)


[docs]class FunctionNotFound(KeyError): """Used when a function is not found in a state_chain function list (subclasses :py:exc:`KeyError`). """
[docs] def __str__(self): return "The function '{0}' isn't in this state chain.".format(*self.args)
_NO_PREVIOUS = object() def _iter_with_previous(iterable): prev = _NO_PREVIOUS for o in iterable: yield o, prev prev = o
[docs]class StateChain(object): """Model an algorithm as a list of functions operating on a shared state dictionary. :param functions: a sequence of functions in the order they are to be run :param bool raise_immediately: Whether to re-raise exceptions immediately. :py:class:`False` by default, this can only be set as a keyword argument Each function in the state chain must return a mapping or :py:class:`None`. If it returns a mapping, the mapping will be used to update a state dictionary for the current run of the algorithm. Functions in the state chain can use any name from the current state dictionary as a parameter, and the value will then be supplied dynamically via :py:mod:`dependency_injection`. See the :py:func:`run` method for details on exception handling. """ functions = None #: A list of functions comprising the algorithm. default_raise_immediately = False START = -1 END = -2
[docs] def __init__(self, *functions, **kw): self.default_raise_immediately = kw.pop('raise_immediately', False) if functions: if not isinstance(functions[0], collections.Callable): raise TypeError("Not a function: {0}".format(repr(functions[0]))) self.functions = list(functions) self._signatures = {} self.debug = _DebugMethod(self)
[docs] def run(self, _raise_immediately=None, _return_after=None, **state): """Run through the functions in the :py:attr:`functions` list. :param bool _raise_immediately: if not ``None``, will override any default for ``raise_immediately`` that was set in the constructor :param str _return_after: if not ``None``, return after calling the function with this name :param dict state: remaining keyword arguments are used for the initial state dictionary for this run of the state chain :raises: :py:exc:`FunctionNotFound`, if there is no function named ``_return_after`` :returns: a dictionary representing the final state The state dictionary is initialized with three items (their default values can be overriden using keyword arguments to :py:func:`run`): - ``chain`` - a reference to the parent :py:class:`StateChain` instance - ``state`` - a circular reference to the state dictionary - ``exception`` - ``None`` For each function in the :py:attr:`functions` list, we look at the function signature and compare it to the current value of ``exception`` in the state dictionary. If ``exception`` is ``None`` then we skip any function that asks for ``exception``, and if ``exception`` is *not* ``None`` then we only call functions that *do* ask for it. The upshot is that any function that raises an exception will cause us to fast-forward to the next exception-handling function in the list. Here are some further notes on exception handling: - If a function provides a default value for ``exception``, then that function will be called whether or not there is an exception being handled. - You should return ``{'exception': None}`` to reset exception handling. Under Python 2 we will call ``sys.exc_clear`` for you (under Python 3 exceptions are cleared automatically at the end of except blocks). - If an exception is raised by a function handling another exception, then ``exception`` is set to the new one and we look for the next exception handler. - If ``exception`` is not ``None`` after all functions have been run, then we re-raise it. - If ``raise_immediately`` evaluates to ``True`` (looking first at any per-call ``_raise_immediately`` and then at the instance default), then we re-raise any exception immediately instead of fast-forwarding to the next exception handler. - When an exception occurs, the functions that accept an ``exception`` argument will be called from inside the ``except:`` block, so you can access ``sys.exc_info`` (which contains the traceback) even under Python 3. """ if _raise_immediately is None: _raise_immediately = self.default_raise_immediately if _return_after is not None: if _return_after not in self.get_names(): raise FunctionNotFound(_return_after) if 'chain' not in state: state['chain'] = self if 'state' not in state: state['state'] = state if 'exception' not in state: state['exception'] = None # The `for` loop in the `loop()` function below can be entered multiple # times since that function calls itself when an exception is raised. # If we looped over the `functions` list we'd be starting from the top # at each exception, and that's not what we want, so we use an iterator # instead to keep track of where we are in the state chain. functions_iter = _iter_with_previous(self.functions) def loop(in_except): signatures = self._signatures for function, prev_func in functions_iter: if _return_after is not None and prev_func is not _NO_PREVIOUS: if prev_func.__name__ == _return_after: break try: if function not in signatures: signatures[function] = get_signature(function) deps = resolve_dependencies(signatures[function], state) skip = ( # When function wants exception but we don't have it. not in_except and 'exception' in deps.signature.required or # When function doesn't want exception but we have it. in_except and 'exception' not in deps.signature.parameters ) if not skip: new_state = function(**deps.as_kwargs) if new_state is not None: state.update(new_state) if in_except and state['exception'] is None: # exception is cleared, return to normal flow if PYTHON_2: sys.exc_clear() return except: if _raise_immediately: raise state['exception'] = sys.exc_info()[1] loop(True) if in_except: # an exception occurred while we were handling another # exception, but now it's been cleared, so we return to # the normal flow return if in_except: raise # exception hasn't been handled, reraise loop(False) return state
[docs] def __getitem__(self, name): """Return the function in the :py:attr:`functions` list named ``name``, or raise :py:exc:`FunctionNotFound`. >>> def foo(): pass >>> algo = StateChain(foo) >>> algo['foo'] is foo True >>> algo['bar'] Traceback (most recent call last): ... FunctionNotFound: The function 'bar' isn't in this state chain. """ func = None for candidate in self.functions: if candidate.__name__ == name: func = candidate break if func is None: raise FunctionNotFound(name) return func
[docs] def get_names(self): """Returns a list of the names of the functions in the :py:attr:`functions` list. """ return [f.__name__ for f in self.functions]
[docs] def insert_before(self, name, *newfuncs): """Insert ``newfuncs`` in the :py:attr:`functions` list before the function named ``name``, or raise :py:exc:`FunctionNotFound`. >>> def foo(): pass >>> algo = StateChain(foo) >>> def bar(): pass >>> algo.insert_before('foo', bar) >>> algo.get_names() ['bar', 'foo'] >>> def baz(): pass >>> algo.insert_before('foo', baz) >>> algo.get_names() ['bar', 'baz', 'foo'] >>> def bal(): pass >>> algo.insert_before(StateChain.START, bal) >>> algo.get_names() ['bal', 'bar', 'baz', 'foo'] >>> def bah(): pass >>> algo.insert_before(StateChain.END, bah) >>> algo.get_names() ['bal', 'bar', 'baz', 'foo', 'bah'] """ if name == self.START: i = 0 elif name == self.END: i = len(self.functions) else: i = self.functions.index(self[name]) self.functions[i:i] = newfuncs
[docs] def insert_after(self, name, *newfuncs): """Insert ``newfuncs`` in the :py:attr:`functions` list after the function named ``name``, or raise :py:exc:`FunctionNotFound`. >>> def foo(): pass >>> algo = StateChain(foo) >>> def bar(): pass >>> algo.insert_after('foo', bar) >>> algo.get_names() ['foo', 'bar'] >>> def baz(): pass >>> algo.insert_after('bar', baz) >>> algo.get_names() ['foo', 'bar', 'baz'] >>> def bal(): pass >>> algo.insert_after(StateChain.START, bal) >>> algo.get_names() ['bal', 'foo', 'bar', 'baz'] >>> def bah(): pass >>> algo.insert_before(StateChain.END, bah) >>> algo.get_names() ['bal', 'foo', 'bar', 'baz', 'bah'] """ if name == self.START: i = 0 elif name == self.END: i = len(self.functions) else: i = self.functions.index(self[name]) + 1 self.functions[i:i] = newfuncs
[docs] def remove(self, *names): """Remove the functions named ``name`` from the :py:attr:`functions` list, or raise :py:exc:`FunctionNotFound`. """ for name in names: func = self[name] self.functions.remove(func)
[docs] @classmethod def from_dotted_name(cls, dotted_name, **kw): """Construct a new instance from functions defined in a Python module. :param dotted_name: the dotted name of a Python module that contains functions that will be added to a state chain in the order of appearance. :param kw: keyword arguments are passed through to the default constructor This is a convenience constructor to instantiate a state chain based on functions defined in a regular Python file. For example, create a file named ``blah_state_chain.py`` on your ``PYTHONPATH``:: def foo(): return {'baz': 1} def bar(): return {'buz': 2} def bloo(baz, buz): return {'sum': baz + buz} Then pass the dotted name of the file to this constructor: >>> blah = StateChain.from_dotted_name('blah_state_chain') All functions defined in the file whose name doesn't begin with ``_`` are loaded into a list in the order they're defined in the file, and this list is passed to the default class constructor. >>> blah.functions #doctest: +ELLIPSIS [<function foo ...>, <function bar ...>, <function bloo ...>] For this specific module, the code above is equivalent to: >>> from blah_state_chain import foo, bar, bloo >>> blah = StateChain(foo, bar, bloo) """ module = cls._load_module_from_dotted_name(dotted_name) functions = cls._load_functions_from_module(module) return cls(*functions, **kw)
[docs] def debug(self, function): """Given a function, return a copy of the function with a breakpoint immediately inside it. :param function function: a function object This method wraps the module-level function :py:func:`state_chain.debug`, adding three conveniences. First, calling this method not only returns a copy of the function with a breakpoint installed, it actually replaces the old function in the state chain with the copy. So you can do: >>> def foo(): ... pass ... >>> algo = StateChain(foo) >>> algo.debug(foo) #doctest: +ELLIPSIS <function foo at ...> >>> algo.run() #doctest: +SKIP (Pdb) Second, it provides a method on itself to install via function name instead of function object: >>> algo = StateChain(foo) >>> algo.debug.by_name('foo') #doctest: +ELLIPSIS <function foo at ...> >>> algo.run() #doctest: +SKIP (Pdb) Third, it aliases the :py:meth:`~DebugMethod.by_name` method as :py:meth:`~_DebugMethod.__getitem__` so you can use mapping access as well: >>> algo = StateChain(foo) >>> algo.debug['foo'] #doctest: +ELLIPSIS <function foo at ...> >>> algo.run() #doctest: +SKIP (Pdb) Why would you want to do that? Well, let's say you've written a library that includes a state chain: >>> def foo(): pass ... >>> def bar(): pass ... >>> def baz(): pass ... >>> blah = StateChain(foo, bar, baz) And now some user of your library ends up rebuilding the functions list using some of the original functions and some of their own: >>> def mine(): pass ... >>> def precious(): pass ... >>> blah.functions = [ blah['foo'] ... , mine ... , blah['bar'] ... , precious ... , blah['baz'] ... ] Now the user of your library wants to debug ``blah['bar']``, but since they're using your code as a library it's inconvenient for them to drop a breakpoint in your source code. With this feature, they can just insert ``.debug`` in their own source code like so: >>> blah.functions = [ blah['foo'] ... , mine ... , blah.debug['bar'] ... , precious ... , blah['baz'] ... ] Now when they run the state chain they'll hit a pdb breakpoint just inside your ``bar`` function: >>> blah.run() #doctest: +SKIP (Pdb) """ raise NotImplementedError # Should be overriden by _DebugMethod in constructor.
# Helpers for loading from a file. # ================================ @staticmethod def _load_module_from_dotted_name(dotted_name): class RootModule(object): pass module = RootModule() # let's us use getattr to traverse down exec_('import {0}'.format(dotted_name), module.__dict__) for name in dotted_name.split('.'): module = getattr(module, name) return module @staticmethod def _load_functions_from_module(module): """Given a module object, return a list of functions from the module, sorted by lineno. """ functions_with_lineno = [] for name in dir(module): if name.startswith('_'): continue obj = getattr(module, name) if type(obj) != types.FunctionType: continue func = obj lineno = func.__code__.co_firstlineno functions_with_lineno.append((lineno, func)) functions_with_lineno.sort() return [f for i, f in functions_with_lineno]
# Debugging Helpers # ================= class _DebugMethod(object): # See docstring at StateChain.debug. def __init__(self, chain): self.chain = chain def __call__(self, function): debugging_function = debug(function) for i, candidate in enumerate(self.chain.functions): if candidate is function: self.chain.functions[i] = debugging_function return debugging_function def by_name(self, name): return self(self.chain[name]) __getitem__ = by_name
[docs]def debug(function): """Given a function, return a copy of the function with a breakpoint immediately inside it. :param function function: a function object Okay! This is fun. :-) This is a decorator, because it takes a function and returns a function. But it would be useless in situations where you could actually decorate a function using the normal decorator syntax, because then you have the source code in front of you and you could just insert the breakpoint yourself. It's also pretty useless when you have a function object that you're about to call, because you can simply add a ``set_trace`` before the function call and then step into the function. No: this helper is only useful when you've got a function object that you want to debug, and you have neither the definition nor the call conveniently at hand. See the method :py:meth:`StateChain.debug` for an explanation of how this situation arises with the :py:mod:`state_chain` module. For our purposes here, it's enough to know that you can wrap any function: >>> def foo(bar, baz): ... return bar + baz ... >>> func = debug(foo) And then calling the function will drop you into pdb: >>> func(1, 2) #doctest: +SKIP (Pdb) The fun part is how this is implemented: we dynamically modify the function's bytecode to insert the statements ``import pdb; pdb.set_trace()``. Neat, huh? :-) """ # Build bytecode for a set_trace call. # ==================================== NOARG = object() codes = ( ('LOAD_CONST', 0) , ('LOAD_CONST', None) , ('IMPORT_NAME', 'pdb') , ('STORE_GLOBAL', 'pdb') , ('LOAD_GLOBAL', 'pdb') , ('LOAD_ATTR', 'set_trace') , ('CALL_FUNCTION', 0) , ('POP_TOP', NOARG) ) new_names = function.__code__.co_names new_consts = function.__code__.co_consts new_code = b'' addr_pad = 0 if PYTHON_2: _chr = chr else: # In Python 3 chr returns a str (== 2's unicode), not a bytes (== 2's # str). However, the func_new constructor wants a bytes for both code # and lnotab. We use latin-1 to encode these to bytes, per the docs: # # The simplest method is to map the codepoints 0-255 to the bytes # 0x0-0xff. This means that a string object that contains codepoints # above U+00FF can't be encoded with this method (which is called # 'latin-1' or 'iso-8859-1'). # # http://docs.python.org/3/library/codecs.html#encodings-and-unicode _chr = lambda x: chr(x).encode('latin-1') for name, arg in codes: # This is the inverse of the subset of dis.disassemble needed to handle # our use case. addr_pad += 1 op = opcode.opmap[name] new_code += _chr(op) if op >= opcode.HAVE_ARGUMENT: addr_pad += 2 if op in opcode.hasconst: if arg not in new_consts: new_consts += (arg,) val = new_consts.index(arg) elif op in opcode.hasname: if PYTHON_2: # In Python 3, func_new wants str (== unicode) for names. arg = arg.encode('ASCII') if arg not in new_names: new_names += (arg,) val = new_names.index(arg) elif name == 'CALL_FUNCTION': val = arg # number of args new_code += _chr(val) + _chr(0) # Finish inserting our new bytecode in front of the old. # ====================================================== # Loop over old_code and append it to new_code, fixing up absolute jump # references along the way. Then fix up the line number table. old_code = function.__code__.co_code i = 0 n = len(old_code) while i < n: c = old_code[i] if type(c) is int: # In Python 3, index access on a bytestring returns an int. c = _chr(c) op = ord(c) i += 1 new_code += c if op >= opcode.HAVE_ARGUMENT: if PYTHON_2: oparg = ord(old_code[i]) + ord(old_code[i+1])*256 else: oparg = old_code[i] + old_code[i+1]*256 if op in opcode.hasjabs: oparg += addr_pad i += 2 new_code += _chr(oparg) + _chr(0) old = function.__code__.co_lnotab new_lnotab = ( old[:2] + _chr( (ord(old[2]) if len(old) > 2 else 0) + addr_pad ) + old[3:] ) # Now construct new code and function objects. # ============================================ # See Objects/codeobject.c in Python source. common_args = ( function.__code__.co_nlocals , function.__code__.co_stacksize , function.__code__.co_flags , new_code , new_consts , new_names , function.__code__.co_varnames , function.__code__.co_filename , function.__code__.co_name , function.__code__.co_firstlineno , new_lnotab , function.__code__.co_freevars , function.__code__.co_cellvars ) if PYTHON_2: new_code = type(function.__code__)(function.__code__.co_argcount, *common_args) new_function = type(function)( new_code , function.func_globals , function.func_name , function.func_defaults , function.func_closure ) else: new_code = type(function.__code__)( function.__code__.co_argcount , function.__code__.co_kwonlyargcount , *common_args ) new_function = type(function)( new_code , function.__globals__ , function.__name__ , function.__defaults__ , function.__closure__ ) return new_function
if __name__ == '__main__': import doctest import sys sys.exit(min(doctest.testmod()[0], 1))