# -*- encoding: utf-8 -*-
"""
hio.core.http.clienting module
nonblocking http client
"""
import json
import copy
import random
from collections import deque, namedtuple
from urllib.parse import urlsplit, quote, quote_plus, unquote, unquote_plus
from contextlib import contextmanager
from ... import help
from ...base import tyming, doing
from .. import coring, wiring
from .. import tcp
from . import httping
[docs]logger = help.ogler.getLogger()
[docs]Response = namedtuple('Response', 'version status reason headers body data request errored error')
# Class Definitions
[docs]class Requester(object):
"""
Nonblocking HTTP Client Requester class
"""
[docs] HttpVersionString = httping.HTTP_11_VERSION_STRING # http version string
[docs] Port = httping.HTTP_PORT # default port
def __init__(self,
hostname='127.0.0.1',
port=None,
scheme=u'http',
method=u'GET', # unicode
path=u'/', # unicode
qargs=None,
fragment=u'', #unicode
headers=None,
body=b'',
data=None,
fargs=None):
"""
Initialize Instance
hostname = remote server hostname (may include port as hostname:port)
port = remote server port
scheme = http scheme 'http' or 'https' usually
method = http request method verb
path = http url path
qargs = http query args
fragment = http fragment
headers = http request headers
body = http request body
data = dict to jsonify as body if provided
fargs = dict to url form encode as body if provided
"""
self.hostname, self.port = httping.normalizeHostPort(hostname, port, 80)
self.scheme = scheme
self.method = method.upper() if method else u'GET'
self.path = path or u'/'
self.qargs = qargs if qargs is not None else dict()
self.fragment = fragment
self.headers = help.Hict(headers) if headers else help.Hict()
if body and isinstance(body, str): # use default
# RFC 2616 Section 3.7.1 default charset of iso-8859-1.
body = body.encode('iso-8859-1')
self.body = body or b''
self.data = data
self.fargs = fargs
self.lines = [] # keep around for testing
self.head = b"" # keep around for testing
self.msg = b""
[docs] def reinit(self,
hostname=None,
port=None,
scheme=None,
method=None, # unicode
path=None, # unicode
qargs=None,
fragment=None,
headers=None,
body=None,
data=None,
fargs=None):
"""
Reinitialize anything that is not None
This enables creating another request on a connection to the same host port
"""
if hostname is not None: # may need to renormalize host port
self.hostname = hostname
if port is not None:
self.port = port
if scheme is not None:
self.scheme = scheme
if method is not None:
self.method = method.upper()
if path is not None:
self.path = path
if qargs is not None:
self.qargs = qargs
if fragment is not None:
self.fragment = fragment
if headers is not None:
self.headers = help.Hict(headers)
if body is not None: # body should be bytes
if isinstance(body, str):
# RFC 2616 Section 3.7.1 default charset of iso-8859-1.
body = body.encode('iso-8859-1')
self.body = body
else:
self.body = b''
if data is not None:
self.data = data
else:
self.data = None
if fargs is not None:
self.fargs = fargs
else:
self.fargs = None
[docs] def rebuild(self,
method=None,
path=None,
qargs=None,
fragment=None,
headers=None,
body=None,
data=None,
fargs=None):
"""
Reinit then build and return request message
This allows sending another request to same destination
"""
self.reinit(method=method,
path=path,
qargs=qargs,
fragment=fragment,
headers=headers,
body=body,
data=data,
fargs=fargs)
return self.build()
[docs] def build(self):
"""
Build and return request message from attributes
"""
self.lines = []
# need to check for proxy as the start line is different if proxied
pathSplits = urlsplit(self.path) # path should not include scheme host port
path = pathSplits.path
self.path = path
path = quote(path)
scheme = pathSplits.scheme
if scheme and scheme != self.scheme:
raise ValueError("Already open connection attempt to change scheme "
" to '{0}'".format(scheme))
port = pathSplits.port
if port and port != self.port:
raise ValueError("Already open connection attempt to change port "
" to '{0}'".format(port))
hostname = pathSplits.hostname
if hostname and hostname != self.hostname:
raise ValueError("Already open connection attempt to change hostname "
" to '{0}'".format(hostname))
query = pathSplits.query
self.qargs, query = httping.updateQargsQuery(self.qargs, query)
fragment = pathSplits.fragment
if fragment:
self.fragment = fragment
# we should quote the fragment here
combine = u"{0}?{1}#".format(path, query, fragment)
combine = urlsplit(combine).geturl() # strips blank parts
startLine = "{0} {1} {2}".format(self.method, combine, self.HttpVersionString)
try:
startLine = startLine.encode('ascii')
except UnicodeEncodeError:
startLine = startLine.encode('idna')
self.lines.append(startLine)
if u'host' not in self.headers: # create Host header
host = self.hostname
port = self.port
# As per RFC 273, IPv6 address should be wrapped with []
# when used as Host header
if host.find(u':') >= 0:
host = u'[' + host + u']'
value = "{0}:{1}".format(host, port)
try:
value = value.encode("ascii")
except UnicodeEncodeError:
value = value.encode("idna")
self.lines.append(httping.packHeader('Host', value))
# we only want a Content-Encoding of "identity" since we don't
# support encodings such as x-gzip or x-deflate.
if u'accept-encoding' not in self.headers:
self.lines.append(httping.packHeader(u'Accept-Encoding', u'identity'))
if self.method == u"GET": # do not send body on GET
body = b''
else:
if self.data is not None: # data takes precedence
body = json.dumps(self.data, separators=(',', ':')).encode("utf-8")
self.headers[u'content-type'] = u'application/json; charset=utf-8'
elif self.fargs is not None: # form data args next in precendence
if ((u'content-type' in self.headers and
self.headers[u'content-type'].startswith(u'multipart/form-data'))):
boundary = '____________{0:012x}'.format(random.randint(123456789,
0xffffffffffff))
formParts = []
# mime parts always start with --
for key, val in self.fargs.items():
formParts.append('\r\n--{0}\r\nContent-Disposition: '
'form-data; name="{1}"\r\n'
'Content-Type: text/plain; charset=utf-8\r\n'
'\r\n{2}'.format(boundary, key, val))
formParts.append('\r\n--{0}--'.format(boundary))
form = "".join(formParts)
body = form.encode(encoding='utf-8')
self.headers[u'content-type'] = u'multipart/form-data; boundary={0}'.format(boundary)
else:
formParts = [u"{0}={1}".format(key, val) for key, val in self.fargs.items()]
form = u'&'.join(formParts)
form = quote_plus(form, '&=')
body = form.encode(encoding='utf-8')
self.headers[u'content-type'] = u'application/x-www-form-urlencoded; charset=utf-8'
else: # body last in precendence
body = self.body
if body and (u'content-length' not in self.headers):
self.lines.append(httping.packHeader(u'Content-Length', str(len(body))))
for name, value in self.headers.items():
self.lines.append(httping.packHeader(name, value))
self.lines.extend((b"", b""))
self.head = CRLF.join(self.lines) # b'/r/n'
self.msg = self.head + body
return self.msg
[docs]class Respondent(httping.Parsent):
"""
Nonblocking HTTP Client Respondent class
"""
[docs] Retry = 100 # sse retry timeout in milliseconds if evented
def __init__(self,
redirects=None,
redirectable=True,
events=None,
retry=None,
leid=None,
**kwa):
"""
Initialize Instance:
redirects = list of redirects if any
redirectable = Boolean allow redirects
events = deque of events if any
retry = sse retry timeout in seconds if any if evented
leid = last event id if any if evented
"""
super(Respondent, self).__init__(**kwa)
self.status = None # Status-Code from status line (consider making this code reason)
self.code = None # Status-Code from status line
self.reason = None # Reason-Phrase from status line
self.redirectant = None # Boolean True if received redirect status, need to redirect
self.redirected = None # attempted a redirection
self.redirects = redirects if redirects is not None else []
self.redirectable = True if redirectable else False
self.evented = None # are server sent events being used
self.events = events if events is not None else deque()
self.retry = retry if retry is not None else self.Retry # retry timeout in milliseconds if evented
self.leid = None # non None if evented with event ids sent
self.eventSource = None # httping.EventSource instance when .evented
[docs] def reinit(self,
redirectable=None,
**kwa):
"""
Reinitialize Instance
See super class
redirectable means allow redirection
"""
super(Respondent, self).reinit(**kwa)
if redirectable is not None:
self.redirectable = True if redirectable else False
self.status = None
self.code = None
self.reason = None
self.evented = None
[docs] def close(self):
"""
Call super to assign True to .closed
Also close event source
"""
super(Respondent, self).close()
if self.eventSource: # assign True to .eventSource.closed
self.eventSource.close()
[docs] def checkPersisted(self):
"""
Checks headers to determine if connection should be kept open until
server closes it
Sets the .persisted flag
"""
connection = self.headers.get("connection") # check connection header
if self.version == (1, 1): # rules for http v1.1
self.persisted = True # connections default to persisted
# An HTTP/1.1 proxy is assumed to stay open unless
# explicitly closed.
connection = self.headers.get("connection")
if connection and "close" in connection.lower():
self.persisted = False
# non-chunked but persistent connections should have non None for
# content-length Otherwise assume not persisted
elif (not self.chunked and self.length is None):
self.persisted = False
elif self.version == (1, 0):
self.persisted = False # connections default to non-persisted
# Some HTTP/1.0 implementations have support for persistent
# connections, using rules different than HTTP/1.1.
if self.evented: # server sent events
self.persisted = True
# For older HTTP, Keep-Alive indicates persistent connection.
elif self.headers.get("keep-alive"):
self.persisted = True
# At least Akamai returns a "Connection: Keep-Alive" header,
# which was supposed to be sent by the client.
elif connection and "keep-alive" in connection.lower():
self.persisted = True
else: # Proxy-Connection is a netscape hack.
proxy = self.headers.get("proxy-connection")
if proxy and "keep-alive" in proxy.lower():
self.persisted = True
[docs] def parseHead(self):
"""
Generator to parse headers in heading of .msg
Yields None if more to parse
Yields True if done parsing
"""
if self.headed:
return # already parsed the head
self.headers = help.Hict()
# create generator
lineParser = httping.parseLine(raw=self.msg, eols=(CRLF, LF), kind="status line")
while True: # parse until we get a non-100 status
if self.closed and not self.msg: # connection closed prematurely
raise httping.PrematureClosure("Connection closed unexpectedly"
" while parsing response start line")
line = next(lineParser)
if line is None:
(yield None)
continue
lineParser.close() # close generator
version, status, reason = httping.parseStatusLine(line)
if status != httping.CONTINUE: # 100 continue (with request or ignore)
break
leaderParser = httping.parseLeader(raw=self.msg,
eols=(CRLF, LF),
kind="continue header line")
while True:
if self.closed and not self.msg: # connection closed prematurely
raise httping.PrematureClosure("Connection closed unexpectedly"
" while parsing response header")
headers = next(leaderParser)
if headers is not None:
leaderParser.close()
break
(yield None)
self.code = self.status = status
self.reason = reason.strip()
if version in ("HTTP/1.0", "HTTP/0.9"):
# Some servers might still return "0.9", treat it as 1.0 anyway
self.version = (1, 0)
elif version.startswith("HTTP/1."):
self.version = (1, 1) # use HTTP/1.1 code for HTTP/1.x where x>=1
else:
raise httping.UnknownProtocol(version)
leaderParser = httping.parseLeader(raw=self.msg,
eols=(CRLF, LF),
kind="leader header line")
while True:
if self.closed and not self.msg: # connection closed prematurely
raise httping.PrematureClosure("Connection closed unexpectedly"
" while parsing response header")
headers = next(leaderParser)
if headers is not None:
leaderParser.close()
break
(yield None)
self.headers.update(headers)
# are we using the chunked-style of transfer encoding?
transferEncoding = self.headers.get("transfer-encoding")
if transferEncoding and transferEncoding.lower() == "chunked":
self.chunked = True
else:
self.chunked = False
# NOTE: RFC 2616, S4.4, #3 says ignore if transfer-encoding is "chunked"
contentLength = self.headers.get("content-length")
if contentLength and not self.chunked:
try:
self.length = int(contentLength)
except ValueError:
self.length = None
else:
if self.length < 0: # ignore nonsensical negative lengths
self.length = None
else:
self.length = None
# does the body have a fixed length? (of zero)
if ((self.status == httping.NO_CONTENT or self.status == httping.NOT_MODIFIED) or
(100 <= self.status < 200) or # 1xx codes
(self.method == "HEAD")):
self.length = 0
contentType = self.headers.get("content-type")
if contentType:
if u';' in contentType: # should also parse out charset for decoding
contentType, sep, encoding = contentType.rpartition(u';')
if encoding:
self.encoding = encoding
if 'text/event-stream' in contentType.lower():
self.evented = True
self.eventSource = httping.EventSource(raw=self.body,
events=self.events,
dictable=self.dictable)
else:
self.evented = False
if 'application/json' in contentType.lower():
self.jsoned = True
else:
self.jsoned = False
# Should connection be kept open until server closes
self.checkPersisted() # sets .persisted
if self.status in (httping.MULTIPLE_CHOICES,
httping.MOVED_PERMANENTLY,
httping.FOUND,
httping.SEE_OTHER,
httping.TEMPORARY_REDIRECT):
self.redirectant = True
self.headed = True
yield True
return
[docs] def parseBody(self):
"""
Parse body
"""
if self.bodied:
return # already parsed the body
if self.length and self.length < 0:
raise ValueError("Invalid content length of {0}".format(self.length))
del self.body[:] # self.body.clear() clear body python2 bytearrays don't clear
if self.chunked: # content-length is ignored if chunked
self.parms = dict()
while True: # parse all chunks here
chunkParser = httping.parseChunk(raw=self.msg)
while True: # parse another chunk
if self.closed and not self.msg: # connection closed prematurely
raise httping.PrematureClosure("Connection closed "
"unexpectedly while parsing response body chunk")
result = next(chunkParser)
if result is not None:
chunkParser.close()
break
(yield None)
size, parms, trails, chunk = result
if parms: # chunk extension parms
self.parms.update(parms)
if size: # size non zero so append chunk but keep iterating
self.body.extend(chunk)
if self.evented:
self.eventSource.parse() # parse events here
if (self.eventSource.retry is not None and
self.retry != self.eventSource.retry):
self.retry = self.eventSource.retry
if (self.eventSource.leid is not None and
self.leid != self.eventSource.leid):
self.leid = self.eventSource.leid
if self.closed and not self.msg: # no more data so finish
chunkParser.close()
break
else: # last chunk when empty chunk so done
if trails:
self.trails = trails
chunkParser.close()
break
elif self.length != None: # known content length
while len(self.msg) < self.length:
if self.closed and not self.msg: # connection closed prematurely
raise httping.PrematureClosure("Connection closed unexpectedly"
" while parsing response body")
(yield None)
self.body = self.msg[:self.length]
del self.msg[:self.length]
else: # unknown content length so parse forever until closed
while True:
if self.msg:
self.body.extend(self.msg[:])
del self.msg[:] # python2 bytearrays dont have clear self.msg.clear()
if self.evented:
self.eventSource.parse() # parse events here
if (self.eventSource.retry is not None and
self.retry != self.eventSource.retry):
self.retry = self.eventSource.retry
if (self.eventSource.leid is not None and
self.leid != self.eventSource.leid):
self.leid = self.eventSource.leid
if self.closed and not self.msg: # no more data so finish
break
(yield None)
# only gets to here once content length has become finite
# closed, not chunked/streamed, or chunking/streaming has ended
self.length = len(self.body)
self.bodied = True
(yield True)
return
@contextmanager
[docs]def openClient(cls=None, **kwa):
"""
Wrapper to create and open HTTP Client instances
When used in with statement block, calls .close() on exit of with block
Parameters:
cls is Class instance of subclass instance
Usage:
with openClient() as client0:
client0.accept()
with openClient(cls=Client) as client0:
client0.accept()
"""
client = None
if cls is None:
cls = Client
try:
client = cls(**kwa)
client.reopen()
yield client
finally:
if client:
client.close()
[docs]class Client():
"""
Client class nonblocking HTTP client connection manager and HTTP client
request and response processor
"""
def __init__(self,
connector=None,
requester=None,
respondent=None,
name='',
uid=0,
bufsize=8096,
wl=None,
hostname='127.0.0.1',
port=None,
scheme=u'',
method=u'GET', # unicode
path=u'/', # unicode
headers=None,
qargs=None,
fragment=u'',
body=b'',
data=None,
fargs=None,
requests=None,
msg=None,
dictable=None,
events=None,
redirectable=True,
redirects=None,
responses=None,
**kwa):
"""
Initialization method for instance.
Parameters:
connector is instance of tcp.Client or tcp.ClientTls or None
requester is instance of Requester or None
respondent is instance of Respondent or None
name is user friendly name for connector (connection)
uid is unique identifier for connector (connection)
bufsize is buffer size for connector
wl is opened WireLog instance if any for connector
hostname is host address or hostname of remote server for connector
connector.hostname is used for requester
port = socket port of remote server for connector
connector.port is used for requester
kwa are passed as other init parameters to connector
scheme is http scheme for requester
method is http request method verb for requester and respondent
path is http url path section for requester
path may include scheme and netloc which takes priority
headers is dict of http headers if any for requester
qargs is dict of http query args if any for requester
fragment is http fragment if any for requester
body is byte or bytearray of request body for requester
data is dict of request body json if any for requester
fargs is dict of request body form args if any for requester
requests is deque of requests if any to be processed by requester
each request is dict
msg is bytearray of response msg to parse by respondent
dictable is Boolean flag for respondent
If True attempt to convert body from json
events is reference to deque of events if any processed by respondent
redirectable is Boolean to allow redirects to be processed by respondent
redirects is list of redirects if any processed by respondent
each redirect is dict
responses is deque of responses if any processed by respondent
each response is dict
**kwa are passed through to init .connector tcp.Client or tcp.ClientTLS
"""
# .requests is deque of dicts of request data
self.requests = requests if requests is not None else deque()
# .responses is deque of dicts of response data
self.responses = responses if responses is not None else deque()
# .redicrest is list of dicts of response data when response is redirect
self.redirects = redirects if redirects is not None else list()
# .events is deque of dicts of response server sent event data
self.events = events if events is not None else deque()
self.waited = False # Boolean True If sent request but waiting for response
self.latest = None # latest request dict from .requests in process if any
# see if path also includes scheme, netloc, host, port, query, fragment
splits = urlsplit(path)
scheme = splits.scheme or scheme # is scheme provided
scheme = scheme.lower()
if connector:
if isinstance(connector, tcp.ClientTls):
if scheme and scheme != u'https':
raise ValueError("Provided scheme '{0}' incompatible with connector".format(scheme))
secured = True
scheme = u'https'
defaultPort = 443
elif isinstance(connector, tcp.Client):
if scheme and scheme != u'http':
raise ValueError("Provided scheme '{0}' incompatible with connector".format(scheme))
secured = False
scheme = 'http'
defaultPort = 80
else:
raise ValueError("Invalid connector type {0}".format(type(connector)))
else:
scheme = u'https' if scheme == u'https' else u'http'
if scheme == u'https':
secured = True # use tls socket connection
defaultPort = 443
else:
secured = False # non tls socket connection
defaultPort = 80
hostname = splits.hostname or hostname # is host or port provided
port = splits.port or port # is port provided
hostname, port = httping.normalizeHostPort(host=hostname, port=port, defaultPort=defaultPort)
host = coring.normalizeHost(hostname)
ha = (host, port)
if connector:
if connector.hostname != hostname:
ValueError("Provided hostname '{0}' incompatible with connector".format(hostname))
if connector.ha != ha:
ValueError("Provided ha '{0}' incompatible with connector".format(hostname))
# at some point may want to support changing the hostname and ha of provided connector
if name:
connector.name = name
else:
if secured:
connector = tcp.ClientTls( name=name,
uid=uid,
host=hostname,
port=port,
bufsize=bufsize,
wl=wl,
**kwa)
else:
connector = tcp.Client( name=name,
uid=uid,
host=hostname,
port=port,
bufsize=bufsize,
wl=wl,
**kwa)
self.secured = secured
self.connector = connector
path = splits.path # only path component
query = splits.query # is query in original path
qargs = qargs or dict()
qargs, query = httping.updateQargsQuery(qargs, query)
fragment = splits.fragment or fragment # fragment in path prioritized
if requester is None:
requester = Requester(hostname=self.connector.hostname,
port=self.connector.port,
scheme=scheme,
method=method,
path=path, # unicode
headers=headers,
qargs=qargs,
fragment=fragment,
body=body,
data=data,
fargs=fargs)
else:
requester.reinit(hostname=self.connector.hostname,
port=self.connector.port,
scheme=scheme,
method=method,
path=path,
qargs=qargs,
fragment=fragment,
headers=headers,
body=body,
data=data,
fargs=fargs)
self.requester = requester
if respondent is None:
respondent = Respondent(msg=self.connector.rxbs,
method=method,
dictable=dictable,
events=self.events,
redirectable=redirectable,
redirects=self.redirects)
else:
# do we need to assign the events, redirects also?
respondent.reinit(msg=self.connector.rxbs,
method=method,
dictable=dictable,
redirectable=redirectable)
self.respondent = respondent
[docs] def wind(self, tymth):
"""
Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base.
Updates winds .tymer .tymth
"""
if self.connector:
self.connector.wind(tymth)
[docs] def reopen(self):
"""
Return result of .connector.reopen()
"""
return self.connector.reopen()
[docs] def close(self):
"""
Call .connecter.close (shutclose)
"""
self.connector.close()
@staticmethod
[docs] def attrify(response):
"""
Convert response dict into named tuple Response so can access fields
as attributes
"""
return Response(**{k:response[k] for k in Response._fields})
[docs] def respond(self):
"""
Pops and returns next response from .responses if any
Otherwise returns None
"""
if self.responses:
return self.attrify(self.responses.popleft())
return None
[docs] def request(self,
method=None,
path=None,
qargs=None,
fragment=None,
headers=None,
body=None,
data=None,
fargs=None,
reply=None,
**kwa):
"""
Create and append request dict onto .requests with updated values from
parameters. Use existing .requester values if not provided except for
body. Body/Data/fargs must be newly provided.
This is a differential request that reuses latest values if not changed.
Requires that patron already be setup with scheme host port
request = dict([('method', method),
('path', path),
('qargs', dict([("auth", self.token.value)])),
('headers', dict([('Accept', 'application/json'),
('Connection', 'Keep-Alive'),
('Keep-Alive', 'timeout=60, max=100'),
])),
('body', body),
('reply', dict([('rid', rid), ('rdeck', replies)])),
])
self.patron.value.requests.append(request)
"""
request = dict()
request['method'] = method.upper() if method is not None else self.requester.method
request["path"] = path if path is not None else self.requester.path
request["qargs"] = qargs if qargs is not None else self.requester.qargs.copy()
request["fragment"] = fragment if qargs is not None else self.requester.fragment
request["headers"] = help.Hict(headers) if headers is not None else self.requester.headers.copy()
request["body"] = body if body is not None else b''
if body is not None: # body should be bytes
if isinstance(body, str):
# RFC 2616 Section 3.7.1 default charset of iso-8859-1.
body = body.encode('iso-8859-1')
else:
body = b''
request["body"] = body
request["data"] = data
request["fargs"] = fargs
# not sent but supports associating requests with responses
if reply is not None:
request["reply"] = reply
for k, v in kwa.items(): # extra stuff not sent
if v is not None:
request[k] = v
self.requests.append(request)
[docs] def transmit(self,
method=None,
path=None,
qargs=None,
fragment=None,
headers=None,
body=None,
data=None,
fargs=None,
**kwa):
"""
Rebuild and transmit request
Assumes that .waited is not True. Do not use bare unless know that there
is no current request/reponse in process otherwise it clobbers .requester
attributes
If the parameters are all None then use existing
.requester and .respondent attributes otherwise reinit .requester and
.respondent if method is not None
Should only use with all None first time after that
change one of the parameters.
"""
self.waited = True
reinit = any(True for parm in (method, path, qargs, fragment, headers,
body, data, fargs)
if parm is not None)
if reinit:
# rebuild calls reinit to enable repeated requests same destination
request = self.requester.rebuild(method=method,
path=path,
qargs=qargs,
fragment=fragment,
headers=headers,
body=body,
data=data,
fargs=fargs)
else:
request = self.requester.build()
self.connector.tx(request)
if method is not None:
self.respondent.reinit(method=self.requester.method)
else:
self.respondent.reinit() # reset code status reason
[docs] def redirect(self):
"""
Perform redirect
"""
if self.redirects:
redirect = self.redirects[-1]
location = redirect['headers'].get('location')
path, sep, query = location.partition('?')
path = unquote(path)
if sep:
location = sep.join([path, query])
else:
location = path
splits = urlsplit(location)
hostname = splits.hostname
port = splits.port
scheme = splits.scheme
scheme = 'https' if scheme.lower() == 'https' else 'http'
if scheme == 'https':
secured = True # use tls socket connection
defaultPort = 443
else:
secured = False # non tls socket connection
defaultPort = 80
hostname, port = httping.normalizeHostPort(hostname, port=port, defaultPort=defaultPort)
path = splits.path
query = splits.query
fragment = splits.fragment
method = redirect.get('method')
host = coring.normalizeHost(hostname)
ha = (host, port)
if ha != self.connector.ha or scheme != self.requester.scheme:
if self.requester.scheme == 'https' and scheme != 'https':
raise ValueError("Attempt to redirect to non secure "
"host '{0}'".format(location))
self.connector.close()
if secured:
context = getattr(self.connector, 'context')
connector = tcp.ClientTls(tymth=self.connector.tymth,
ha=(hostname, port),
bufsize=self.connector.bs,
wl=self.connector.wl,
context=context)
else:
connector = tcp.Client(tymth=self.connector.tymth,
ha=(hostname, port),
bufsize=self.connector.bs,
wl=self.connector.wl,)
self.secured = secured
self.connector = connector
self.connector.reopen()
self.requester.reinit(hostname=hostname,
port=port,
scheme=scheme)
self.respondent.reinit(msg=self.connector.rxbs,
method=method)
qargs = dict()
qargs, query = httping.updateQargsQuery(qargs, query)
self.transmit(method=method, path=path, qargs=qargs, fragment=fragment)
self.respondent.redirectant = False
self.respondent.redirected = True
self.respondent.ended = False # since redirecting not done
[docs] def serviceRequests(self):
"""
Service requests deque
"""
if not self.waited:
if self.requests:
self.latest = request = self.requests.popleft()
# future check host port scheme if need to reconnect on new ha
# reconnect here
self.transmit(**request) # expand items in request
[docs] def serviceResponse(self):
"""
Service Rx on connection and parse
"""
self.connector.serviceReceives()
if self.waited:
try:
self.respondent.parse()
except httping.HTTPException as ex:
self.respondent.errored = True
self.respondent.error = str(ex)
self.respondent.ended = True
if self.respondent.ended:
self.respondent.dictify()
if not self.respondent.evented:
if self.latest: # use saved request attribute
request = copy.copy(self.latest)
request.update([
('host', self.requester.hostname),
('port', self.requester.port),
('scheme', self.requester.scheme),
('method', self.requester.method),
('path', self.requester.path),
('fragment', self.requester.fragment),
('qargs', copy.copy(self.requester.qargs)),
('headers', copy.copy(self.requester.headers)),
('body', self.requester.body),
('data', self.requester.data),
('fargs', copy.copy(self.requester.fargs)),
])
self.latest = None
else:
request = dict([
('host', self.requester.hostname),
('port', self.requester.port),
('scheme', self.requester.scheme),
('method', self.requester.method),
('path', self.requester.path),
('fragment', self.requester.fragment),
('qargs', copy.copy(self.requester.qargs)),
('headers', copy.copy(self.requester.headers)),
('body', self.requester.body),
('data', self.requester.data),
('fargs', copy.copy(self.requester.fargs)),
])
response = dict([('version', self.respondent.version),
('status', self.respondent.status),
('reason', self.respondent.reason),
('headers', copy.copy(self.respondent.headers)),
('body', self.respondent.body),
('data', self.respondent.data),
('request', request),
('errored', self.respondent.errored),
('error', self.respondent.error),
])
if self.respondent.redirectable and self.respondent.redirectant:
self.redirects.append(copy.copy(response))
self.redirect()
else:
if self.redirects:
response['redirects'] = copy.copy(self.redirects)
self.redirects = []
self.responses.append(response)
self.waited = False
self.respondent.makeParser() #set up for next time
[docs] def service(self):
"""
Service request response
"""
if self.connector.cutoff:
if self.respondent:
self.respondent.close() # close any pending or current response parsing
if self.connector.reconnectable: # useful for server sent event stream
if self.connector.tymeout > 0.0 and self.connector.tymer.expired: # timed out
self.connector.reopen()
if self.respondent.evented:
duration = float(self.respondent.retry) / 1000.0 # convert to seconds
else:
duration = None # reused current duration
self.connector.tymer.restart(duration=duration)
if not self.connector.connected:
self.connector.serviceConnect()
if self.connector.connected:
if self.respondent:
if self.respondent.evented and self.respondent.leid is not None: # update Last-Event-ID header
self.requester.headers['Last-Event-ID'] = self.respondent.leid
self.connector.txbs.clear() # remove any stale request leftovers
self.transmit() # rebuilds and queues up most recent http request here
self.serviceRequests()
self.connector.serviceSends()
self.serviceResponse()
[docs] def serviceWhileGen(self, tymeout=0.5):
"""
Generator Method.
ServiceAll while pending requests or not a response or not tymeout
Usage:
response = yield from .serviceWhileGen(tymeout=0.5)
Runs one iteration of .service() on next and yields empty
bytes while not done.
Assumes associated tymist is advanced and realtime sleep delay
is incurred elsewhere.
Returns response as namedtuple or None if tymeout.
"""
tymer = tyming.Tymer(tymth=self.tymth, duration=tymeout)
while ((self.requests or self.connector.txbs or not self.responses)
and not tymer.expired):
try:
self.service()
except Exception as ex:
logger.error("Error: Servicing HTTP Client '%s'."
" '%s'\n", self.connector.name, ex)
raise ex
yield b'' # this is eventually yielded by wsgi app while waiting
return self.respond()
[docs]def backendRequest(tymth, *,
method=u'GET',
scheme=u'', #default if not in path
host=u'localhost', # default if not in path
port=None, # default if not in path
path=u'/',
qargs=None,
data=None,
buffered=False,
tymeout=2.0,
):
"""
Perform Async ReST request to Backend Server
Parameters:
Usage: (Inside a generator function)
response = yield from backendRequest()
response is the response if valid else None
before response is completed the yield from yields up an empty string ''
once completed then response has a value
path can be full url with host port etc path takes precedence over others
"""
try:
if buffered:
wl = wiring.WireLog(name='backend', filed=True, samed=True)
wl.reopen()
else:
wl = None
headers = help.Hict([('Accept', 'application/json'),
('Connection', 'close')])
client = Client(bufsize=131072,
wl=wl,
scheme=scheme,
hostname=host,
port=port,
method=method,
path=path,
qargs=qargs,
headers=headers,
data=data,
tymeout=tymeout,
reconnectable=False,
tymth=tymth,
)
client.transmit()
# assumes store clock is advanced elsewhere
tymer = tyming.Tymer(tymth=tymth, duration=tymeout)
while ((client.requests or client.connector.txbs or not client.responses)
and not tymer.expired):
try:
client.service()
except Exception as ex:
logger.error("Error: Servicing backend client. '{0}'\n".format(ex))
raise ex
yield b'' # yield from yields up to wsgi app while waiting
response = None # in case timed out
if client.responses:
response = client.responses.popleft()
finally:
client.close()
if wl:
wl.close()
return response
[docs]class ClientDoer(doing.Doer):
"""
HTTP Client Doer
See Doer for inherited attributes, properties, and methods.
Attributes:
.client is HTTP Client instance
"""
def __init__(self, client, **kwa):
"""
Initialize instance.
Parameters:
client is HTTP Client instance
"""
super(ClientDoer, self).__init__(**kwa)
self.client = client
if self.tymth:
self.client.wind(self.tymth)
[docs] def wind(self, tymth):
"""
Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base.
Updates winds .tymer .tymth
"""
super(ClientDoer, self).wind(tymth)
self.client.wind(tymth)
[docs] def enter(self):
""""""
self.client.reopen()
[docs] def recur(self, tyme):
""""""
self.client.service()
[docs] def exit(self):
""""""
self.client.close()