Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
TurnTouch Python Library
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Wiki
Code
Merge requests
0
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Anton Sarukhanov
TurnTouch Python Library
Commits
78213d89
Commit
78213d89
authored
6 years ago
by
Anton Sarukhanov
Browse files
Options
Downloads
Patches
Plain Diff
Allow reading name/battery while listening for button presses.
parent
7b0d84da
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
setup.py
+1
-1
1 addition, 1 deletion
setup.py
turntouch/turntouch.py
+91
-43
91 additions, 43 deletions
turntouch/turntouch.py
with
92 additions
and
44 deletions
setup.py
+
1
−
1
View file @
78213d89
...
@@ -9,7 +9,7 @@ def read(filename):
...
@@ -9,7 +9,7 @@ def read(filename):
setup
(
setup
(
name
=
'
TurnTouch
'
,
name
=
'
TurnTouch
'
,
version
=
'
0.4.
1
'
,
version
=
'
0.4.
2
'
,
url
=
'
https://github.com/antsar/python-turntouch
'
,
url
=
'
https://github.com/antsar/python-turntouch
'
,
author
=
'
Anton Sarukhanov
'
,
author
=
'
Anton Sarukhanov
'
,
author_email
=
'
code@ant.sr
'
,
author_email
=
'
code@ant.sr
'
,
...
...
This diff is collapsed.
Click to expand it.
turntouch/turntouch.py
+
91
−
43
View file @
78213d89
...
@@ -3,8 +3,9 @@
...
@@ -3,8 +3,9 @@
from
concurrent.futures
import
ThreadPoolExecutor
from
concurrent.futures
import
ThreadPoolExecutor
import
time
import
time
import
logging
import
logging
from
bluepy
import
btle
from
functools
import
partial
from
typing
import
List
,
Union
from
typing
import
List
,
Union
from
bluepy
import
btle
logger
=
logging
.
getLogger
(
'
TurnTouch
'
)
logger
=
logging
.
getLogger
(
'
TurnTouch
'
)
...
@@ -158,6 +159,7 @@ class TurnTouch(btle.Peripheral):
...
@@ -158,6 +159,7 @@ class TurnTouch(btle.Peripheral):
DEVICE_NAME_LENGTH
=
32
DEVICE_NAME_LENGTH
=
32
MAX_DELAY
=
0.75
MAX_DELAY
=
0.75
LISTEN_TIMEOUT
=
0.1
LISTEN_TIMEOUT
=
0.1
LISTEN_PERIOD
=
1
BUTTON_NORTH
=
Button
(
'
North
'
,
'
north
'
)
BUTTON_NORTH
=
Button
(
'
North
'
,
'
north
'
)
BUTTON_EAST
=
Button
(
'
East
'
,
'
east
'
)
BUTTON_EAST
=
Button
(
'
East
'
,
'
east
'
)
BUTTON_WEST
=
Button
(
'
West
'
,
'
west
'
)
BUTTON_WEST
=
Button
(
'
West
'
,
'
west
'
)
...
@@ -258,6 +260,7 @@ class TurnTouch(btle.Peripheral):
...
@@ -258,6 +260,7 @@ class TurnTouch(btle.Peripheral):
self
.
withDelegate
(
self
.
NotificationDelegate
(
turn_touch
=
self
))
self
.
withDelegate
(
self
.
NotificationDelegate
(
turn_touch
=
self
))
self
.
handler
=
handler
or
DefaultActionHandler
self
.
handler
=
handler
or
DefaultActionHandler
self
.
debounce
=
debounce
self
.
debounce
=
debounce
self
.
_listening
=
False
self
.
_combo_action
=
set
()
self
.
_combo_action
=
set
()
if
listen
:
if
listen
:
self
.
listen_forever
()
self
.
listen_forever
()
...
@@ -265,14 +268,7 @@ class TurnTouch(btle.Peripheral):
...
@@ -265,14 +268,7 @@ class TurnTouch(btle.Peripheral):
@property
@property
def
name
(
self
)
->
str
:
def
name
(
self
)
->
str
:
"""
Read the nickname of this remote.
"""
"""
Read the nickname of this remote.
"""
try
:
name_bytes
=
self
.
_read
(
self
.
DEVICE_NAME_CHARACTERISTIC_UUID
)
name_bytes
=
self
.
getCharacteristics
(
uuid
=
self
.
DEVICE_NAME_CHARACTERISTIC_UUID
)[
0
].
read
()
except
btle
.
BTLEException
:
raise
TurnTouchException
(
"
Failed to read name of device {addr}
"
.
format
(
addr
=
self
.
addr
))
logger
.
debug
(
"
Read name of device {address}:
'
{name}
'"
.
format
(
address
=
self
.
addr
,
name
=
name_bytes
))
return
name_bytes
.
decode
(
'
utf-8
'
).
rstrip
(
'
\0
'
)
return
name_bytes
.
decode
(
'
utf-8
'
).
rstrip
(
'
\0
'
)
@name.setter
@name.setter
...
@@ -281,29 +277,58 @@ class TurnTouch(btle.Peripheral):
...
@@ -281,29 +277,58 @@ class TurnTouch(btle.Peripheral):
if
len
(
name
)
>
self
.
DEVICE_NAME_LENGTH
:
if
len
(
name
)
>
self
.
DEVICE_NAME_LENGTH
:
raise
(
TurnTouchException
(
"
Name must be {limit} characters or less.
"
raise
(
TurnTouchException
(
"
Name must be {limit} characters or less.
"
.
format
(
limit
=
self
.
DEVICE_NAME_LENGTH
)))
.
format
(
limit
=
self
.
DEVICE_NAME_LENGTH
)))
name_characteristic
=
self
.
getCharacteristics
(
uuid
=
self
.
DEVICE_NAME_CHARACTERISTIC_UUID
)[
0
]
name_bytes
=
name
.
encode
(
'
utf-8
'
).
ljust
(
self
.
DEVICE_NAME_LENGTH
,
b
'
\0
'
)
name_bytes
=
name
.
encode
(
'
utf-8
'
).
ljust
(
self
.
DEVICE_NAME_LENGTH
,
b
'
\0
'
)
try
:
self
.
_write
(
self
.
DEVICE_NAME_CHARACTERISTIC_UUID
,
name_bytes
)
name_characteristic
.
write
(
name_bytes
,
withResponse
=
True
)
except
btle
.
BTLEException
:
raise
TurnTouchException
(
"
Failed to set name of device {addr}
"
.
format
(
addr
=
self
.
addr
))
logger
.
debug
(
"
Set name for device {address} to
'
{name}
'"
.
format
(
address
=
self
.
addr
,
name
=
name_bytes
))
@property
@property
def
battery
(
self
)
->
int
:
def
battery
(
self
)
->
int
:
"""
Read the battery level (percentage) of this remote.
"""
"""
Read the battery level (percentage) of this remote.
"""
battery_bytes
=
self
.
_read
(
self
.
BATTERY_LEVEL_CHARACTERISTIC_UUID
)
return
int
.
from_bytes
(
battery_bytes
,
byteorder
=
'
big
'
)
def
_read
(
self
,
uuid
)
->
bytes
:
"""
Read some characteristic from the device.
If the device is currently listening, we have to queue the read and
wait for self.listen() to pause listening and invoke the read.
Attempting to read while listening would cause a bluepy exception.
"""
if
self
.
_listening
:
while
self
.
_pending_read
:
# wait for any other pending reads to occur
pass
self
.
_pending_read
=
partial
(
self
.
_read_now
,
uuid
=
uuid
)
while
not
self
.
_read_value
:
# wait for the read to occur
pass
read_value
=
self
.
_read_value
self
.
_read_value
=
None
return
read_value
else
:
return
self
.
_read_now
(
uuid
)
def
_read_now
(
self
,
uuid
)
->
bytes
:
"""
Read some characteristic from the device.
"""
try
:
try
:
battery_bytes
=
self
.
getCharacteristics
(
read_bytes
=
self
.
getCharacteristics
(
uuid
=
uuid
)[
0
].
read
()
uuid
=
self
.
BATTERY_LEVEL_CHARACTERISTIC_UUID
)[
0
].
read
()
except
btle
.
BTLEException
:
except
btle
.
BTLEException
:
raise
TurnTouchException
(
"
Failed to read battery of device {addr}
"
raise
TurnTouchException
(
"
Failed to read device {address}
"
.
format
(
addr
=
self
.
addr
))
"
characteristic {uuid}
"
logger
.
debug
(
"
Read device {address} battery level:
'
{battery}
'"
.
format
(
.
format
(
address
=
self
.
addr
,
uuid
=
uuid
))
address
=
self
.
addr
,
battery
=
battery_bytes
))
logger
.
debug
(
"
Read device {address} characteristic {uuid}:
'
{value}
'"
return
int
.
from_bytes
(
battery_bytes
,
byteorder
=
'
big
'
)
.
format
(
address
=
self
.
addr
,
uuid
=
uuid
,
value
=
read_bytes
))
return
read_bytes
def
_write
(
self
,
uuid
,
value_bytes
):
"""
Write some characteristic to the device.
"""
characteristic
=
self
.
getCharacteristics
(
uuid
=
uuid
)[
0
]
try
:
characteristic
.
write
(
value_bytes
,
withResponse
=
True
)
except
btle
.
BTLEException
:
raise
TurnTouchException
(
"
Failed to write device {address}
"
"
characteristic {uuid}
"
.
format
(
address
=
self
.
addr
,
uuid
=
uuid
))
logger
.
debug
(
"
Wrote device {address} characteristic {uuid}:
'
{value}
'"
.
format
(
address
=
self
.
addr
,
uuid
=
uuid
,
value
=
value_bytes
))
def
listen_forever
(
self
):
def
listen_forever
(
self
):
"""
Listen for button press events indefinitely.
"""
"""
Listen for button press events indefinitely.
"""
...
@@ -313,6 +338,9 @@ class TurnTouch(btle.Peripheral):
...
@@ -313,6 +338,9 @@ class TurnTouch(btle.Peripheral):
"""
Listen for a button press event.
"""
Listen for a button press event.
Will listen indefinitely if `only_one` is False.
"""
Will listen indefinitely if `only_one` is False.
"""
self
.
_enable_notifications
()
self
.
_enable_notifications
()
self
.
_pending_read
=
None
self
.
_read_value
=
None
self
.
_listening
=
True
if
self
.
debounce
:
if
self
.
debounce
:
self
.
executor
=
ThreadPoolExecutor
(
5
)
self
.
executor
=
ThreadPoolExecutor
(
5
)
try
:
try
:
...
@@ -320,29 +348,51 @@ class TurnTouch(btle.Peripheral):
...
@@ -320,29 +348,51 @@ class TurnTouch(btle.Peripheral):
self
.
waitForNotifications
(
0
)
self
.
waitForNotifications
(
0
)
else
:
else
:
while
True
:
while
True
:
self
.
waitForNotifications
(
0
)
self
.
waitForNotifications
(
self
.
LISTEN_PERIOD
)
if
self
.
_pending_read
:
while
self
.
_read_value
:
# wait for previously read value to be consumed
pass
self
.
_read_value
=
self
.
_pending_read
()
self
.
_pending_read
=
None
except
btle
.
BTLEException
as
e
:
except
btle
.
BTLEException
as
e
:
raise
TurnTouchException
(
e
)
raise
TurnTouchException
(
e
)
self
.
_enable_notifications
(
enable
=
False
)
finally
:
self
.
_listening
=
False
def
_enable_notifications
(
self
,
enabled
=
True
):
self
.
_enable_notifications
(
enabled
=
False
)
"""
Tell the remote to start sending button press notifications.
"""
def
_enable_notifications
(
self
,
enabled
=
True
,
button
=
True
,
battery
=
False
):
"""
Tell the remote to start sending notifications for button presses
and battery level updates.
"""
if
button
:
self
.
_enable_notification
(
self
.
BUTTON_STATUS_CHARACTERISTIC_UUID
,
enabled
)
if
battery
:
self
.
_enable_notification
(
self
.
BATTERY_LEVEL_CHARACTERISTIC_UUID
,
enabled
)
def
_enable_notification
(
self
,
uuid
,
enabled
=
True
):
"""
Tell the remote to start sending notifications for a particular
characteristic (uuid).
"""
try
:
try
:
notification_handle
=
self
.
getCharacteristics
(
notification_handle
=
self
.
getCharacteristics
(
uuid
=
self
.
BUTTON_STATUS_CHARACTERISTIC_UUID
)[
0
].
getHandle
()
uuid
=
uuid
)[
0
].
getHandle
()
notification_enable_handle
=
notification_handle
+
1
notification_enable_handle
=
notification_handle
+
1
logger
.
debug
(
"
{action} notifications for device {address}...
"
logger
.
debug
(
"
{action} notifications for device {address},
"
"
characteristic {uuid}...
"
.
format
(
action
=
"
Enabling
"
if
enabled
else
"
Disabling
"
,
.
format
(
action
=
"
Enabling
"
if
enabled
else
"
Disabling
"
,
address
=
self
.
addr
))
address
=
self
.
addr
,
uuid
=
uuid
))
self
.
writeCharacteristic
(
notification_enable_handle
,
self
.
writeCharacteristic
(
notification_enable_handle
,
bytes
([
0x01
if
enabled
else
0x00
,
0x00
]),
bytes
([
0x01
if
enabled
else
0x00
,
0x00
]),
withResponse
=
True
)
withResponse
=
True
)
logger
.
debug
(
"
Notifications {action} for device {address}.
"
logger
.
debug
(
"
Notifications {action} for device {address},
"
"
characteristic {uuid}
"
.
format
(
action
=
"
enabled
"
if
enabled
else
"
disabled
"
,
.
format
(
action
=
"
enabled
"
if
enabled
else
"
disabled
"
,
address
=
self
.
addr
))
address
=
self
.
addr
,
uuid
=
uuid
))
except
btle
.
BTLEException
:
except
btle
.
BTLEException
:
raise
TurnTouchException
(
"
Failed to enable notifications for
"
raise
TurnTouchException
(
"
Failed to enable notifications for
"
"
device {addr}
"
.
format
(
addr
=
self
.
addr
))
"
device {addr}, characteristic {uuid}
"
.
format
(
addr
=
self
.
addr
,
uuid
=
uuid
))
class
NotificationDelegate
(
btle
.
DefaultDelegate
):
class
NotificationDelegate
(
btle
.
DefaultDelegate
):
"""
Handle callbacks for notifications from the device.
"""
Handle callbacks for notifications from the device.
...
@@ -413,29 +463,27 @@ class TurnTouch(btle.Peripheral):
...
@@ -413,29 +463,27 @@ class TurnTouch(btle.Peripheral):
def
handleNotification
(
self
,
cHandle
,
data
):
def
handleNotification
(
self
,
cHandle
,
data
):
"""
Call the appropriate button press handler method(s).
"""
"""
Call the appropriate button press handler method(s).
"""
logger
.
debug
(
"
Got notification {notification}
"
.
format
(
notification
=
data
))
type_int
=
int
.
from_bytes
(
data
,
byteorder
=
'
big
'
)
type_int
=
int
.
from_bytes
(
data
,
byteorder
=
'
big
'
)
try
:
try
:
action
=
self
.
turn_touch
.
ACTIONS
[
type_int
]
action
=
self
.
turn_touch
.
ACTIONS
[
type_int
]
except
IndexError
:
except
IndexError
:
raise
TurnTouchException
(
'
Unknown
notific
ation received: {}
'
raise
TurnTouchException
(
'
Unknown a
c
tion received: {}
'
.
format
(
data
))
.
format
(
data
))
if
self
.
turn_touch
.
debounce
:
if
self
.
turn_touch
.
debounce
:
if
action
.
is_combo
:
if
action
.
is_combo
:
self
.
_handle_combo
(
action
)
self
.
_handle_combo
(
action
)
elif
action
.
is_multi
:
elif
action
.
is_multi
:
logger
.
debug
(
"
Debounce: delaying
action
{action}.
"
.
format
(
logger
.
debug
(
"
Debounce: delaying {action}.
"
.
format
(
action
=
action
))
action
=
action
))
self
.
turn_touch
.
executor
.
submit
(
self
.
turn_touch
.
executor
.
submit
(
self
.
_handle_multi
,
(
action
))
self
.
_handle_multi
,
(
action
))
elif
action
.
is_off
:
elif
action
.
is_off
:
logger
.
debug
(
"
Debounce: delaying
action
{action}.
"
.
format
(
logger
.
debug
(
"
Debounce: delaying {action}.
"
.
format
(
action
=
action
))
action
=
action
))
self
.
turn_touch
.
executor
.
submit
(
self
.
turn_touch
.
executor
.
submit
(
self
.
_handle_off
,
(
action
))
self
.
_handle_off
,
(
action
))
else
:
else
:
logger
.
debug
(
"
Debounce: delaying
action
{action}.
"
.
format
(
logger
.
debug
(
"
Debounce: delaying {action}.
"
.
format
(
action
=
action
))
action
=
action
))
self
.
turn_touch
.
executor
.
submit
(
self
.
turn_touch
.
executor
.
submit
(
self
.
_handle_single
,
(
action
))
self
.
_handle_single
,
(
action
))
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment