Source code for hio.core.http.serving

# -*- encoding: utf-8 -*-
"""
hio.core.http.serving classes

nonblocking http server
"""
import sys
import os
import io
import json
import copy
import datetime
import mimetypes

from urllib.parse import urlsplit, unquote
from contextlib import contextmanager

from ... import help
from ...base import doing
from .. import tcp

from . import httping

[docs]logger = help.ogler.getLogger()
[docs]CRLF = b"\r\n"
[docs]LF = b"\n"
[docs]CR = b"\r"
# Class Definitions
[docs]class Requestant(httping.Parsent): """ Nonblocking HTTP Server Requestant class Parses request msg """ def __init__(self, remoter=None, **kwa): """ Initialize Instance Parameters: remoter = Remoter incoming connection instance """ super(Requestant, self).__init__(**kwa) self.remoter = remoter self.url = u'' # full path in request line either relative or absolute self.scheme = u'' # scheme used in request line path self.hostname = u'' # hostname used in request line path self.port = u'' # port used in request line path self.path = u'' # partial path in request line without scheme host port query fragment self.query = u'' # query string from full path self.fragment = u'' # fragment from full path # self.headers = None # never received a request
[docs] def checkPersisted(self): """ Checks headers to determine if connection should be kept open until client closes it Sets the .persisted flag """ connection = self.headers.get("connection") # check connection header if self.version == (1, 1): # rules for http v1.1 self.persisted = True # connections default to persisted connection = self.headers.get("connection") if connection and "close" in connection.lower(): self.persisted = False # unless connection set to close. # non-chunked but persistent connections should have non None for # content-length Otherwise assume not persisted elif (not self.chunked and self.length is None): self.persisted = False elif self.version == (1, 0): # rules for http v1.0 self.persisted = False # connections default to non-persisted # HTTP/1.0 Connection: keep-alive indicates persistent connection. if connection and "keep-alive" in connection.lower(): self.persisted = True if self.persisted: # override timeout so server never timesout self.remoter.tymeout = 0.0 # never timesout
[docs] def parseHead(self): """ Generator to parse headers in heading of .msg Yields None if more to parse Yields True if done parsing """ if self.headed: return # already parsed the head self.headers = help.Hict() # create generator lineParser = httping.parseLine(raw=self.msg, eols=(CRLF, LF), kind="status line") while True: # parse until we get full start line if self.closed: # connection closed prematurely raise httping.PrematureClosure("Connection closed unexpectedly " "while parsing request start line") line = next(lineParser) if line is None: (yield None) continue lineParser.close() # close generator break method, url, version = httping.parseRequestLine(line) self.method = method self.url = url.strip() if not version.startswith(u"HTTP/1."): raise httping.UnknownProtocol(version) if version.startswith(u"HTTP/1.0"): self.version = (1, 0) else: self.version = (1, 1) # use HTTP/1.1 code for HTTP/1.x where x>=1 pathSplits = urlsplit(self.url) self.path = unquote(pathSplits.path) # unquote non query path portion here self.scheme = pathSplits.scheme self.hostname = pathSplits.hostname self.port = pathSplits.port self.query = pathSplits.query # WSGI spec leaves it quoted do not unquote self.fragment = pathSplits.fragment leaderParser = httping.parseLeader(raw=self.msg, eols=(CRLF, LF), kind="leader header line") while True: if self.closed: # connection closed prematurely raise httping.PrematureClosure("Connection closed unexpectedly " "while parsing request header") headers = next(leaderParser) if headers is not None: leaderParser.close() break (yield None) self.headers.update(headers) # are we using the chunked-style of transfer encoding? transferEncoding = self.headers.get("transfer-encoding") if transferEncoding and transferEncoding.lower() == "chunked": self.chunked = True else: self.chunked = False # NOTE: RFC 2616, S4.4, #3 says ignore if transfer-encoding is "chunked" contentLength = self.headers.get("content-length") if not self.chunked: if contentLength: try: self.length = int(contentLength) except ValueError: self.length = None else: if self.length < 0: # ignore nonsensical negative lengths self.length = None else: # if no body then neither content-length or chunked required self.length = 0 # assume no body so length 0 else: # ignore content-length if chunked self.length = None contentType = self.headers.get("content-type") if contentType: if u';' in contentType: # should also parse out charset for decoding contentType, sep, encoding = contentType.rpartition(u';') if encoding: self.encoding = encoding if 'application/json' in contentType.lower(): self.jsoned = True else: self.jsoned = False # Should connection be kept open until client closes self.checkPersisted() # sets .persisted self.headed = True yield True return
[docs] def parseBody(self): """ Parse body """ if self.bodied: return # already parsed the body if self.length and self.length < 0: raise ValueError("Invalid content length of {0}".format(self.length)) del self.body[:] # self.body.clear() clear body python2 bytearrays don't clear if self.chunked: # chunked takes precedence over length self.parms = dict() while True: # parse all chunks here if self.closed: # connection closed prematurely raise httping.PrematureClosure("Connection closed unexpectedly" " while parsing request body chunk") chunkParser = httping.parseChunk(raw=self.msg) while True: # parse another chunk result = next(chunkParser) if result is not None: chunkParser.close() break (yield None) size, parms, trails, chunk = result if parms: # chunk extension parms self.parms.update(parms) if size: # size non zero so append chunk but keep iterating self.body.extend(chunk) if self.closed: # no more data so finish chunkParser.close() break else: # last chunk when empty chunk so done if trails: self.trails = trails chunkParser.close() break elif self.length != None: # known content length while len(self.msg) < self.length: if self.closed: # connection closed prematurely raise httping.PrematureClosure("Connection closed unexpectedly" " while parsing request body") (yield None) self.body = self.msg[:self.length] del self.msg[:self.length] else: # unknown content length invalid raise httping.HTTPException("Invalid body, content-length not provided!") # only gets to here once content length has become finite # closed or not chunked or chunking has ended self.length = len(self.body) self.bodied = True (yield True) return
[docs]class Responder(): """ Nonblocking HTTP WSGI Responder class """
[docs] HttpVersionString = httping.HTTP_11_VERSION_STRING # http version string
[docs] Delay = 1.0
def __init__(self, incomer, app, environ, chunkable=False, delay=None): """ Initialize Instance Parameters: incomer = incomer connection instance app = wsgi app callable environ = wsgi environment dict chunkable = True if may send body in chunks """ status = "200 OK" # integer or string with reason, WSGI is string with reason self.incomer = incomer self.app = app self.environ = environ self.chunkable = True if chunkable else False self.started = False # True once start called (start_response) self.headed = False # True once headers sent self.chunked = False # True if should send in chunks self.ended = False # True if response body completely sent self.closed = False # True if connection closed by far side self.iterator = None # iterator on application body self.status = status self.headers = help.Hict() # headers self.length = None # if content-length provided must not exceed self.size = 0 # number of body bytes sent so far self.evented = False # True if response is event-stream
[docs] def close(self): """ Close any resources """ if not self.closed and not self.ended: self.write(b'') # in case chunked send empty chunk to terminate self.ended = True self.closed = True
[docs] def reset(self, environ, chunkable=None): """ Reset attributes for another request-response """ self.environ = environ if self.chunkable is not None: self.chunkable = chunkable self.started = False self.headed = False self.chunked = False self.ended = False self.iterator = None self.status = "200 OK" self.headers = help.Hict() self.length = None self.size = 0
[docs] def build(self): """ Return built head bytes from .status and .headers """ lines = [] _status = getattr(self.iterator, '_status', None) # if AttributiveGenerator if _status is not None: # override self.status = _status if isinstance(self.status, int): # replace int with str self.status = "{0} {1}".format(self.status, httping.STATUS_DESCRIPTIONS[self.status]) startLine = "{0} {1}".format(self.HttpVersionString, self.status) try: startLine = startLine.encode('ascii') except UnicodeEncodeError: startLine = startLine.encode('idna') lines.append(startLine) # Override if AttributiveGenerator self.headers.update(getattr(self.iterator, '_headers', help.Hict())) if u'server' not in self.headers: # create Server header self.headers[u'server'] = "Ioflo WSGI Server" if u'date' not in self.headers: # create Date header self.headers[u'date'] = httping.httpDate1123(datetime.datetime.utcnow()) if self.chunkable and 'transfer-encoding' not in self.headers: self.chunked = True self.headers[u'transfer-encoding'] = u'chunked' for name, value in self.headers.items(): lines.append(httping.packHeader(name, value)) lines.extend((b"", b"")) head = CRLF.join(lines) # b'/r/n' return head
[docs] def write(self, msg): """ WSGI write callback This writes out the headers the first time its called otherwise writes the msg bytes """ if not self.started: raise AssertionError("WSGI write() before start_response()") if not self.headed: # head not written yet head = self.build() self.incomer.tx(head) self.headed = True if self.chunked: msg = httping.packChunk(msg) if self.length is not None: # limit total size to length size = self.size + len(msg) if size > self.length: msg = msg[:self.length - size] self.size += len(msg) if msg: self.incomer.tx(msg)
[docs] def start(self, status, response_headers, exc_info=None): """ WSGI application start_response callable Parameters: status is string of status code and status reason '200 OK' or simple status code int which will be replaced with string response_headers is list of tuples of strings of the form (field, value) one tuple for each header example: [ ('Content-type', 'text/plain'), ('X-Some-Header', 'value') ] exc_info is optional exception info if exception occurred while processing request in wsgi application If exc_info is supplied, and no HTTP headers have been output yet, start_response should replace the currently-stored HTTP response headers with the newly-supplied ones, thus allowing the application to "change its mind" about the output when an error has occurred. However, if exc_info is provided, and the HTTP headers have already been sent, start_response must raise an error, and should re-raise using the exc_info tuple. That is: raise exc_info[1].with_traceback(exc_info[2]) (python3) Nonstandard modifiction to allow for iterable/generator of body to change headers and status before first write to support async processing of responses whose iterator/generator yields empty before first non-empty yield. In .service yielding empty does not cause write so status line and headers are not sent until first non-empty write. The mode is that the app.headers and app.status are consulted to see if changed from when .start = wsgi start_response was first called. """ if exc_info: try: if self.headed: # Re-raise original exception if headers sent raise exc_info[1].with_traceback(exc_info[2]) finally: exc_info = None # avoid dangling circular ref elif self.started: # may not call start_response again without exc_info raise AssertionError("Already started!") self.status = status self.headers = help.Hict(response_headers) if u'content-length' in self.headers: self.length = int(self.headers['content-length']) self.chunkable = False # cannot use chunking with finite content-length else: self.length = None if u'content-type' in self.headers: if self.headers['content-type'].startswith('text/event-stream'): self.evented = True self.started = True return self.write
[docs] def service(self): """ Service wsgi compatible application """ if not self.closed and not self.ended: if self.iterator is None: # initiate application self.iterator = iter(self.app(self.environ, start_response=self.start)) try: msg = next(self.iterator) except StopIteration as ex: if hasattr(ex, "value") and ex.value: self.write(ex.value) # new style generators in python3.3+ self.write(b'') # in case chunked send empty chunk to terminate self.ended = True except httping.HTTPError as ex: if not self.headed: headers = help.Hict() headers.update(ex.headers.items()) if 'content-type' not in headers: headers['content-type'] = 'text/plain' msg = ex.render() headers['content-length'] = str(len(msg)) # WSGI status is string of status code and reason status = "{} {}".format(ex.status, ex.reason) self.start(status, headers.items(), sys.exc_info()) self.write(msg) self.ended = True else: logger.error("HTTPError streaming body after headers sent.\n" "%s\n", ex) except Exception as ex: # handle http exceptions not caught by app logger.error("Unexcepted Server Error.\n%s\n", ex) else: if msg: # only write if not empty allows async processing self.write(msg) if self.length is not None and self.size >= self.length: self.ended = True
@contextmanager
[docs]def openServer(cls=None, **kwa): """ Wrapper to create and open HTTP Server instances When used in with statement block, calls .close() on exit of with block Parameters: cls is Class instance of subclass instance Usage: with openServer() as server0: server0. with openServer(cls=BareServer) as server0: server0. """ server = None cls = cls if cls is not None else Server try: server = cls(**kwa) server.reopen() # opens accept socket yield server finally: if server: server.close()
[docs]class Server(): """ Server WSGI HTTP Server Class """
[docs] Tymeout = 5.0 # default tcp server connection tymeout
def __init__(self, name="hio.wsgi.server", app=None, reqs=None, reps=None, servant=None, bufsize=8096, wl=None, ha=None, host=u'', port=None, eha=None, scheme=u'', tymeout=None, **kwa): """ Initialization method for instance. Parameters: name is wsgi server name app is wsgi application callable reqs is dict of Requestant instances keyed by ca reps is dict of running Wsgi Responder instances keyed by ca servant is instance of Server or ServerTls or None bufsize is buffer size for servant wl is WireLog instance if any for servant ha is host address duple (host, port) for local servant listen socket host is host address for local servant listen socket, '' means any interface on host port is socket port for local servant listen socket eha is external destination address for servant for incoming connections used in TLS scheme is http scheme u'http' or u'https' or empty for servant and WSGI environment kwa needed to pass additional parameters to servant tymeout is tymeout in seconds for dropping idle connections Attributes: .app is wsgi application callable .reqs is dict of Requestant instances keyed by ca .reps is dict of running Wsgi Responder instances keyed by ca .servant is instance of Server or ServerTls or None .tymeout is tymeout in seconds for dropping idle connections .scheme is http scheme http or https for servant and environment .secured is Boolean true if TLS """ self.name = name self.app = app self.reqs = reqs if reqs is not None else dict() # allows external view self.reqs.clear() # items should only be assigned by valet self.reps = reps if reps is not None else dict() # allows external view self.reps.clear() # items should only be assigned by valet if tymeout is None: tymeout = self.Tymeout ha = ha or (host, port) # ha = host address takes precendence over host, port if servant: if isinstance(servant, tcp.ServerTls): if scheme and scheme != u'https': raise ValueError("Provided scheme '{0}' incompatible with servant".format(scheme)) secured = True scheme = u'https' defaultPort = 443 elif isinstance(servant, tcp.Server): if scheme and scheme != u'http': raise ValueError("Provided scheme '{0}' incompatible with servant".format(scheme)) secured = False scheme = 'http' defaultPort = 80 else: raise ValueError("Invalid servant type {0}".format(type(servant))) else: scheme = u'https' if scheme == u'https' else u'http' if scheme == u'https': secured = True # use tls socket connection defaultPort = 443 else: secured = False # non tls socket connection defaultPort = 80 self.scheme = scheme host, port = ha port = port or defaultPort # if port not specified ha = (host, port) if servant: if servant.ha != ha: ValueError("Provided ha '{0}:{1}' incompatible with servant".format(ha[0], ha[1])) # at some point may want to support changing the ha of provided servant else: # what about timeouts for servant connections if secured: servant = tcp.ServerTls(ha=ha, eha=eha, bufsize=bufsize, wl=wl, tymeout=tymeout, **kwa) else: servant = tcp.Server(ha=ha, eha=eha, bufsize=bufsize, wl=wl, tymeout=tymeout, **kwa) self.secured = secured self.servant = servant
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ if self.servant: self.servant.wind(tymth)
[docs] def reopen(self): """ Return result of .servant.reopen() """ return self.servant.reopen()
[docs] def close(self): """ Close all reqs, reps, and ixes """ # start with reqs for ca in list(self.reqs.keys()): # need list as closeConnection deletes self.closeConnection(ca) # just in case there is an orphan rep for ca in list(self.reps.keys()): # need list as closeConnection deletes self.closeConnection(ca) # just in case there is an orphan ix self.servant.close()
[docs] def idle(self): """ Returns True if no connections have requests in process Useful for debugging """ idle = True for requestant in self.reqs.values(): if not requestant.ended: idle = False break if idle: for responder in self.reps.values(): if not responder.ended: idle = False break return idle
[docs] def buildEnviron(self, requestant): """ Returns wisgi environment dictionary for supplied requestant """ environ = dict() # maybe should be mudict for cookies or other repeated headers # WSGI variables environ['wsgi.version'] = (1, 0) environ['wsgi.url_scheme'] = self.scheme environ['wsgi.input'] = io.BytesIO(requestant.body) environ['wsgi.errors'] = sys.stderr environ['wsgi.multithread'] = False environ['wsgi.multiprocess'] = False environ['wsgi.run_once'] = False environ["wsgi.server_name"] = self.name environ["wsgi.server_version"] = (1, 0) # Required CGI variables environ['REQUEST_METHOD'] = requestant.method # GET environ['SERVER_NAME'] = self.servant.eha[0] # localhost environ['SERVER_PORT'] = str(self.servant.eha[1]) # 8888 environ['SERVER_PROTOCOL'] = "HTTP/{0}.{1}".format(*requestant.version) # used by request http/1.1 environ['SCRIPT_NAME'] = u'' environ['PATH_INFO'] = requestant.path # /hello?name=john # Optional CGI variables environ['QUERY_STRING'] = requestant.query # name=john environ['REMOTE_ADDR'] = requestant.remoter.ca environ['CONTENT_TYPE'] = requestant.headers.get('content-type', '') if requestant.length is not None: environ['CONTENT_LENGTH'] = str(requestant.length) # recieved http headers mapped to all caps with HTTP_ prepended for key, value in requestant.headers.items(): key = "HTTP_" + key.replace("-", "_").upper() environ[key] = value return environ
[docs] def closeConnection(self, ca): """ Close and remove connection given by ca """ if ca in self.reqs: self.reqs[ca].close() # this signals request parser del self.reqs[ca] if ca in self.reps: self.reps[ca].close() # this signals response handler if ca in self.servant.ixes: self.servant.ixes[ca].serviceSends() # send final bytes to socket del self.reps[ca] self.servant.removeIx(ca)
[docs] def serviceConnects(self): """ Service new incoming connections Create requestants Timeout stale connections """ self.servant.serviceConnects() for ca, ix in list(self.servant.ixes.items()): # ixes changes during iteration if ix.cutoff: self.closeConnection(ca) continue if ca not in self.reqs: # point requestant.msg to incomer.rxbs self.reqs[ca] = Requestant(msg=ix.rxbs, remoter=ix) if ix.tymeout > 0.0 and ix.tymer.expired: self.closeConnection(ca)
[docs] def serviceReqs(self): """ Service pending requestants """ for ca, requestant in list(self.reqs.items()): if requestant.parser: try: requestant.parse() except httping.HTTPException as ex: # unkown error cause this may be superfluous #requestant.errored = True #requestant.error = str(ex) #requestant.ended = True sys.stderr.write(str(ex)) self.closeConnection(ca) continue # give up on request since shouldn't be here if requestant.ended: if requestant.errored: # parse may swallow error but set .errored and .error sys.stderr.write(requestant.error) self.closeConnection(ca) continue logger.info("Parsed Request:\n%s %s {%s\n" "%s\n%s\n", requestant.method, requestant.path, requestant.version, requestant.headers, requestant.body) # create or restart wsgi app responder here environ = self.buildEnviron(requestant) if ca not in self.reps: chunkable = True if requestant.version >= (1, 1) else False responder = Responder(incomer=requestant.remoter, app=self.app, environ=environ, chunkable=chunkable) self.reps[ca] = responder else: # reuse responder = self.reps[ca] responder.reset(environ=environ)
[docs] def serviceReps(self): """ Service pending responders """ for ca, responder in list(self.reps.items()): if responder.closed: self.closeConnection(ca) continue if not responder.ended: responder.service() if responder.ended: requestant = self.reqs[ca] if requestant.persisted: if requestant.parser is None: # reuse requestant.makeParser() # resets requestant parser else: # not persistent so close and remove requestant and responder ix = self.servant.ixes[ca] if not ix.txbs: # wait for outgoing txbs to be empty self.closeConnection(ca)
[docs] def service(self): """ Service request response """ self.serviceConnects() self.servant.serviceReceivesAllIx() self.serviceReqs() self.serviceReps() self.servant.serviceSendsAllIx()
[docs]WsgiServer = Server # alias
[docs]class CustomResponder(): """ Nonblocking HTTP Server Response class for non-wsgi applications Used by Steward HTTP/1.1 200 OK\r\n Content-Length: 122\r\n Content-Type: application/json\r\n Date: Thu, 30 Apr 2015 19:37:17 GMT\r\n Server: IoBook.local\r\n\r\n """
[docs] HttpVersionString = httping.HTTP_11_VERSION_STRING # http version string
def __init__(self, steward=None, status=200, # integer headers=None, body=b'', data=None): """ Initialize Instance steward = managing Steward instance status = response status code headers = http response headers body = http response body data = dict to jsonify as body if provided """ self.steward = steward self.status = status self.headers = help.Hict(headers) if headers else help.Hict() if body and isinstance(body, str): # use default # RFC 2616 Section 3.7.1 default charset of iso-8859-1. body = body.encode('iso-8859-1') self.body = body or b'' self.data = data self.ended = False # True if response generated completed self.msg = b"" # for debugging self.lines = [] # for debugging self.head = b"" # for debugging
[docs] def reinit(self, status=None, # integer headers=None, body=None, data=None): """ Reinitialize anything that is not None This enables creating another response on a connection """ if status is not None: self.status = status if headers is not None: self.headers = help.Hict(headers) if body is not None: # body should be bytes if isinstance(body, str): # RFC 2616 Section 3.7.1 default charset of iso-8859-1. body = body.encode('iso-8859-1') self.body = body else: self.body = b'' if data is not None: self.data = data else: self.data = None
[docs] def build(self, status=None, headers=None, body=None, data=None): """ Build and return response message """ self.reinit(status=status, headers=headers, body=body, data=data) self.lines = [] startLine = "{0} {1} {2}".format(self.HttpVersionString, self.status, httping.STATUS_DESCRIPTIONS[self.status]) try: startLine = startLine.encode('ascii') except UnicodeEncodeError: startLine = startLine.encode('idna') self.lines.append(startLine) if u'server' not in self.headers: # create Server header self.headers[u'server'] = "Ioflo Server" if u'date' not in self.headers: # create Date header self.headers[u'date'] = httping.httpDate1123(datetime.datetime.utcnow()) if self.data is not None: body = json.dumps(self.data, separators=(',', ':')).encode("utf-8") self.headers[u'content-type'] = u'application/json; charset=utf-8' else: body = self.body if body and (u'content-length' not in self.headers): self.headers[u'content-length'] = str(len(body)) for name, value in self.headers.items(): self.lines.append(httping.packHeader(name, value)) self.lines.extend((b"", b"")) self.head = CRLF.join(self.lines) # b'/r/n' self.msg = self.head + body self.ended = True return self.msg
[docs]class Steward(): """ Manages the associated requestant and responder for an incoming connection for BareServer (non-wsgi) HTTP server """ def __init__(self, remoter, requestant=None, responder=None, dictable=False): """ incomer = Incomer instance for connection requestant = Requestant instance for connection responder = Responder instance for connection dictable = True if should attempt to convert request body as json """ self.remoter = remoter if requestant is None: requestant = Requestant(msg=self.remoter.rxbs, remoter=remoter, dictable=dictable) self.requestant = requestant if responder is None: responder = CustomResponder(steward=self) self.responder = responder self.waited = False # True if waiting for reponse to finish self.msg = b"" # outgoing msg bytes
[docs] def refresh(self): """ Restart incomer tymer """ self.remoter.tymer.restart()
[docs] def respond(self): """ Respond to request Override in subclass Echo request """ logger.info("Responding to Request:\n%s %s %s\n" "%s\n%s\n", self.requestant.method, self.requestant.path, self.requestant.version, self.requestant.headers, self.requestant.body) data = dict() data['version'] = "HTTP/{0}.{1}".format(*self.requestant.version) data['method'] = self.requestant.method pathSplits = urlsplit(unquote(self.requestant.url)) path = pathSplits.path data['path'] = path query = pathSplits.query qargs = dict() qargs, query = httping.updateQargsQuery(qargs, query) data['qargs'] = qargs fragment = pathSplits.fragment data['fragment'] = fragment data['headers'] = list(self.requestant.headers.items()) # copy.copy(self.requestant.headers) # make copy data['body'] = self.requestant.body.decode('utf-8') data['data'] = copy.copy(self.requestant.data) # make copy msg = self.responder.build(status=200, data=data) self.remoter.tx(msg) self.waited = not self.responder.ended
[docs] def pour(self): """ Run generator to stream response message """ # putnext generator here if self.responder.ended: self.waited = False else: self.refresh()
[docs]class BareServer(): """ BareServer class nonblocking Bare (non-WSGI) HTTP server Define CustomResponder subclass to respond to requests as per Steward """
[docs] Timeout = 5.0 # default tcp server (servant) connection timeout
def __init__(self, servant=None, stewards=None, name='', bufsize=8096, wl=None, ha=None, host=u'', port=None, eha=None, scheme=u'', dictable=False, timeout=None, **kwa): """ Initialization method for instance. servant = instance of Server or ServerTls or None stewards = dict of Steward instances kwa needed to pass additional parameters to servant if servantinstances are not provided (None) some or all of these parameters will be used for initialization name = user friendly name for servant bufsize = buffer size wl = WireLog instance if any ha = host address duple (host, port) for local servant listen socket host = host address for local servant listen socket, '' means any interface on host port = socket port for local servant listen socket eha = external destination address for incoming connections used in TLS scheme = http scheme u'http' or u'https' or empty dictable = Boolean flag If True attempt to convert body from json for requestants """ self.stewards = stewards if stewards is not None else dict() self.dictable = True if dictable else False # for stewards if timeout is None: timeout = self.Timeout ha = ha or (host, port) # ha = host address takes precendence over host, port if servant: if isinstance(servant, tcp.ServerTls): if scheme and scheme != u'https': raise ValueError("Provided scheme '{0}' incompatible with servant".format(scheme)) secured = True scheme = u'https' defaultPort = 443 elif isinstance(servant, tcp.Server): if scheme and scheme != u'http': raise ValueError("Provided scheme '{0}' incompatible with servant".format(scheme)) secured = False scheme = 'http' defaultPort = 80 else: raise ValueError("Invalid servant type {0}".format(type(servant))) else: scheme = u'https' if scheme == u'https' else u'http' if scheme == u'https': secured = True # use tls socket connection defaultPort = 443 else: secured = False # non tls socket connection defaultPort = 80 host, port = ha port = port or defaultPort # if port not specified ha = (host, port) if servant: if servant.ha != ha: ValueError("Provided ha '{0}:{1}' incompatible with servant".format(ha[0], ha[1])) # at some point may want to support changing the ha of provided servant else: # what about timeouts for servant connections if secured: servant = tcp.ServerTls(name=name, ha=ha, eha=eha, bufsize=bufsize, wl=wl, tymeout=timeout, **kwa) else: servant = tcp.Server(name=name, ha=ha, eha=eha, bufsize=bufsize, wl=wl, tymeout=timeout, **kwa) self.secured = secured self.servant = servant
[docs] def reopen(self): """ Return result of .servant.reopen() """ return self.servant.reopen()
[docs] def idle(self): """ Returns True if no connections have requests in process Useful for debugging """ idle = True for steward in self.stewards.values(): if not steward.requestant.ended: idle = False break return idle
[docs] def close(self): """ Close all ixes for all stewards """ for ca, steward in list(self.stewards.items()): self.closeConnection(ca) # just in case there is an orphan ix self.servant.close()
[docs] def closeConnection(self, ca): """ Close and remove connection and associated steward given by ca """ self.servant.removeIx(ca) del self.stewards[ca]
[docs] def serviceConnects(self): """ Service new incoming connections Create requestants Timeout stale connections """ self.servant.serviceConnects() for ca, ix in self.servant.ixes.items(): # check for and handle cutoff connections by client here if ca not in self.stewards: self.stewards[ca] = Steward(remoter=ix, dictable=self.dictable) if ix.tymeout > 0.0 and ix.tymer.expired: self.closeConnection(ca)
[docs] def serviceStewards(self): """ Service pending requestants and responders """ for ca, steward in self.stewards.items(): if not steward.waited: steward.requestant.parse() if steward.requestant.ended: steward.requestant.dictify() logger.info("Parsed Request:\n%s %s %s\n" "%s\n%s\n", steward.requestant.method, steward.requestant.path, steward.requestant.version, steward.requestant.headers, steward.requestant.body) steward.respond() if steward.waited: steward.pour() if not steward.waited and steward.requestant.ended: if steward.requestant.persisted: steward.requestant.makeParser() #set up for next time else: # remove and close connection self.closeConnection(ca)
[docs] def service(self): """ Service request response """ self.serviceConnects() self.servant.serviceReceivesAllIx() self.serviceStewards() self.servant.serviceSendsAllIx()
try: import falcon except ImportError as ex: logger.error("Falcon not available.\n") else: # falcon import succeeded
[docs] class StaticSink(): """ Class that provides Falcon sink endpoint for serving static files in support of a client side web app. """
[docs] StaticSinkBasePath = "/static"
[docs] DefaultStaticSinkBasePath = "/"
def __init__(self, staticDirPath=None): """ Parameters: staticDirPath (str): path to static sink directory Example computation of staticDirPath: WEB_DIR_PATH = os.path.dirname( os.path.abspath( sys.modules.get(__name__).__file__)) STATIC_DIR_PATH = os.path.join(WEB_DIR_PATH, 'static') /Users/Load/Data/Code/public/hio/src/hio/demo/web/static """ if staticDirPath is None: staticDirPath == os.path.join( os.path.dirname( os.path.abspath( sys.modules.get(__name__).__file__)), self.StaticSinkBasePath[1:]) self.staticDirPath = staticDirPath
[docs] def __call__(self, req, rep): path = req.path.removeprefix(self.StaticSinkBasePath) path = path.removeprefix(self.DefaultStaticSinkBasePath) if not path: # return default path = "index.html" path = os.path.join(self.staticDirPath, path) if not os.path.exists(path): raise falcon.HTTPError(falcon.HTTP_NOT_FOUND, title='Missing Resource', description='File "{}" not found or forbidden'.format(path)) filetype = mimetypes.guess_type(path, strict=True)[0] # get first guess rep.set_header("Content-Type", "{}; charset=UTF-8".format(filetype)) rep.status = falcon.HTTP_200 # This is the default status # for better stream handling provide "wsgi.file_wrapper" in wsgi environ # rep.stream = open(filepath, 'rb') # the following works faster and more consistently than rep.stream above # Maybe Falcon's default is to throttle the reads too much for rep.stream with open(path, 'rb') as f: rep.data = f.read()
[docs]class ServerDoer(doing.Doer): """ HTTP WSGI Server Doer See Doer for inherited attributes, properties, and methods. Attributes: .server is HTTP WSGI Server instance Properties: """ def __init__(self, server, **kwa): """ Initialize Parameters: server is HTTP Server instance """ super(ServerDoer, self).__init__(**kwa) self.server = server if self.tymth: self.server.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(ServerDoer, self).wind(tymth) self.server.wind(tymth)
[docs] def enter(self): """""" self.server.reopen()
[docs] def recur(self, tyme): """""" self.server.service()
[docs] def exit(self): """""" self.server.close()