aboutsummaryrefslogtreecommitdiff
path: root/examples/at_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/at_protocol.py')
-rw-r--r--examples/at_protocol.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/examples/at_protocol.py b/examples/at_protocol.py
new file mode 100644
index 0000000..7d43007
--- /dev/null
+++ b/examples/at_protocol.py
@@ -0,0 +1,154 @@
+#! /usr/bin/env python
+# encoding: utf-8
+"""
+Example of a AT command protocol.
+
+https://en.wikipedia.org/wiki/Hayes_command_set
+http://www.itu.int/rec/T-REC-V.250-200307-I/en
+"""
+from __future__ import print_function
+
+import sys
+sys.path.insert(0, '..')
+
+import logging
+import serial
+import serial.threaded
+import threading
+
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+
+
+class ATException(Exception):
+ pass
+
+
+class ATProtocol(serial.threaded.LineReader):
+
+ TERMINATOR = b'\r\n'
+
+ def __init__(self):
+ super(ATProtocol, self).__init__()
+ self.alive = True
+ self.responses = queue.Queue()
+ self.events = queue.Queue()
+ self._event_thread = threading.Thread(target=self._run_event)
+ self._event_thread.daemon = True
+ self._event_thread.name = 'at-event'
+ self._event_thread.start()
+ self.lock = threading.Lock()
+
+ def stop(self):
+ """
+ Stop the event processing thread, abort pending commands, if any.
+ """
+ self.alive = False
+ self.events.put(None)
+ self.responses.put('<exit>')
+
+ def _run_event(self):
+ """
+ Process events in a separate thread so that input thread is not
+ blocked.
+ """
+ while self.alive:
+ try:
+ self.handle_event(self.events.get())
+ except:
+ logging.exception('_run_event')
+
+ def handle_line(self, line):
+ """
+ Handle input from serial port, check for events.
+ """
+ if line.startswith('+'):
+ self.events.put(line)
+ else:
+ self.responses.put(line)
+
+ def handle_event(self, event):
+ """
+ Spontaneous message received.
+ """
+ print('event received:', event)
+
+ def command(self, command, response='OK', timeout=5):
+ """
+ Set an AT command and wait for the response.
+ """
+ with self.lock: # ensure that just one thread is sending commands at once
+ self.write_line(command)
+ lines = []
+ while True:
+ try:
+ line = self.responses.get(timeout=timeout)
+ #~ print("%s -> %r" % (command, line))
+ if line == response:
+ return lines
+ else:
+ lines.append(line)
+ except queue.Empty:
+ raise ATException('AT command timeout ({!r})'.format(command))
+
+
+# test
+if __name__ == '__main__':
+ import time
+
+ class PAN1322(ATProtocol):
+ """
+ Example communication with PAN1322 BT module.
+
+ Some commands do not respond with OK but with a '+...' line. This is
+ implemented via command_with_event_response and handle_event, because
+ '+...' lines are also used for real events.
+ """
+
+ def __init__(self):
+ super(PAN1322, self).__init__()
+ self.event_responses = queue.Queue()
+ self._awaiting_response_for = None
+
+ def connection_made(self, transport):
+ super(PAN1322, self).connection_made(transport)
+ # our adapter enables the module with RTS=low
+ self.transport.serial.rts = False
+ time.sleep(0.3)
+ self.transport.serial.reset_input_buffer()
+
+ def handle_event(self, event):
+ """Handle events and command responses starting with '+...'"""
+ if event.startswith('+RRBDRES') and self._awaiting_response_for.startswith('AT+JRBD'):
+ rev = event[9:9 + 12]
+ mac = ':'.join('{:02X}'.format(ord(x)) for x in rev.decode('hex')[::-1])
+ self.event_responses.put(mac)
+ else:
+ logging.warning('unhandled event: {!r}'.format(event))
+
+ def command_with_event_response(self, command):
+ """Send a command that responds with '+...' line"""
+ with self.lock: # ensure that just one thread is sending commands at once
+ self._awaiting_response_for = command
+ self.transport.write(b'{}\r\n'.format(command.encode(self.ENCODING, self.UNICODE_HANDLING)))
+ response = self.event_responses.get()
+ self._awaiting_response_for = None
+ return response
+
+ # - - - example commands
+
+ def reset(self):
+ self.command("AT+JRES", response='ROK') # SW-Reset BT module
+
+ def get_mac_address(self):
+ # requests hardware / calibration info as event
+ return self.command_with_event_response("AT+JRBD")
+
+ ser = serial.serial_for_url('spy://COM1', baudrate=115200, timeout=1)
+ #~ ser = serial.Serial('COM1', baudrate=115200, timeout=1)
+ with serial.threaded.ReaderThread(ser, PAN1322) as bt_module:
+ bt_module.reset()
+ print("reset OK")
+ print("MAC address is", bt_module.get_mac_address())