382 lines
15 KiB
Python
382 lines
15 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
|
||
|
from base64 import b64decode
|
||
|
from cups import IPPError, IPP_PRINTER_IDLE, IPP_PRINTER_PROCESSING, IPP_PRINTER_STOPPED
|
||
|
import dbus
|
||
|
import io
|
||
|
import logging
|
||
|
import netifaces as ni
|
||
|
import os
|
||
|
from PIL import Image, ImageOps
|
||
|
import re
|
||
|
import subprocess
|
||
|
from uuid import getnode as get_mac
|
||
|
|
||
|
from odoo import http
|
||
|
from odoo.addons.hw_drivers.connection_manager import connection_manager
|
||
|
from odoo.addons.hw_drivers.controllers.proxy import proxy_drivers
|
||
|
from odoo.addons.hw_drivers.driver import Driver
|
||
|
from odoo.addons.hw_drivers.event_manager import event_manager
|
||
|
from odoo.addons.hw_drivers.iot_handlers.interfaces.PrinterInterface_L import PPDs, conn, cups_lock
|
||
|
from odoo.addons.hw_drivers.main import iot_devices
|
||
|
from odoo.addons.hw_drivers.tools import helpers
|
||
|
|
||
|
_logger = logging.getLogger(__name__)
|
||
|
|
||
|
RECEIPT_PRINTER_COMMANDS = {
|
||
|
'star': {
|
||
|
'center': b'\x1b\x1d\x61\x01', # ESC GS a n
|
||
|
'cut': b'\x1b\x64\x02', # ESC d n
|
||
|
'title': b'\x1b\x69\x01\x01%s\x1b\x69\x00\x00', # ESC i n1 n2
|
||
|
'drawers': [b'\x07', b'\x1a'] # BEL & SUB
|
||
|
},
|
||
|
'escpos': {
|
||
|
'center': b'\x1b\x61\x01', # ESC a n
|
||
|
'cut': b'\x1d\x56\x41\n', # GS V m
|
||
|
'title': b'\x1b\x21\x30%s\x1b\x21\x00', # ESC ! n
|
||
|
'drawers': [b'\x1b\x3d\x01', b'\x1b\x70\x00\x19\x19', b'\x1b\x70\x01\x19\x19'] # ESC = n then ESC p m t1 t2
|
||
|
}
|
||
|
}
|
||
|
|
||
|
def cups_notification_handler(message, uri, device_identifier, state, reason, accepting_jobs):
|
||
|
if device_identifier in iot_devices:
|
||
|
reason = reason if reason != 'none' else None
|
||
|
state_value = {
|
||
|
IPP_PRINTER_IDLE: 'connected',
|
||
|
IPP_PRINTER_PROCESSING: 'processing',
|
||
|
IPP_PRINTER_STOPPED: 'stopped'
|
||
|
}
|
||
|
iot_devices[device_identifier].update_status(state_value[state], message, reason)
|
||
|
|
||
|
# Create a Cups subscription if it doesn't exist yet
|
||
|
try:
|
||
|
conn.getSubscriptions('/printers/')
|
||
|
except IPPError:
|
||
|
conn.createSubscription(
|
||
|
uri='/printers/',
|
||
|
recipient_uri='dbus://',
|
||
|
events=['printer-state-changed']
|
||
|
)
|
||
|
|
||
|
# Listen for notifications from Cups
|
||
|
bus = dbus.SystemBus()
|
||
|
bus.add_signal_receiver(cups_notification_handler, signal_name="PrinterStateChanged", dbus_interface="org.cups.cupsd.Notifier")
|
||
|
|
||
|
|
||
|
class PrinterDriver(Driver):
|
||
|
connection_type = 'printer'
|
||
|
|
||
|
def __init__(self, identifier, device):
|
||
|
super(PrinterDriver, self).__init__(identifier, device)
|
||
|
self.device_type = 'printer'
|
||
|
self.device_connection = device['device-class'].lower()
|
||
|
self.device_name = device['device-make-and-model']
|
||
|
self.state = {
|
||
|
'status': 'connecting',
|
||
|
'message': 'Connecting to printer',
|
||
|
'reason': None,
|
||
|
}
|
||
|
self.send_status()
|
||
|
|
||
|
self._actions.update({
|
||
|
'cashbox': self.open_cashbox,
|
||
|
'print_receipt': self.print_receipt,
|
||
|
'': self._action_default,
|
||
|
})
|
||
|
|
||
|
self.receipt_protocol = 'star' if 'STR_T' in device['device-id'] else 'escpos'
|
||
|
if 'direct' in self.device_connection and any(cmd in device['device-id'] for cmd in ['CMD:STAR;', 'CMD:ESC/POS;']):
|
||
|
self.print_status()
|
||
|
|
||
|
@classmethod
|
||
|
def supported(cls, device):
|
||
|
if device.get('supported', False):
|
||
|
return True
|
||
|
protocol = ['dnssd', 'lpd', 'socket']
|
||
|
if any(x in device['url'] for x in protocol) and device['device-make-and-model'] != 'Unknown' or 'direct' in device['device-class']:
|
||
|
model = cls.get_device_model(device)
|
||
|
ppdFile = ''
|
||
|
for ppd in PPDs:
|
||
|
if model and model in PPDs[ppd]['ppd-product']:
|
||
|
ppdFile = ppd
|
||
|
break
|
||
|
with cups_lock:
|
||
|
if ppdFile:
|
||
|
conn.addPrinter(name=device['identifier'], ppdname=ppdFile, device=device['url'])
|
||
|
else:
|
||
|
conn.addPrinter(name=device['identifier'], device=device['url'])
|
||
|
conn.setPrinterInfo(device['identifier'], device['device-make-and-model'])
|
||
|
conn.enablePrinter(device['identifier'])
|
||
|
conn.acceptJobs(device['identifier'])
|
||
|
conn.setPrinterUsersAllowed(device['identifier'], ['all'])
|
||
|
conn.addPrinterOptionDefault(device['identifier'], "usb-no-reattach", "true")
|
||
|
conn.addPrinterOptionDefault(device['identifier'], "usb-unidir", "true")
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
@classmethod
|
||
|
def get_device_model(cls, device):
|
||
|
device_model = ""
|
||
|
if device.get('device-id'):
|
||
|
for device_id in [device_lo for device_lo in device['device-id'].split(';')]:
|
||
|
if any(x in device_id for x in ['MDL', 'MODEL']):
|
||
|
device_model = device_id.split(':')[1]
|
||
|
break
|
||
|
elif device.get('device-make-and-model'):
|
||
|
device_model = device['device-make-and-model']
|
||
|
return re.sub("[\(].*?[\)]", "", device_model).strip()
|
||
|
|
||
|
@classmethod
|
||
|
def get_status(cls):
|
||
|
status = 'connected' if any(iot_devices[d].device_type == "printer" and iot_devices[d].device_connection == 'direct' for d in iot_devices) else 'disconnected'
|
||
|
return {'status': status, 'messages': ''}
|
||
|
|
||
|
def disconnect(self):
|
||
|
self.update_status('disconnected', 'Printer was disconnected')
|
||
|
super(PrinterDriver, self).disconnect()
|
||
|
|
||
|
def update_status(self, status, message, reason=None):
|
||
|
"""Updates the state of the current printer.
|
||
|
|
||
|
Args:
|
||
|
status (str): The new value of the status
|
||
|
message (str): A comprehensive message describing the status
|
||
|
reason (str): The reason fo the current status
|
||
|
"""
|
||
|
if self.state['status'] != status or self.state['reason'] != reason:
|
||
|
self.state = {
|
||
|
'status': status,
|
||
|
'message': message,
|
||
|
'reason': reason,
|
||
|
}
|
||
|
self.send_status()
|
||
|
|
||
|
def send_status(self):
|
||
|
""" Sends the current status of the printer to the connected Odoo instance.
|
||
|
"""
|
||
|
self.data = {
|
||
|
'value': '',
|
||
|
'state': self.state,
|
||
|
}
|
||
|
event_manager.device_changed(self)
|
||
|
|
||
|
def print_raw(self, data):
|
||
|
process = subprocess.Popen(["lp", "-d", self.device_identifier], stdin=subprocess.PIPE)
|
||
|
process.communicate(data)
|
||
|
if process.returncode != 0:
|
||
|
# The stderr isn't meaningful so we don't log it ('No such file or directory')
|
||
|
_logger.error('Printing failed: printer with the identifier "%s" could not be found',
|
||
|
self.device_identifier)
|
||
|
|
||
|
def print_receipt(self, data):
|
||
|
receipt = b64decode(data['receipt'])
|
||
|
im = Image.open(io.BytesIO(receipt))
|
||
|
|
||
|
# Convert to greyscale then to black and white
|
||
|
im = im.convert("L")
|
||
|
im = ImageOps.invert(im)
|
||
|
im = im.convert("1")
|
||
|
|
||
|
print_command = getattr(self, 'format_%s' % self.receipt_protocol)(im)
|
||
|
self.print_raw(print_command)
|
||
|
|
||
|
def format_star(self, im):
|
||
|
width = int((im.width + 7) / 8)
|
||
|
|
||
|
raster_init = b'\x1b\x2a\x72\x41'
|
||
|
raster_page_length = b'\x1b\x2a\x72\x50\x30\x00'
|
||
|
raster_send = b'\x62'
|
||
|
raster_close = b'\x1b\x2a\x72\x42'
|
||
|
|
||
|
raster_data = b''
|
||
|
dots = im.tobytes()
|
||
|
while len(dots):
|
||
|
raster_data += raster_send + width.to_bytes(2, 'little') + dots[:width]
|
||
|
dots = dots[width:]
|
||
|
|
||
|
return raster_init + raster_page_length + raster_data + raster_close
|
||
|
|
||
|
def format_escpos_bit_image_raster(self, im):
|
||
|
""" prints with the `GS v 0`-command """
|
||
|
width = int((im.width + 7) / 8)
|
||
|
|
||
|
raster_send = b'\x1d\x76\x30\x00'
|
||
|
max_slice_height = 255
|
||
|
|
||
|
raster_data = b''
|
||
|
dots = im.tobytes()
|
||
|
while len(dots):
|
||
|
im_slice = dots[:width*max_slice_height]
|
||
|
slice_height = int(len(im_slice) / width)
|
||
|
raster_data += raster_send + width.to_bytes(2, 'little') + slice_height.to_bytes(2, 'little') + im_slice
|
||
|
dots = dots[width*max_slice_height:]
|
||
|
|
||
|
return raster_data
|
||
|
|
||
|
def extract_columns_from_picture(self, im, line_height):
|
||
|
# Code inspired from python esc pos library:
|
||
|
# https://github.com/python-escpos/python-escpos/blob/4a0f5855ef118a2009b843a3a106874701d8eddf/src/escpos/image.py#L73-L89
|
||
|
width_pixels, height_pixels = im.size
|
||
|
for left in range(0, width_pixels, line_height):
|
||
|
box = (left, 0, left + line_height, height_pixels)
|
||
|
im_chunk = im.transform(
|
||
|
(line_height, height_pixels),
|
||
|
Image.EXTENT,
|
||
|
box
|
||
|
)
|
||
|
yield im_chunk.tobytes()
|
||
|
|
||
|
def format_escpos_bit_image_column(self, im, high_density_vertical=True,
|
||
|
high_density_horizontal=True,
|
||
|
size_scale=100):
|
||
|
""" prints with the `ESC *`-command
|
||
|
reference: https://reference.epson-biz.com/modules/ref_escpos/index.php?content_id=88
|
||
|
|
||
|
:param im: PIL image to print
|
||
|
:param high_density_vertical: print in high density in vertical direction
|
||
|
:param high_density_horizontal: print in high density in horizontal direction
|
||
|
:param size_scale: picture scale in percentage,
|
||
|
e.g: 50 -> half the size (horizontally and vertically)
|
||
|
"""
|
||
|
size_scale_ratio = size_scale / 100
|
||
|
size_scale_width = int(im.width * size_scale_ratio)
|
||
|
size_scale_height = int(im.height * size_scale_ratio)
|
||
|
im = im.resize((size_scale_width, size_scale_height))
|
||
|
# escpos ESC * command print column per column
|
||
|
# (instead of usual row by row).
|
||
|
# So we transpose the picture to ease the calculations
|
||
|
im = im.transpose(Image.ROTATE_270).transpose(Image.FLIP_LEFT_RIGHT)
|
||
|
|
||
|
# Most of the code here is inspired from python escpos library
|
||
|
# https://github.com/python-escpos/python-escpos/blob/4a0f5855ef118a2009b843a3a106874701d8eddf/src/escpos/escpos.py#L237C9-L251
|
||
|
ESC = b'\x1b'
|
||
|
density_byte = (1 if high_density_horizontal else 0) + \
|
||
|
(32 if high_density_vertical else 0)
|
||
|
nL = im.height & 0xFF
|
||
|
nH = (im.height >> 8) & 0xFF
|
||
|
HEADER = ESC + b'*' + bytes([density_byte, nL, nH])
|
||
|
|
||
|
raster_data = ESC + b'3\x10' # Adjust line-feed size
|
||
|
line_height = 24 if high_density_vertical else 8
|
||
|
for column in self.extract_columns_from_picture(im, line_height):
|
||
|
raster_data += HEADER + column + b'\n'
|
||
|
raster_data += ESC + b'2' # Reset line-feed size
|
||
|
return raster_data
|
||
|
|
||
|
def format_escpos(self, im):
|
||
|
# Epson support different command to print pictures.
|
||
|
# We use by default "GS v 0", but it is incompatible with certain
|
||
|
# printer models (like TM-U2x0)
|
||
|
# As we are pretty limited in the information that we have, we will
|
||
|
# use the printer name to parse some configuration value
|
||
|
# Printer name examples:
|
||
|
# EpsonTMM30
|
||
|
# -> Print using raster mode
|
||
|
# TM-U220__IMC_LDV_LDH_SCALE70__
|
||
|
# -> Print using column bit image mode (without vertical and
|
||
|
# horizontal density and a scale of 70%)
|
||
|
|
||
|
# Default image printing mode
|
||
|
image_mode = 'raster'
|
||
|
|
||
|
options_str = self.device_name.split('__')
|
||
|
option_str = ""
|
||
|
if len(options_str) > 2:
|
||
|
option_str = options_str[1].upper()
|
||
|
if option_str.startswith('IMC'):
|
||
|
image_mode = 'column'
|
||
|
|
||
|
if image_mode == 'column':
|
||
|
# Default printing mode parameters
|
||
|
high_density_vertical = True
|
||
|
high_density_horizontal = True
|
||
|
scale = 100
|
||
|
|
||
|
# Parse the printer name to get the needed parameters
|
||
|
# The separator need to not be filtered by `get_identifier`
|
||
|
options = option_str.split('_')
|
||
|
for option in options:
|
||
|
if option == 'LDV':
|
||
|
high_density_vertical = False
|
||
|
elif option == 'LDH':
|
||
|
high_density_horizontal = False
|
||
|
elif option.startswith('SCALE'):
|
||
|
scale_value_str = re.search(r'\d+$', option)
|
||
|
if scale_value_str is not None:
|
||
|
scale = int(scale_value_str.group())
|
||
|
else:
|
||
|
raise ValueError(
|
||
|
"Missing printer SCALE parameter integer "
|
||
|
"value in option: " + option)
|
||
|
|
||
|
res = self.format_escpos_bit_image_column(im,
|
||
|
high_density_vertical,
|
||
|
high_density_horizontal,
|
||
|
scale)
|
||
|
else:
|
||
|
res = self.format_escpos_bit_image_raster(im)
|
||
|
return res + RECEIPT_PRINTER_COMMANDS['escpos']['cut']
|
||
|
|
||
|
def print_status(self):
|
||
|
"""Prints the status ticket of the IoTBox on the current printer."""
|
||
|
wlan = ''
|
||
|
ip = ''
|
||
|
mac = ''
|
||
|
homepage = ''
|
||
|
pairing_code = ''
|
||
|
|
||
|
ssid = helpers.get_ssid()
|
||
|
wlan = '\nWireless network:\n%s\n\n' % ssid
|
||
|
|
||
|
interfaces = ni.interfaces()
|
||
|
ips = []
|
||
|
for iface_id in interfaces:
|
||
|
iface_obj = ni.ifaddresses(iface_id)
|
||
|
ifconfigs = iface_obj.get(ni.AF_INET, [])
|
||
|
for conf in ifconfigs:
|
||
|
if conf.get('addr') and conf.get('addr'):
|
||
|
ips.append(conf.get('addr'))
|
||
|
if len(ips) == 0:
|
||
|
ip = '\nERROR: Could not connect to LAN\n\nPlease check that the IoTBox is correc-\ntly connected with a network cable,\n that the LAN is setup with DHCP, and\nthat network addresses are available'
|
||
|
elif len(ips) == 1:
|
||
|
ip = '\nIP Address:\n%s\n' % ips[0]
|
||
|
else:
|
||
|
ip = '\nIP Addresses:\n%s\n' % '\n'.join(ips)
|
||
|
|
||
|
if len(ips) >= 1:
|
||
|
ips_filtered = [i for i in ips if i != '127.0.0.1']
|
||
|
main_ips = ips_filtered and ips_filtered[0] or '127.0.0.1'
|
||
|
mac = '\nMAC Address:\n%s\n' % helpers.get_mac_address()
|
||
|
homepage = '\nHomepage:\nhttp://%s:8069\n\n' % main_ips
|
||
|
|
||
|
code = connection_manager.pairing_code
|
||
|
if code:
|
||
|
pairing_code = '\nPairing Code:\n%s\n' % code
|
||
|
|
||
|
commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
|
||
|
title = commands['title'] % b'IoTBox Status'
|
||
|
self.print_raw(commands['center'] + title + b'\n' + wlan.encode() + mac.encode() + ip.encode() + homepage.encode() + pairing_code.encode() + commands['cut'])
|
||
|
|
||
|
def open_cashbox(self, data):
|
||
|
"""Sends a signal to the current printer to open the connected cashbox."""
|
||
|
commands = RECEIPT_PRINTER_COMMANDS[self.receipt_protocol]
|
||
|
for drawer in commands['drawers']:
|
||
|
self.print_raw(drawer)
|
||
|
|
||
|
def _action_default(self, data):
|
||
|
self.print_raw(b64decode(data['document']))
|
||
|
|
||
|
|
||
|
class PrinterController(http.Controller):
|
||
|
|
||
|
@http.route('/hw_proxy/default_printer_action', type='json', auth='none', cors='*')
|
||
|
def default_printer_action(self, data):
|
||
|
printer = next((d for d in iot_devices if iot_devices[d].device_type == 'printer' and iot_devices[d].device_connection == 'direct'), None)
|
||
|
if printer:
|
||
|
iot_devices[printer].action(data)
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
proxy_drivers['printer'] = PrinterDriver
|