Source code for hio.core.tcp.clienting

# -*- encoding: utf-8 -*-
"""
hio.core.tcp.clienting Module
"""

import sys
import os
import errno
import socket
import ssl

from contextlib import contextmanager


from ... import help
from ...base import tyming, doing
from .. import coring, wiring


[docs]logger = help.ogler.getLogger()
@contextmanager
[docs]def openClient(cls=None, **kwa): """ Wrapper to create and open TCP Client instances When used in with statement block, calls .close() on exit of with block Parameters: cls is Class instance of subclass instance Usage: with openClient() as client0: client0.accept() with openClient(cls=ClientTls) as client0: client0.accept() """ if cls is None: cls = Client try: client = cls(**kwa) client.reopen() yield client finally: client.close()
[docs]class Client(tyming.Tymee): """ Nonblocking TCP Socket Client Class. See tyming.Tymee for inherited attributes, properties, and methods Attributes: Properties: Methods: """
[docs] Tymeout = 0.0 # tymeout in seconds, tymeout of 0.0 means ignore tymeout
[docs] Reconnectable = False # auto reconnect flag
def __init__(self, tymeout=None, ha=None, host='127.0.0.1', port=56000, reconnectable=None, bs=8096, txbs=None, rxbs=None, wl=None, **kwa): """ Initialization method for instance. Parameters: tymth is injected function wrapper closure returned by .tymen() of Tymist instance. Calling tymth() returns associated Tymist .tyme. tymeout = auto reconnect retry tymeout ha = host address duple (host, port) of remote server host = host address or tcp server to connect to port = socket port reconnectable = Boolean retry auto reconnect if timed out bs = buffer size txbs = bytearray of data to send rxbs = bytearray of data received wl = WireLog object if any """ super(Client, self).__init__(**kwa) self.tymeout = tymeout if tymeout is not None else self.Tymeout self.tymer = tyming.Tymer(tymth=self.tymth, duration=self.tymeout) # reconnect retry timer self.reinitHostPort(ha=ha, hostname=host, port=port) self.ha = ha or (host, port) host, port = self.ha self.hostname = host # host domain name host = coring.normalizeHost(host) # ip host address self.ha = (host, port) self.cs = None # connection socket self.ca = (None, None) # host address of local connection self._accepted = False # attribute to support accepted property self.cutoff = False # True when detect connection closed on far side self.reconnectable = reconnectable if reconnectable is not None else self.Reconnectable self.opened = False self.bs = bs self.txbs = txbs if txbs is not None else bytearray() # byte array of data to send self.rxbs = rxbs if rxbs is not None else bytearray() # byte array of data recieved self.wl = wl @property
[docs] def host(self): """ Property that returns host in .ha duple """ return self.ha[0]
@host.setter def host(self, value): """ setter for host property """ self.ha = (value, self.port) @property
[docs] def port(self): """ Property that returns port in .ha duple """ return self.ha[1]
@port.setter def port(self, value): """ setter for port property """ self.ha = (self.host, value) @property
[docs] def accepted(self): """ Property that returns accepted state of TCP socket """ return self._accepted
@accepted.setter def accepted(self, value): """ setter for accepted property """ self._accepted = value @property
[docs] def connected(self): """ Property that returns connected state of TCP socket Non-tls tcp is connected when accepted """ return self.accepted
@connected.setter def connected(self, value): """ setter for connected property """ self.accepted = value
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(Client, self).wind(tymth) self.tymer.wind(tymth)
[docs] def reinitHostPort(self, ha=None, hostname=u'127.0.0.1', port=56000): """ Reinit self.ha and self.hostname from ha = (host, port) or hostname port self.ha is of form (host, port) where host is either dns name or ip address self.hostname is hostname as dns name host eventually is host ip address output from normalizeHost() """ self.ha = ha or (hostname, port) hostname, port = self.ha self.hostname = hostname # host domain name host = coring.normalizeHost(hostname) # ip host address self.ha = (host, port)
[docs] def actualBufSizes(self): """ Returns duple of the the actual socket send and receive buffer size (send, receive) """ if not self.cs: return (0, 0) return (self.cs.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF), self.cs.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
[docs] def open(self): """ Opens connection socket in non blocking mode. if socket not closed properly, binding socket gets error OSError: (48, 'Address already in use') """ self.accepted = False self.connected = False self.cutoff = False #create connection socket self.cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # make socket address reusable. # the SO_REUSEADDR flag tells the kernel to reuse a local socket in # TIME_WAIT state, without waiting for its natural timeout to expire. self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Linux TCP allocates twice the requested size if sys.platform.startswith('linux'): bs = 2 * self.bs # get size is twice the set size else: bs = self.bs if self.cs.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) < bs: self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.bs) if self.cs.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) < bs: self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.bs) self.cs.setblocking(0) #non blocking socket self.opened = True return True
[docs] def reopen(self): """ Idempotently opens socket """ self.close() return self.open()
[docs] def shutdown(self, how=socket.SHUT_RDWR): """ Shutdown connected socket .cs """ if self.cs: try: self.cs.shutdown(how) # shutdown socket except OSError as ex: pass
[docs] def shutdownSend(self): """ Shutdown send on connected socket .cs """ if self.cs: try: self.shutdown(how=socket.SHUT_WR) # shutdown socket except OSError as ex: pass
[docs] def shutdownReceive(self): """ Shutdown receive on connected socket .cs """ if self.cs: try: self.shutdown(how=socket.SHUT_RD) # shutdown socket except OSError as ex: pass
[docs] def close(self): """ Shutdown and close connected socket .cs """ if self.cs: self.shutdown() self.cs.close() #close socket self.cs = None self.accepted = False self.connected = False self.opened = False
[docs] def refresh(self): """ Restart timer """ self.tymer.restart()
[docs] def accept(self): """ Attempt nonblocking acceptance connect to .ha Returns True if successful Returns False if not so try again later """ if not self.cs: self.reopen() try: result = self.cs.connect_ex(self.ha) # async connect except OSError as ex: logger.error("OSError = %s\n", ex) raise if result not in [0, errno.EISCONN]: # not connected if result in (errno.EINVAL, errno.ECONNREFUSED): # server not listening so must reopen self.reopen() return False # try again later # now self.cs has new virtual port see self.cs.getsockname() self.ca = self.cs.getsockname() # resolved local connection address # self.cs.getpeername() is self.ha self.ha = self.cs.getpeername() # resolved remote connection address self.accepted = True # also sets .connected == True self.cutoff = False return True
[docs] def connect(self): """ Attempt nonblocking connect to .ha Returns True if successful Returns False if not so try again later For non-TLS tcp connect is done when accepted This is placeholder for subclass Tls """ return self.accept()
[docs] def serviceConnect(self): """ Service connection attempt If not already connected make a nonblocking attempt Returns .connected """ if not self.connected: self.connect() # if successful sets .accepted .connected to True if not self.connected and self.reconnectable: if self.tymeout > 0.0 and self.tymer.expired: # timed out self.reopen() self.tymer.restart() return self.connected
[docs] def receive(self): """ Perform non blocking receive from connected socket .cs If no data then returns None If connection closed then returns empty Otherwise returns data data is string in python2 and bytes in python3 """ try: data = self.cs.recv(self.bs) except OSError as ex: # ex.args[0] == ex.errno for better os compatibility. # the value of a given errno.XXXXX may be different on each os # EAGAIN: BSD 35, Linux 11, Windows 11 # EWOULDBLOCK: BSD 35 Linux 11 Windows 140 if ex.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK): return None # blocked waiting for data elif ex.args[0] in (errno.ECONNRESET, errno.ENETRESET, errno.ENETUNREACH, errno.EHOSTUNREACH, errno.ENETDOWN, errno.EHOSTDOWN, errno.ETIMEDOUT, errno.ECONNREFUSED): self.cutoff = True # this signals need to close/reopen connection return bytes() # data empty else: logger.error("Error: Receive on HTTP Client '%s'." " '%s'\n", self.ha, ex) raise # re-raise if data: # connection open if self.wl: # log over the wire rx self.wl.writeRx(data, self.ha) else: # data empty so connection closed on other end, whereas see above for blocked self.cutoff = True return data
[docs] def serviceReceives(self): """ Service receives until no more """ while self.connected and not self.cutoff: data = self.receive() if not data: break self.rxbs.extend(data)
[docs] def serviceReceiveOnce(self): ''' Retrieve from server only one reception ''' if self.connected and not self.cutoff: data = self.receive() if data: self.rxbs.extend(data)
[docs] def clearRxbs(self): """ Clear .rxbs """ del self.rxbs[:]
[docs] def send(self, data): """ Perform non blocking send on connected socket .cs. Return number of bytes sent data is string in python2 and bytes in python3 """ try: count = self.cs.send(data) # result is number of bytes sent except OSError as ex: # ex.args[0] == ex.errno for better os compatibility. # the value of a given errno.XXXXX may be different on each os # EAGAIN: BSD 35, Linux 11, Windows 11 # EWOULDBLOCK: BSD 35 Linux 11 Windows 140 if ex.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK): count = 0 # blocked try again elif ex.args[0] in (errno.ECONNRESET, errno.ENETRESET, errno.ENETUNREACH, errno.EHOSTUNREACH, errno.ENETDOWN, errno.EHOSTDOWN, errno.ETIMEDOUT, errno.ECONNREFUSED): self.cutoff = True # this signals need to close/reopen connection count = 0 else: logger.error("Error: Send on HTTP Client '%s'." " '%s'\n", self.ha, ex) raise if count: if self.wl: self.wl.writeTx(data[:count], self.ha) return count
[docs] def tx(self, data): """ Copy data onto .txbs, .extend copies data. """ self.txbs.extend(data)
[docs] def serviceSends(self): """ Service sends (transmits) of data in .txbs bytearray Attempt to send all of .txbs. Delete what is actually sent. """ while self.txbs and self.connected and not self.cutoff: count = self.send(self.txbs) del self.txbs[:count] break # try again later
[docs] def service(self): """ Service connect, txbs, and receives. """ self.serviceConnect() self.serviceSends() self.serviceReceives()
[docs]class ClientTls(Client): """ Outgoer with Nonblocking TLS/SSL support Nonblocking TCP Socket Client Class. Attributes: Properties: Methods: """ def __init__(self, context=None, version=None, certify=None, hostify=None, certedhost="", keypath=None, certpath=None, cafilepath=None, **kwa): """ Initialization method for instance. IF no context THEN create one IF no version THEN create using library default IF certify is not None then use certify else use default IF hostify is not none the use hostify else use default Parameters: context = context object for tls/ssl If None use default version = ssl version If None use default certify = cert requirement If None use default ssl.CERT_NONE = 0 ssl.CERT_OPTIONAL = 1 ssl.CERT_REQUIRED = 2 keypath = pathname of local client side PKI private key file path If given apply to context certpath = pathname of local client side PKI public cert file path If given apply to context cafilepath = Cert Authority file path to use to verify server cert If given apply to context hostify = verify server hostName If None use default certedhost = server's certificate common name (hostname) to check against """ super(ClientTls, self).__init__(**kwa) self._connected = False # attributed supporting connected property if context is None: # create context if not version: # use default context context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH) hostify = hostify if hostify is not None else context.check_hostname context.check_hostname = hostify certify = certify if certify is not None else context.verify_mode context.verify_mode = certify else: # create context with specified protocol version context = ssl.SSLContext(version) # disable bad protocols versions context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 # disable compression to prevent CRIME attacks (OpenSSL 1.0+) context.options |= getattr(ssl._ssl, "OP_NO_COMPRESSION", 0) context.verify_mode = certify = ssl.CERT_REQUIRED if certify is None else certify context.check_hostname = hostify = True if hostify else False self.context = context self.certedhost = certedhost or self.hostname if cafilepath: context.load_verify_locations(cafile=cafilepath, capath=None, cadata=None) elif context.verify_mode != ssl.CERT_NONE: context.load_default_certs(purpose=ssl.Purpose.SERVER_AUTH) if keypath or certpath: context.load_cert_chain(certfile=certpath, keyfile=keypath) if hostify and certify == ssl.CERT_NONE: raise ValueError("Check Hostname needs a SSL context with " "either CERT_OPTIONAL or CERT_REQUIRED") @property
[docs] def connected(self): """ Property that returns connected state of TCP socket TLS tcp is connected when accepted and handshake completed """ return self._connected
@connected.setter def connected(self, value): """ setter for connected property """ self._connected = value
[docs] def close(self): """ Shutdown and close connected socket .cs """ if self.cs: self.shutdown() self.cs.close() #close socket self.cs = None self.accepted = False self.connected = False self.opened = False
[docs] def wrap(self): """ Wrap socket .cs in ssl context """ self.cs = self.context.wrap_socket(self.cs, server_side=False, do_handshake_on_connect=False, server_hostname=self.certedhost)
[docs] def handshake(self): """ Attempt nonblocking ssl handshake to .ha Returns True if successful Returns False if not so try again later """ try: self.cs.do_handshake() except OSError as ex: if ex.errno in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): return False elif ex.errno in (ssl.SSL_ERROR_EOF, ): self.close() raise # should give up here nicely else: self.close() raise except OSError as ex: self.close() if ex.errno in (errno.ECONNABORTED, ): raise # should give up here nicely raise except Exception as ex: self.close() raise self.connected = True return True
[docs] def connect(self): """ Attempt nonblocking connect to .ha Returns True if successful Returns False if not so try again later Connected when both accepted connection and TLS handshake complete """ if not self.accepted: self.accept() if self.accepted: # only do this once immediately after accepted if not self.certedhost: self.certedhost = self.ha[0] self.wrap() if self.accepted and not self.connected: self.handshake() return self.connected
[docs] def receive(self): """ Perform non blocking receive from connected socket .cs If no data then returns None If connection closed then returns '' Otherwise returns data data is string in python2 and bytes in python3 """ try: data = self.cs.recv(self.bs) except OSError as ex: # ssl.SSLError is a subtype of OSError # ex.args[0] == ex.errno for better os compatibility. # the value of a given errno.XXXXX may be different on each os if ex.args[0] in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): return None elif ex.args[0] in (errno.ECONNRESET, errno.ENETRESET, errno.ENETUNREACH, errno.EHOSTUNREACH, errno.ENETDOWN, errno.EHOSTDOWN, errno.ETIMEDOUT, errno.ECONNREFUSED, ssl.SSLEOFError): self.cutoff = True # this signals need to close/reopen connection return bytes() # data empty else: logger.error("Error: Receive on HTTP ClientTLS '%s'." " '%s'\n", self.ha, ex) raise # re-raise if data: # connection open if self.wl: # log over the wire rx self.wl.writeRx(data, self.ha) else: # data empty so connection closed on other end self.cutoff = True return data
[docs] def send(self, data): """ Perform non blocking send on connected socket .cs. Return number of bytes sent data is string in python2 and bytes in python3 """ try: result = self.cs.send(data) #result is number of bytes sent except OSError as ex: # ssl.SSLError is a subtype of OSError # ex.args[0] == ex.errno for better os compatibility. # the value of a given errno.XXXXX may be different on each os if ex.args[0] in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE): result = 0 elif ex.args[0] in (errno.ECONNRESET, errno.ENETRESET, errno.ENETUNREACH, errno.EHOSTUNREACH, errno.ENETDOWN, errno.EHOSTDOWN, errno.ETIMEDOUT, errno.ECONNREFUSED, ssl.SSLEOFError): self.cutoff = True # this signals need to close/reopen connection result = 0 else: logger.error("Error: Send on HTTP ClientTLS '%s'." " '%s'\n", self.ha, ex) raise if result: if self.wl: self.wl.writeTx(data[:result], self.ha) return result
[docs]class ClientDoer(doing.Doer): """ Basic TCP Client See Doer for inherited attributes, properties, and methods. Attributes: .client is TCP Client instance """ def __init__(self, client, **kwa): """ Initialize instance. Parameters: client is TCP Client instance """ super(ClientDoer, self).__init__(**kwa) self.client = client if self.tymth: self.client.wind(self.tymth)
[docs] def wind(self, tymth): """ Inject new tymist.tymth as new ._tymth. Changes tymist.tyme base. Updates winds .tymer .tymth """ super(ClientDoer, self).wind(tymth) self.client.wind(tymth)
[docs] def enter(self): """""" self.client.reopen()
[docs] def recur(self, tyme): """""" self.client.service()
[docs] def exit(self): """""" self.client.close()