summaryrefslogtreecommitdiff
path: root/python-packages/adb/device.py
blob: a7d281223179797973bf5cf31634389604783887 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import base64
import logging
import os
import re
import subprocess


class FindDeviceError(RuntimeError):
    pass


class DeviceNotFoundError(FindDeviceError):
    def __init__(self, serial):
        self.serial = serial
        super(DeviceNotFoundError, self).__init__(
            'No device with serial {}'.format(serial))


class NoUniqueDeviceError(FindDeviceError):
    def __init__(self):
        super(NoUniqueDeviceError, self).__init__('No unique device')


class ShellError(RuntimeError):
    def __init__(self, cmd, stdout, stderr, exit_code):
        super(ShellError, self).__init__(
            '`{0}` exited with code {1}'.format(cmd, exit_code))
        self.cmd = cmd
        self.stdout = stdout
        self.stderr = stderr
        self.exit_code = exit_code


def get_devices(adb_path='adb'):
    with open(os.devnull, 'wb') as devnull:
        subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
                              stderr=devnull)
    out = split_lines(subprocess.check_output([adb_path, 'devices']))

    # The first line of `adb devices` just says "List of attached devices", so
    # skip that.
    devices = []
    for line in out[1:]:
        if not line.strip():
            continue
        if 'offline' in line:
            continue

        serial, _ = re.split(r'\s+', line, maxsplit=1)
        devices.append(serial)
    return devices


def _get_unique_device(product=None, adb_path='adb'):
    devices = get_devices(adb_path=adb_path)
    if len(devices) != 1:
        raise NoUniqueDeviceError()
    return AndroidDevice(devices[0], product, adb_path)


def _get_device_by_serial(serial, product=None, adb_path='adb'):
    for device in get_devices(adb_path=adb_path):
        if device == serial:
            return AndroidDevice(serial, product, adb_path)
    raise DeviceNotFoundError(serial)


def get_device(serial=None, product=None, adb_path='adb'):
    """Get a uniquely identified AndroidDevice if one is available.

    Raises:
        DeviceNotFoundError:
            The serial specified by `serial` or $ANDROID_SERIAL is not
            connected.

        NoUniqueDeviceError:
            Neither `serial` nor $ANDROID_SERIAL was set, and the number of
            devices connected to the system is not 1. Having 0 connected
            devices will also result in this error.

    Returns:
        An AndroidDevice associated with the first non-None identifier in the
        following order of preference:

        1) The `serial` argument.
        2) The environment variable $ANDROID_SERIAL.
        3) The single device connnected to the system.
    """
    if serial is not None:
        return _get_device_by_serial(serial, product, adb_path)

    android_serial = os.getenv('ANDROID_SERIAL')
    if android_serial is not None:
        return _get_device_by_serial(android_serial, product, adb_path)

    return _get_unique_device(product, adb_path=adb_path)


def _get_device_by_type(flag, adb_path):
    with open(os.devnull, 'wb') as devnull:
        subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
                              stderr=devnull)
    try:
        serial = subprocess.check_output(
            [adb_path, flag, 'get-serialno']).strip()
    except subprocess.CalledProcessError:
        raise RuntimeError('adb unexpectedly returned nonzero')
    if serial == 'unknown':
        raise NoUniqueDeviceError()
    return _get_device_by_serial(serial, adb_path=adb_path)


def get_usb_device(adb_path='adb'):
    """Get the unique USB-connected AndroidDevice if it is available.

    Raises:
        NoUniqueDeviceError:
            0 or multiple devices are connected via USB.

    Returns:
        An AndroidDevice associated with the unique USB-connected device.
    """
    return _get_device_by_type('-d', adb_path=adb_path)


def get_emulator_device(adb_path='adb'):
    """Get the unique emulator AndroidDevice if it is available.

    Raises:
        NoUniqueDeviceError:
            0 or multiple emulators are running.

    Returns:
        An AndroidDevice associated with the unique running emulator.
    """
    return _get_device_by_type('-e', adb_path=adb_path)


# If necessary, modifies subprocess.check_output() or subprocess.Popen() args
# to run the subprocess via Windows PowerShell to work-around an issue in
# Python 2's subprocess class on Windows where it doesn't support Unicode.
def _get_subprocess_args(args):
    # Only do this slow work-around if Unicode is in the cmd line on Windows.
    # PowerShell takes 600-700ms to startup on a 2013-2014 machine, which is
    # very slow.
    if os.name != 'nt' or all(not isinstance(arg, unicode) for arg in args[0]):
        return args

    def escape_arg(arg):
        # Escape for the parsing that the C Runtime does in Windows apps. In
        # particular, this will take care of double-quotes.
        arg = subprocess.list2cmdline([arg])
        # Escape single-quote with another single-quote because we're about
        # to...
        arg = arg.replace(u"'", u"''")
        # ...put the arg in a single-quoted string for PowerShell to parse.
        arg = u"'" + arg + u"'"
        return arg

    # Escape command line args.
    argv = map(escape_arg, args[0])
    # Cause script errors (such as adb not found) to stop script immediately
    # with an error.
    ps_code = u'$ErrorActionPreference = "Stop"\r\n'
    # Add current directory to the PATH var, to match cmd.exe/CreateProcess()
    # behavior.
    ps_code += u'$env:Path = ".;" + $env:Path\r\n'
    # Precede by &, the PowerShell call operator, and separate args by space.
    ps_code += u'& ' + u' '.join(argv)
    # Make the PowerShell exit code the exit code of the subprocess.
    ps_code += u'\r\nExit $LastExitCode'
    # Encode as UTF-16LE (without Byte-Order-Mark) which Windows natively
    # understands.
    ps_code = ps_code.encode('utf-16le')

    # Encode the PowerShell command as base64 and use the special
    # -EncodedCommand option that base64 decodes. Base64 is just plain ASCII,
    # so it should have no problem passing through Win32 CreateProcessA()
    # (which python erroneously calls instead of CreateProcessW()).
    return (['powershell.exe', '-NoProfile', '-NonInteractive',
             '-EncodedCommand', base64.b64encode(ps_code)],) + args[1:]


# Call this instead of subprocess.check_output() to work-around issue in Python
# 2's subprocess class on Windows where it doesn't support Unicode.
def _subprocess_check_output(*args, **kwargs):
    try:
        return subprocess.check_output(*_get_subprocess_args(args), **kwargs)
    except subprocess.CalledProcessError as e:
        # Show real command line instead of the powershell.exe command line.
        raise subprocess.CalledProcessError(e.returncode, args[0],
                                            output=e.output)


# Call this instead of subprocess.Popen(). Like _subprocess_check_output().
def _subprocess_Popen(*args, **kwargs):
    return subprocess.Popen(*_get_subprocess_args(args), **kwargs)


def split_lines(s):
    """Splits lines in a way that works even on Windows and old devices.

    Windows will see \r\n instead of \n, old devices do the same, old devices
    on Windows will see \r\r\n.
    """
    # rstrip is used here to workaround a difference between splineslines and
    # re.split:
    # >>> 'foo\n'.splitlines()
    # ['foo']
    # >>> re.split(r'\n', 'foo\n')
    # ['foo', '']
    return re.split(r'[\r\n]+', s.rstrip())


def version(adb_path='adb'):
    """Get the version of adb (in terms of ADB_SERVER_VERSION)."""

    version_output = subprocess.check_output([adb_path, 'version'])
    pattern = r'^Android Debug Bridge version 1.0.(\d+)$'
    result = re.match(pattern, version_output.splitlines()[0])
    if not result:
        return 0
    return int(result.group(1))


class AndroidDevice(object):
    # Delimiter string to indicate the start of the exit code.
    _RETURN_CODE_DELIMITER = 'x'

    # Follow any shell command with this string to get the exit
    # status of a program since this isn't propagated by adb.
    #
    # The delimiter is needed because `printf 1; echo $?` would print
    # "10", and we wouldn't be able to distinguish the exit code.
    _RETURN_CODE_PROBE = [';', 'echo', '{0}$?'.format(_RETURN_CODE_DELIMITER)]

    # Maximum search distance from the output end to find the delimiter.
    # adb on Windows returns \r\n even if adbd returns \n. Some old devices
    # seem to actually return \r\r\n.
    _RETURN_CODE_SEARCH_LENGTH = len(
        '{0}255\r\r\n'.format(_RETURN_CODE_DELIMITER))

    def __init__(self, serial, product=None, adb_path='adb'):
        self.serial = serial
        self.product = product
        self.adb_cmd = [adb_path]

        if self.serial is not None:
            self.adb_cmd.extend(['-s', serial])
        if self.product is not None:
            self.adb_cmd.extend(['-p', product])
        self._linesep = None
        self._features = None

    @property
    def linesep(self):
        if self._linesep is None:
            self._linesep = subprocess.check_output(self.adb_cmd +
                                                    ['shell', 'echo'])
        return self._linesep

    @property
    def features(self):
        if self._features is None:
            try:
                self._features = split_lines(self._simple_call(['features']))
            except subprocess.CalledProcessError:
                self._features = []
        return self._features

    def has_shell_protocol(self):
        return version(self.adb_cmd) >= 35 and 'shell_v2' in self.features

    def _make_shell_cmd(self, user_cmd):
        command = self.adb_cmd + ['shell'] + user_cmd
        if self.has_shell_protocol():
            command += self._RETURN_CODE_PROBE
        return command

    def _parse_shell_output(self, out):
        """Finds the exit code string from shell output.

        Args:
            out: Shell output string.

        Returns:
            An (exit_code, output_string) tuple. The output string is
            cleaned of any additional stuff we appended to find the
            exit code.

        Raises:
            RuntimeError: Could not find the exit code in |out|.
        """
        search_text = out
        if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
            # We don't want to search over massive amounts of data when we know
            # the part we want is right at the end.
            search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
        partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
        if partition[1] == '':
            raise RuntimeError('Could not find exit status in shell output.')
        result = int(partition[2])
        # partition[0] won't contain the full text if search_text was
        # truncated, pull from the original string instead.
        out = out[:-len(partition[1]) - len(partition[2])]
        return result, out

    def _simple_call(self, cmd):
        logging.info(' '.join(self.adb_cmd + cmd))
        return _subprocess_check_output(
            self.adb_cmd + cmd, stderr=subprocess.STDOUT)

    def shell(self, cmd):
        """Calls `adb shell`

        Args:
            cmd: command to execute as a list of strings.

        Returns:
            A (stdout, stderr) tuple. Stderr may be combined into stdout
            if the device doesn't support separate streams.

        Raises:
            ShellError: the exit code was non-zero.
        """
        exit_code, stdout, stderr = self.shell_nocheck(cmd)
        if exit_code != 0:
            raise ShellError(cmd, stdout, stderr, exit_code)
        return stdout, stderr

    def shell_nocheck(self, cmd):
        """Calls `adb shell`

        Args:
            cmd: command to execute as a list of strings.

        Returns:
            An (exit_code, stdout, stderr) tuple. Stderr may be combined
            into stdout if the device doesn't support separate streams.
        """
        cmd = self._make_shell_cmd(cmd)
        logging.info(' '.join(cmd))
        p = _subprocess_Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        if self.has_shell_protocol():
            exit_code = p.returncode
        else:
            exit_code, stdout = self._parse_shell_output(stdout)
        return exit_code, stdout, stderr

    def shell_popen(self, cmd, kill_atexit=True, preexec_fn=None,
                    creationflags=0, **kwargs):
        """Calls `adb shell` and returns a handle to the adb process.

        This function provides direct access to the subprocess used to run the
        command, without special return code handling. Users that need the
        return value must retrieve it themselves.

        Args:
            cmd: Array of command arguments to execute.
            kill_atexit: Whether to kill the process upon exiting.
            preexec_fn: Argument forwarded to subprocess.Popen.
            creationflags: Argument forwarded to subprocess.Popen.
            **kwargs: Arguments forwarded to subprocess.Popen.

        Returns:
            subprocess.Popen handle to the adb shell instance
        """

        command = self.adb_cmd + ['shell'] + cmd

        # Make sure a ctrl-c in the parent script doesn't kill gdbserver.
        if os.name == 'nt':
            creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
        else:
            if preexec_fn is None:
                preexec_fn = os.setpgrp
            elif preexec_fn is not os.setpgrp:
                fn = preexec_fn
                def _wrapper():
                    fn()
                    os.setpgrp()
                preexec_fn = _wrapper

        p = _subprocess_Popen(command, creationflags=creationflags,
                              preexec_fn=preexec_fn, **kwargs)

        if kill_atexit:
            atexit.register(p.kill)

        return p

    def install(self, filename, replace=False):
        cmd = ['install']
        if replace:
            cmd.append('-r')
        cmd.append(filename)
        return self._simple_call(cmd)

    def push(self, local, remote):
        return self._simple_call(['push', local, remote])

    def pull(self, remote, local):
        return self._simple_call(['pull', remote, local])

    def sync(self, directory=None):
        cmd = ['sync']
        if directory is not None:
            cmd.append(directory)
        return self._simple_call(cmd)

    def tcpip(self, port):
        return self._simple_call(['tcpip', port])

    def usb(self):
        return self._simple_call(['usb'])

    def reboot(self):
        return self._simple_call(['reboot'])

    def remount(self):
        return self._simple_call(['remount'])

    def root(self):
        return self._simple_call(['root'])

    def unroot(self):
        return self._simple_call(['unroot'])

    def connect(self, host):
        return self._simple_call(['connect', host])

    def disconnect(self, host):
        return self._simple_call(['disconnect', host])

    def forward(self, local, remote):
        return self._simple_call(['forward', local, remote])

    def forward_list(self):
        return self._simple_call(['forward', '--list'])

    def forward_no_rebind(self, local, remote):
        return self._simple_call(['forward', '--no-rebind', local, remote])

    def forward_remove(self, local):
        return self._simple_call(['forward', '--remove', local])

    def forward_remove_all(self):
        return self._simple_call(['forward', '--remove-all'])

    def reverse(self, remote, local):
        return self._simple_call(['reverse', remote, local])

    def reverse_list(self):
        return self._simple_call(['reverse', '--list'])

    def reverse_no_rebind(self, local, remote):
        return self._simple_call(['reverse', '--no-rebind', local, remote])

    def reverse_remove_all(self):
        return self._simple_call(['reverse', '--remove-all'])

    def reverse_remove(self, remote):
        return self._simple_call(['reverse', '--remove', remote])

    def wait(self):
        return self._simple_call(['wait-for-device'])

    def get_props(self):
        result = {}
        output, _ = self.shell(['getprop'])
        output = split_lines(output)
        pattern = re.compile(r'^\[([^]]+)\]: \[(.*)\]')
        for line in output:
            match = pattern.match(line)
            if match is None:
                raise RuntimeError('invalid getprop line: "{}"'.format(line))
            key = match.group(1)
            value = match.group(2)
            if key in result:
                raise RuntimeError('duplicate getprop key: "{}"'.format(key))
            result[key] = value
        return result

    def get_prop(self, prop_name):
        output = split_lines(self.shell(['getprop', prop_name])[0])
        if len(output) != 1:
            raise RuntimeError('Too many lines in getprop output:\n' +
                               '\n'.join(output))
        value = output[0]
        if not value.strip():
            return None
        return value

    def set_prop(self, prop_name, value):
        self.shell(['setprop', prop_name, value])