mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-10-31 21:14:37 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			431 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			431 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| #
 | |
| # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #     http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| #
 | |
| 
 | |
| # Creating GATT Application which then becomes available to remote devices.
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| import sys
 | |
| 
 | |
| try:
 | |
|     import dbus
 | |
|     import dbus.service
 | |
| except ImportError as e:
 | |
|     if 'linux' not in sys.platform:
 | |
|         raise e
 | |
|     print(e)
 | |
|     print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
 | |
|     print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
 | |
|     raise
 | |
| 
 | |
| DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
 | |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
 | |
| GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
 | |
| GATT_SERVICE_IFACE = 'org.bluez.GattService1'
 | |
| GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
 | |
| GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
 | |
| 
 | |
| 
 | |
| SERVICE_UUIDS = {
 | |
|     'ALERT_NOTIF_SVC_UUID': '00001811-0000-1000-8000-00805f9b34fb'
 | |
| }
 | |
| 
 | |
| CHAR_UUIDS = {
 | |
|     'SUPPORT_NEW_ALERT_UUID': '00002A47-0000-1000-8000-00805f9b34fb',
 | |
|     'ALERT_NOTIF_UUID': '00002A44-0000-1000-8000-00805f9b34fb',
 | |
|     'UNREAD_ALERT_STATUS_UUID': '00002A45-0000-1000-8000-00805f9b34fb'
 | |
| }
 | |
| 
 | |
| DESCR_UUIDS = {
 | |
|     'CCCD_UUID': '00002902-0000-1000-8000-00805f9b34fb'
 | |
| }
 | |
| 
 | |
| 
 | |
| class InvalidArgsException(dbus.exceptions.DBusException):
 | |
|     _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
 | |
| 
 | |
| 
 | |
| class NotSupportedException(dbus.exceptions.DBusException):
 | |
|     _dbus_error_name = 'org.bluez.Error.NotSupported'
 | |
| 
 | |
| 
 | |
| class Application(dbus.service.Object):
 | |
|     """
 | |
|     org.bluez.GattApplication1 interface implementation
 | |
|     """
 | |
| 
 | |
|     def __init__(self, bus, path):
 | |
|         self.path = path
 | |
|         self.services = []
 | |
|         dbus.service.Object.__init__(self, bus, self.path)
 | |
| 
 | |
|     def __del__(self):
 | |
|         pass
 | |
| 
 | |
|     def add_service(self, service):
 | |
|         self.services.append(service)
 | |
| 
 | |
|     @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
 | |
|     def GetManagedObjects(self):
 | |
|         response = {}
 | |
| 
 | |
|         for service in self.services:
 | |
|             response[service.get_path()] = service.get_properties()
 | |
|             chrcs = service.get_characteristics()
 | |
|             for chrc in chrcs:
 | |
|                 response[chrc.get_path()] = chrc.get_properties()
 | |
|                 descs = chrc.get_descriptors()
 | |
|                 for desc in descs:
 | |
|                     response[desc.get_path()] = desc.get_properties()
 | |
| 
 | |
|         return response
 | |
| 
 | |
|     def get_path(self):
 | |
|         return dbus.ObjectPath(self.path)
 | |
| 
 | |
|     def Release(self):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class AlertNotificationApp(Application):
 | |
|     '''
 | |
|         Alert Notification Application
 | |
|     '''
 | |
|     def __init__(self, bus, path):
 | |
|         Application.__init__(self, bus, path)
 | |
|         self.service = AlertNotificationService(bus, '0001')
 | |
|         self.add_service(self.service)
 | |
| 
 | |
| 
 | |
| class Service(dbus.service.Object):
 | |
|     """
 | |
|     org.bluez.GattService1 interface implementation
 | |
|     """
 | |
|     PATH_BASE = '/org/bluez/hci0/service'
 | |
| 
 | |
|     def __init__(self, bus, index, uuid, primary=False):
 | |
|         self.path = self.PATH_BASE + str(index)
 | |
|         self.bus = bus
 | |
|         self.uuid = uuid
 | |
|         self.primary = primary
 | |
|         self.characteristics = []
 | |
|         dbus.service.Object.__init__(self, bus, self.path)
 | |
| 
 | |
|     def get_properties(self):
 | |
|         return {
 | |
|             GATT_SERVICE_IFACE: {
 | |
|                 'UUID': self.uuid,
 | |
|                 'Primary': self.primary,
 | |
|                 'Characteristics': dbus.Array(
 | |
|                     self.get_characteristic_paths(),
 | |
|                     signature='o')
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     def get_path(self):
 | |
|         return dbus.ObjectPath(self.path)
 | |
| 
 | |
|     def add_characteristic(self, characteristic):
 | |
|         self.characteristics.append(characteristic)
 | |
| 
 | |
|     def get_characteristic_paths(self):
 | |
|         result = []
 | |
|         for chrc in self.characteristics:
 | |
|             result.append(chrc.get_path())
 | |
|         return result
 | |
| 
 | |
|     def get_characteristics(self):
 | |
|         return self.characteristics
 | |
| 
 | |
|     @dbus.service.method(DBUS_PROP_IFACE,
 | |
|                          in_signature='s',
 | |
|                          out_signature='a{sv}')
 | |
|     def GetAll(self, interface):
 | |
|         if interface != GATT_SERVICE_IFACE:
 | |
|             raise InvalidArgsException()
 | |
|         return self.get_properties()[GATT_SERVICE_IFACE]
 | |
| 
 | |
| 
 | |
| class Characteristic(dbus.service.Object):
 | |
|     """
 | |
|     org.bluez.GattCharacteristic1 interface implementation
 | |
|     """
 | |
|     def __init__(self, bus, index, uuid, flags, service):
 | |
|         self.path = service.path + '/char' + str(index)
 | |
|         self.bus = bus
 | |
|         self.uuid = uuid
 | |
|         self.service = service
 | |
|         self.flags = flags
 | |
|         self.value = [0]
 | |
|         self.descriptors = []
 | |
|         dbus.service.Object.__init__(self, bus, self.path)
 | |
| 
 | |
|     def get_properties(self):
 | |
|         return {
 | |
|             GATT_CHRC_IFACE: {
 | |
|                 'Service': self.service.get_path(),
 | |
|                 'UUID': self.uuid,
 | |
|                 'Flags': self.flags,
 | |
|                 'Value': self.value,
 | |
|                 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
 | |
| 
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     def get_path(self):
 | |
|         return dbus.ObjectPath(self.path)
 | |
| 
 | |
|     def add_descriptor(self, descriptor):
 | |
|         self.descriptors.append(descriptor)
 | |
| 
 | |
|     def get_descriptor_paths(self):
 | |
|         result = []
 | |
|         for desc in self.descriptors:
 | |
|             result.append(desc.get_path())
 | |
|         return result
 | |
| 
 | |
|     def get_descriptors(self):
 | |
|         return self.descriptors
 | |
| 
 | |
|     @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
 | |
|     def GetAll(self, interface):
 | |
|         if interface != GATT_CHRC_IFACE:
 | |
|             raise InvalidArgsException()
 | |
|         return self.get_properties()[GATT_CHRC_IFACE]
 | |
| 
 | |
|     @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
 | |
|     def ReadValue(self, options):
 | |
|         print('\nDefault ReadValue called, returning error')
 | |
|         raise NotSupportedException()
 | |
| 
 | |
|     @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
 | |
|     def WriteValue(self, value, options):
 | |
|         print('\nDefault WriteValue called, returning error')
 | |
|         raise NotSupportedException()
 | |
| 
 | |
|     @dbus.service.method(GATT_CHRC_IFACE)
 | |
|     def StartNotify(self):
 | |
|         print('Default StartNotify called, returning error')
 | |
|         raise NotSupportedException()
 | |
| 
 | |
|     @dbus.service.method(GATT_CHRC_IFACE)
 | |
|     def StopNotify(self):
 | |
|         print('Default StopNotify called, returning error')
 | |
|         raise NotSupportedException()
 | |
| 
 | |
|     @dbus.service.signal(DBUS_PROP_IFACE,
 | |
|                          signature='sa{sv}as')
 | |
|     def PropertiesChanged(self, interface, changed, invalidated):
 | |
|         pass
 | |
|         # print('\nProperties Changed')
 | |
| 
 | |
| 
 | |
| class Descriptor(dbus.service.Object):
 | |
|     """
 | |
|     org.bluez.GattDescriptor1 interface implementation
 | |
|     """
 | |
|     def __init__(self, bus, index, uuid, flags, characteristic):
 | |
|         self.path = characteristic.path + '/desc' + str(index)
 | |
|         self.bus = bus
 | |
|         self.uuid = uuid
 | |
|         self.flags = flags
 | |
|         self.chrc = characteristic
 | |
|         dbus.service.Object.__init__(self, bus, self.path)
 | |
| 
 | |
|     def get_properties(self):
 | |
|         return {
 | |
|             GATT_DESC_IFACE: {
 | |
|                 'Characteristic': self.chrc.get_path(),
 | |
|                 'UUID': self.uuid,
 | |
|                 'Flags': self.flags,
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     def get_path(self):
 | |
|         return dbus.ObjectPath(self.path)
 | |
| 
 | |
|     @dbus.service.method(DBUS_PROP_IFACE,
 | |
|                          in_signature='s',
 | |
|                          out_signature='a{sv}')
 | |
|     def GetAll(self, interface):
 | |
|         if interface != GATT_DESC_IFACE:
 | |
|             raise InvalidArgsException()
 | |
|         return self.get_properties()[GATT_DESC_IFACE]
 | |
| 
 | |
|     @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
 | |
|     def ReadValue(self, options):
 | |
|         print('Default ReadValue called, returning error')
 | |
|         raise NotSupportedException()
 | |
| 
 | |
|     @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
 | |
|     def WriteValue(self, value, options):
 | |
|         print('Default WriteValue called, returning error')
 | |
|         raise NotSupportedException()
 | |
| 
 | |
|     @dbus.service.signal(DBUS_PROP_IFACE,
 | |
|                          signature='sa{sv}as')
 | |
|     def PropertiesChanged(self, interface, changed, invalidated):
 | |
|         pass
 | |
|         # print('\nProperties Changed')
 | |
| 
 | |
| 
 | |
| class AlertNotificationService(Service):
 | |
|     def __init__(self, bus, index):
 | |
|         Service.__init__(self, bus, index, SERVICE_UUIDS['ALERT_NOTIF_SVC_UUID'], primary=True)
 | |
|         self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
 | |
|         self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
 | |
|         self.add_characteristic(UnreadAlertStatusCharacteristic(bus, '0003', self))
 | |
| 
 | |
|     def get_char_status(self, uuid, status):
 | |
|         for char in self.characteristics:
 | |
|             if char.uuid == uuid:
 | |
|                 if status in char.status:
 | |
|                     return True
 | |
|         return False
 | |
| 
 | |
| 
 | |
| class SupportedNewAlertCategoryCharacteristic(Characteristic):
 | |
|     def __init__(self, bus, index, service):
 | |
|         Characteristic.__init__(
 | |
|             self, bus, index,
 | |
|             CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'],
 | |
|             ['read'],
 | |
|             service)
 | |
|         self.value = [dbus.Byte(2)]
 | |
|         self.status = []
 | |
| 
 | |
|     def ReadValue(self, options):
 | |
|         val_list = []
 | |
|         for val in self.value:
 | |
|             val_list.append(dbus.Byte(val))
 | |
|         print('Read Request received\n', '\tSupportedNewAlertCategoryCharacteristic')
 | |
|         print('\tValue:', '\t', val_list)
 | |
|         self.status.append('read')
 | |
|         return val_list
 | |
| 
 | |
| 
 | |
| class AlertNotificationControlPointCharacteristic(Characteristic):
 | |
|     def __init__(self, bus, index, service):
 | |
|         Characteristic.__init__(
 | |
|             self, bus, index,
 | |
|             CHAR_UUIDS['ALERT_NOTIF_UUID'],
 | |
|             ['read', 'write'],
 | |
|             service)
 | |
|         self.value = [dbus.Byte(0)]
 | |
|         self.status = []
 | |
| 
 | |
|     def ReadValue(self, options):
 | |
|         val_list = []
 | |
|         for val in self.value:
 | |
|             val_list.append(dbus.Byte(val))
 | |
|         print('Read Request received\n', '\tAlertNotificationControlPointCharacteristic')
 | |
|         print('\tValue:', '\t', val_list)
 | |
|         self.status.append('read')
 | |
|         return val_list
 | |
| 
 | |
|     def WriteValue(self, value, options):
 | |
|         print('Write Request received\n', '\tAlertNotificationControlPointCharacteristic')
 | |
|         print('\tCurrent value:', '\t', self.value)
 | |
|         val_list = []
 | |
|         for val in value:
 | |
|             val_list.append(val)
 | |
|         self.value = val_list
 | |
|         self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
 | |
|         # Check if new value is written
 | |
|         print('\tNew value:', '\t', self.value)
 | |
|         if not self.value == value:
 | |
|             print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
 | |
|         self.status.append('write')
 | |
| 
 | |
| 
 | |
| class UnreadAlertStatusCharacteristic(Characteristic):
 | |
|     def __init__(self, bus, index, service):
 | |
|         Characteristic.__init__(
 | |
|             self, bus, index,
 | |
|             CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'],
 | |
|             ['read', 'write', 'notify'],
 | |
|             service)
 | |
|         self.value = [dbus.Byte(0)]
 | |
|         self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
 | |
|         self.add_descriptor(self.cccd_obj)
 | |
|         self.notifying = False
 | |
|         self.status = []
 | |
| 
 | |
|     def StartNotify(self):
 | |
|         try:
 | |
|             if self.notifying:
 | |
|                 print('\nAlready notifying, nothing to do')
 | |
|                 return
 | |
|             print('Notify Started')
 | |
|             self.notifying = True
 | |
|             self.ReadValue()
 | |
|             self.WriteValue([dbus.Byte(1), dbus.Byte(0)])
 | |
|             self.status.append('notify')
 | |
| 
 | |
|         except Exception as e:
 | |
|             print(e)
 | |
| 
 | |
|     def StopNotify(self):
 | |
|         if not self.notifying:
 | |
|             print('\nNot notifying, nothing to do')
 | |
|             return
 | |
|         self.notifying = False
 | |
|         print('\nNotify Stopped')
 | |
| 
 | |
|     def ReadValue(self, options=None):
 | |
|         val_list = []
 | |
|         for val in self.value:
 | |
|             val_list.append(dbus.Byte(val))
 | |
|         self.status.append('read')
 | |
|         print('\tValue:', '\t', val_list)
 | |
|         return val_list
 | |
| 
 | |
|     def WriteValue(self, value, options=None):
 | |
|         val_list = []
 | |
|         for val in value:
 | |
|             val_list.append(val)
 | |
|         self.value = val_list
 | |
|         self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
 | |
|         # Check if new value is written
 | |
|         if not self.value == value:
 | |
|             print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
 | |
|         print('New value:', '\t', self.value)
 | |
|         self.status.append('write')
 | |
| 
 | |
| 
 | |
| class ClientCharacteristicConfigurationDescriptor(Descriptor):
 | |
|     def __init__(self, bus, index, characteristic):
 | |
|         self.value = [dbus.Byte(1)]
 | |
|         Descriptor.__init__(
 | |
|             self, bus, index,
 | |
|             DESCR_UUIDS['CCCD_UUID'],
 | |
|             ['read', 'write'],
 | |
|             characteristic)
 | |
| 
 | |
|     def ReadValue(self, options=None):
 | |
|         return self.value
 | |
| 
 | |
|     def WriteValue(self, value, options=None):
 | |
|         val_list = []
 | |
|         for val in value:
 | |
|             val_list.append(val)
 | |
|         self.value = val_list
 | |
|         self.PropertiesChanged(GATT_DESC_IFACE, {'Value': self.value}, [])
 | |
|         # Check if new value is written
 | |
|         if not self.value == value:
 | |
|             print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
 | 
