# -*- encoding: utf-8 -*-
"""
hio.help.helping module
"""
import types
import functools
import inspect
import os
import errno
import stat
import json
from collections.abc import Iterable, Sequence, Generator
from abc import ABCMeta
import msgpack
import cbor2 as cbor
[docs]def copyfunc(f, name=None):
"""
Copy a function in detail.
To change name of func provide name parameter
functools to update_wrapper assigns and updates following attributes
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__',
'__annotations__')
WRAPPER_UPDATES = ('__dict__',)
Based on
https://stackoverflow.com/questions/6527633/how-can-i-make-a-deepcopy-of-a-function-in-python
https://stackoverflow.com/questions/13503079/how-to-create-a-copy-of-a-python-function
"""
g = types.FunctionType(f.__code__,
f.__globals__,
name=f.__name__,
argdefs=f.__defaults__,
closure=f.__closure__
)
g = functools.update_wrapper(g, f)
g.__kwdefaults__ = f.__kwdefaults__
if name:
g.__name__ = name
return g
[docs]def attributize(genie):
"""
Decorator function:
Python generators do not support adding attributes.
Adding support for attributes provides a way to pass information
from a WSGI App that returns a generator to a WSGI server via the generator
after the WSGI app has already started returning its body. The hio.http.Server
WSGI server looks for the attributes ._status and ._headers and substitutes
these if present. This allows a streaming WSGI App body iterator to later
modify the headers and status taht will be returned before the body iterator
began iterating. This is useful for web hooks or backend requests that are
serviced by an async coroutine based WSGI app so that they may leverage
the streaming support of standard WSGI but use a the coroutine based
hio.http.Server as an async WSGI server.
This decorator takes a Duck Typing approach to decorating
a generator function or method that returns a new function type instance that
when called will return a generator like object that supports attributes.
the new wrapped object acts like a generator but with attributes.
Parameters:
genie (generator function, generator method): is either
a generator function that returns a generator object
a generator method that returns a generator object
If genie is a generator function then a reference to its wrapper
is injected as the first positional argument to the orginal
generator function. The convention is to use the parameter 'me' to refer
to the injected reference to the wrapper.
If genie is a generator method, that is, its first parameter is 'self'
then a reference to its wrapper is injected as the second
positional argument to the original generator method. the convention
is to use the parameter 'me' to refer to the injected reference to the
wrapper so as not to collide with the 'self' instance reference.
When wrapped the new type is AttributiveGenerator
Usage:
# decorated generator function
@attributize
def bar(me, req=None, rep=None):
me._status = 400 # or copy from rep.status
me._headers = odict(example="Hi") # or copy from rep.headers
yield b""
yield b""
yield b"Hello There"
return b"Goodbye"
gen = bar()
msg = next(gen) # attributes set after first next
gen._status
gen._headers
# decorated generator method
class R:
@attributize
def bar(self, me, req=None, rep=None):
self.name = "Peter"
me._status = 400 # or copy from rep.status
me._headers = odict(example="Hi") # or copy from rep.headers
yield b""
yield b""
yield b"Hello There " + self.name.encode()
return b"Goodbye"
r = R()
gen = r.bar()
msg = next(gen) # attributes set after first next
gen._status
gen._headers
# use as function wrapper directly instead of as decorator
def gf(me, x): # convention injected reference to attributed wrapper is 'me'
me.x = 5
me.y = 'a'
cnt = 0
while cnt < x:
yield cnt
cnt += 1
agf = attributize(gf)
ag = agf(3)
# body of gf is not run until first next call
assert isIterator(ag)
assert not hasattr(ag, 'x')
assert not hasattr(ag, 'y')
n = next(ag) # first run here which sets up attributes
assert n == 0
assert hasattr(ag, 'x')
assert hasattr(ag, 'y')
assert ag.x == 5
assert ag.y =='a'
n = next(ag)
assert n == 1
Adding attributes to this injected reference makes them available
as attributes of the resultant wrapper.
The HTTP WSGI server at hio.core.http.serving.Server uses an instance of
hio.core.http.serving.Responder to generate the response for each WSGI request.
The Responder instance checks its WSGI app generator for existence of the
attributes ._status and ._headers. If so then it overrides its default
response status with ._status and updates its default response headers
with the headers in ._header. This allows a backend (webhook) to conveniently
influence the response status and headers. The response body is returned by
the generator itself.
Background:
Unlike Python functions, Python generators do not support custom attributes
and the generator locals dict at .gi_frame.f_locals dissappears once the
generator is complete so its inconvenient.
Fixed attributes of generator objects.
['.__next__', '__iter__', 'close', 'gi_code', 'gi_frame', 'gi_running',
'gi_yieldfrom', 'send', 'throw']
"""
def wrapper(*args, **kwargs):
"""
When called returns instance of AttributiveGenerator instead of generator.
"""
def __iter__(self): # default attribute
return self
def send(self): # default attribute
raise NotImplementedError
def throw(self): # default attribute
raise NotImplementedError
# use type() to create dynamic instance of class AttributiveGenerator
# dict spec for type() is {'__iter__': lambda self: self, 'send': ,}
tdict = { '__iter__': __iter__, 'send': send, 'throw': throw,}
AG = type("AttributiveGenerator", (Generator,), tdict) # make custom type
ag = AG() # create instance so we can inject it into genfunc
# create generator and inject ag ref for me parameter ag = me
fargs = inspect.getfullargspec(genie).args
if fargs and fargs[0] == 'self':
gen = genie(args[0], ag, *args[1:], **kwargs) # inject self and me
else:
gen = genie(ag, *args, **kwargs) # inject me only
# now replace default class references with real gen attributes. "duckify"
for attr in ('__next__', 'close', 'send', 'throw',
'gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom'):
setattr(AG, attr, getattr(gen, attr))
functools.update_wrapper(wrapper=ag, wrapped=gen) # fix up wrapper
return ag
return wrapper
[docs]def repack(n, seq, default=None):
""" Repacks seq into a generator of len n and returns the generator.
The purpose is to enable unpacking into n variables.
The first n-1 elements of seq are returned as the first n-1 elements of the
generator and any remaining elements are returned in a tuple as the
last element of the generator
default (None) is substituted for missing elements when len(seq) < n
Example:
x = (1, 2, 3, 4)
tuple(repack(3, x))
(1, 2, (3, 4))
x = (1, 2, 3)
tuple(repack(3, x))
(1, 2, (3,))
x = (1, 2)
tuple(repack(3, x))
(1, 2, ())
x = (1, )
tuple(repack(3, x))
(1, None, ())
x = ()
tuple(repack(3, x))
(None, None, ())
"""
it = iter(seq)
for i in range(n - 1):
yield next(it, default)
yield tuple(it)
[docs]def just(n, seq, default=None):
""" Returns a generator of just the first n elements of seq and substitutes
default (None) for any missing elements. This guarantees that a generator of exactly
n elements is returned. This is to enable unpacking into n varaibles
Example:
x = (1, 2, 3, 4)
tuple(just(3, x))
(1, 2, 3)
x = (1, 2, 3)
tuple(just(3, x))
(1, 2, 3)
x = (1, 2)
tuple(just(3, x))
(1, 2, None)
x = (1, )
tuple(just(3, x))
(1, None, None)
x = ()
tuple(just(3, x))
(None, None, None)
"""
it = iter(seq)
for i in range(n):
yield next(it, default)
[docs]class NonStringIterable(metaclass=ABCMeta):
"""
Allows isinstance check for iterable that is not a string
if isinstance(x, NonStringIterable):
"""
@classmethod
[docs] def __subclasshook__(cls, C):
if cls is NonStringIterable:
if (not issubclass(C, (str, bytes)) and issubclass(C, Iterable)):
return True
return NotImplemented
[docs]class NonStringSequence(metaclass=ABCMeta):
"""
Allows isinstance check for sequence that is not a string
if isinstance(x, NonStringSequence):
"""
@classmethod
[docs] def __subclasshook__(cls, C):
if cls is NonStringSequence:
if (not issubclass(C, (str, bytes)) and issubclass(C, Sequence)):
return True
return NotImplemented
[docs]def nonStringIterable(obj):
"""
Returns: (bool) True if obj is non-string iterable, False otherwise
Another way that is less future proof
return (hasattr(x, '__iter__') and not isinstance(x, (str, bytes)))
"""
return (not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable))
[docs]def nonStringSequence(obj):
"""
Returns: (bool) True if obj is non-string sequence, False otherwise
"""
return (not isinstance(obj, (str, bytes)) and isinstance(obj, Sequence) )
[docs]def isIterator(obj):
"""
Returns True if obj is an iterator object, that is,
has an __iter__ method
has a __next__ method
.__iter__ is callable and returns obj
Otherwise returns False
"""
if (hasattr(obj, "__iter__") and
hasattr(obj, "__next__") and
callable(obj.__iter__) and
obj.__iter__() is obj
):
return True
return False
[docs]def ocfn(path, mode='r+', perm=(stat.S_IRUSR | stat.S_IWUSR)):
"""
Atomically open or create file from filepath.
If file already exists, Then open file using openMode
Else create file using write update mode If not binary Else
write update binary mode
Returns file object
If binary Then If new file open with write update binary mode
x = stat.S_IRUSR | stat.S_IWUSR
384 == 0o600
436 == octal 0664
"""
try:
newfd = os.open(path, os.O_EXCL | os.O_CREAT | os.O_RDWR, perm)
if "b" in mode:
file = os.fdopen(newfd,"w+b") # w+ truncate read and/or write
else:
file = os.fdopen(newfd,"w+") # w+ truncate read and/or write
except OSError as ex:
if ex.errno == errno.EEXIST:
file = open(path, mode) # r+ do not truncate read and/or write
else:
raise
return file
[docs]def dump(data, path):
"""
Serialize data dict and write to file given by path where serialization is
given by path's extension of either JSON, MsgPack, or CBOR for extension
.json, .mgpk, or .cbor respectively
"""
if ' ' in path:
raise IOError(f"Invalid file path '{path}' contains space.")
root, ext = os.path.splitext(path)
if ext == '.json':
with ocfn(path, "w+b") as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno())
elif ext == '.mgpk':
with ocfn(path, "w+b") as f:
msgpack.dump(data, f)
f.flush()
os.fsync(f.fileno())
elif ext == '.cbor':
with ocfn(path, "w+b") as f:
cbor.dump(data, f)
f.flush()
os.fsync(f.fileno())
else:
raise IOError(f"Invalid file path ext '{path}' "
f"not '.json', '.mgpk', or 'cbor'.")
[docs]def load(path):
"""
Return data read from file path as dict
file may be either json, msgpack, or cbor given by extension .json, .mgpk, or
.cbor respectively
Otherwise raise IOError
"""
root, ext = os.path.splitext(path)
if ext == '.json':
with ocfn(path, "rb") as f:
it = json.load(f)
elif ext == '.mgpk':
with ocfn(path, "rb") as f:
it = msgpack.load(f)
elif ext == '.cbor':
with ocfn(path, "rb") as f:
it = cbor.load(f)
else:
raise IOError(f"Invalid file path ext '{path}' "
f"not '.json', '.mgpk', or 'cbor'.")
return it