145 lines
5.0 KiB
Python
145 lines
5.0 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
|
||
|
from collections import namedtuple
|
||
|
from contextlib import contextmanager
|
||
|
import logging
|
||
|
import serial
|
||
|
from threading import Lock
|
||
|
import time
|
||
|
import traceback
|
||
|
|
||
|
from odoo import _
|
||
|
from odoo.addons.hw_drivers.event_manager import event_manager
|
||
|
from odoo.addons.hw_drivers.driver import Driver
|
||
|
|
||
|
_logger = logging.getLogger(__name__)
|
||
|
|
||
|
SerialProtocol = namedtuple(
|
||
|
'SerialProtocol',
|
||
|
"name baudrate bytesize stopbits parity timeout writeTimeout measureRegexp statusRegexp "
|
||
|
"commandTerminator commandDelay measureDelay newMeasureDelay "
|
||
|
"measureCommand emptyAnswerValid")
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def serial_connection(path, protocol, is_probing=False):
|
||
|
"""Opens a serial connection to a device and closes it automatically after use.
|
||
|
|
||
|
:param path: path to the device
|
||
|
:type path: string
|
||
|
:param protocol: an object containing the serial protocol to connect to a device
|
||
|
:type protocol: namedtuple
|
||
|
:param is_probing: a flag thet if set to `True` makes the timeouts longer, defaults to False
|
||
|
:type is_probing: bool, optional
|
||
|
"""
|
||
|
|
||
|
PROBING_TIMEOUT = 1
|
||
|
port_config = {
|
||
|
'baudrate': protocol.baudrate,
|
||
|
'bytesize': protocol.bytesize,
|
||
|
'stopbits': protocol.stopbits,
|
||
|
'parity': protocol.parity,
|
||
|
'timeout': PROBING_TIMEOUT if is_probing else protocol.timeout, # longer timeouts for probing
|
||
|
'writeTimeout': PROBING_TIMEOUT if is_probing else protocol.writeTimeout # longer timeouts for probing
|
||
|
}
|
||
|
connection = serial.Serial(path, **port_config)
|
||
|
yield connection
|
||
|
connection.close()
|
||
|
|
||
|
|
||
|
class SerialDriver(Driver):
|
||
|
"""Abstract base class for serial drivers."""
|
||
|
|
||
|
_protocol = None
|
||
|
connection_type = 'serial'
|
||
|
|
||
|
STATUS_CONNECTED = 'connected'
|
||
|
STATUS_ERROR = 'error'
|
||
|
STATUS_CONNECTING = 'connecting'
|
||
|
|
||
|
def __init__(self, identifier, device):
|
||
|
""" Attributes initialization method for `SerialDriver`.
|
||
|
|
||
|
:param device: path to the device
|
||
|
:type device: str
|
||
|
"""
|
||
|
|
||
|
super(SerialDriver, self).__init__(identifier, device)
|
||
|
self._actions.update({
|
||
|
'get_status': self._push_status,
|
||
|
})
|
||
|
self.device_connection = 'serial'
|
||
|
self._device_lock = Lock()
|
||
|
self._status = {'status': self.STATUS_CONNECTING, 'message_title': '', 'message_body': ''}
|
||
|
self._set_name()
|
||
|
|
||
|
def _get_raw_response(connection):
|
||
|
pass
|
||
|
|
||
|
def _push_status(self):
|
||
|
"""Updates the current status and pushes it to the frontend."""
|
||
|
|
||
|
self.data['status'] = self._status
|
||
|
event_manager.device_changed(self)
|
||
|
|
||
|
def _set_name(self):
|
||
|
"""Tries to build the device's name based on its type and protocol name but falls back on a default name if that doesn't work."""
|
||
|
|
||
|
try:
|
||
|
name = ('%s serial %s' % (self._protocol.name, self.device_type)).title()
|
||
|
except Exception:
|
||
|
name = 'Unknown Serial Device'
|
||
|
self.device_name = name
|
||
|
|
||
|
def _take_measure(self):
|
||
|
pass
|
||
|
|
||
|
def _do_action(self, data):
|
||
|
"""Helper function that calls a specific action method on the device.
|
||
|
|
||
|
:param data: the `_actions` key mapped to the action method we want to call
|
||
|
:type data: string
|
||
|
"""
|
||
|
|
||
|
try:
|
||
|
with self._device_lock:
|
||
|
self._actions[data['action']](data)
|
||
|
time.sleep(self._protocol.commandDelay)
|
||
|
except Exception:
|
||
|
msg = _('An error occurred while performing action %s on %s', data, self.device_name)
|
||
|
_logger.exception(msg)
|
||
|
self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()}
|
||
|
self._push_status()
|
||
|
|
||
|
def action(self, data):
|
||
|
"""Establish a connection with the device if needed and have it perform a specific action.
|
||
|
|
||
|
:param data: the `_actions` key mapped to the action method we want to call
|
||
|
:type data: string
|
||
|
"""
|
||
|
|
||
|
if self._connection and self._connection.isOpen():
|
||
|
self._do_action(data)
|
||
|
else:
|
||
|
with serial_connection(self.device_identifier, self._protocol) as connection:
|
||
|
self._connection = connection
|
||
|
self._do_action(data)
|
||
|
|
||
|
def run(self):
|
||
|
"""Continuously gets new measures from the device."""
|
||
|
|
||
|
try:
|
||
|
with serial_connection(self.device_identifier, self._protocol) as connection:
|
||
|
self._connection = connection
|
||
|
self._status['status'] = self.STATUS_CONNECTED
|
||
|
self._push_status()
|
||
|
while not self._stopped.is_set():
|
||
|
self._take_measure()
|
||
|
time.sleep(self._protocol.newMeasureDelay)
|
||
|
except Exception:
|
||
|
msg = _('Error while reading %s', self.device_name)
|
||
|
_logger.exception(msg)
|
||
|
self._status = {'status': self.STATUS_ERROR, 'message_title': msg, 'message_body': traceback.format_exc()}
|
||
|
self._push_status()
|