aboutsummaryrefslogtreecommitdiff
path: root/bumble/profiles/vcp.py
diff options
context:
space:
mode:
Diffstat (limited to 'bumble/profiles/vcp.py')
-rw-r--r--bumble/profiles/vcp.py228
1 files changed, 228 insertions, 0 deletions
diff --git a/bumble/profiles/vcp.py b/bumble/profiles/vcp.py
new file mode 100644
index 0000000..0788219
--- /dev/null
+++ b/bumble/profiles/vcp.py
@@ -0,0 +1,228 @@
+# Copyright 2021-2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+from __future__ import annotations
+import enum
+
+from bumble import att
+from bumble import device
+from bumble import gatt
+from bumble import gatt_client
+
+from typing import Optional
+
+# -----------------------------------------------------------------------------
+# Constants
+# -----------------------------------------------------------------------------
+
+MIN_VOLUME = 0
+MAX_VOLUME = 255
+
+
+class ErrorCode(enum.IntEnum):
+ '''
+ See Volume Control Service 1.6. Application error codes.
+ '''
+
+ INVALID_CHANGE_COUNTER = 0x80
+ OPCODE_NOT_SUPPORTED = 0x81
+
+
+class VolumeFlags(enum.IntFlag):
+ '''
+ See Volume Control Service 3.3. Volume Flags.
+ '''
+
+ VOLUME_SETTING_PERSISTED = 0x01
+ # RFU
+
+
+class VolumeControlPointOpcode(enum.IntEnum):
+ '''
+ See Volume Control Service Table 3.3: Volume Control Point procedure requirements.
+ '''
+
+ # fmt: off
+ RELATIVE_VOLUME_DOWN = 0x00
+ RELATIVE_VOLUME_UP = 0x01
+ UNMUTE_RELATIVE_VOLUME_DOWN = 0x02
+ UNMUTE_RELATIVE_VOLUME_UP = 0x03
+ SET_ABSOLUTE_VOLUME = 0x04
+ UNMUTE = 0x05
+ MUTE = 0x06
+
+
+# -----------------------------------------------------------------------------
+# Server
+# -----------------------------------------------------------------------------
+class VolumeControlService(gatt.TemplateService):
+ UUID = gatt.GATT_VOLUME_CONTROL_SERVICE
+
+ volume_state: gatt.Characteristic
+ volume_control_point: gatt.Characteristic
+ volume_flags: gatt.Characteristic
+
+ volume_setting: int
+ muted: int
+ change_counter: int
+
+ def __init__(
+ self,
+ step_size: int = 16,
+ volume_setting: int = 0,
+ muted: int = 0,
+ change_counter: int = 0,
+ volume_flags: int = 0,
+ ) -> None:
+ self.step_size = step_size
+ self.volume_setting = volume_setting
+ self.muted = muted
+ self.change_counter = change_counter
+
+ self.volume_state = gatt.Characteristic(
+ uuid=gatt.GATT_VOLUME_STATE_CHARACTERISTIC,
+ properties=(
+ gatt.Characteristic.Properties.READ
+ | gatt.Characteristic.Properties.NOTIFY
+ ),
+ permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+ value=gatt.CharacteristicValue(read=self._on_read_volume_state),
+ )
+ self.volume_control_point = gatt.Characteristic(
+ uuid=gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC,
+ properties=gatt.Characteristic.Properties.WRITE,
+ permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
+ value=gatt.CharacteristicValue(write=self._on_write_volume_control_point),
+ )
+ self.volume_flags = gatt.Characteristic(
+ uuid=gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC,
+ properties=gatt.Characteristic.Properties.READ,
+ permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+ value=bytes([volume_flags]),
+ )
+
+ super().__init__(
+ [
+ self.volume_state,
+ self.volume_control_point,
+ self.volume_flags,
+ ]
+ )
+
+ @property
+ def volume_state_bytes(self) -> bytes:
+ return bytes([self.volume_setting, self.muted, self.change_counter])
+
+ @volume_state_bytes.setter
+ def volume_state_bytes(self, new_value: bytes) -> None:
+ self.volume_setting, self.muted, self.change_counter = new_value
+
+ def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes:
+ return self.volume_state_bytes
+
+ def _on_write_volume_control_point(
+ self, connection: Optional[device.Connection], value: bytes
+ ) -> None:
+ assert connection
+
+ opcode = VolumeControlPointOpcode(value[0])
+ change_counter = value[1]
+
+ if change_counter != self.change_counter:
+ raise att.ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
+
+ handler = getattr(self, '_on_' + opcode.name.lower())
+ if handler(*value[2:]):
+ self.change_counter = (self.change_counter + 1) % 256
+ connection.abort_on(
+ 'disconnection',
+ connection.device.notify_subscribers(
+ attribute=self.volume_state,
+ value=self.volume_state_bytes,
+ ),
+ )
+ self.emit(
+ 'volume_state', self.volume_setting, self.muted, self.change_counter
+ )
+
+ def _on_relative_volume_down(self) -> bool:
+ old_volume = self.volume_setting
+ self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME)
+ return self.volume_setting != old_volume
+
+ def _on_relative_volume_up(self) -> bool:
+ old_volume = self.volume_setting
+ self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME)
+ return self.volume_setting != old_volume
+
+ def _on_unmute_relative_volume_down(self) -> bool:
+ old_volume, old_muted_state = self.volume_setting, self.muted
+ self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME)
+ self.muted = 0
+ return (self.volume_setting, self.muted) != (old_volume, old_muted_state)
+
+ def _on_unmute_relative_volume_up(self) -> bool:
+ old_volume, old_muted_state = self.volume_setting, self.muted
+ self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME)
+ self.muted = 0
+ return (self.volume_setting, self.muted) != (old_volume, old_muted_state)
+
+ def _on_set_absolute_volume(self, volume_setting: int) -> bool:
+ old_volume_setting = self.volume_setting
+ self.volume_setting = volume_setting
+ return old_volume_setting != self.volume_setting
+
+ def _on_unmute(self) -> bool:
+ old_muted_state = self.muted
+ self.muted = 0
+ return self.muted != old_muted_state
+
+ def _on_mute(self) -> bool:
+ old_muted_state = self.muted
+ self.muted = 1
+ return self.muted != old_muted_state
+
+
+# -----------------------------------------------------------------------------
+# Client
+# -----------------------------------------------------------------------------
+class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
+ SERVICE_CLASS = VolumeControlService
+
+ volume_control_point: gatt_client.CharacteristicProxy
+
+ def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
+ self.service_proxy = service_proxy
+
+ self.volume_state = gatt.PackedCharacteristicAdapter(
+ service_proxy.get_characteristics_by_uuid(
+ gatt.GATT_VOLUME_STATE_CHARACTERISTIC
+ )[0],
+ 'BBB',
+ )
+
+ self.volume_control_point = service_proxy.get_characteristics_by_uuid(
+ gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC
+ )[0]
+
+ self.volume_flags = gatt.PackedCharacteristicAdapter(
+ service_proxy.get_characteristics_by_uuid(
+ gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
+ )[0],
+ 'B',
+ )