Source code for hio.core.serial.serialing

"""
Asynchronous (nonblocking) serial io

"""
from __future__ import absolute_import, division, print_function

import sys
import os
import errno
from collections import deque


from ... import hioing, help
from ...base import doing

[docs]logger = help.ogler.getLogger()
[docs]class LineError(hioing.HioError):
""" Serial line error. Too big for buffer. Usage: raise LineError("error message") """
[docs]class Console(): """ Class to manage non blocking interactive I/O on serial console Opens non blocking read file descriptor on console Use instance method .close to close file descriptor Use instance methods .getline to read & .put to write to console Needs os module Attributes: bs (int): max buffer size for each read, defaults to 256 fd (int): file descriptor for console opened (bool): True means .fd opened, False means .fd closed rxbs (bytearray): of received characters (bytes) Methods: reopen (): closes and reopens .fd, sets .opened close (): closes .fd unsets .opened get (): returns chars including newline but no more than bs characters put (): puts characters Hidden: ._line is bytearray of line buffer """
[docs] MaxBufSize = 256
def __init__(self, bs=None): """ Initialization method for instance. Creates attributes. """ self.bs = bs if bs is not None else self.MaxBufSize self.fd = None # console file descriptor needs to be opened self.opened = False self.rxbs = bytearray()
[docs] def open(self, port=''): """ Opens fd on terminal console in non blocking mode. port is the serial port device path name or if '' then use os.ctermid() which returns path name of console usually '/dev/tty' os.O_NONBLOCK makes non blocking io os.O_RDWR allows both read and write. os.O_NOCTTY don't make this the controlling terminal of the process O_NOCTTY is only for cross platform portability BSD never makes it the controlling terminal Don't use print at same time since it will mess up non blocking reads. Works in both canonical and non-canonical input mode. In canonical mode, no chars are available to read until eol newline is entered and eol is included in the read characters. It appears that canonical mode is the default for fd console os.ctermid(). For other serial port fds the characters may be available immediately. To debug use os.isatty(fd) which returns True if the file descriptor fd is open and connected to a tty-like device, else False. """ if not port: port = os.ctermid() # default to console try: self.fd = os.open(port, os.O_NONBLOCK | os.O_RDWR | os.O_NOCTTY) except OSError as ex: logger.error("Error opening console serial port, %s\n", ex) return False self.opened = True return self.opened
[docs] def reopen(self): """ Idempotently opens console """ self.close() return self.open()
[docs] def close(self): """ Closes fd. """ if self.fd: os.close(self.fd) self.fd = None del self.rxbs[:] self.opened = False
[docs] def put(self, data = b'\n'): """ Writes data bytes to console and return number of bytes from data written. """ return (os.write(self.fd, data)) # returns number of bytes written
[docs] def get(self, bs=None): """ Gets nonblocking line of bytes from console of up to bs characters including eol newline if in bs characters otherwise must repeat get until a newline appears. Returns empty string if no characters available else returns line. Works in both canonical and non-canonical mode In canonical mode, no chars are available to read until eol newline is entered and eol is included in the read characters. Strips eol newline before returning line. """ bs = bs if bs is not None else self.bs line = bytearray() try: self.rxbs.extend(os.read(self.fd, bs)) except OSError as ex1: # if no chars available generates exception # ex.args[0] == ex.errno for better os compatibility # the value of a given errno.XXXXX may be different on each os # EAGAIN: BSD 35, Linux 11, Windows 11 # EWOULDBLOCK: BSD 35 Linux 11 Windows 140 if ex1.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK): pass # No characters available else: logger.error("Error: Get on Console '%s'." " '%s'\n", self.fd, ex1) raise # re-raise exception ex1 else: if (idx := self.rxbs.find(ord(b'\n'))) != -1: line.extend(self.rxbs[:idx]) # copy all but newline del self.rxbs[:idx+1] # delete including newline return line
[docs]class ConsoleDoer(doing.Doer): """ Basic Console Doer. Wraps console in doer context so opens and closes console To test in WingIde must configure Debug I/O to use external console See Doer for inherited attributes, properties, and methods. Attributes: .console is serial Console instance """ def __init__(self, console, **kwa): """ Initialize instance. Parameters: console is serial Console instance """ super(ConsoleDoer, self).__init__(**kwa) self.console = console
[docs] def enter(self): """""" result = self.console.reopen()
[docs] def exit(self): """""" self.console.close()
[docs]class EchoConsoleDoer(doing.Doer): """ Basic Terminal Console IO to buffers. Echos input back to output To test in WingIde must configure Debug I/O to use external console See Doer for inherited attributes, properties, and methods. Attributes: .console is serial Console instance """ def __init__(self, console, lines=None, txbs=None, **kwa): """ Initialize instance. Parameters: console is serial Console instance lines is deque of input bytes bytearrays of each line from console txbs is ouput bytes bytearray to send to console """ super(EchoConsoleDoer, self).__init__(**kwa) self.console = console self.lines = lines if lines is not None else deque() self.txbs = txbs if txbs is not None else bytearray()
[docs] def enter(self): """""" self.console.reopen() self.txbs.extend(b"Cmds: q=quit, h=help otherwise echoes.\n") self.txbs.extend(b"Type cmd & \n: ")
[docs] def recur(self, tyme): """""" done = False prompt = False while self.lines: line = self.lines.popleft() #process line here if line == b'q': self.txbs.extend(b"Goodbye\n.") done = True # all done so indicate exit break elif line == b'h': self.txbs.extend(b"Help: type q to quit or h for help.\n") else: self.txbs.extend(b"Echo: %s\n" % line ) prompt = True if prompt: self.txbs.extend(b"Type cmd & \n: ") if self.txbs: count = self.console.put(self.txbs) # write del self.txbs[:count] line = self.console.get() # read if line: self.lines.append(line) return done # keep going if done == False else ends
[docs] def exit(self): """""" self.console.close()
[docs]class Device(): """ Class to manage non blocking IO on serial device port. Opens non blocking read file descriptor on serial port Use instance method close to close file descriptor Use instance methods get & put to read & write to serial device Needs os module """ def __init__(self, port=None, speed=9600, bs=1024): """ Initialization method for instance. port = serial device port path string speed = serial port speed in bps bs = buffer size for reads """ self.fd = None # serial device port file descriptor, must be opened first self.port = port or os.ctermid() #default to console self.speed = speed or 9600 self.bs = bs or 1024 self.opened = False
[docs] def reopen(self, port=None, speed=None, bs=None): """ Idempotently open serial device port Opens fd on serial port in non blocking mode. port is the serial port device path name or if '' then use os.ctermid() which returns path name of console usually '/dev/tty' os.O_NONBLOCK makes non blocking io os.O_RDWR allows both read and write. os.O_NOCTTY don't make this the controlling terminal of the process O_NOCTTY is only for cross platform portability BSD never makes it the controlling terminal Don't use print and console at same time since it will mess up non blocking reads. The input mode, canonical or noncanonical, is controlled by the ICANON flag see termios module. Raw mode def setraw(fd, when=TCSAFLUSH): Put terminal into a raw mode. mode = tcgetattr(fd) mode[IFLAG] = mode[IFLAG] & ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON) mode[OFLAG] = mode[OFLAG] & ~(OPOST) mode[CFLAG] = mode[CFLAG] & ~(CSIZE | PARENB) mode[CFLAG] = mode[CFLAG] | CS8 mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON | IEXTEN | ISIG) mode[CC][VMIN] = 1 mode[CC][VTIME] = 0 tcsetattr(fd, when, mode) # set up raw mode / no echo / binary cflag |= (TERMIOS.CLOCAL|TERMIOS.CREAD) lflag &= ~(TERMIOS.ICANON|TERMIOS.ECHO|TERMIOS.ECHOE|TERMIOS.ECHOK|TERMIOS.ECHONL| TERMIOS.ISIG|TERMIOS.IEXTEN) #|TERMIOS.ECHOPRT for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk if hasattr(TERMIOS, flag): lflag &= ~getattr(TERMIOS, flag) oflag &= ~(TERMIOS.OPOST) iflag &= ~(TERMIOS.INLCR|TERMIOS.IGNCR|TERMIOS.ICRNL|TERMIOS.IGNBRK) if hasattr(TERMIOS, 'IUCLC'): iflag &= ~TERMIOS.IUCLC if hasattr(TERMIOS, 'PARMRK'): iflag &= ~TERMIOS.PARMRK """ self.close() if port is not None: self.port = port if speed is not None: self.speed = speed if bs is not None: self.bs = bs self.fd = os.open(self.port, os.O_NONBLOCK | os.O_RDWR | os.O_NOCTTY) system = platform.system() if (system == 'Darwin') or (system == 'Linux'): # use termios to set values import termios iflag, oflag, cflag, lflag, ispeed, ospeed, cc = range(7) settings = termios.tcgetattr(self.fd) #print(settings) settings[lflag] = (settings[lflag] & ~termios.ICANON) settings[lflag] = (settings[lflag] & ~termios.ECHO) # no echo #ignore carriage returns on input #settings[iflag] = (settings[iflag] | (termios.IGNCR)) #ignore cr # 8N1 8bit word no parity one stop bit nohardware handshake ctsrts # to set size have to mask out(clear) CSIZE bits and or in size settings[cflag] = ((settings[cflag] & ~termios.CSIZE) | termios.CS8) # no parity clear PARENB settings[cflag] = (settings[cflag] & ~termios.PARENB) #one stop bit clear CSTOPB settings[cflag] = (settings[cflag] & ~termios.CSTOPB) #no hardware handshake clear crtscts settings[cflag] = (settings[cflag] & ~termios.CRTSCTS) # in linux the speed flag does not equal value so always set it speedattr = "B{0}".format(self.speed) # convert numeric speed to attribute name string speed = getattr(termios, speedattr) settings[ispeed] = speed settings[ospeed] = speed termios.tcsetattr(self.fd, termios.TCSANOW, settings) #print(settings) self.opened = True return self.opened
[docs] def close(self): """ Closes fd. """ if self.fd: os.close(self.fd) self.fd = None self.opened = False
[docs] def receive(self): """ Reads nonblocking characters from serial device up to bs characters Returns empty bytes if no characters available else returns all available. In canonical mode no chars are available until newline is entered. """ data = b'' try: data = os.read(self.fd, self.bs) #if no chars available generates exception except OSError as ex1: # ex1 is the target instance of the exception if ex1.errno == errno.EAGAIN: #BSD 35, Linux 11 pass #No characters available else: logger.error("Error: Receive on Device '%s'." " '%s'\n", self.port, ex) raise #re raise exception ex1 return data
[docs] def send(self, data=b'\n'): """ Writes data bytes to serial device port. Returns number of bytes sent """ try: count = os.write(self.fd, data) except OSError as ex1: # ex1 is the target instance of the exception if ex1.errno == errno.EAGAIN: # BSD 35, Linux 11 count = 0 # buffer full can't write else: logger.error("Error: Send on Device '%s'." " '%s'\n", self.port, ex) raise #re raise exception ex1 return count
[docs]class Serial(): """ Class to manage non blocking IO on serial device port using pyserial Opens non blocking read file descriptor on serial port Use instance method close to close file descriptor Use instance methods get & put to read & write to serial device Needs os module """ def __init__(self, port=None, speed=9600, bs=1024): """ Initialization method for instance. port = serial device port path string speed = serial port speed in bps bs = buffer size for reads """ self.serial = None # Serial instance self.port = port or os.ctermid() #default to console self.speed = speed or 9600 self.bs = bs or 1024 self.opened = False
[docs] def reopen(self, port=None, speed=None, bs=None): """ Opens fd on serial port in non blocking mode. port is the serial port device path name or if None then use os.ctermid() which returns path name of console usually '/dev/tty' """ self.close() if port is not None: self.port = port if speed is not None: self.speed = speed if bs is not None: self.bs = bs import serial # import pyserial self.serial = serial.Serial(port=self.port, baudrate=self.speed, timeout=0, writeTimeout=0) #self.serial.nonblocking() self.serial.reset_input_buffer() self.opened = True return self.opened
[docs] def close(self): """ Closes .serial """ if self.serial: self.serial.reset_output_buffer() self.serial.close() self.serial = None self.opened = False
[docs] def receive(self): """ Reads nonblocking characters from serial device up to bs characters Returns empty bytes if no characters available else returns all available. In canonical mode no chars are available until newline is entered. """ data = b'' try: data = self.serial.read(self.bs) #if no chars available generates exception except OSError as ex1: # ex1 is the target instance of the exception if ex1.errno == errno.EAGAIN: #BSD 35, Linux 11 pass #No characters available else: logger.error("Error: Receive on Serial '%s'." " '%s'\n", self.port, ex) raise #re raise exception ex1 return data
[docs] def send(self, data=b'\n'): """ Writes data bytes to serial device port. Returns number of bytes sent """ try: count = self.serial.write(data) except OSError as ex1: # ex1 is the target instance of the exception if ex1.errno == errno.EAGAIN: #BSD 35, Linux 11 count = 0 # buffer full can't write else: logger.error("Error: Send on Serial '%s'." " '%s'\n", self.port, ex) raise #re raise exception ex1 return count
[docs]class Driver(): """ Nonblocking Serial Device Port Driver """ def __init__(self, name=u'', uid=0, port=None, speed=9600, bs=1024, server=None): """ Initialization method for instance. Parameters: name = user friendly name for driver uid = unique identifier for driver port = serial device port path string speed = serial port speed in bps canonical = canonical mode True or False bs = buffer size for reads server = serial port device server if any Attributes: name = user friendly name for driver uid = unique identifier for driver server = serial device server nonblocking txbs = bytearray of data bytes to send rxbs = bytearray of data bytes received """ self.name = name self.uid = uid if not server: try: import serial self.server = Serial(port=port, speed=speed, bs=bs) except ImportError as ex: logger.error("Error: importing pyserial\n%s\n", ex) self.server = Device(port=port, speed=speed, bs=bs) else: self.server = server self.txbs = bytearray() # bytearray of data to send self.rxbs = bytearray() # bytearray of data received
[docs] def serviceReceives(self): """ Service receives until no more """ while self.server.opened: data = self.server.receive() # bytes if not data: break self.rxbs.extend(data)
[docs] def clearRxbs(self): """ Clear .rxbs """ del self.rxbs[:]
[docs] def scan(self, start): """ Returns offset of given start byte in self.rxbs Returns None if start is not given or not found If strip then remove any bytes before offset """ offset = self.rxbs.find(start) if offset < 0: return None return offset
[docs] def send(self, data): """ Handle one tx data """ count = self.server.send(data) return count
[docs] def tx(self, data): """ Queue data onto .txbs """ self.txbs.extend(data)
[docs] def serviceSends(self): """ Service .txbs """ while self.txbs and self.server.opened: count = self.send(self.txbs) del self.txbs[:count] break # try again later
[docs] def service(self): """ Sevice receives and sends """ self.serviceReceives() self.serviceSends()