aboutsummaryrefslogtreecommitdiff
path: root/apps/CameraITS/pymodules/its/device.py
diff options
context:
space:
mode:
Diffstat (limited to 'apps/CameraITS/pymodules/its/device.py')
-rw-r--r--apps/CameraITS/pymodules/its/device.py131
1 files changed, 62 insertions, 69 deletions
diff --git a/apps/CameraITS/pymodules/its/device.py b/apps/CameraITS/pymodules/its/device.py
index ea763ef..3134b47 100644
--- a/apps/CameraITS/pymodules/its/device.py
+++ b/apps/CameraITS/pymodules/its/device.py
@@ -41,12 +41,6 @@ class ItsSession(object):
sock: The open socket.
"""
- # TODO: Handle multiple connected devices.
- # The adb program is used for communication with the device. Need to handle
- # the case of multiple devices connected. Currently, uses the "-d" param
- # to adb, which causes it to fail if there is more than one device.
- ADB = "adb -d"
-
# Open a connection to localhost:6000, forwarded to port 6000 on the device.
# TODO: Support multiple devices running over different TCP ports.
IPADDR = '127.0.0.1'
@@ -73,39 +67,59 @@ class ItsSession(object):
CAP_RAW_YUV_JPEG = [{"format":"raw"}, {"format":"yuv"}, {"format":"jpeg"}]
CAP_DNG_YUV_JPEG = [{"format":"dng"}, {"format":"yuv"}, {"format":"jpeg"}]
- def __init__(self):
- reboot_device_on_argv()
- # Get the camera ID to open as an argument.
- camera_id = 0
+ # Method to handle the case where the service isn't already running.
+ # This occurs when a test is invoked directly from the command line, rather
+ # than as a part of a separate test harness which is setting up the device
+ # and the TCP forwarding.
+ def __pre_init(self):
+ # TODO: Handle multiple connected devices.
+ adb = "adb -d"
+
+ # This also includes the optional reboot handling: if the user
+ # provides a "reboot" or "reboot=N" arg, then reboot the device,
+ # waiting for N seconds (default 30) before returning.
for s in sys.argv[1:]:
- if s[:7] == "camera=" and len(s) > 7:
- camera_id = int(s[7:])
+ if s[:6] == "reboot":
+ duration = 30
+ if len(s) > 7 and s[6] == "=":
+ duration = int(s[7:])
+ print "Rebooting device"
+ _run("%s reboot" % (adb));
+ _run("%s wait-for-device" % (adb))
+ time.sleep(duration)
+ print "Reboot complete"
+
# TODO: Figure out why "--user 0" is needed, and fix the problem.
- _run('%s logcat -c' % (self.ADB))
- _run('%s shell am force-stop --user 0 %s' % (self.ADB, self.PACKAGE))
_run(('%s shell am startservice --user 0 -t text/plain '
- '-a %s -d %d') % (self.ADB, self.INTENT_START, camera_id))
- self._wait_until_socket_ready()
- _run('%s forward tcp:%d tcp:%d' % (self.ADB,self.PORT,self.PORT))
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((self.IPADDR, self.PORT))
- self.sock.settimeout(self.SOCK_TIMEOUT)
+ '-a %s') % (adb, self.INTENT_START))
- def _wait_until_socket_ready(self):
+ # Wait until the socket is ready to accept a connection.
proc = subprocess.Popen(
- self.ADB.split() + ["logcat"],
+ adb.split() + ["logcat"],
stdout=subprocess.PIPE)
logcat = proc.stdout
while True:
line = logcat.readline().strip()
- if line.find('Waiting for client to connect to socket') >= 0:
+ if line.find('ItsService ready') >= 0:
break
proc.kill()
+ # Setup the TCP-over-ADB forwarding.
+ _run('%s forward tcp:%d tcp:%d' % (adb,self.PORT,self.PORT))
+
+ def __init__(self):
+ if "noinit" not in sys.argv:
+ self.__pre_init()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.connect((self.IPADDR, self.PORT))
+ self.sock.settimeout(self.SOCK_TIMEOUT)
+ self.__close_camera()
+ self.__open_camera()
+
def __del__(self):
if hasattr(self, 'sock') and self.sock:
+ self.__close_camera()
self.sock.close()
- _run('%s shell am force-stop --user 0 %s' % (self.ADB, self.PACKAGE))
def __enter__(self):
return self
@@ -117,7 +131,11 @@ class ItsSession(object):
# Read a line (newline-terminated) string serialization of JSON object.
chars = []
while len(chars) == 0 or chars[-1] != '\n':
- chars.append(self.sock.recv(1))
+ ch = self.sock.recv(1)
+ if len(ch) == 0:
+ # Socket was probably closed; otherwise don't get empty strings
+ raise its.error.Error('Problem with socket on device side')
+ chars.append(ch)
line = ''.join(chars)
jobj = json.loads(line)
# Optionally read a binary buffer of a fixed size.
@@ -133,6 +151,25 @@ class ItsSession(object):
buf = numpy.frombuffer(buf, dtype=numpy.uint8)
return jobj, buf
+ def __open_camera(self):
+ # Get the camera ID to open as an argument.
+ camera_id = 0
+ for s in sys.argv[1:]:
+ if s[:7] == "camera=" and len(s) > 7:
+ camera_id = int(s[7:])
+ cmd = {"cmdName":"open", "cameraId":camera_id}
+ self.sock.send(json.dumps(cmd) + "\n")
+ data,_ = self.__read_response_from_socket()
+ if data['tag'] != 'cameraOpened':
+ raise its.error.Error('Invalid command response')
+
+ def __close_camera(self):
+ cmd = {"cmdName":"close"}
+ self.sock.send(json.dumps(cmd) + "\n")
+ data,_ = self.__read_response_from_socket()
+ if data['tag'] != 'cameraClosed':
+ raise its.error.Error('Invalid command response')
+
def do_vibrate(self, pattern):
"""Cause the device to vibrate to a specific pattern.
@@ -454,50 +491,6 @@ def _run(cmd):
subprocess.check_call(
cmd.split(), stdout=devnull, stderr=subprocess.STDOUT)
-def reboot_device(sleep_duration=30):
- """Function to reboot a device and block until it is ready.
-
- Can be used at the start of a test to get the device into a known good
- state. Will disconnect any other adb sessions, so this function is not
- a part of the ItsSession class (which encapsulates a session with a
- device.)
-
- Args:
- sleep_duration: (Optional) the length of time to sleep (seconds) after
- the device comes online before returning; this gives the device
- time to finish booting.
- """
- print "Rebooting device"
- _run("%s reboot" % (ItsSession.ADB));
- _run("%s wait-for-device" % (ItsSession.ADB))
- time.sleep(sleep_duration)
- print "Reboot complete"
-
-def reboot_device_on_argv():
- """Examine sys.argv, and reboot if the "reboot" arg is present.
-
- If the script command line contains either:
-
- reboot
- reboot=30
-
- then the device will be rebooted, and if the optional numeric arg is
- present, then that will be the sleep duration passed to the reboot
- call.
-
- Returns:
- Boolean, indicating whether the device was rebooted.
- """
- for s in sys.argv[1:]:
- if s[:6] == "reboot":
- if len(s) > 7 and s[6] == "=":
- duration = int(s[7:])
- reboot_device(duration)
- elif len(s) == 6:
- reboot_device()
- return True
- return False
-
class __UnitTest(unittest.TestCase):
"""Run a suite of unit tests on this module.
"""