From 78213d893855f0908eb1733fd8a50c43fa82e50d Mon Sep 17 00:00:00 2001 From: Anton Sarukhanov <code@ant.sr> Date: Fri, 15 Jun 2018 16:17:42 -0400 Subject: [PATCH] Allow reading name/battery while listening for button presses. --- setup.py | 2 +- turntouch/turntouch.py | 134 ++++++++++++++++++++++++++++------------- 2 files changed, 92 insertions(+), 44 deletions(-) diff --git a/setup.py b/setup.py index 2d3bb61..712c1a6 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ def read(filename): setup( name='TurnTouch', - version='0.4.1', + version='0.4.2', url='https://github.com/antsar/python-turntouch', author='Anton Sarukhanov', author_email='code@ant.sr', diff --git a/turntouch/turntouch.py b/turntouch/turntouch.py index a7fcd63..85e4c9b 100644 --- a/turntouch/turntouch.py +++ b/turntouch/turntouch.py @@ -3,8 +3,9 @@ from concurrent.futures import ThreadPoolExecutor import time import logging -from bluepy import btle +from functools import partial from typing import List, Union +from bluepy import btle logger = logging.getLogger('TurnTouch') @@ -158,6 +159,7 @@ class TurnTouch(btle.Peripheral): DEVICE_NAME_LENGTH = 32 MAX_DELAY = 0.75 LISTEN_TIMEOUT = 0.1 + LISTEN_PERIOD = 1 BUTTON_NORTH = Button('North', 'north') BUTTON_EAST = Button('East', 'east') BUTTON_WEST = Button('West', 'west') @@ -258,6 +260,7 @@ class TurnTouch(btle.Peripheral): self.withDelegate(self.NotificationDelegate(turn_touch=self)) self.handler = handler or DefaultActionHandler self.debounce = debounce + self._listening = False self._combo_action = set() if listen: self.listen_forever() @@ -265,14 +268,7 @@ class TurnTouch(btle.Peripheral): @property def name(self) -> str: """Read the nickname of this remote.""" - try: - name_bytes = self.getCharacteristics( - uuid=self.DEVICE_NAME_CHARACTERISTIC_UUID)[0].read() - except btle.BTLEException: - raise TurnTouchException("Failed to read name of device {addr}" - .format(addr=self.addr)) - logger.debug("Read name of device {address}: '{name}'".format( - address=self.addr, name=name_bytes)) + name_bytes = self._read(self.DEVICE_NAME_CHARACTERISTIC_UUID) return name_bytes.decode('utf-8').rstrip('\0') @name.setter @@ -281,29 +277,58 @@ class TurnTouch(btle.Peripheral): if len(name) > self.DEVICE_NAME_LENGTH: raise(TurnTouchException("Name must be {limit} characters or less." .format(limit=self.DEVICE_NAME_LENGTH))) - name_characteristic = self.getCharacteristics( - uuid=self.DEVICE_NAME_CHARACTERISTIC_UUID)[0] name_bytes = name.encode('utf-8').ljust(self.DEVICE_NAME_LENGTH, b'\0') - try: - name_characteristic.write(name_bytes, withResponse=True) - except btle.BTLEException: - raise TurnTouchException("Failed to set name of device {addr}" - .format(addr=self.addr)) - logger.debug("Set name for device {address} to '{name}'".format( - address=self.addr, name=name_bytes)) + self._write(self.DEVICE_NAME_CHARACTERISTIC_UUID, name_bytes) @property def battery(self) -> int: """Read the battery level (percentage) of this remote.""" + battery_bytes = self._read(self.BATTERY_LEVEL_CHARACTERISTIC_UUID) + return int.from_bytes(battery_bytes, byteorder='big') + + def _read(self, uuid) -> bytes: + """Read some characteristic from the device. + If the device is currently listening, we have to queue the read and + wait for self.listen() to pause listening and invoke the read. + Attempting to read while listening would cause a bluepy exception.""" + if self._listening: + while self._pending_read: + # wait for any other pending reads to occur + pass + self._pending_read = partial(self._read_now, uuid=uuid) + while not self._read_value: + # wait for the read to occur + pass + read_value = self._read_value + self._read_value = None + return read_value + else: + return self._read_now(uuid) + + + def _read_now(self, uuid) -> bytes: + """Read some characteristic from the device.""" try: - battery_bytes = self.getCharacteristics( - uuid=self.BATTERY_LEVEL_CHARACTERISTIC_UUID)[0].read() + read_bytes = self.getCharacteristics(uuid=uuid)[0].read() except btle.BTLEException: - raise TurnTouchException("Failed to read battery of device {addr}" - .format(addr=self.addr)) - logger.debug("Read device {address} battery level: '{battery}'".format( - address=self.addr, battery=battery_bytes)) - return int.from_bytes(battery_bytes, byteorder='big') + raise TurnTouchException("Failed to read device {address} " + "characteristic {uuid}" + .format(address=self.addr, uuid=uuid)) + logger.debug("Read device {address} characteristic {uuid}: '{value}'" + .format(address=self.addr, uuid=uuid, value=read_bytes)) + return read_bytes + + def _write(self, uuid, value_bytes): + """Write some characteristic to the device.""" + characteristic = self.getCharacteristics(uuid=uuid)[0] + try: + characteristic.write(value_bytes, withResponse=True) + except btle.BTLEException: + raise TurnTouchException("Failed to write device {address} " + "characteristic {uuid}" + .format(address=self.addr, uuid=uuid)) + logger.debug("Wrote device {address} characteristic {uuid}: '{value}'" + .format(address=self.addr, uuid=uuid, value=value_bytes)) def listen_forever(self): """Listen for button press events indefinitely.""" @@ -313,6 +338,9 @@ class TurnTouch(btle.Peripheral): """Listen for a button press event. Will listen indefinitely if `only_one` is False.""" self._enable_notifications() + self._pending_read = None + self._read_value = None + self._listening = True if self.debounce: self.executor = ThreadPoolExecutor(5) try: @@ -320,29 +348,51 @@ class TurnTouch(btle.Peripheral): self.waitForNotifications(0) else: while True: - self.waitForNotifications(0) + self.waitForNotifications(self.LISTEN_PERIOD) + if self._pending_read: + while self._read_value: + # wait for previously read value to be consumed + pass + self._read_value = self._pending_read() + self._pending_read = None except btle.BTLEException as e: raise TurnTouchException(e) - self._enable_notifications(enable=False) - - def _enable_notifications(self, enabled=True): - """Tell the remote to start sending button press notifications.""" + finally: + self._listening = False + self._enable_notifications(enabled=False) + + def _enable_notifications(self, enabled=True, button=True, battery=False): + """Tell the remote to start sending notifications for button presses + and battery level updates.""" + if button: + self._enable_notification( + self.BUTTON_STATUS_CHARACTERISTIC_UUID, enabled) + if battery: + self._enable_notification( + self.BATTERY_LEVEL_CHARACTERISTIC_UUID, enabled) + + def _enable_notification(self, uuid, enabled=True): + """Tell the remote to start sending notifications for a particular + characteristic (uuid).""" try: notification_handle = self.getCharacteristics( - uuid=self.BUTTON_STATUS_CHARACTERISTIC_UUID)[0].getHandle() + uuid=uuid)[0].getHandle() notification_enable_handle = notification_handle + 1 - logger.debug("{action} notifications for device {address}..." + logger.debug("{action} notifications for device {address}, " + "characteristic {uuid}..." .format(action="Enabling" if enabled else "Disabling", - address=self.addr)) + address=self.addr, uuid=uuid)) self.writeCharacteristic(notification_enable_handle, bytes([0x01 if enabled else 0x00, 0x00]), withResponse=True) - logger.debug("Notifications {action} for device {address}." + logger.debug("Notifications {action} for device {address}, " + "characteristic {uuid}" .format(action="enabled" if enabled else "disabled", - address=self.addr)) + address=self.addr, uuid=uuid)) except btle.BTLEException: - raise TurnTouchException("Failed to enable notifications for" - "device {addr}".format(addr=self.addr)) + raise TurnTouchException("Failed to enable notifications for " + "device {addr}, characteristic {uuid}" + .format(addr=self.addr, uuid=uuid)) class NotificationDelegate(btle.DefaultDelegate): """Handle callbacks for notifications from the device. @@ -413,29 +463,27 @@ class TurnTouch(btle.Peripheral): def handleNotification(self, cHandle, data): """Call the appropriate button press handler method(s).""" - logger.debug("Got notification {notification}".format( - notification=data)) type_int = int.from_bytes(data, byteorder='big') try: action = self.turn_touch.ACTIONS[type_int] except IndexError: - raise TurnTouchException('Unknown notification received: {}' + raise TurnTouchException('Unknown action received: {}' .format(data)) if self.turn_touch.debounce: if action.is_combo: self._handle_combo(action) elif action.is_multi: - logger.debug("Debounce: delaying action {action}.".format( + logger.debug("Debounce: delaying {action}.".format( action=action)) self.turn_touch.executor.submit( self._handle_multi, (action)) elif action.is_off: - logger.debug("Debounce: delaying action {action}.".format( + logger.debug("Debounce: delaying {action}.".format( action=action)) self.turn_touch.executor.submit( self._handle_off, (action)) else: - logger.debug("Debounce: delaying action {action}.".format( + logger.debug("Debounce: delaying {action}.".format( action=action)) self.turn_touch.executor.submit( self._handle_single, (action)) -- GitLab