diff options
Diffstat (limited to 'apps/CameraITS/pymodules/its/device.py')
-rw-r--r-- | apps/CameraITS/pymodules/its/device.py | 131 |
1 files changed, 62 insertions, 69 deletions
diff --git a/apps/CameraITS/pymodules/its/device.py b/apps/CameraITS/pymodules/its/device.py index ea763ef..3134b47 100644 --- a/apps/CameraITS/pymodules/its/device.py +++ b/apps/CameraITS/pymodules/its/device.py @@ -41,12 +41,6 @@ class ItsSession(object): sock: The open socket. """ - # TODO: Handle multiple connected devices. - # The adb program is used for communication with the device. Need to handle - # the case of multiple devices connected. Currently, uses the "-d" param - # to adb, which causes it to fail if there is more than one device. - ADB = "adb -d" - # Open a connection to localhost:6000, forwarded to port 6000 on the device. # TODO: Support multiple devices running over different TCP ports. IPADDR = '127.0.0.1' @@ -73,39 +67,59 @@ class ItsSession(object): CAP_RAW_YUV_JPEG = [{"format":"raw"}, {"format":"yuv"}, {"format":"jpeg"}] CAP_DNG_YUV_JPEG = [{"format":"dng"}, {"format":"yuv"}, {"format":"jpeg"}] - def __init__(self): - reboot_device_on_argv() - # Get the camera ID to open as an argument. - camera_id = 0 + # Method to handle the case where the service isn't already running. + # This occurs when a test is invoked directly from the command line, rather + # than as a part of a separate test harness which is setting up the device + # and the TCP forwarding. + def __pre_init(self): + # TODO: Handle multiple connected devices. + adb = "adb -d" + + # This also includes the optional reboot handling: if the user + # provides a "reboot" or "reboot=N" arg, then reboot the device, + # waiting for N seconds (default 30) before returning. for s in sys.argv[1:]: - if s[:7] == "camera=" and len(s) > 7: - camera_id = int(s[7:]) + if s[:6] == "reboot": + duration = 30 + if len(s) > 7 and s[6] == "=": + duration = int(s[7:]) + print "Rebooting device" + _run("%s reboot" % (adb)); + _run("%s wait-for-device" % (adb)) + time.sleep(duration) + print "Reboot complete" + # TODO: Figure out why "--user 0" is needed, and fix the problem. - _run('%s logcat -c' % (self.ADB)) - _run('%s shell am force-stop --user 0 %s' % (self.ADB, self.PACKAGE)) _run(('%s shell am startservice --user 0 -t text/plain ' - '-a %s -d %d') % (self.ADB, self.INTENT_START, camera_id)) - self._wait_until_socket_ready() - _run('%s forward tcp:%d tcp:%d' % (self.ADB,self.PORT,self.PORT)) - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((self.IPADDR, self.PORT)) - self.sock.settimeout(self.SOCK_TIMEOUT) + '-a %s') % (adb, self.INTENT_START)) - def _wait_until_socket_ready(self): + # Wait until the socket is ready to accept a connection. proc = subprocess.Popen( - self.ADB.split() + ["logcat"], + adb.split() + ["logcat"], stdout=subprocess.PIPE) logcat = proc.stdout while True: line = logcat.readline().strip() - if line.find('Waiting for client to connect to socket') >= 0: + if line.find('ItsService ready') >= 0: break proc.kill() + # Setup the TCP-over-ADB forwarding. + _run('%s forward tcp:%d tcp:%d' % (adb,self.PORT,self.PORT)) + + def __init__(self): + if "noinit" not in sys.argv: + self.__pre_init() + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.IPADDR, self.PORT)) + self.sock.settimeout(self.SOCK_TIMEOUT) + self.__close_camera() + self.__open_camera() + def __del__(self): if hasattr(self, 'sock') and self.sock: + self.__close_camera() self.sock.close() - _run('%s shell am force-stop --user 0 %s' % (self.ADB, self.PACKAGE)) def __enter__(self): return self @@ -117,7 +131,11 @@ class ItsSession(object): # Read a line (newline-terminated) string serialization of JSON object. chars = [] while len(chars) == 0 or chars[-1] != '\n': - chars.append(self.sock.recv(1)) + ch = self.sock.recv(1) + if len(ch) == 0: + # Socket was probably closed; otherwise don't get empty strings + raise its.error.Error('Problem with socket on device side') + chars.append(ch) line = ''.join(chars) jobj = json.loads(line) # Optionally read a binary buffer of a fixed size. @@ -133,6 +151,25 @@ class ItsSession(object): buf = numpy.frombuffer(buf, dtype=numpy.uint8) return jobj, buf + def __open_camera(self): + # Get the camera ID to open as an argument. + camera_id = 0 + for s in sys.argv[1:]: + if s[:7] == "camera=" and len(s) > 7: + camera_id = int(s[7:]) + cmd = {"cmdName":"open", "cameraId":camera_id} + self.sock.send(json.dumps(cmd) + "\n") + data,_ = self.__read_response_from_socket() + if data['tag'] != 'cameraOpened': + raise its.error.Error('Invalid command response') + + def __close_camera(self): + cmd = {"cmdName":"close"} + self.sock.send(json.dumps(cmd) + "\n") + data,_ = self.__read_response_from_socket() + if data['tag'] != 'cameraClosed': + raise its.error.Error('Invalid command response') + def do_vibrate(self, pattern): """Cause the device to vibrate to a specific pattern. @@ -454,50 +491,6 @@ def _run(cmd): subprocess.check_call( cmd.split(), stdout=devnull, stderr=subprocess.STDOUT) -def reboot_device(sleep_duration=30): - """Function to reboot a device and block until it is ready. - - Can be used at the start of a test to get the device into a known good - state. Will disconnect any other adb sessions, so this function is not - a part of the ItsSession class (which encapsulates a session with a - device.) - - Args: - sleep_duration: (Optional) the length of time to sleep (seconds) after - the device comes online before returning; this gives the device - time to finish booting. - """ - print "Rebooting device" - _run("%s reboot" % (ItsSession.ADB)); - _run("%s wait-for-device" % (ItsSession.ADB)) - time.sleep(sleep_duration) - print "Reboot complete" - -def reboot_device_on_argv(): - """Examine sys.argv, and reboot if the "reboot" arg is present. - - If the script command line contains either: - - reboot - reboot=30 - - then the device will be rebooted, and if the optional numeric arg is - present, then that will be the sleep duration passed to the reboot - call. - - Returns: - Boolean, indicating whether the device was rebooted. - """ - for s in sys.argv[1:]: - if s[:6] == "reboot": - if len(s) > 7 and s[6] == "=": - duration = int(s[7:]) - reboot_device(duration) - elif len(s) == 6: - reboot_device() - return True - return False - class __UnitTest(unittest.TestCase): """Run a suite of unit tests on this module. """ |