diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 05:19:53 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2023-07-07 05:19:53 +0000 |
commit | 7a9fb41030e1e2c83a6f8e7a93d588eddfe3aec4 (patch) | |
tree | 26c4570f6cc8f5ee36233bd88a2b1236e65a8f20 | |
parent | 5979bcb5027a90f89acb289aefbbbfb51e0d1b4b (diff) | |
parent | c362e05a76d7238f684234fe4ad6768c0c461b8d (diff) | |
download | adb-android14-mainline-sdkext-release.tar.gz |
Snap for 10453563 from c362e05a76d7238f684234fe4ad6768c0c461b8d to mainline-sdkext-releaseaml_sdk_341510000aml_sdk_341410000aml_sdk_341110080aml_sdk_341110000aml_sdk_341010000aml_sdk_340912010android14-mainline-sdkext-release
Change-Id: I5b8c69036ab58b779643c014c99e7d400ae32fab
68 files changed, 1796 insertions, 606 deletions
@@ -62,6 +62,7 @@ cc_defaults { "-lpthread", "-framework CoreFoundation", "-framework IOKit", + "-framework Security", "-lobjc", ], cflags: [ @@ -248,6 +249,7 @@ libadb_test_srcs = [ "adb_listeners_test.cpp", "adb_utils_test.cpp", "fdevent/fdevent_test.cpp", + "shell_service_protocol.cpp", "socket_spec_test.cpp", "socket_test.cpp", "sysdeps_test.cpp", @@ -362,6 +364,7 @@ cc_test_host { defaults: ["adb_defaults"], srcs: libadb_test_srcs + [ "client/mdns_utils_test.cpp", + "test_utils/test_utils.cpp", ], static_libs: [ @@ -393,6 +396,11 @@ cc_test_host { shared_libs: ["AdbWinApi"], }, }, + + test_options: { + // TODO(b/247985207) remove "no-remote" tag when b/247985207 is fixed. + tags: ["no-remote"], + } } cc_binary_host { @@ -465,6 +473,8 @@ cc_binary_host { targets: [ "dist_files", "sdk", + "sdk-repo-platform-tools", + "sdk_repo", "win_sdk", ], }, @@ -753,8 +763,14 @@ phony { "libadbd_fs", "abb", "reboot", - "set-verity-state", - ] + ], + product_variables: { + debuggable: { + required: [ + "remount", + ], + }, + }, } phony { @@ -801,6 +817,13 @@ cc_binary { ], } +ADBD_TEST_LIBS = [ + "libadbd", + "libadbd_auth", + "libbase", + "libusb", +] + cc_test { name: "adbd_test", @@ -808,15 +831,17 @@ cc_test { recovery_available: false, srcs: libadb_test_srcs + [ + "daemon/restart_service.cpp", + "daemon/restart_service_test.cpp", "daemon/services.cpp", "daemon/shell_service.cpp", "daemon/shell_service_test.cpp", - "shell_service_protocol.cpp", + "test_utils/test_utils.cpp", "shell_service_protocol_test.cpp", "mdns_test.cpp", ], - test_config: "adb_test.xml", + test_config: "adbd_test.xml", target: { android: { @@ -830,13 +855,16 @@ cc_test { "liblog", ], - static_libs: [ - "libadbd", - "libadbd_auth", - "libbase", - "libcrypto_utils", - "libusb", - ], + static_libs: ADBD_TEST_LIBS, + + // Shared lib versions of static libs can potentially come from + // libadbd_binary_dependencies (e.g. libbase as a shared_lib, depending on + // library_linking_strategy), which will cause both shared/static versions of + // the same library to be in the link action. + // + // adbd_test uses the static version of these libraries above, so exclude them here. + exclude_shared_libs: ADBD_TEST_LIBS, + test_suites: ["general-tests", "mts-adbd"], require_root: true, } @@ -849,14 +877,6 @@ python_test_host { ], test_config: "adb_integration_test_adb.xml", test_suites: ["general-tests"], - version: { - py2: { - enabled: false, - }, - py3: { - enabled: true, - }, - }, test_options: { unit_test: false, }, @@ -873,14 +893,6 @@ python_test_host { ], test_config: "adb_integration_test_device.xml", test_suites: ["general-tests"], - version: { - py2: { - enabled: false, - }, - py3: { - enabled: true, - }, - }, test_options: { unit_test: false, }, diff --git a/SERVICES.TXT b/SERVICES.TXT index e0c95c42..c8d06a32 100644 --- a/SERVICES.TXT +++ b/SERVICES.TXT @@ -79,9 +79,6 @@ host:<request> interpreted as 'any single device or emulator connected to/running on the host'. -<host-prefix>:get-product - XXX - <host-prefix>:get-serialno Returns the serial number of the corresponding device/emulator. Note that emulator serial numbers are of the form "emulator-5554" @@ -164,6 +161,22 @@ shell: this to implement "adb shell", but will also cook the input before sending it to the device (see interactive_shell() in commandline.c) +shell,v2: (API>=24) + Variant of shell service which uses "shell protocol" in order to + differentiate stdin, stderr, and also retrieve exit code. + +exec: + Variant of shell which uses a raw PTY in order to not mangle output. + +abb: (API>=30) + Direct connection to Binder on device. This service does not use space + for parameter separator but "\u0000". Example: + abb:package0install-create + +abb_exec: (API>=30) + Variant of abb. Use a raw PTY in order to not mangle output. Example: + abb_exec:package0install-write + remount: Ask adbd to remount the device's filesystem in read-write mode, instead of read-only. This is usually necessary before performing @@ -172,6 +185,12 @@ remount: This request may not succeed on certain builds which do not allow that. +dev:<path> + Opens a device file and connects the client directly to it for + read/write purposes. Useful for debugging, but may require special + privileges and thus may not run on all devices. <path> is a full + path from the root of the filesystem. + tcp:<port> Tries to connect to tcp port <port> on localhost. @@ -227,6 +246,28 @@ track-jdwp Note that there is no single-shot service to retrieve the list only once. +track-app: + Improved version of "track-jdwp" service which also mentions whether the + app is profileable and its architecture. Each time the list changes, + a new messeage is sent (this service never stops). + + Each message features a hex4 length prefix followed by a + human-readable protocol buffer. e.g.: + + process { + pid: 18595 + debuggable: true + architecture: "arm64" + } + process { + pid: 18407 + debuggable: true + profileable: true + architecture: "arm64" + } + + Note: Generate a parser from [app_processes.proto]. + sync: This starts the file synchronization service, used to implement "adb push" and "adb pull". Since this service is pretty complex, it will be detailed diff --git a/TEST_MAPPING b/TEST_MAPPING index bb007292..e3e22d7c 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -33,10 +33,16 @@ "name": "adb_tls_connection_test" }, { + "name": "FastDeployTests" + }, + { + "name": "FastDeployHostTests" + }, + { "name": "MicrodroidHostTestCases" } ], - "hwasan-postsubmit": [ + "hwasan-presubmit": [ { "name": "adbd_test" }, @@ -24,7 +24,7 @@ _adb() { _init_completion || return fi - local where i cur serial + local where i cur serial state transport COMPREPLY=() serial="${ANDROID_SERIAL:-none}" @@ -41,6 +41,9 @@ _adb() { -*) where=OPTIONS ;; + wait-for-*) + where=OPTIONS + ;; *) if [[ $where == OPT_SERIAL ]]; then where=OPT_SERIAL_ARG @@ -60,13 +63,21 @@ _adb() { OPTIONS="-d -e -s -p" COMMAND="devices connect disconnect push pull sync shell emu logcat lolcat forward jdwp install uninstall bugreport help version start-server kill-server get-state get-serialno status-window remount reboot reboot-bootloader root usb tcpip disable-verity" + for state in device recovery rescue sideload bootloader disconnect ; do + for transport in -usb -local -any "" ; do + WAIT_COMMAND=wait-for${transport}-${state} + if [[ -n "${cur}" && $WAIT_COMMAND == "${cur}"* ]] ; then + COMMAND+=" $WAIT_COMMAND" + fi + done + done + case $where in OPTIONS|OPT_SERIAL|OPT_PATH) COMPREPLY=( $(compgen -W "$OPTIONS $COMMAND" -- "$cur") ) ;; OPT_SERIAL_ARG) - local devices=$(command adb devices 2> /dev/null | grep -v "List of devices" | awk '{ print $1 }') - COMPREPLY=( $(compgen -W "${devices}" -- ${cur}) ) + _adb_devices ;; COMMAND) if [[ $i -eq $COMP_CWORD ]]; then @@ -74,6 +85,9 @@ _adb() { else i=$((i+1)) case "${cur}" in + disconnect) + _adb_devices + ;; install) _adb_cmd_install "$serial" $i ;; @@ -106,6 +120,12 @@ _adb() { return 0 } +_adb_devices() { + local devices=$(command adb devices 2> /dev/null | grep -v "List of devices" | awk '{ print $1 }') + COMPREPLY=( $(compgen -W "${devices}" -- ${cur}) ) + return +} + _adb_cmd_install() { local serial i cur where @@ -29,6 +29,7 @@ #include <string.h> #include <sys/time.h> #include <time.h> +#include <unistd.h> #include <chrono> #include <condition_variable> @@ -94,12 +95,13 @@ static void DecrementActiveConnections() { std::string adb_version() { // Don't change the format of this --- it's parsed by ddmlib. return android::base::StringPrintf( - "Android Debug Bridge version %d.%d.%d\n" - "Version %s-%s\n" - "Installed as %s\n", - ADB_VERSION_MAJOR, ADB_VERSION_MINOR, ADB_SERVER_VERSION, - PLATFORM_TOOLS_VERSION, android::build::GetBuildNumber().c_str(), - android::base::GetExecutablePath().c_str()); + "Android Debug Bridge version %d.%d.%d\n" + "Version %s-%s\n" + "Installed as %s\n" + "Running on %s\n", + ADB_VERSION_MAJOR, ADB_VERSION_MINOR, ADB_SERVER_VERSION, PLATFORM_TOOLS_VERSION, + android::build::GetBuildNumber().c_str(), android::base::GetExecutablePath().c_str(), + GetOSVersion().c_str()); } uint32_t calculate_apacket_checksum(const apacket* p) { @@ -233,13 +235,18 @@ void print_packet(const char *label, apacket *p) } #endif -static void send_ready(unsigned local, unsigned remote, atransport *t) -{ +void send_ready(unsigned local, unsigned remote, atransport* t, uint32_t ack_bytes) { D("Calling send_ready"); apacket *p = get_apacket(); p->msg.command = A_OKAY; p->msg.arg0 = local; p->msg.arg1 = remote; + if (t->SupportsDelayedAck()) { + p->msg.data_length = sizeof(ack_bytes); + p->payload.resize(sizeof(ack_bytes)); + memcpy(p->payload.data(), &ack_bytes, sizeof(ack_bytes)); + } + send_packet(p, t); } @@ -392,8 +399,6 @@ static void handle_new_connection(atransport* t, apacket* p) { send_auth_request(t); } #endif - - update_transports(); } void handle_packet(apacket *p, atransport *t) @@ -461,39 +466,88 @@ void handle_packet(apacket *p, atransport *t) } break; - case A_OPEN: /* OPEN(local-id, 0, "destination") */ - if (t->online && p->msg.arg0 != 0 && p->msg.arg1 == 0) { - std::string_view address(p->payload.begin(), p->payload.size()); + case A_OPEN: { + /* OPEN(local-id, [send-buffer], "destination") */ + if (!t->online || p->msg.arg0 == 0) { + break; + } + + uint32_t send_bytes = static_cast<uint32_t>(p->msg.arg1); + if (t->SupportsDelayedAck() != static_cast<bool>(send_bytes)) { + LOG(ERROR) << "unexpected value of A_OPEN arg1: " << send_bytes + << " (delayed acks = " << t->SupportsDelayedAck() << ")"; + send_close(0, p->msg.arg0, t); + break; + } - // Historically, we received service names as a char*, and stopped at the first NUL - // byte. The client sent strings with null termination, which post-string_view, start - // being interpreted as part of the string, unless we explicitly strip them. - address = StripTrailingNulls(address); + std::string_view address(p->payload.begin(), p->payload.size()); - asocket* s = create_local_service_socket(address, t); - if (s == nullptr) { - send_close(0, p->msg.arg0, t); - } else { - s->peer = create_remote_socket(p->msg.arg0, t); - s->peer->peer = s; - send_ready(s->id, s->peer->id, t); - s->ready(s); - } + // Historically, we received service names as a char*, and stopped at the first NUL + // byte. The client sent strings with null termination, which post-string_view, start + // being interpreted as part of the string, unless we explicitly strip them. + address = StripTrailingNulls(address); +#if ADB_HOST + // The incoming address (from the payload) might be some other + // target (e.g tcp:<ip>:8000), however we do not allow *any* + // such requests - namely, those from (a potentially compromised) + // adbd (reverse:forward: source) port transport. + if (!t->IsReverseConfigured(address.data())) { + LOG(FATAL) << __func__ << " disallowed connect to " << address << " from " + << t->serial_name(); + } +#endif + asocket* s = create_local_service_socket(address, t); + if (s == nullptr) { + send_close(0, p->msg.arg0, t); + break; } + + s->peer = create_remote_socket(p->msg.arg0, t); + s->peer->peer = s; + + if (t->SupportsDelayedAck()) { + LOG(DEBUG) << "delayed ack available: send buffer = " << send_bytes; + s->available_send_bytes = send_bytes; + + // TODO: Make this adjustable at connection time? + send_ready(s->id, s->peer->id, t, INITIAL_DELAYED_ACK_BYTES); + } else { + LOG(DEBUG) << "delayed ack unavailable"; + send_ready(s->id, s->peer->id, t, 0); + } + + s->ready(s); break; + } case A_OKAY: /* READY(local-id, remote-id, "") */ if (t->online && p->msg.arg0 != 0 && p->msg.arg1 != 0) { asocket* s = find_local_socket(p->msg.arg1, 0); if (s) { - if(s->peer == nullptr) { + std::optional<int32_t> acked_bytes; + if (p->payload.size() == sizeof(int32_t)) { + int32_t value; + memcpy(&value, p->payload.data(), sizeof(value)); + // acked_bytes can be negative! + // + // In the future, we can use this to preemptively supply backpressure, instead + // of waiting for the writer to hit its limit. + acked_bytes = value; + } else if (p->payload.size() != 0) { + LOG(ERROR) << "invalid A_OKAY payload size: " << p->payload.size(); + return; + } + + if (s->peer == nullptr) { /* On first READY message, create the connection. */ s->peer = create_remote_socket(p->msg.arg0, t); s->peer->peer = s; + + local_socket_ack(s, acked_bytes); s->ready(s); } else if (s->peer->id == p->msg.arg0) { /* Other READY messages must use the same local-id */ - s->ready(s); + local_socket_ack(s, acked_bytes); } else { D("Invalid A_OKAY(%d,%d), expected A_OKAY(%d,%d) on transport %s", p->msg.arg0, p->msg.arg1, s->peer->id, p->msg.arg1, t->serial.c_str()); @@ -535,11 +589,7 @@ void handle_packet(apacket *p, atransport *t) if (t->online && p->msg.arg0 != 0 && p->msg.arg1 != 0) { asocket* s = find_local_socket(p->msg.arg1, p->msg.arg0); if (s) { - unsigned rid = p->msg.arg0; - if (s->enqueue(s, std::move(p->payload)) == 0) { - D("Enqueue the socket"); - send_ready(s->id, rid, t); - } + s->enqueue(s, std::move(p->payload)); } } break; @@ -827,7 +877,7 @@ int launch_server(const std::string& socket_spec, const char* one_device) { return -1; } - WCHAR args[64]; + WCHAR args[4096]; if (one_device) { snwprintf(args, arraysize(args), L"adb -L %s fork-server server --reply-fd %d --one-device %s", @@ -989,6 +1039,11 @@ int launch_server(const std::string& socket_spec, const char* one_device) { if (one_device) { child_argv.push_back("--one-device"); child_argv.push_back(one_device); + } else if (access("/etc/adb/one_device_required", F_OK) == 0) { + fprintf(stderr, + "adb: cannot start server: --one-device option is required for this system in " + "order to start adb.\n"); + return -1; } child_argv.push_back(nullptr); int result = execv(path.c_str(), const_cast<char* const*>(child_argv.data())); @@ -33,6 +33,10 @@ constexpr size_t MAX_PAYLOAD_V1 = 4 * 1024; constexpr size_t MAX_PAYLOAD = 1024 * 1024; constexpr size_t MAX_FRAMEWORK_PAYLOAD = 64 * 1024; +// When delayed acks are supported, the initial number of unacknowledged bytes we're willing to +// receive on a socket before the other side should block. +constexpr size_t INITIAL_DELAYED_ACK_BYTES = 32 * 1024 * 1024; + constexpr size_t LINUX_MAX_SOCKET_SIZE = 4194304; #define A_SYNC 0x434e5953 @@ -107,12 +111,14 @@ enum ConnectionState { kCsDetached, // USB device that's detached from the adb server. kCsOffline, - kCsBootloader, - kCsDevice, - kCsHost, - kCsRecovery, - kCsSideload, - kCsRescue, + // After CNXN packet, the ConnectionState describes not a state but the type of service + // on the other end of the transport. + kCsBootloader, // Device running fastboot OS (fastboot) or userspace fastboot (fastbootd). + kCsDevice, // Device running Android OS (adbd). + kCsHost, // What a device sees from its end of a Transport (adb host). + kCsRecovery, // Device with bootloader loaded but no ROM OS loaded (adbd). + kCsSideload, // Device running Android OS Sideload mode (minadbd sideload mode). + kCsRescue, // Device running Android OS Rescue mode (minadbd rescue mode). }; std::string to_string(ConnectionState state); @@ -159,21 +165,13 @@ asocket* host_service_to_socket(std::string_view name, std::string_view serial, #endif #if !ADB_HOST -asocket* daemon_service_to_socket(std::string_view name); +asocket* daemon_service_to_socket(std::string_view name, atransport* transport); #endif #if !ADB_HOST unique_fd execute_abb_command(std::string_view command); #endif -#if !ADB_HOST -int init_jdwp(void); -asocket* create_jdwp_service_socket(); -asocket* create_jdwp_tracker_service_socket(); -asocket* create_app_tracker_service_socket(); -unique_fd create_jdwp_connection_fd(int jdwp_pid); -#endif - bool handle_forward_request(const char* service, atransport* transport, int reply_fd); bool handle_forward_request(const char* service, std::function<atransport*(std::string* error)> transport_acquirer, @@ -235,6 +233,7 @@ void handle_offline(atransport* t); void send_connect(atransport* t); void send_tls_request(atransport* t); +void send_ready(unsigned local, unsigned remote, atransport* t, uint32_t ack_bytes); void parse_banner(const std::string&, atransport* t); @@ -252,9 +251,7 @@ void update_transport_status(); // Wait until device scan has completed and every transport is ready, or a timeout elapses. void adb_wait_for_device_initialization(); -#endif // ADB_HOST -#if ADB_HOST // When ssh-forwarding to a remote adb server, kill-server is almost never what you actually want, // and unfortunately, many other tools issue it. This adds a knob to reject kill-servers. void adb_set_reject_kill_server(bool reject); diff --git a/adb_test.xml b/adbd_test.xml index ce359323..ce359323 100644 --- a/adb_test.xml +++ b/adbd_test.xml diff --git a/apex/adbd.rc b/apex/adbd.rc index 9cb072bc..f2fcef38 100644 --- a/apex/adbd.rc +++ b/apex/adbd.rc @@ -4,3 +4,4 @@ service adbd /apex/com.android.adbd/bin/adbd --root_seclabel=u:r:su:s0 disabled override seclabel u:r:adbd:s0 + user root diff --git a/client/adb_install.cpp b/client/adb_install.cpp index 1619e855..b861ce22 100644 --- a/client/adb_install.cpp +++ b/client/adb_install.cpp @@ -39,6 +39,7 @@ #include "commandline.h" #include "fastdeploy.h" #include "incremental.h" +#include "sysdeps.h" using namespace std::literals; @@ -855,7 +856,8 @@ int install_multi_package(int argc, const char** argv) { session_ids.push_back(session_id); // Support splitAPKs by allowing the notation split1.apk:split2.apk:split3.apk as argument. - std::vector<std::string> splits = android::base::Split(file, ":"); + // The character used as separator is OS-dependent, see ENV_PATH_SEPARATOR_STR. + std::vector<std::string> splits = android::base::Split(file, ENV_PATH_SEPARATOR_STR); for (const std::string& split : splits) { struct stat sb; diff --git a/client/auth.cpp b/client/auth.cpp index db4c4790..10b835e5 100644 --- a/client/auth.cpp +++ b/client/auth.cpp @@ -331,9 +331,8 @@ int adb_auth_pubkey(const char* filename) { if (!pubkey_from_privkey(&pubkey, filename)) { return 1; } - pubkey.push_back('\n'); - - return WriteFdExactly(STDOUT_FILENO, pubkey.data(), pubkey.size()) ? 0 : 1; + fprintf(stdout, "%s\n", pubkey.data()); + return 0; } #if defined(__linux__) diff --git a/client/commandline.cpp b/client/commandline.cpp index 018bfd78..2823cbaf 100644 --- a/client/commandline.cpp +++ b/client/commandline.cpp @@ -46,7 +46,6 @@ #if !defined(_WIN32) #include <sys/ioctl.h> #include <termios.h> -#include <unistd.h> #else #define _POSIX #include <signal.h> @@ -120,12 +119,12 @@ static void help() { " localabstract:<unix domain socket name>\n" " localreserved:<unix domain socket name>\n" " localfilesystem:<unix domain socket name>\n" + " dev:<character device name>\n" " jdwp:<process pid> (remote only)\n" " vsock:<CID>:<port> (remote only)\n" " acceptfd:<fd> (listen only)\n" " forward --remove LOCAL remove specific forward socket connection\n" " forward --remove-all remove all forward socket connections\n" - " ppp TTY [PARAMETER...] run PPP over USB\n" " reverse --list list all reverse socket connections from device\n" " reverse [--no-rebind] REMOTE LOCAL\n" " reverse socket connection using:\n" @@ -253,6 +252,9 @@ static void help() { " $ANDROID_LOG_TAGS tags to be used by logcat (see logcat --help)\n" " $ADB_LOCAL_TRANSPORT_MAX_PORT max emulator scan port (default 5585, 16 emus)\n" " $ADB_MDNS_AUTO_CONNECT comma-separated list of mdns services to allow auto-connect (default adb-tls-connect)\n" + "\n" + "Online documentation: https://android.googlesource.com/platform/packages/modules/adb/+/refs/heads/master/docs/user/adb.1.md\n" + "\n" ); // clang-format on } @@ -1009,53 +1011,6 @@ static int adb_wipe_devices() { return 1; } -static int ppp(int argc, const char** argv) { -#if defined(_WIN32) - error_exit("adb %s not implemented on Win32", argv[0]); - __builtin_unreachable(); -#else - if (argc < 2) error_exit("usage: adb %s <adb service name> [ppp opts]", argv[0]); - - const char* adb_service_name = argv[1]; - std::string error_message; - int fd = adb_connect(adb_service_name, &error_message); - if (fd < 0) { - error_exit("could not open adb service %s: %s", adb_service_name, error_message.c_str()); - } - - pid_t pid = fork(); - if (pid == -1) { - perror_exit("fork failed"); - } - - if (pid == 0) { - // child side - int i; - - // copy args - const char** ppp_args = (const char**)alloca(sizeof(char*) * argc + 1); - ppp_args[0] = "pppd"; - for (i = 2 ; i < argc ; i++) { - //argv[2] and beyond become ppp_args[1] and beyond - ppp_args[i - 1] = argv[i]; - } - ppp_args[i-1] = nullptr; - - dup2(fd, STDIN_FILENO); - dup2(fd, STDOUT_FILENO); - adb_close(STDERR_FILENO); - adb_close(fd); - - execvp("pppd", (char* const*)ppp_args); - perror_exit("exec pppd failed"); - } - - // parent side - adb_close(fd); - return 0; -#endif /* !defined(_WIN32) */ -} - static bool wait_for_device(const char* service, std::optional<std::chrono::milliseconds> timeout = std::nullopt) { std::vector<std::string> components = android::base::Split(service, "-"); @@ -1280,7 +1235,9 @@ static int backup(int argc, const char** argv) { static int restore(int argc, const char** argv) { fprintf(stdout, "WARNING: adb restore is deprecated and may be removed in a future release\n"); - if (argc != 2) error_exit("restore requires an argument"); + if (argc < 2) { + error_exit("usage: adb restore FILENAME [ARG]..."); + } const char* filename = argv[1]; unique_fd tarFd(adb_open(filename, O_RDONLY)); @@ -1289,8 +1246,17 @@ static int restore(int argc, const char** argv) { return -1; } + std::string cmd = "restore:"; + argc -= 2; + argv += 2; + while (argc-- > 0) { + cmd += " " + escape_arg(*argv++); + } + + D("restore. filename=%s cmd=%s", filename, cmd.c_str()); + std::string error; - unique_fd fd(adb_connect("restore:", &error)); + unique_fd fd(adb_connect(cmd, &error)); if (fd < 0) { fprintf(stderr, "adb: unable to connect for restore: %s\n", error.c_str()); return -1; @@ -1592,6 +1558,7 @@ int adb_commandline(int argc, const char** argv) { if (isdigit(argv[0][2])) { id = argv[0] + 2; } else { + if (argc < 2 || argv[0][2] != '\0') error_exit("-t requires an argument"); id = argv[1]; --argc; ++argv; @@ -1980,7 +1947,10 @@ int adb_commandline(int argc, const char** argv) { parse_push_pull_args(&argv[1], argc - 1, &srcs, &dst, ©_attrs, &sync, &compression, &dry_run); - if (srcs.empty() || !dst) error_exit("push requires an argument"); + if (srcs.empty() || !dst) { + error_exit("push requires <source> and <destination> arguments"); + } + return do_sync_push(srcs, dst, sync, compression, dry_run) ? 0 : 1; } else if (!strcmp(argv[0], "pull")) { bool copy_attrs = false; @@ -2067,8 +2037,6 @@ int adb_commandline(int argc, const char** argv) { else if (!strcmp(argv[0], "logcat") || !strcmp(argv[0], "lolcat") || !strcmp(argv[0], "longcat")) { return logcat(argc, argv); - } else if (!strcmp(argv[0], "ppp")) { - return ppp(argc, argv); } else if (!strcmp(argv[0], "start-server")) { std::string error; const int result = adb_connect("host:start-server", &error); diff --git a/client/file_sync_client.cpp b/client/file_sync_client.cpp index 5a587dbc..4c8fd042 100644 --- a/client/file_sync_client.cpp +++ b/client/file_sync_client.cpp @@ -811,30 +811,45 @@ class SyncConnection { while (!deferred_acknowledgements_.empty()) { bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks; - ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0); - if (rc == 0) { - CHECK(!should_block); - return true; - } - - if (acknowledgement_buffer_.size() < sizeof(sync_status)) { - const ssize_t header_bytes_left = sizeof(sync_status) - buf.size(); - ssize_t rc = adb_read(fd, buf.end(), header_bytes_left); - if (rc <= 0) { - Error("failed to read copy response"); - return false; + enum class ReadStatus { + Success, + Failure, + TryLater, + }; + + // Read until the acknowledgement buffer has at least `amount` bytes in it (or there is + // an I/O error, or the socket has no data ready to read). + auto read_until_amount = [&](size_t amount) -> ReadStatus { + while (buf.size() < amount) { + // The fd is blocking, so if we want to avoid blocking in this function, we must + // poll first to verify that some data is available before trying to read it. + if (!should_block) { + ssize_t rc = adb_poll(&pfd, 1, 0); + if (rc == 0) { + return ReadStatus::TryLater; + } + } + const ssize_t bytes_left = amount - buf.size(); + ssize_t rc = adb_read(fd, buf.end(), bytes_left); + if (rc <= 0) { + Error("failed to read copy response"); + return ReadStatus::Failure; + } + buf.resize(buf.size() + rc); + if (!should_block && buf.size() < amount) { + return ReadStatus::TryLater; + } } + return ReadStatus::Success; + }; - buf.resize(buf.size() + rc); - if (rc != header_bytes_left) { - // Early exit if we run out of data in the socket. + switch (read_until_amount(sizeof(sync_status))) { + case ReadStatus::TryLater: return true; - } - - if (!should_block) { - // We don't want to read again yet, because the socket might be empty. - continue; - } + case ReadStatus::Failure: + return false; + case ReadStatus::Success: + break; } auto* hdr = reinterpret_cast<sync_status*>(buf.data()); @@ -854,29 +869,19 @@ class SyncConnection { return false; } - const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size(); - CHECK_GE(msg_bytes_left, 0); - if (msg_bytes_left > 0) { - ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left); - if (rc <= 0) { - Error("failed to read copy failure message"); + switch (read_until_amount(sizeof(sync_status) + hdr->msglen)) { + case ReadStatus::TryLater: + return true; + case ReadStatus::Failure: return false; - } - - buf.resize(buf.size() + rc); - if (rc != msg_bytes_left) { - if (should_block) { - continue; - } else { - return true; - } - } - - std::string msg(buf.begin() + sizeof(sync_status), buf.end()); - ReportDeferredCopyFailure(msg); - buf.resize(0); - return false; + case ReadStatus::Success: + break; } + + std::string msg(buf.begin() + sizeof(sync_status), buf.end()); + ReportDeferredCopyFailure(msg); + buf.resize(0); + return false; } return true; diff --git a/client/incremental_utils.cpp b/client/incremental_utils.cpp index c7cd3869..2f6958b6 100644 --- a/client/incremental_utils.cpp +++ b/client/incremental_utils.cpp @@ -50,9 +50,9 @@ Size verity_tree_blocks_for_file(Size fileSize) { Size total_tree_block_count = 0; - auto block_count = 1 + (fileSize - 1) / kBlockSize; - auto hash_block_count = block_count; - for (auto i = 0; hash_block_count > 1; i++) { + const Size block_count = 1 + (fileSize - 1) / kBlockSize; + Size hash_block_count = block_count; + while (hash_block_count > 1) { hash_block_count = (hash_block_count + hash_per_block - 1) / hash_per_block; total_tree_block_count += hash_block_count; } diff --git a/client/main.cpp b/client/main.cpp index cc89734f..e66c0150 100644 --- a/client/main.cpp +++ b/client/main.cpp @@ -107,9 +107,7 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, const char* o // act like Ctrl-C. signal(SIGBREAK, [](int) { raise(SIGINT); }); #endif - signal(SIGINT, [](int) { - fdevent_run_on_main_thread([]() { exit(0); }); - }); + signal(SIGINT, [](int) { fdevent_run_on_looper([]() { exit(0); }); }); if (one_device) { transport_set_one_device(one_device); @@ -220,7 +218,7 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, const char* o // We don't accept() client connections until this point: this way, clients // can't see wonky state early in startup even if they're connecting directly // to the server instead of going through the adb program. - fdevent_run_on_main_thread([] { enable_server_sockets(); }); + fdevent_run_on_looper([] { enable_server_sockets(); }); }); notify_thread.detach(); diff --git a/client/mdnsresponder_client.cpp b/client/mdnsresponder_client.cpp index 8ff38b55..9eab1b70 100644 --- a/client/mdnsresponder_client.cpp +++ b/client/mdnsresponder_client.cpp @@ -545,7 +545,7 @@ void init_mdns_transport_discovery_thread(void) { if (error_codes[i] != kDNSServiceErr_NoError) { D("Got %d browsing for mDNS service %s.", error_codes[i], kADBDNSServices[i]); } else { - fdevent_run_on_main_thread([i]() { + fdevent_run_on_looper([i]() { g_service_ref_fdes[i] = fdevent_create(adb_DNSServiceRefSockFD(g_service_refs[i]), pump_service_ref, &g_service_refs[i]); fdevent_set(g_service_ref_fdes[i], FDE_READ); diff --git a/client/openscreen/platform/task_runner.cpp b/client/openscreen/platform/task_runner.cpp index 167a1c90..a7d15483 100644 --- a/client/openscreen/platform/task_runner.cpp +++ b/client/openscreen/platform/task_runner.cpp @@ -31,7 +31,7 @@ using namespace openscreen; namespace mdns { AdbOspTaskRunner::AdbOspTaskRunner() { - check_main_thread(); + fdevent_check_looper(); thread_id_ = android::base::GetThreadId(); task_handler_ = std::thread([this]() { TaskExecutorWorker(); }); } @@ -112,7 +112,7 @@ void AdbOspTaskRunner::TaskExecutorWorker() { return 0; }); - fdevent_run_on_main_thread([&]() { waitable_task(); }); + fdevent_run_on_looper([&]() { waitable_task(); }); waitable_task.get_future().wait(); } diff --git a/client/pairing/tests/pairing_server.cpp b/client/pairing/tests/pairing_server.cpp index 9201e7a0..fc917266 100644 --- a/client/pairing/tests/pairing_server.cpp +++ b/client/pairing/tests/pairing_server.cpp @@ -212,12 +212,6 @@ bool PairingServerImpl::setupServer() { if (server_fd_.get() == -1) { PLOG(ERROR) << "Failed to start pairing connection server"; return false; - } else if (fcntl(server_fd_.get(), F_SETFD, FD_CLOEXEC) != 0) { - PLOG(ERROR) << "Failed to make server socket cloexec"; - return false; - } else if (fcntl(server_fd_.get(), F_SETFD, O_NONBLOCK) != 0) { - PLOG(ERROR) << "Failed to make server socket nonblocking"; - return false; } startConnectionEventsThread(); diff --git a/client/usb_libusb.cpp b/client/usb_libusb.cpp index e0b0104a..4cb697d8 100644 --- a/client/usb_libusb.cpp +++ b/client/usb_libusb.cpp @@ -695,7 +695,7 @@ struct LibusbConnection : public Connection { Stop(); - fdevent_run_on_main_thread([device]() { + fdevent_run_on_looper([device]() { process_device(device); libusb_unref_device(device); }); @@ -902,7 +902,7 @@ static void device_disconnected(libusb_device* device) { libusb_device* device = it->first; std::weak_ptr<LibusbConnection> connection_weak = it->second; usb_handles.erase(it); - fdevent_run_on_main_thread([connection_weak]() { + fdevent_run_on_looper([connection_weak]() { auto connection = connection_weak.lock(); if (connection) { connection->Stop(); diff --git a/client/usb_linux.cpp b/client/usb_linux.cpp index 7f926266..b2e4634a 100644 --- a/client/usb_linux.cpp +++ b/client/usb_linux.cpp @@ -59,7 +59,7 @@ using namespace std::literals; struct usb_handle { ~usb_handle() { - if (fd != -1) unix_close(fd); + if (fd != -1) unix_close(fd); } std::string path; @@ -71,8 +71,14 @@ struct usb_handle { unsigned zero_mask; unsigned writeable = 1; + // The usbdevfs_urb structure ends in a variable length array of + // usbdevfs_iso_packet_desc. Since none of the usb calls ever attempt + // to fill in those values, ignore this warning. +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wgnu-variable-sized-type-not-at-end" usbdevfs_urb urb_in; usbdevfs_urb urb_out; +#pragma clang diagnostic pop bool urb_in_busy = false; bool urb_out_busy = false; diff --git a/client/usb_osx.cpp b/client/usb_osx.cpp index e72d719a..99857e26 100644 --- a/client/usb_osx.cpp +++ b/client/usb_osx.cpp @@ -110,6 +110,16 @@ static void AndroidInterfaceAdded(io_iterator_t iterator); static std::unique_ptr<usb_handle> CheckInterface(IOUSBInterfaceInterface550** iface, UInt16 vendor, UInt16 product); +// Flag-guarded (using host env variable) feature that turns on +// the ability to clear the device-side endpoint also before +// starting. See public bug https://issuetracker.google.com/issues/37055927 +// for historical context. +static bool clear_endpoints() { + static const char* env(getenv("ADB_OSX_USB_CLEAR_ENDPOINTS")); + static bool result = env && strcmp("1", env) == 0; + return result; +} + static bool FindUSBDevices() { // Create the matching dictionary to find the Android device's adb interface. CFMutableDictionaryRef matchingDict = IOServiceMatching(kIOUSBInterfaceClassName); @@ -176,9 +186,8 @@ AndroidInterfaceAdded(io_iterator_t iterator) kr = (*iface)->GetInterfaceSubClass(iface, &subclass); kr = (*iface)->GetInterfaceProtocol(iface, &protocol); if (!is_adb_interface(if_class, subclass, protocol)) { - // Ignore non-ADB devices. - LOG(DEBUG) << "Ignoring interface with incorrect class/subclass/protocol - " << if_class - << ", " << subclass << ", " << protocol; + // Ignore non-ADB devices (interface with incorrect + // class/subclass/protocol). (*iface)->Release(iface); continue; } @@ -323,7 +332,17 @@ AndroidInterfaceAdded(io_iterator_t iterator) // Used to clear both the endpoints before starting. // When adb quits, we might clear the host endpoint but not the device. // So we make sure both sides are clear before starting up. +// Returns true if: +// - the feature is disabled (OSX/host only) +// - the feature is enabled and successfully clears both endpoints +// Returns false otherwise (if an error is encountered) static bool ClearPipeStallBothEnds(IOUSBInterfaceInterface550** interface, UInt8 bulkEp) { + // If feature-disabled, (silently) bypass clearing both + // endpoints (including device-side). + if (!clear_endpoints()) { + return true; + } + IOReturn rc = (*interface)->ClearPipeStallBothEnds(interface, bulkEp); if (rc != kIOReturnSuccess) { LOG(ERROR) << "Could not clear pipe stall both ends: " << std::hex << rc; @@ -377,19 +396,34 @@ static std::unique_ptr<usb_handle> CheckInterface(IOUSBInterfaceInterface550** i //* Iterate over the endpoints for this interface and find the first //* bulk in/out pipes available. These will be our read/write pipes. - for (endpoint = 1; endpoint <= interfaceNumEndpoints; endpoint++) { + for (endpoint = 1; endpoint <= interfaceNumEndpoints; ++endpoint) { UInt8 transferType; - UInt16 maxPacketSize; + UInt16 endPointMaxPacketSize = 0; UInt8 interval; + + // Attempt to retrieve the 'true' packet-size from supported interface. + kr = (*interface) + ->GetEndpointProperties(interface, 0, endpoint, + kUSBOut, + &transferType, + &endPointMaxPacketSize, &interval); + if (kr == kIOReturnSuccess) { + CHECK_NE(0, endPointMaxPacketSize); + } + + UInt16 pipePropMaxPacketSize; UInt8 number; UInt8 direction; UInt8 maxBurst; UInt8 mult; UInt16 bytesPerInterval; - kr = (*interface) - ->GetPipePropertiesV2(interface, endpoint, &direction, &number, &transferType, - &maxPacketSize, &interval, &maxBurst, &mult, + // Proceed with extracting the transfer direction, so we can fill in the + // appropriate fields (bulkIn or bulkOut). + kr = (*interface)->GetPipePropertiesV2(interface, endpoint, + &direction, &number, &transferType, + &pipePropMaxPacketSize, &interval, + &maxBurst, &mult, &bytesPerInterval); if (kr != kIOReturnSuccess) { LOG(ERROR) << "FindDeviceInterface - could not get pipe properties: " @@ -401,23 +435,31 @@ static std::unique_ptr<usb_handle> CheckInterface(IOUSBInterfaceInterface550** i if (kUSBIn == direction) { handle->bulkIn = endpoint; - if (!ClearPipeStallBothEnds(interface, handle->bulkIn)) goto err_get_pipe_props; + + if (!ClearPipeStallBothEnds(interface, handle->bulkIn)) { + goto err_get_pipe_props; + } } if (kUSBOut == direction) { handle->bulkOut = endpoint; - if (!ClearPipeStallBothEnds(interface, handle->bulkOut)) goto err_get_pipe_props; + + if (!ClearPipeStallBothEnds(interface, handle->bulkOut)) { + goto err_get_pipe_props; + } } - if (maxBurst != 0) + // Compute the packet-size, in case the system did not return the correct value. + if (endPointMaxPacketSize == 0 && maxBurst != 0) { // bMaxBurst is the number of additional packets in the burst. - maxPacketSize /= (maxBurst + 1); + endPointMaxPacketSize = pipePropMaxPacketSize / (maxBurst + 1); + } // mult is only relevant for isochronous endpoints. CHECK_EQ(0, mult); - handle->zero_mask = maxPacketSize - 1; - handle->max_packet_size = maxPacketSize; + handle->zero_mask = endPointMaxPacketSize - 1; + handle->max_packet_size = endPointMaxPacketSize; } handle->interface = interface; diff --git a/crypto/x509_generator.cpp b/crypto/x509_generator.cpp index 43b81530..a64e49af 100644 --- a/crypto/x509_generator.cpp +++ b/crypto/x509_generator.cpp @@ -24,6 +24,8 @@ #include <openssl/pem.h> #include <openssl/rsa.h> +#include <string.h> + namespace adb { namespace crypto { diff --git a/daemon/adb_wifi.cpp b/daemon/adb_wifi.cpp index 2f9e9b4d..2de219d8 100644 --- a/daemon/adb_wifi.cpp +++ b/daemon/adb_wifi.cpp @@ -69,7 +69,7 @@ TlsServer::TlsServer(int port) : port_(port) {} TlsServer::~TlsServer() { fdevent* fde = fd_event_; - fdevent_run_on_main_thread([fde]() { + fdevent_run_on_looper([fde]() { if (fde != nullptr) { fdevent_destroy(fde); } @@ -104,7 +104,7 @@ bool TlsServer::Start() { LOG(INFO) << "adbwifi started on port " << port_; std::unique_lock<std::mutex> lock(mutex); - fdevent_run_on_main_thread([&]() { + fdevent_run_on_looper([&]() { fd_event_ = fdevent_create(fd.release(), &TlsServer::StaticOnFdEvent, this); if (fd_event_ == nullptr) { LOG(ERROR) << "Failed to create fd event for TlsServer."; @@ -194,6 +194,7 @@ static void start_wifi_enabled_observer() { while (true) { std::string toggled_val = wifi_enabled ? "0" : "1"; LOG(INFO) << "Waiting for " << kWifiEnabledProp << "=" << toggled_val; + if (WaitForProperty(kWifiEnabledProp, toggled_val)) { wifi_enabled = !wifi_enabled; LOG(INFO) << kWifiEnabledProp << " changed to " << toggled_val; diff --git a/daemon/auth.cpp b/daemon/auth.cpp index 2c3844ba..d2f6d6bb 100644 --- a/daemon/auth.cpp +++ b/daemon/auth.cpp @@ -209,7 +209,7 @@ void adbd_cloexec_auth_socket() { static void adbd_auth_key_authorized(void* arg, uint64_t id) { LOG(INFO) << "adb client " << id << " authorized"; - fdevent_run_on_main_thread([=]() { + fdevent_run_on_looper([=]() { auto* transport = transport_from_callback_arg(arg); if (!transport) { LOG(ERROR) << "authorization received for deleted transport (" << id << "), ignoring"; diff --git a/daemon/file_sync_service.cpp b/daemon/file_sync_service.cpp index b59da972..f594d646 100644 --- a/daemon/file_sync_service.cpp +++ b/daemon/file_sync_service.cpp @@ -317,7 +317,7 @@ static bool handle_send_file_data(borrowed_fd s, unique_fd fd, uint32_t* timesta std::span<char> output; DecodeResult result = decoder->Decode(&output); if (result == DecodeResult::Error) { - SendSyncFailErrno(s, "decompress failed"); + SendSyncFail(s, "decompress failed"); return false; } diff --git a/daemon/jdwp_service.cpp b/daemon/jdwp_service.cpp index adae9f7f..2501688c 100644 --- a/daemon/jdwp_service.cpp +++ b/daemon/jdwp_service.cpp @@ -14,8 +14,6 @@ * limitations under the License. */ -#if !ADB_HOST - #if !defined(__ANDROID_RECOVERY__) #define TRACE_TAG JDWP @@ -461,12 +459,16 @@ static int jdwp_tracker_enqueue(asocket* s, apacket::payload_type) { } static asocket* create_process_tracker_service_socket(TrackerKind kind) { - auto t = std::make_unique<JdwpTracker>(kind, true); + std::unique_ptr<JdwpTracker> t = std::make_unique<JdwpTracker>(kind, true); if (!t) { LOG(FATAL) << "failed to allocate JdwpTracker"; } - memset(t.get(), 0, sizeof(asocket)); + /* Object layout (with an inheritance hierarchy) varies across arch (e.g + * armv7a/Android TV vs aarch64), so no assumptions can be made about + * accessing fields based on offsets (e.g memset(t.get(), 0, sizeof(asocket)) + * might clobber an unintended memory location). + */ install_local_socket(t.get()); D("LS(%d): created new jdwp tracker service", t->id); @@ -495,7 +497,7 @@ int init_jdwp(void) { adb_thread_setname("jdwp control"); adbconnection_listen([](int fd, ProcessInfo process) { LOG(INFO) << "jdwp connection from " << process.pid; - fdevent_run_on_main_thread([fd, process] { + fdevent_run_on_looper([fd, process] { unique_fd ufd(fd); auto proc = std::make_unique<JdwpProcess>(std::move(ufd), process); if (!proc) { @@ -534,4 +536,3 @@ int init_jdwp() { } #endif /* defined(__ANDROID_RECOVERY__) */ -#endif /* !ADB_HOST */ diff --git a/daemon/jdwp_service.h b/daemon/jdwp_service.h new file mode 100644 index 00000000..1daa0f9d --- /dev/null +++ b/daemon/jdwp_service.h @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "adb_unique_fd.h" +#include "socket.h" + +int init_jdwp(void); +asocket* create_jdwp_service_socket(); +asocket* create_jdwp_tracker_service_socket(); +asocket* create_app_tracker_service_socket(); +unique_fd create_jdwp_connection_fd(int jdwp_pid);
\ No newline at end of file diff --git a/daemon/main.cpp b/daemon/main.cpp index 1efa797c..677e853b 100644 --- a/daemon/main.cpp +++ b/daemon/main.cpp @@ -57,6 +57,7 @@ #include "socket_spec.h" #include "transport.h" +#include "daemon/jdwp_service.h" #include "daemon/mdns.h" #include "daemon/watchdog.h" @@ -159,6 +160,9 @@ static void drop_privileges(int server_port) { if (root_seclabel != nullptr) { if (selinux_android_setcon(root_seclabel) < 0) { + // If we failed to become root, don't try again to avoid a + // restart loop. + android::base::SetProperty("service.adb.root", "0"); LOG(FATAL) << "Could not set SELinux context"; } } @@ -317,6 +321,7 @@ int main(int argc, char** argv) { {"device_banner", required_argument, nullptr, 'b'}, {"version", no_argument, nullptr, 'v'}, {"logpostfsdata", no_argument, nullptr, 'l'}, + {nullptr, no_argument, nullptr, 0}, }; int option_index = 0; diff --git a/daemon/restart_service_test.cpp b/daemon/restart_service_test.cpp new file mode 100644 index 00000000..d2ae67cf --- /dev/null +++ b/daemon/restart_service_test.cpp @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2023 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "restart_service.h" + +#include <time.h> +#include <string> + +#include <android-base/properties.h> +#include <android-base/stringprintf.h> +#include <android-base/strings.h> +#include <gtest/gtest.h> + +#include "services.h" +#include "sysdeps.h" +#include "test_utils/test_utils.h" + +using namespace test_utils; + +// Test successful execution of tcp restart. +TEST(RestartServiceTest, RestartTcpServiceValidPortSuccess) { + // Identify an available port from the system allocated pool. + // The objective is to do a best guess attempt at randomizing the + // specified port on which the restart service listens. + unique_fd dontcare; + const int assigned_port(test_utils::GetUnassignedPort(dontcare)); + + unique_fd command_fd_ = create_service_thread( + "tcp", std::bind(restart_tcp_service, std::placeholders::_1, assigned_port)); + EXPECT_GE(command_fd_.get(), 0); + test_utils::ExpectLinesEqual( + ReadRaw(command_fd_), + {android::base::StringPrintf("restarting in TCP mode port: %d", assigned_port)}); + + EXPECT_EQ(android::base::GetProperty("service.adb.tcp.port", ""), + std::to_string(assigned_port)); +} + +// Test failure path of tcp restart. +TEST(RestartServiceTest, RestartTcpServiceInvalidPortFailure) { + const std::string port_str = android::base::GetProperty("service.adb.tcp.port", ""); + + const int port = -5; + unique_fd command_fd_ = create_service_thread( + "tcp", std::bind(restart_tcp_service, std::placeholders::_1, port)); + EXPECT_GE(command_fd_, 0); + test_utils::ExpectLinesEqual(ReadRaw(command_fd_), + {android::base::StringPrintf("invalid port %d", port)}); + + // Validate that there's no mutation. + EXPECT_EQ(android::base::GetProperty("service.adb.tcp.port", ""), port_str); +} + +// Test successful execution of usb restart. +TEST(RestartServiceTest, RestartUsbServiceSuccess) { + unique_fd command_fd_ = create_service_thread("usb", restart_usb_service); + EXPECT_GE(command_fd_, 0); + + test_utils::ExpectLinesEqual(ReadRaw(command_fd_), + {android::base::StringPrintf("restarting in USB mode")}); + + EXPECT_EQ(android::base::GetProperty("service.adb.tcp.port", ""), "0"); +} diff --git a/daemon/services.cpp b/daemon/services.cpp index d86b54a0..07fbe328 100644 --- a/daemon/services.cpp +++ b/daemon/services.cpp @@ -49,10 +49,12 @@ #include "adb_utils.h" #include "services.h" #include "socket_spec.h" +#include "sysdeps.h" #include "transport.h" #include "daemon/file_sync_service.h" #include "daemon/framebuffer_service.h" +#include "daemon/jdwp_service.h" #include "daemon/logging.h" #include "daemon/restart_service.h" #include "daemon/shell_service.h" @@ -131,7 +133,7 @@ static void spin_service(unique_fd fd) { return; } - fdevent_run_on_main_thread([fd = pipe_read.release()]() { + fdevent_run_on_looper([fd = pipe_read.release()]() { fdevent* fde = fdevent_create( fd, [](int, unsigned, void*) {}, nullptr); fdevent_add(fde, FDE_READ); @@ -161,9 +163,14 @@ static void spin_service(unique_fd fd) { } struct ServiceSocket : public asocket { - ServiceSocket() { + ServiceSocket() = delete; + explicit ServiceSocket(atransport* transport) { + CHECK(transport); install_local_socket(this); + this->transport = transport; this->enqueue = [](asocket* self, apacket::payload_type data) { + // TODO: This interface currently can't give any backpressure. + send_ready(self->id, self->peer->id, self->transport, data.size()); return static_cast<ServiceSocket*>(self)->Enqueue(std::move(data)); }; this->ready = [](asocket* self) { return static_cast<ServiceSocket*>(self)->Ready(); }; @@ -171,6 +178,11 @@ struct ServiceSocket : public asocket { } virtual ~ServiceSocket() = default; + ServiceSocket(const ServiceSocket& copy) = delete; + ServiceSocket(ServiceSocket&& move) = delete; + ServiceSocket& operator=(const ServiceSocket& copy) = delete; + ServiceSocket& operator=(ServiceSocket&& move) = delete; + virtual int Enqueue(apacket::payload_type data) { return -1; } virtual void Ready() {} virtual void Close() { @@ -188,9 +200,9 @@ struct ServiceSocket : public asocket { }; struct SinkSocket : public ServiceSocket { - explicit SinkSocket(size_t byte_count) { + explicit SinkSocket(atransport* transport, size_t byte_count) + : ServiceSocket(transport), bytes_left_(byte_count) { LOG(INFO) << "Creating new SinkSocket with capacity " << byte_count; - bytes_left_ = byte_count; } virtual ~SinkSocket() { LOG(INFO) << "SinkSocket destroyed"; } @@ -210,9 +222,9 @@ struct SinkSocket : public ServiceSocket { }; struct SourceSocket : public ServiceSocket { - explicit SourceSocket(size_t byte_count) { + explicit SourceSocket(atransport* transport, size_t byte_count) + : ServiceSocket(transport), bytes_left_(byte_count) { LOG(INFO) << "Creating new SourceSocket with capacity " << byte_count; - bytes_left_ = byte_count; } virtual ~SourceSocket() { LOG(INFO) << "SourceSocket destroyed"; } @@ -235,7 +247,7 @@ struct SourceSocket : public ServiceSocket { size_t bytes_left_; }; -asocket* daemon_service_to_socket(std::string_view name) { +asocket* daemon_service_to_socket(std::string_view name, atransport* transport) { if (name == "jdwp") { return create_jdwp_service_socket(); } else if (name == "track-jdwp") { @@ -247,13 +259,13 @@ asocket* daemon_service_to_socket(std::string_view name) { if (!ParseUint(&byte_count, name)) { return nullptr; } - return new SinkSocket(byte_count); + return new SinkSocket(transport, byte_count); } else if (android::base::ConsumePrefix(&name, "source:")) { uint64_t byte_count = 0; if (!ParseUint(&byte_count, name)) { return nullptr; } - return new SourceSocket(byte_count); + return new SourceSocket(transport, byte_count); } return nullptr; @@ -308,7 +320,9 @@ unique_fd daemon_service_to_fd(std::string_view name, atransport* transport) { } #endif - if (android::base::ConsumePrefix(&name, "jdwp:")) { + if (android::base::ConsumePrefix(&name, "dev:")) { + return unique_fd{unix_open(name, O_RDWR | O_CLOEXEC)}; + } else if (android::base::ConsumePrefix(&name, "jdwp:")) { pid_t pid; if (!ParseUint(&pid, name)) { return unique_fd{}; diff --git a/daemon/shell_service_test.cpp b/daemon/shell_service_test.cpp index cdd8dbed..f61aaebd 100644 --- a/daemon/shell_service_test.cpp +++ b/daemon/shell_service_test.cpp @@ -29,6 +29,9 @@ #include "adb_io.h" #include "shell_protocol.h" #include "sysdeps.h" +#include "test_utils/test_utils.h" + +using namespace test_utils; class ShellServiceTest : public ::testing::Test { public: @@ -74,81 +77,6 @@ void ShellServiceTest::StartTestCommandInProcess(std::string name, Command comma ASSERT_TRUE(command_fd_ >= 0); } -namespace { - -// Reads raw data from |fd| until it closes or errors. -std::string ReadRaw(borrowed_fd fd) { - char buffer[1024]; - char *cur_ptr = buffer, *end_ptr = buffer + sizeof(buffer); - - while (1) { - int bytes = adb_read(fd, cur_ptr, end_ptr - cur_ptr); - if (bytes <= 0) { - return std::string(buffer, cur_ptr); - } - cur_ptr += bytes; - } -} - -// Reads shell protocol data from |fd| until it closes or errors. Fills -// |stdout| and |stderr| with their respective data, and returns the exit code -// read from the protocol or -1 if an exit code packet was not received. -int ReadShellProtocol(borrowed_fd fd, std::string* stdout, std::string* stderr) { - int exit_code = -1; - stdout->clear(); - stderr->clear(); - - auto protocol = std::make_unique<ShellProtocol>(fd.get()); - while (protocol->Read()) { - switch (protocol->id()) { - case ShellProtocol::kIdStdout: - stdout->append(protocol->data(), protocol->data_length()); - break; - case ShellProtocol::kIdStderr: - stderr->append(protocol->data(), protocol->data_length()); - break; - case ShellProtocol::kIdExit: - EXPECT_EQ(-1, exit_code) << "Multiple exit packets received"; - EXPECT_EQ(1u, protocol->data_length()); - exit_code = protocol->data()[0]; - break; - default: - ADD_FAILURE() << "Unidentified packet ID: " << protocol->id(); - } - } - - return exit_code; -} - -// Checks if each line in |lines| exists in the same order in |output|. Blank -// lines in |output| are ignored for simplicity. -bool ExpectLinesEqual(const std::string& output, - const std::vector<std::string>& lines) { - auto output_lines = android::base::Split(output, "\r\n"); - size_t i = 0; - - for (const std::string& line : lines) { - // Skip empty lines in output. - while (i < output_lines.size() && output_lines[i].empty()) { - ++i; - } - if (i >= output_lines.size()) { - ADD_FAILURE() << "Ran out of output lines"; - return false; - } - EXPECT_EQ(line, output_lines[i]); - ++i; - } - - while (i < output_lines.size() && output_lines[i].empty()) { - ++i; - } - EXPECT_EQ(i, output_lines.size()) << "Found unmatched output lines"; - return true; -} - -} // namespace - // Tests a raw subprocess with no protocol. TEST_F(ShellServiceTest, RawNoProtocolSubprocess) { // [ -t 0 ] checks if stdin is connected to a terminal. diff --git a/daemon/usb.cpp b/daemon/usb.cpp index 6c3f735e..f9e085fb 100644 --- a/daemon/usb.cpp +++ b/daemon/usb.cpp @@ -61,10 +61,10 @@ using android::base::StringPrintf; // Also, each submitted operation does an allocation in the kernel of that size, so we want to // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed. static constexpr size_t kUsbReadQueueDepth = 8; -static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE; +static constexpr size_t kUsbReadSize = 16384; static constexpr size_t kUsbWriteQueueDepth = 8; -static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE; +static constexpr size_t kUsbWriteSize = 16384; static const char* to_string(enum usb_functionfs_event_type type) { switch (type) { @@ -567,6 +567,10 @@ struct UsbFfsConnection : public Connection { memcpy(&msg, block->payload.data(), sizeof(msg)); LOG(DEBUG) << "USB read:" << dump_header(&msg); incoming_header_ = msg; + + if (msg.command == A_CNXN) { + CancelWrites(); + } } else { size_t bytes_left = incoming_header_->data_length - incoming_payload_.size(); if (block->payload.size() > bytes_left) { @@ -676,6 +680,17 @@ struct UsbFfsConnection : public Connection { } } + void CancelWrites() { + std::lock_guard<std::mutex> lock(write_mutex_); + for (size_t i = 0; i < writes_submitted_; ++i) { + struct io_event res; + if (write_requests_[i].pending == true) { + LOG(INFO) << "cancelling pending write# " << i; + io_cancel(aio_context_.get(), &write_requests_[i].control, &res); + } + } + } + void HandleError(const std::string& error) { std::call_once(error_flag_, [&]() { if (transport_) { diff --git a/docs/user/adb.1.md b/docs/user/adb.1.md new file mode 100644 index 00000000..6070a953 --- /dev/null +++ b/docs/user/adb.1.md @@ -0,0 +1,385 @@ +# ADB(1) MAN PAGE + +# VERSION + +1.0.41 + +# NAME + +**adb** + CLI Client for ADB (Android Debug Bridge) Server. + +# SYNOPSIS + +**adb** [*GLOBAL_OPTIONS*] command [*COMMAND_OPTIONS*] + +# DESCRIPTION + +Connects to the ADB Server via its smart socket interface. Allows sending requests, receives responses and manages lifecycle of the adb server. + +Tasks are performed via commands. Some commands are fulfilled directly by the server while others are "forwarded over to the adbd(ADB daemon) running on the device. + +# GLOBAL OPTIONS: + +**-a** + Listen on all network interfaces, not just localhost. + +**-d** + Use USB device (error if multiple devices connected). + +**-e** + Use TCP/IP device (error if multiple TCP/IP devices available). + +**-s** **SERIAL** + Use device with given **SERIAL** (overrides $ANDROID_SERIAL). + +**-t** **ID** + Use device with given transport **ID**. + +**-H** + Name of adb server host [default=localhost]. + +**-P** **PORT** + Smart socket **PORT** of adb server [default=5037]. + +**-L** **SOCKET** + Listen on given socket for adb server [default=tcp:localhost:5037]. + +**\-\-one-device** **SERIAL**|**USB** + Server will only connect to one USB device, specified by a **SERIAL** number or **USB** device address (only with 'start-server' or 'server nodaemon'). + +**\-\-exit-on-write-error** + Exit if stdout is closed. + + +# GENERAL COMMANDS: + +devices [**-l**] + List connected devices. + +**-l** + Use long output. + +help + Show this help message. + +version + Show version number. + +# NETWORKING + +connect **HOST**[:**PORT**] + Connect to a device via TCP/IP [default **PORT**=5555]. + +disconnect [**HOST**[:**PORT**]] + Disconnect from given TCP/IP device [default **PORT**=5555], or all. + +pair **HOST**[:**PORT**] [**PAIRING_CODE**] + Pair with a device for secure TCP/IP communication. + +forward **\-\-list** | [**--no-rebind**] **LOCAL_REMOTE** | **\-\-remove** **LOCAL** | **\-\-remove-all** + +**\-\-list** + List all forward socket connections. + +[**--no-rebind**] **LOCAL_REMOTE** + Forward socket connection using one of the followings. + + **tcp**:**PORT** (local may be "tcp:0" to pick any open port. + **localreserved**:**UNIX_DOMAIN_SOCKET_NAME**. + **localfilesystem**:**UNIX_DOMAIN_SOCKET_NAME**. + **jdwp**:**PROCESS PID** (remote only). + **vsock**:**CID**:**PORT** (remote only). + **acceptfd**:**FD** (listen only). + +**\-\-remove** **LOCAL** + Remove specific forward socket connection. + +**\-\-remove-all** + Remove all forward socket connections. + +reverse **\-\-list** | [**\-\-no-rebind**] **REMOTE** **LOCAL** | **\-\-remove** **REMOTE** | **\-\-remove-all** + +**\-\-list** + List all reverse socket connections from device. + +[**\-\-no-rebind**] **REMOTE** **LOCAL** + Reverse socket connection using one of the following. + + tcp:**PORT** (**REMOTE** may be "tcp:0" to pick any open port). + localabstract:**UNIX_DOMAIN_SOCKET_NAME**. + localreserved:**UNIX_DOMAIN_SOCKET_NAME**. + localfilesystem:**UNIX_DOMAIN_SOCKET_NAME**. + +**\-\-remove** **REMOTE** + Remove specific reverse socket connection. + +**\-\-remove-all** + Remove all reverse socket connections from device. + +mdns **check** | **services** + Perform mDNS subcommands. + +**check** + Check if mdns discovery is available. + +**services** + List all discovered services. + + +# FILE TRANSFER: + +push [**--sync**] [**-z** **ALGORITHM**] [**-Z**] **LOCAL**... **REMOTE** + Copy local files/directories to device. + +**--sync** + Only push files that are newer on the host than the device. + +**-n** + Dry run, push files to device without storing to the filesystem. + +**-z** + enable compression with a specified algorithm (any/none/brotli/lz4/zstd). + +**-Z** + Disable compression. + +pull [**-a**] [**-z** **ALGORITHM**] [**-Z**] **REMOTE**... **LOCAL** + Copy files/dirs from device + +**-a** + preserve file timestamp and mode. + +**-z** + enable compression with a specified algorithm (**any**/**none**/**brotli**/**lz4**/**zstd**) + +**-Z** + disable compression + +sync [**-l**] [**-z** **ALGORITHM**] [**-Z**] [**all**|**data**|**odm**|**oem**|**product**|**system**|**system_ext**|**vendor**] + Sync a local build from $ANDROID_PRODUCT_OUT to the device (default all) + +**-n** + Dry run. Push files to device without storing to the filesystem. + +**-l** + List files that would be copied, but don't copy them. + +**-z** +Enable compression with a specified algorithm (**any**/**none**/**brotli**/**lz4**/**zstd**) + +**-Z** +Disable compression. + +# SHELL: + +shell [**-e** **ESCAPE**] [**-n**] [**-Tt**] [**-x**] [**COMMAND**...] + Run remote shell command (interactive shell if no command given). + +**-e** + Choose escape character, or "**none**"; default '**~**'. + +**-n** + Don't read from stdin. + +**-T**: + Disable pty allocation. + +**-t**: + Allocate a pty if on a tty (-tt: force pty allocation). + +**-x** + Disable remote exit codes and stdout/stderr separation. + +emu **COMMAND** + Run emulator console **COMMAND** + +# APP INSTALLATION +(see also `adb shell cmd package help`): + +install [**-lrtsdg**] [**--instant**] **PACKAGE** + Push a single package to the device and install it + +install-multiple [**-lrtsdpg**] [**--instant**] **PACKAGE**... + Push multiple APKs to the device for a single package and install them + +install-multi-package [**-lrtsdpg**] [**--instant**] **PACKAGE**... + Push one or more packages to the device and install them atomically + +**-r**: + Replace existing application. + +**-t** + Allow test packages. + +**-d** + Allow version code downgrade (debuggable packages only). + +**-p** + Partial application install (install-multiple only). + +**-g** + Grant all runtime permissions. + +**\-\-abi** **ABI** + Override platform's default ABI. + +**\-\-instant** + Cause the app to be installed as an ephemeral install app. + +**\-\-no-streaming** + Always push APK to device and invoke Package Manager as separate steps. + +**\-\-streaming** + Force streaming APK directly into Package Manager. + +**\-\-fastdeploy** + Use fast deploy. + +**-no-fastdeploy** + Prevent use of fast deploy. + +**-force-agent** + Force update of deployment agent when using fast deploy. + +**-date-check-agent** + Update deployment agent when local version is newer and using fast deploy. + +**\-\-version-check-agent** + Update deployment agent when local version has different version code and using fast deploy. + +**\-\-local-agent** + Locate agent files from local source build (instead of SDK location). See also `adb shell pm help` for more options. + +uninstall [**-k**] **APPLICATION_ID** + Remove this **APPLICATION_ID** from the device. + +**-k** + Keep the data and cache directories. + +# DEBUGGING: + +bugreport [**PATH**] + Write bugreport to given PATH [default=bugreport.zip]; if **PATH** is a directory, the bug report is saved in that directory. devices that don't support zipped bug reports output to stdout. + +jdwp + List pids of processes hosting a JDWP transport. + +logcat + Show device log (logcat --help for more). + + +# SECURITY: + +disable-verity + Disable dm-verity checking on userdebug builds. + +enable-verity + Re-enable dm-verity checking on userdebug builds. + +keygen **FILE** + Generate adb public/private key; private key stored in **FILE**. + +# SCRIPTING: + +wait-for [-**TRANSPORT**] -**STATE**... + Wait for device to be in a given state. + + **STATE**: device, recovery, rescue, sideload, bootloader, or disconnect. + **TRANSPORT**: **usb**, **local**, or **any** [default=**any**]. + +get-state + Print offline | bootloader | device. + +get-serialno + Print **SERIAL_NUMBER**. + +get-devpath + Print **DEVICE_PATH**. + +remount [**-R**] + Remount partitions read-write. + +**-R** + Automatically reboot the device. + +reboot [**bootloader**|**recovery**|**sideload**|**sideload-auto-reboot**] + Reboot the device; defaults to booting system image but supports **bootloader** and **recovery** too. + +**sideload** + Reboots into recovery and automatically starts sideload mode. + +**sideload-auto-reboot** + Same as **sideload** but reboots after sideloading. + + +sideload **OTAPACKAGE** + Sideload the given full OTA package **OTAPACKAGE**. + +root + Restart adbd with root permissions. + +unroot + Restart adbd without root permissions. + +usb + Restart adbd listening on USB. + +tcpip **PORT** + Restart adbd listening on TCP on **PORT**. + +# INTERNAL DEBUGGING: + +start-server + Ensure that there is a server running. + +kill-server + Kill the server if it is running. + +reconnect + Close connection from host side to force reconnect. + +reconnect device + Close connection from device side to force reconnect. + +reconnect offline + Reset offline/unauthorized devices to force reconnect. + +# USB: + +Only valid when running with libusb backend. + +attach **SERIAL** + Attach a detached USB device identified by its **SERIAL** number. + +detach **SERIAL** + Detach from a USB device identified by its **SERIAL** to allow use by other processes. + + +# ENVIRONMENT VARIABLES + +$ADB_TRACE + Comma-separated list of debug info to log: all,adb,sockets,packets,rwx,usb,sync,sysdeps,transport,jdwp. + +$ADB_VENDOR_KEYS + Colon-separated list of keys (files or directories). + +$ANDROID_SERIAL + Serial number to connect to (see -s). + +$ANDROID_LOG_TAGS + Tags to be used by logcat (see logcat --help). + +$ADB_LOCAL_TRANSPORT_MAX_PORT + Max emulator scan port (default 5585, 16 emulators). + +$ADB_MDNS_AUTO_CONNECT + Comma-separated list of mdns services to allow auto-connect (default adb-tls-connect). + +# BUGS + +See Issue Tracker: [here](https://issuetracker.google.com/issues/new?component=192795&template=1310483). + +# AUTHORS + +See [OWNERS](../../OWNERS) file in ADB AOSP repo. diff --git a/fastdeploy/Android.bp b/fastdeploy/Android.bp index c2daf9b5..84e62c62 100644 --- a/fastdeploy/Android.bp +++ b/fastdeploy/Android.bp @@ -36,6 +36,7 @@ java_library { java_binary { name: "deployagent", + wrapper: "deployagent/deployagent.sh", sdk_version: "24", static_libs: [ "deployagent_lib", @@ -76,6 +77,12 @@ android_test { optimize: { enabled: false, }, + + test_config: "FastDeployTests.xml", + + test_suites: [ + "general-tests", + ], } java_test_host { @@ -86,6 +93,8 @@ java_test_host { data: [ "testdata/helloworld5.apk", "testdata/helloworld7.apk", + "testdata/sample.apk", + "testdata/sample.cd", ], libs: [ "compatibility-host-util", @@ -95,4 +104,5 @@ java_test_host { test_suites: [ "general-tests", ], + test_config: "FastDeployHostDrivenTests.xml", } diff --git a/fastdeploy/FastDeployHostDrivenTests.xml b/fastdeploy/FastDeployHostDrivenTests.xml new file mode 100644 index 00000000..7208230e --- /dev/null +++ b/fastdeploy/FastDeployHostDrivenTests.xml @@ -0,0 +1,22 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + ~ Copyright (C) 2019 The Android Open Source Project + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License + --> +<configuration description="Runs Device Tests for FastDeploy."> + <option name="test-suite-tag" value="FastDeployTests"/> + <test class="com.android.compatibility.common.tradefed.testtype.JarHostTest" > + <option name="jar" value="FastDeployHostTests.jar" /> + </test> +</configuration> diff --git a/fastdeploy/AndroidTest.xml b/fastdeploy/FastDeployTests.xml index 24a72bc8..5eb3999e 100644 --- a/fastdeploy/AndroidTest.xml +++ b/fastdeploy/FastDeployTests.xml @@ -24,7 +24,6 @@ </target_preparer> <target_preparer class="com.android.tradefed.targetprep.PushFilePreparer"> - <option name="cleanup" value="false" /> <option name="push-file" key="sample.apk" value="/data/local/tmp/FastDeployTests/sample.apk" /> <option name="push-file" key="sample.cd" value="/data/local/tmp/FastDeployTests/sample.cd" /> </target_preparer> @@ -33,8 +32,4 @@ <option name="package" value="com.android.fastdeploytests"/> <option name="runner" value="androidx.test.runner.AndroidJUnitRunner"/> </test> - - <test class="com.android.compatibility.common.tradefed.testtype.JarHostTest" > - <option name="jar" value="FastDeployHostTests.jar" /> - </test> </configuration> diff --git a/fastdeploy/deployagent/src/com/android/fastdeploy/DeployAgent.java b/fastdeploy/deployagent/src/com/android/fastdeploy/DeployAgent.java index 3812307c..8f72419f 100644 --- a/fastdeploy/deployagent/src/com/android/fastdeploy/DeployAgent.java +++ b/fastdeploy/deployagent/src/com/android/fastdeploy/DeployAgent.java @@ -245,7 +245,7 @@ public final class DeployAgent { outputStream.flush(); if (bytesWritten != newSize) { throw new PatchFormatException(String.format( - "output size mismatch (expected %ld but wrote %ld)", newSize, bytesWritten)); + "output size mismatch (expected %d but wrote %d)", newSize, bytesWritten)); } return bytesWritten; } @@ -324,7 +324,7 @@ public final class DeployAgent { p.waitFor(); if (bytesWritten != newSize) { throw new PatchFormatException( - String.format("output size mismatch (expected %d but wrote %)", newSize, + String.format("output size mismatch (expected %d but wrote %d)", newSize, bytesWritten)); } return p.exitValue(); diff --git a/fdevent/fdevent.cpp b/fdevent/fdevent.cpp index 70cb9b3a..e20a2953 100644 --- a/fdevent/fdevent.cpp +++ b/fdevent/fdevent.cpp @@ -61,7 +61,8 @@ std::string dump_fde(const fdevent* fde) { } fdevent* fdevent_context::Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) { - CheckMainThread(); + CheckLooperThread(); + CHECK_GE(fd.get(), 0); int fd_num = fd.get(); @@ -82,12 +83,14 @@ fdevent* fdevent_context::Create(unique_fd fd, std::variant<fd_func, fd_func2> f LOG(ERROR) << "failed to set non-blocking mode for fd " << fde->fd.get(); } + this->fdevent_set_.insert(fde); this->Register(fde); return fde; } unique_fd fdevent_context::Destroy(fdevent* fde) { - CheckMainThread(); + CheckLooperThread(); + if (!fde) { return {}; } @@ -98,6 +101,8 @@ unique_fd fdevent_context::Destroy(fdevent* fde) { auto erased = this->installed_fdevents_.erase(fd.get()); CHECK_EQ(1UL, erased); + erased = this->fdevent_set_.erase(fde); + CHECK_EQ(1UL, erased); return fd; } @@ -113,7 +118,8 @@ void fdevent_context::Del(fdevent* fde, unsigned events) { } void fdevent_context::SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) { - CheckMainThread(); + CheckLooperThread(); // Caller thread is expected to have already + // initialized the looper thread instance variable. fde->timeout = timeout; fde->last_active = std::chrono::steady_clock::now(); } @@ -121,7 +127,8 @@ void fdevent_context::SetTimeout(fdevent* fde, std::optional<std::chrono::millis std::optional<std::chrono::milliseconds> fdevent_context::CalculatePollDuration() { std::optional<std::chrono::milliseconds> result = std::nullopt; auto now = std::chrono::steady_clock::now(); - CheckMainThread(); + + CheckLooperThread(); for (const auto& [fd, fde] : this->installed_fdevents_) { UNUSED(fd); @@ -146,7 +153,12 @@ std::optional<std::chrono::milliseconds> fdevent_context::CalculatePollDuration( void fdevent_context::HandleEvents(const std::vector<fdevent_event>& events) { for (const auto& event : events) { - invoke_fde(event.fde, event.events); + // Verify the fde is still installed before invoking it. It could have been unregistered + // and destroyed inside an earlier event handler. + if (this->fdevent_set_.find(event.fde) != this->fdevent_set_.end()) { + invoke_fde(event.fde, event.events); + break; + } } FlushRunQueue(); } @@ -168,9 +180,9 @@ void fdevent_context::FlushRunQueue() { } } -void fdevent_context::CheckMainThread() { - if (main_thread_id_) { - CHECK_EQ(*main_thread_id_, android::base::GetThreadId()); +void fdevent_context::CheckLooperThread() const { + if (looper_thread_id_) { + CHECK_EQ(*looper_thread_id_, android::base::GetThreadId()); } } @@ -239,7 +251,7 @@ void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> fdevent_get_ambient()->SetTimeout(fde, timeout); } -void fdevent_run_on_main_thread(std::function<void()> fn) { +void fdevent_run_on_looper(std::function<void()> fn) { fdevent_get_ambient()->Run(std::move(fn)); } @@ -247,8 +259,8 @@ void fdevent_loop() { fdevent_get_ambient()->Loop(); } -void check_main_thread() { - fdevent_get_ambient()->CheckMainThread(); +void fdevent_check_looper() { + fdevent_get_ambient()->CheckLooperThread(); } void fdevent_terminate_loop() { diff --git a/fdevent/fdevent.h b/fdevent/fdevent.h index bb3af746..7f8e1a2c 100644 --- a/fdevent/fdevent.h +++ b/fdevent/fdevent.h @@ -24,9 +24,10 @@ #include <chrono> #include <deque> #include <functional> +#include <map> #include <mutex> #include <optional> -#include <unordered_map> +#include <set> #include <variant> #include <android-base/thread_annotations.h> @@ -56,7 +57,6 @@ struct fdevent final { uint64_t id; unique_fd fd; - int force_eof = 0; uint16_t state = 0; std::optional<std::chrono::milliseconds> timeout; @@ -108,11 +108,11 @@ struct fdevent_context { // terminate_loop_ to determine whether to stop. virtual void Loop() = 0; - // Assert that the caller is either running on the context's main thread, or that there is no - // active main thread. - void CheckMainThread(); + // Assert that the caller is executing in the context of the execution + // thread that invoked Loop(). + void CheckLooperThread() const; - // Queue an operation to be run on the main thread. + // Queue an operation to be run on the looper thread. void Run(std::function<void()> fn); // Test-only functionality: @@ -123,16 +123,17 @@ struct fdevent_context { // Interrupt the run loop. virtual void Interrupt() = 0; - std::optional<uint64_t> main_thread_id_ = std::nullopt; + std::optional<uint64_t> looper_thread_id_ = std::nullopt; std::atomic<bool> terminate_loop_ = false; - protected: - std::unordered_map<int, fdevent> installed_fdevents_; + std::map<int, fdevent> installed_fdevents_; private: uint64_t fdevent_id_ = 0; std::mutex run_queue_mutex_; std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_); + + std::set<fdevent*> fdevent_set_; }; // Backwards compatibility shims that forward to the global fdevent_context. @@ -147,10 +148,14 @@ void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout); void fdevent_loop(); -void check_main_thread(); -// Queue an operation to run on the main thread. -void fdevent_run_on_main_thread(std::function<void()> fn); +// Delegates to the member function that checks for the initialization +// of Loop() so that fdevent_context requests can be serially processed +// by the global instance robustly. +void fdevent_check_looper(); + +// Queue an operation to run on the looper event thread. +void fdevent_run_on_looper(std::function<void()> fn); // The following functions are used only for tests. void fdevent_terminate_loop(); diff --git a/fdevent/fdevent_epoll.cpp b/fdevent/fdevent_epoll.cpp index 4ef41d1e..147913a5 100644 --- a/fdevent/fdevent_epoll.cpp +++ b/fdevent/fdevent_epoll.cpp @@ -105,17 +105,21 @@ void fdevent_context_epoll::Set(fdevent* fde, unsigned events) { } void fdevent_context_epoll::Loop() { - main_thread_id_ = android::base::GetThreadId(); + looper_thread_id_ = android::base::GetThreadId(); std::vector<fdevent_event> fde_events; + std::unordered_map<fdevent*, fdevent_event*> event_map; std::vector<epoll_event> epoll_events; - epoll_events.resize(this->installed_fdevents_.size()); while (true) { if (terminate_loop_) { break; } + if (epoll_events.size() < this->installed_fdevents_.size()) { + epoll_events.resize(this->installed_fdevents_.size()); + } + int rc = -1; while (rc == -1) { std::optional<std::chrono::milliseconds> timeout = CalculatePollDuration(); @@ -133,7 +137,10 @@ void fdevent_context_epoll::Loop() { } auto post_poll = std::chrono::steady_clock::now(); - std::unordered_map<fdevent*, unsigned> event_map; + fde_events.reserve(installed_fdevents_.size()); + fde_events.clear(); + event_map.clear(); + for (int i = 0; i < rc; ++i) { fdevent* fde = static_cast<fdevent*>(epoll_events[i].data.ptr); @@ -152,13 +159,16 @@ void fdevent_context_epoll::Loop() { events |= FDE_READ | FDE_ERROR; } - event_map[fde] = events; + LOG(DEBUG) << dump_fde(fde) << " got events " << std::hex << std::showbase << events; + auto& fde_event = fde_events.emplace_back(fde, events); + event_map[fde] = &fde_event; + fde->last_active = post_poll; } for (auto& [fd, fde] : installed_fdevents_) { unsigned events = 0; if (auto it = event_map.find(&fde); it != event_map.end()) { - events = it->second; + events = it->second->events; } if (events == 0) { @@ -166,22 +176,18 @@ void fdevent_context_epoll::Loop() { auto deadline = fde.last_active + *fde.timeout; if (deadline < post_poll) { events |= FDE_TIMEOUT; + LOG(DEBUG) << dump_fde(&fde) << " timed out"; + fde_events.emplace_back(&fde, events); + fde.last_active = post_poll; } } } - - if (events != 0) { - LOG(DEBUG) << dump_fde(&fde) << " got events " << std::hex << std::showbase - << events; - fde_events.push_back({&fde, events}); - fde.last_active = post_poll; - } } this->HandleEvents(fde_events); fde_events.clear(); } - main_thread_id_.reset(); + looper_thread_id_.reset(); } size_t fdevent_context_epoll::InstalledCount() { diff --git a/fdevent/fdevent_poll.cpp b/fdevent/fdevent_poll.cpp index 21c1ba09..9b66518b 100644 --- a/fdevent/fdevent_poll.cpp +++ b/fdevent/fdevent_poll.cpp @@ -80,7 +80,7 @@ fdevent_context_poll::~fdevent_context_poll() { } void fdevent_context_poll::Set(fdevent* fde, unsigned events) { - CheckMainThread(); + CheckLooperThread(); fde->state = events; D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events); } @@ -101,7 +101,7 @@ static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { } void fdevent_context_poll::Loop() { - main_thread_id_ = android::base::GetThreadId(); + looper_thread_id_ = android::base::GetThreadId(); std::vector<adb_pollfd> pollfds; std::vector<fdevent_event> poll_events; @@ -193,7 +193,7 @@ void fdevent_context_poll::Loop() { poll_events.clear(); } - main_thread_id_.reset(); + looper_thread_id_.reset(); } size_t fdevent_context_poll::InstalledCount() { diff --git a/fdevent/fdevent_test.cpp b/fdevent/fdevent_test.cpp index e06b3b32..4c334061 100644 --- a/fdevent/fdevent_test.cpp +++ b/fdevent/fdevent_test.cpp @@ -18,6 +18,7 @@ #include <gtest/gtest.h> +#include <unistd.h> #include <chrono> #include <limits> #include <memory> @@ -26,6 +27,8 @@ #include <thread> #include <vector> +#include <android-base/threads.h> + #include "adb_io.h" #include "fdevent_test.h" @@ -135,7 +138,7 @@ TEST_F(FdeventTest, smoke) { PrepareThread(); std::vector<std::unique_ptr<FdHandler>> fd_handlers; - fdevent_run_on_main_thread([&thread_arg, &fd_handlers, use_new_callback]() { + fdevent_run_on_looper([&thread_arg, &fd_handlers, use_new_callback]() { std::vector<int> read_fds; std::vector<int> write_fds; @@ -163,7 +166,7 @@ TEST_F(FdeventTest, smoke) { ASSERT_EQ(read_buffer, write_buffer); } - fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); }); + fdevent_run_on_looper([&fd_handlers]() { fd_handlers.clear(); }); WaitForFdeventLoop(); TerminateThread(); @@ -172,20 +175,20 @@ TEST_F(FdeventTest, smoke) { } } -TEST_F(FdeventTest, run_on_main_thread) { +TEST_F(FdeventTest, run_on_looper_thread_queued) { std::vector<int> vec; PrepareThread(); - // Block the main thread for a long time while we queue our callbacks. - fdevent_run_on_main_thread([]() { - check_main_thread(); + // Block the looper thread for a long time while we queue our callbacks. + fdevent_run_on_looper([]() { + fdevent_check_looper(); std::this_thread::sleep_for(std::chrono::seconds(1)); }); for (int i = 0; i < 1000000; ++i) { - fdevent_run_on_main_thread([i, &vec]() { - check_main_thread(); + fdevent_run_on_looper([i, &vec]() { + fdevent_check_looper(); vec.push_back(i); }); } @@ -198,29 +201,22 @@ TEST_F(FdeventTest, run_on_main_thread) { } } -static std::function<void()> make_appender(std::vector<int>* vec, int value) { - return [vec, value]() { - check_main_thread(); - if (value == 100) { - return; - } +TEST_F(FdeventTest, run_on_looper_thread_reentrant) { + bool b = false; - vec->push_back(value); - fdevent_run_on_main_thread(make_appender(vec, value + 1)); - }; -} + PrepareThread(); -TEST_F(FdeventTest, run_on_main_thread_reentrant) { - std::vector<int> vec; + fdevent_run_on_looper([&b]() { + fdevent_check_looper(); + fdevent_run_on_looper([&b]() { + fdevent_check_looper(); + b = true; + }); + }); - PrepareThread(); - fdevent_run_on_main_thread(make_appender(&vec, 0)); TerminateThread(); - ASSERT_EQ(100u, vec.size()); - for (int i = 0; i < 100; ++i) { - ASSERT_EQ(i, vec[i]); - } + EXPECT_EQ(b, true); } TEST_F(FdeventTest, timeout) { @@ -242,7 +238,7 @@ TEST_F(FdeventTest, timeout) { int fds[2]; ASSERT_EQ(0, adb_socketpair(fds)); static constexpr auto delta = 100ms; - fdevent_run_on_main_thread([&]() { + fdevent_run_on_looper([&]() { test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) { auto test = static_cast<TimeoutTest*>(arg); auto now = std::chrono::steady_clock::now(); @@ -319,3 +315,86 @@ TEST_F(FdeventTest, timeout) { ASSERT_LT(diff[1], delta.count() * 0.5); ASSERT_LT(diff[2], delta.count() * 0.5); } + +TEST_F(FdeventTest, unregister_with_pending_event) { + fdevent_reset(); + + int fds1[2]; + int fds2[2]; + ASSERT_EQ(0, adb_socketpair(fds1)); + ASSERT_EQ(0, adb_socketpair(fds2)); + + struct Test { + fdevent* fde1; + fdevent* fde2; + bool should_not_happen; + }; + Test test{}; + + test.fde1 = fdevent_create( + fds1[0], + [](fdevent* fde, unsigned events, void* arg) { + auto test = static_cast<Test*>(arg); + // Unregister fde2 from inside the fde1 event + fdevent_destroy(test->fde2); + // Unregister fde1 so it doesn't get called again + fdevent_destroy(test->fde1); + }, + &test); + + test.fde2 = fdevent_create( + fds2[0], + [](fdevent* fde, unsigned events, void* arg) { + auto test = static_cast<Test*>(arg); + test->should_not_happen = true; + }, + &test); + + fdevent_add(test.fde1, FDE_READ | FDE_ERROR); + fdevent_add(test.fde2, FDE_READ | FDE_ERROR); + + PrepareThread(); + WaitForFdeventLoop(); + + std::mutex m; + std::condition_variable cv; + bool main_thread_latch = false; + bool looper_thread_latch = false; + + fdevent_run_on_looper([&]() { + std::unique_lock lk(m); + // Notify the main thread that the looper is in this lambda + main_thread_latch = true; + cv.notify_one(); + // Pause the looper to ensure both events occur in the same epoll_wait + cv.wait(lk, [&] { return looper_thread_latch; }); + }); + + // Wait for the looper thread to pause to ensure it is not in epoll_wait + { + std::unique_lock lk(m); + cv.wait(lk, [&] { return main_thread_latch; }); + } + + // Write to one end of the sockets to trigger events on the other ends + adb_write(fds1[1], "a", 1); + adb_write(fds2[1], "a", 1); + + // Unpause the looper thread to let it loop back into epoll_wait, which should return + // both fde1 and fde2. + { + std::lock_guard lk(m); + looper_thread_latch = true; + } + cv.notify_one(); + + WaitForFdeventLoop(); + TerminateThread(); + + adb_close(fds1[0]); + adb_close(fds1[1]); + adb_close(fds2[0]); + adb_close(fds2[1]); + + ASSERT_FALSE(test.should_not_happen); +} diff --git a/fdevent/fdevent_test.h b/fdevent/fdevent_test.h index fcbf181d..2e7df2c1 100644 --- a/fdevent/fdevent_test.h +++ b/fdevent/fdevent_test.h @@ -30,13 +30,13 @@ static void WaitForFdeventLoop() { // Sleep for a bit to make sure that network events have propagated. std::this_thread::sleep_for(100ms); - // fdevent_run_on_main_thread has a guaranteed ordering, and is guaranteed to happen after + // fdevent_run_on_looper has a guaranteed ordering, and is guaranteed to happen after // socket events, so as soon as our function is called, we know that we've processed all // previous events. std::mutex mutex; std::condition_variable cv; std::unique_lock<std::mutex> lock(mutex); - fdevent_run_on_main_thread([&]() { + fdevent_run_on_looper([&]() { mutex.lock(); mutex.unlock(); cv.notify_one(); @@ -46,8 +46,6 @@ static void WaitForFdeventLoop() { class FdeventTest : public ::testing::Test { protected: - unique_fd dummy; - ~FdeventTest() { if (thread_.joinable()) { TerminateThread(); @@ -65,34 +63,14 @@ class FdeventTest : public ::testing::Test { ASSERT_EQ(0u, fdevent_installed_count()); } - // Register a placeholder socket used to wake up the fdevent loop to tell it to die. void PrepareThread() { - int dummy_fds[2]; - if (adb_socketpair(dummy_fds) != 0) { - FAIL() << "failed to create socketpair: " << strerror(errno); - } - - asocket* dummy_socket = create_local_socket(unique_fd(dummy_fds[1])); - if (!dummy_socket) { - FAIL() << "failed to create local socket: " << strerror(errno); - } - dummy_socket->ready(dummy_socket); - dummy.reset(dummy_fds[0]); - thread_ = std::thread([]() { fdevent_loop(); }); WaitForFdeventLoop(); } - size_t GetAdditionalLocalSocketCount() { - // placeholder socket installed in PrepareThread() - return 1; - } - void TerminateThread() { fdevent_terminate_loop(); - ASSERT_TRUE(WriteFdExactly(dummy, "", 1)); thread_.join(); - dummy.reset(); } std::thread thread_; diff --git a/file_sync_protocol.h b/file_sync_protocol.h index 5234c20a..d9f7f845 100644 --- a/file_sync_protocol.h +++ b/file_sync_protocol.h @@ -16,6 +16,8 @@ #pragma once +#include <stdint.h> + #define MKID(a, b, c, d) ((a) | ((b) << 8) | ((c) << 16) | ((d) << 24)) #define ID_LSTAT_V1 MKID('S', 'T', 'A', 'T') diff --git a/libs/adbconnection/adbconnection_server.cpp b/libs/adbconnection/adbconnection_server.cpp index aac9615e..e3d00313 100644 --- a/libs/adbconnection/adbconnection_server.cpp +++ b/libs/adbconnection/adbconnection_server.cpp @@ -69,7 +69,7 @@ void adbconnection_listen(void (*callback)(int fd, ProcessInfo process)) { events[0].events = EPOLLIN; events[0].data.fd = -1; if (epoll_ctl(epfd.get(), EPOLL_CTL_ADD, s.get(), &events[0]) != 0) { - LOG(FATAL) << "failed to register event with epoll fd"; + PLOG(FATAL) << "failed to register socket " << s.get() << " with epoll fd"; } while (true) { @@ -94,7 +94,7 @@ void adbconnection_listen(void (*callback)(int fd, ProcessInfo process)) { register_event.data.fd = client.get(); if (epoll_ctl(epfd.get(), EPOLL_CTL_ADD, client.get(), ®ister_event) != 0) { - PLOG(FATAL) << "failed to register JDWP client with epoll"; + PLOG(FATAL) << "failed to register JDWP client " << client.get() << " with epoll"; } pending_connections.emplace_back(std::move(client)); @@ -118,7 +118,7 @@ void adbconnection_listen(void (*callback)(int fd, ProcessInfo process)) { } if (epoll_ctl(epfd.get(), EPOLL_CTL_DEL, event.data.fd, nullptr) != 0) { - LOG(FATAL) << "failed to delete fd from JDWP epoll fd"; + PLOG(FATAL) << "failed to delete fd " << event.data.fd << " from JDWP epoll fd"; } pending_connections.erase(it); diff --git a/libs/adbconnection/libadbconnection_client.map.txt b/libs/adbconnection/libadbconnection_client.map.txt index 153a0e41..a5e126dc 100644 --- a/libs/adbconnection/libadbconnection_client.map.txt +++ b/libs/adbconnection/libadbconnection_client.map.txt @@ -16,10 +16,10 @@ LIBADBCONNECTION_CLIENT_1 { global: - adbconnection_client_new; - adbconnection_client_destroy; - adbconnection_client_pollfd; - adbconnection_client_receive_jdwp_fd; + adbconnection_client_new; # apex + adbconnection_client_destroy; # apex + adbconnection_client_pollfd; # apex + adbconnection_client_receive_jdwp_fd; # apex local: *; }; diff --git a/libs/libadbd_fs/libadbd_fs.map.txt b/libs/libadbd_fs/libadbd_fs.map.txt index 1454e963..bace3637 100644 --- a/libs/libadbd_fs/libadbd_fs.map.txt +++ b/libs/libadbd_fs/libadbd_fs.map.txt @@ -1,6 +1,6 @@ LIBADBD_FS { global: - adbd_fs_config; # apex + adbd_fs_config; # systemapi local: *; }; diff --git a/pairing_auth/aes_128_gcm.cpp b/pairing_auth/aes_128_gcm.cpp index 51520d81..2d5f86fa 100644 --- a/pairing_auth/aes_128_gcm.cpp +++ b/pairing_auth/aes_128_gcm.cpp @@ -23,6 +23,8 @@ #include <openssl/hkdf.h> #include <openssl/rand.h> +#include <string.h> + namespace adb { namespace pairing { diff --git a/pairing_auth/pairing_auth.cpp b/pairing_auth/pairing_auth.cpp index 0ac04e69..0cf43a3b 100644 --- a/pairing_auth/pairing_auth.cpp +++ b/pairing_auth/pairing_auth.cpp @@ -27,6 +27,8 @@ #include "adb/pairing/aes_128_gcm.h" +#include <string.h> + using namespace adb::pairing; static constexpr spake2_role_t kClientRole = spake2_role_alice; diff --git a/pairing_connection/Android.bp b/pairing_connection/Android.bp index 3557810d..2b33bee8 100644 --- a/pairing_connection/Android.bp +++ b/pairing_connection/Android.bp @@ -176,7 +176,7 @@ cc_library { static_libs: [ // Statically link libadb_crypto because it is not - // ABI-stable. + // ABI-stable. "libadb_crypto", "libadb_protos", ], diff --git a/pairing_connection/pairing_server.cpp b/pairing_connection/pairing_server.cpp index 7218eacf..c9742895 100644 --- a/pairing_connection/pairing_server.cpp +++ b/pairing_connection/pairing_server.cpp @@ -231,12 +231,6 @@ uint16_t PairingServerCtx::SetupServer() { if (server_fd_.get() == -1) { PLOG(ERROR) << "Failed to start pairing connection server"; return 0; - } else if (fcntl(server_fd_.get(), F_SETFD, FD_CLOEXEC) != 0) { - PLOG(ERROR) << "Failed to make server socket cloexec"; - return 0; - } else if (fcntl(server_fd_.get(), F_SETFD, O_NONBLOCK) != 0) { - PLOG(ERROR) << "Failed to make server socket nonblocking"; - return 0; } StartConnectionEventsThread(); diff --git a/pairing_connection/tests/Android.bp b/pairing_connection/tests/Android.bp index a6eb35f4..4199cabc 100644 --- a/pairing_connection/tests/Android.bp +++ b/pairing_connection/tests/Android.bp @@ -39,17 +39,9 @@ cc_test { "libcrypto_utils", "libprotobuf-cpp-lite", "libssl", - ], - - // Let's statically link them so we don't have to install it onto the - // system image for testing. - static_libs: [ - "libadb_pairing_auth_static", - "libadb_pairing_connection_static", - "libadb_pairing_server_static", - "libadb_crypto_static", - "libadb_protos_static", - "libadb_tls_connection_static", + "libadb_pairing_auth", + "libadb_pairing_connection", + "libadb_pairing_server", ], test_suites: ["general-tests", "mts-adbd"], @@ -21,6 +21,7 @@ #include <deque> #include <memory> +#include <optional> #include <string> #include "adb_unique_fd.h" @@ -30,43 +31,58 @@ class atransport; /* An asocket represents one half of a connection between a local and - * remote entity. A local asocket is bound to a file descriptor. A - * remote asocket is bound to the protocol engine. + remote entity. A local asocket is bound to a file descriptor. A + remote asocket is bound to the protocol engine. + + Example (two local_sockets) : + + ASOCKET(THIS) + ┌────────────────────────────────────────────────┐ +┌──┐ write(3) │ ┌─────┐ enqueue() │ +│ │◄─────────┼──┤Queue├─────────────◄────────────┐ │ +│fd│ │ └─────┘ ▲ │ +│ ├──────────►─────────────────┐ │ │ +└──┘ read(3) └─────────────────┼─────────────────┼────────────┘ + outgoing │ │ incoming + ┌─────────────────▼─────────────────▲────────────┐ read(3) ┌──┐ + │ │ └────────────┼─────────◄─┤ │ + │ │ ┌─────┐ │ │fd│ + │ └─────────────────────►│Queue├─┼─────────►─┤ │ + │ enqueue() └─────┘ │ write(3) └──┘ + └────────────────────────────────────────────────┘ + ASOCKET(PEER) + + Note that sockets can be peered regardless of their kind. A remote socket can be peered with + a smart socket, a local socket can be peered with a remote socket and so on. */ struct asocket { /* the unique identifier for this asocket */ unsigned id = 0; + // Start Local socket fields + // TODO: move all the local socket fields together + /* flag: set when the socket's peer has closed * but packets are still queued for delivery + * TODO: This should be a boolean. */ - int closing = 0; + bool closing = false; // flag: set when the socket failed to write, so the socket will not wait to // write packets and close directly. - bool has_write_error = 0; + bool has_write_error = false; /* flag: quit adbd when both ends close the * local service socket */ int exit_on_close = 0; + // End Local socket fields + // the asocket we are connected to asocket* peer = nullptr; - /* For local asockets, the fde is used to bind - * us to our fd event system. For remote asockets - * these fields are not used. - */ - fdevent* fde = nullptr; - int fd = -1; - - // queue of data waiting to be written - IOVector packet_queue; - - std::string smart_socket_data; - /* enqueue is called by our peer when it has data * for us. It should return 0 if we can accept more * data or 1 if not. If we return 1, we must call @@ -96,6 +112,27 @@ struct asocket { atransport* transport = nullptr; size_t get_max_payload() const; + + // TODO: Make asocket an actual class and use inheritance instead of having an ever-growing + // struct with random use-specific fields stuffed into it. + + // Start Local socket fields + fdevent* fde = nullptr; + int fd = -1; + + // Queue of data that we've received from our peer, and are waiting to write into fd. + IOVector packet_queue; + // End Local socket fields + + // The number of bytes that have been acknowledged by the other end if delayed_ack is available. + // This value can go negative: if we have a MAX_PAYLOAD's worth of bytes available to send, + // we'll send out a full packet. + std::optional<int64_t> available_send_bytes; + + // Start Smart socket fields + // A temporary buffer used to hold a partially-read service string for smartsockets. + std::string smart_socket_data; + // End Smart socket fields }; asocket *find_local_socket(unsigned local_id, unsigned remote_id); @@ -103,6 +140,8 @@ void install_local_socket(asocket *s); void remove_socket(asocket *s); void close_all_sockets(atransport *t); +void local_socket_ack(asocket* s, std::optional<int32_t> acked_bytes); + asocket* create_local_socket(unique_fd fd); asocket* create_local_service_socket(std::string_view destination, atransport* transport); diff --git a/socket_spec_test.cpp b/socket_spec_test.cpp index 93fc191b..a4b73c49 100644 --- a/socket_spec_test.cpp +++ b/socket_spec_test.cpp @@ -24,14 +24,20 @@ #include <android-base/stringprintf.h> #include <gtest/gtest.h> -TEST(socket_spec, parse_tcp_socket_spec_failure) { +// If the socket spec is incorrectly specified (i.e w/o a "tcp:" prefix), +// check for the contents of the returned error string. +TEST(socket_spec, parse_tcp_socket_spec_failure_error_check) { std::string hostname, error, serial; int port; - EXPECT_FALSE(parse_tcp_socket_spec("sneakernet:5037", &hostname, &port, &serial, &error)); + + // spec needs to be prefixed with "tcp:" + const std::string spec("sneakernet:5037"); + EXPECT_FALSE(parse_tcp_socket_spec(spec, &hostname, &port, &serial, &error)); EXPECT_TRUE(error.find("sneakernet") != std::string::npos); + EXPECT_EQ(error, "specification is not tcp: " + spec); } -TEST(socket_spec, parse_tcp_socket_spec_just_port) { +TEST(socket_spec, parse_tcp_socket_spec_just_port_success) { std::string hostname, error, serial; int port; EXPECT_TRUE(parse_tcp_socket_spec("tcp:5037", &hostname, &port, &serial, &error)); @@ -40,7 +46,7 @@ TEST(socket_spec, parse_tcp_socket_spec_just_port) { EXPECT_EQ("", serial); } -TEST(socket_spec, parse_tcp_socket_spec_bad_ports) { +TEST(socket_spec, parse_tcp_socket_spec_bad_ports_failure) { std::string hostname, error, serial; int port; EXPECT_FALSE(parse_tcp_socket_spec("tcp:", &hostname, &port, &serial, &error)); @@ -48,7 +54,7 @@ TEST(socket_spec, parse_tcp_socket_spec_bad_ports) { EXPECT_FALSE(parse_tcp_socket_spec("tcp:65536", &hostname, &port, &serial, &error)); } -TEST(socket_spec, parse_tcp_socket_spec_host_and_port) { +TEST(socket_spec, parse_tcp_socket_spec_host_and_port_success) { std::string hostname, error, serial; int port; EXPECT_TRUE(parse_tcp_socket_spec("tcp:localhost:1234", &hostname, &port, &serial, &error)); @@ -57,7 +63,7 @@ TEST(socket_spec, parse_tcp_socket_spec_host_and_port) { EXPECT_EQ("localhost:1234", serial); } -TEST(socket_spec, parse_tcp_socket_spec_host_no_port) { +TEST(socket_spec, parse_tcp_socket_spec_host_no_port_success) { std::string hostname, error, serial; int port; EXPECT_TRUE(parse_tcp_socket_spec("tcp:localhost", &hostname, &port, &serial, &error)); @@ -66,7 +72,16 @@ TEST(socket_spec, parse_tcp_socket_spec_host_no_port) { EXPECT_EQ("localhost:5555", serial); } -TEST(socket_spec, parse_tcp_socket_spec_host_bad_ports) { +TEST(socket_spec, parse_tcp_socket_spec_host_ipv4_no_port_success) { + std::string hostname, error, serial; + int port; + EXPECT_TRUE(parse_tcp_socket_spec("tcp:127.0.0.1", &hostname, &port, &serial, &error)); + EXPECT_EQ("127.0.0.1", hostname); + EXPECT_EQ(5555, port); + EXPECT_EQ("127.0.0.1:5555", serial); +} + +TEST(socket_spec, parse_tcp_socket_spec_host_bad_ports_failure) { std::string hostname, error, serial; int port; EXPECT_FALSE(parse_tcp_socket_spec("tcp:localhost:", &hostname, &port, &serial, &error)); @@ -74,33 +89,85 @@ TEST(socket_spec, parse_tcp_socket_spec_host_bad_ports) { EXPECT_FALSE(parse_tcp_socket_spec("tcp:localhost:65536", &hostname, &port, &serial, &error)); } -TEST(socket_spec, parse_tcp_socket_spec_ipv6_and_port) { +TEST(socket_spec, parse_tcp_socket_spec_host_ipv4_bad_ports_failure) { + std::string hostname, error, serial; + int port; + EXPECT_FALSE(parse_tcp_socket_spec("tcp:127.0.0.1:", &hostname, &port, &serial, &error)); + EXPECT_FALSE(parse_tcp_socket_spec("tcp:127.0.0.1:-1", &hostname, &port, &serial, &error)); + EXPECT_FALSE(parse_tcp_socket_spec("tcp:127.0.0.1:65536", &hostname, &port, &serial, &error)); +} + +TEST(socket_spec, parse_tcp_socket_spec_host_ipv6_bad_ports_failure) { + std::string hostname, error, serial; + int port; + EXPECT_FALSE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:c63:50c9:8a91:8efa:", &hostname, + &port, &serial, &error)); + EXPECT_FALSE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:c63:50c9:8a91:8efa:-1", &hostname, + &port, &serial, &error)); + EXPECT_FALSE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:c63:50c9:8a91:8efa:65536", &hostname, + &port, &serial, &error)); +} + +TEST(socket_spec, parse_tcp_socket_spec_ipv6_and_port_success) { std::string hostname, error, serial; int port; EXPECT_TRUE(parse_tcp_socket_spec("tcp:[::1]:1234", &hostname, &port, &serial, &error)); EXPECT_EQ("::1", hostname); EXPECT_EQ(1234, port); EXPECT_EQ("[::1]:1234", serial); + + // Repeat with different format of ipv6 + EXPECT_TRUE(parse_tcp_socket_spec("tcp:[2601:644:8e80:620::fbbc]:2345", &hostname, &port, + &serial, &error)); + EXPECT_EQ("2601:644:8e80:620::fbbc", hostname); + EXPECT_EQ(2345, port); + EXPECT_EQ("[2601:644:8e80:620::fbbc]:2345", serial); } -TEST(socket_spec, parse_tcp_socket_spec_ipv6_no_port) { +TEST(socket_spec, parse_tcp_socket_spec_ipv6_no_port_success) { std::string hostname, error, serial; int port; EXPECT_TRUE(parse_tcp_socket_spec("tcp:::1", &hostname, &port, &serial, &error)); EXPECT_EQ("::1", hostname); EXPECT_EQ(5555, port); EXPECT_EQ("[::1]:5555", serial); + + // Repeat with other supported formats of ipv6. + EXPECT_TRUE(parse_tcp_socket_spec("tcp:2601:644:8e80:620::fbbc", &hostname, &port, &serial, + &error)); + EXPECT_EQ("2601:644:8e80:620::fbbc", hostname); + EXPECT_EQ(5555, port); + EXPECT_EQ("[2601:644:8e80:620::fbbc]:5555", serial); + + EXPECT_TRUE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:c63:50c9:8a91:8efa", &hostname, &port, + &serial, &error)); + EXPECT_EQ("2601:644:8e80:620:c63:50c9:8a91:8efa", hostname); + EXPECT_EQ(5555, port); + EXPECT_EQ("[2601:644:8e80:620:c63:50c9:8a91:8efa]:5555", serial); + + EXPECT_TRUE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:2d0e:b944:5288:97df", &hostname, &port, + &serial, &error)); + EXPECT_EQ("2601:644:8e80:620:2d0e:b944:5288:97df", hostname); + EXPECT_EQ(5555, port); + EXPECT_EQ("[2601:644:8e80:620:2d0e:b944:5288:97df]:5555", serial); } -TEST(socket_spec, parse_tcp_socket_spec_ipv6_bad_ports) { +TEST(socket_spec, parse_tcp_socket_spec_ipv6_bad_ports_failure) { std::string hostname, error, serial; int port; EXPECT_FALSE(parse_tcp_socket_spec("tcp:[::1]", &hostname, &port, &serial, &error)); EXPECT_FALSE(parse_tcp_socket_spec("tcp:[::1]:", &hostname, &port, &serial, &error)); EXPECT_FALSE(parse_tcp_socket_spec("tcp:[::1]:-1", &hostname, &port, &serial, &error)); + + EXPECT_TRUE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:2d0e:b944:5288:97df", &hostname, &port, + &serial, &error)); + EXPECT_FALSE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:2d0e:b944:5288:97df:", &hostname, + &port, &serial, &error)); + EXPECT_FALSE(parse_tcp_socket_spec("tcp:2601:644:8e80:620:2d0e:b944:5288:97df:-1", &hostname, + &port, &serial, &error)); } -TEST(socket_spec, get_host_socket_spec_port) { +TEST(socket_spec, get_host_socket_spec_port_success) { std::string error; EXPECT_EQ(5555, get_host_socket_spec_port("tcp:5555", &error)); EXPECT_EQ(5555, get_host_socket_spec_port("tcp:localhost:5555", &error)); diff --git a/socket_test.cpp b/socket_test.cpp index 1601ff0a..f1b64b7a 100644 --- a/socket_test.cpp +++ b/socket_test.cpp @@ -33,6 +33,7 @@ #include "socket.h" #include "sysdeps.h" #include "sysdeps/chrono.h" +#include "test_utils/test_utils.h" using namespace std::string_literals; using namespace std::string_view_literals; @@ -99,7 +100,7 @@ TEST_F(LocalSocketTest, smoke) { // Wait until the local sockets are closed. WaitForFdeventLoop(); - ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); + ASSERT_EQ(0u, fdevent_installed_count()); TerminateThread(); } @@ -110,7 +111,7 @@ struct CloseWithPacketArg { }; static void CreateCloser(CloseWithPacketArg* arg) { - fdevent_run_on_main_thread([arg]() { + fdevent_run_on_looper([arg]() { asocket* s = create_local_socket(std::move(arg->socket_fd)); ASSERT_TRUE(s != nullptr); arg->bytes_written = 0; @@ -125,6 +126,8 @@ static void CreateCloser(CloseWithPacketArg* arg) { data.resize(MAX_PAYLOAD); arg->bytes_written += data.size(); int ret = s->enqueue(s, std::move(data)); + + // Return value of 0 implies that more data can be accepted. if (ret == 1) { socket_filled = true; break; @@ -163,11 +166,11 @@ TEST_F(LocalSocketTest, close_socket_with_packet) { ASSERT_EQ(0, adb_close(cause_close_fd[0])); WaitForFdeventLoop(); - EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); + EXPECT_EQ(1u, fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); WaitForFdeventLoop(); - ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); + ASSERT_EQ(0u, fdevent_installed_count()); TerminateThread(); } @@ -188,16 +191,18 @@ TEST_F(LocalSocketTest, read_from_closing_socket) { ASSERT_EQ(0, adb_close(cause_close_fd[0])); WaitForFdeventLoop(); - EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); + EXPECT_EQ(1u, fdevent_installed_count()); // Verify if we can read successfully. std::vector<char> buf(arg.bytes_written); ASSERT_NE(0u, arg.bytes_written); - ASSERT_EQ(true, ReadFdExactly(socket_fd[0], buf.data(), buf.size())); + + ASSERT_EQ(true, ReadFdExactly(socket_fd[0], buf.data(), buf.size())); // TODO: b/237341044 + ASSERT_EQ(0, adb_close(socket_fd[0])); WaitForFdeventLoop(); - ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); + ASSERT_EQ(0u, fdevent_installed_count()); TerminateThread(); } @@ -218,13 +223,13 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { CreateCloser(&arg); WaitForFdeventLoop(); - EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); + EXPECT_EQ(2u, fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); std::this_thread::sleep_for(2s); WaitForFdeventLoop(); - ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); + ASSERT_EQ(0u, fdevent_installed_count()); TerminateThread(); } @@ -261,15 +266,15 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { adb_close(tail_fd[0]); WaitForFdeventLoop(); - ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); + ASSERT_EQ(0u, fdevent_installed_count()); TerminateThread(); } #if defined(__linux__) -static void ClientThreadFunc() { +static void ClientThreadFunc(const int assigned_port) { std::string error; - int fd = network_loopback_client(5038, SOCK_STREAM, &error); + const int fd = network_loopback_client(assigned_port, SOCK_STREAM, &error); ASSERT_GE(fd, 0) << error; std::this_thread::sleep_for(1s); ASSERT_EQ(0, adb_close(fd)); @@ -278,29 +283,30 @@ static void ClientThreadFunc() { // This test checks if we can close sockets in CLOSE_WAIT state. TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) { std::string error; - int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error); - ASSERT_GE(listen_fd, 0); + // Allow the system to allocate an available port. + unique_fd listen_fd; + const int assigned_port(test_utils::GetUnassignedPort(listen_fd)); - std::thread client_thread(ClientThreadFunc); + std::thread client_thread(ClientThreadFunc, assigned_port); + const int accept_fd = adb_socket_accept(listen_fd.get(), nullptr, nullptr); - int accept_fd = adb_socket_accept(listen_fd, nullptr, nullptr); ASSERT_GE(accept_fd, 0); PrepareThread(); - fdevent_run_on_main_thread([accept_fd]() { + fdevent_run_on_looper([accept_fd]() { asocket* s = create_local_socket(unique_fd(accept_fd)); ASSERT_TRUE(s != nullptr); }); WaitForFdeventLoop(); - EXPECT_EQ(1u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); + EXPECT_EQ(1u, fdevent_installed_count()); // Wait until the client closes its socket. client_thread.join(); WaitForFdeventLoop(); - ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); + ASSERT_EQ(0u, fdevent_installed_count()); TerminateThread(); } diff --git a/sockets.cpp b/sockets.cpp index 33b95241..630a2eba 100644 --- a/sockets.cpp +++ b/sockets.cpp @@ -20,6 +20,7 @@ #include <ctype.h> #include <errno.h> +#include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -119,33 +120,56 @@ enum class SocketFlushResult { }; static SocketFlushResult local_socket_flush_incoming(asocket* s) { + D("LS(%u) %s: %zu bytes in queue", s->id, __func__, s->packet_queue.size()); + uint32_t bytes_flushed = 0; if (!s->packet_queue.empty()) { std::vector<adb_iovec> iov = s->packet_queue.iovecs(); ssize_t rc = adb_writev(s->fd, iov.data(), iov.size()); - if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) { - s->packet_queue.clear(); - } else if (rc > 0) { - s->packet_queue.drop_front(rc); - fdevent_add(s->fde, FDE_WRITE); - return SocketFlushResult::TryAgain; + D("LS(%u) %s: rc = %zd", s->id, __func__, rc); + if (rc > 0) { + bytes_flushed = rc; + if (static_cast<size_t>(rc) == s->packet_queue.size()) { + s->packet_queue.clear(); + } else { + s->packet_queue.drop_front(rc); + } } else if (rc == -1 && errno == EAGAIN) { - fdevent_add(s->fde, FDE_WRITE); - return SocketFlushResult::TryAgain; + // fd is full. } else { - // We failed to write, but it's possible that we can still read from the socket. - // Give that a try before giving up. + // rc == 0, probably. + // The other side closed its read side of the fd, but it's possible that we can still + // read from the socket. Give that a try before giving up. s->has_write_error = true; } } + bool fd_full = !s->packet_queue.empty() && !s->has_write_error; + if (s->transport && s->peer) { + if (s->available_send_bytes.has_value()) { + // Deferred acks are available. + send_ready(s->id, s->peer->id, s->transport, bytes_flushed); + } else { + // Deferred acks aren't available, we should ask for more data as long as we've made any + // progress. + if (bytes_flushed != 0) { + send_ready(s->id, s->peer->id, s->transport, 0); + } + } + } + // If we sent the last packet of a closing socket, we can now destroy it. - if (s->closing) { + if (s->closing && !fd_full) { s->close(s); return SocketFlushResult::Destroyed; } - fdevent_del(s->fde, FDE_WRITE); - return SocketFlushResult::Completed; + if (fd_full) { + fdevent_add(s->fde, FDE_WRITE); + return SocketFlushResult::TryAgain; + } else { + fdevent_del(s->fde, FDE_WRITE); + return SocketFlushResult::Completed; + } } // Returns false if the socket has been closed and destroyed as a side-effect of this function. @@ -176,8 +200,7 @@ static bool local_socket_flush_outgoing(asocket* s) { is_eof = 1; break; } - D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof, - s->fde->force_eof); + D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d", s->id, s->fd, r, is_eof); if (avail != max_payload && s->peer) { data.resize(max_payload - avail); @@ -186,6 +209,11 @@ static bool local_socket_flush_outgoing(asocket* s) { // so save variables for debug printing below. unsigned saved_id = s->id; int saved_fd = s->fd; + + if (s->available_send_bytes) { + *s->available_send_bytes -= data.size(); + } + r = s->peer->enqueue(s->peer, std::move(data)); D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); @@ -200,17 +228,20 @@ static bool local_socket_flush_outgoing(asocket* s) { } if (r > 0) { - /* if the remote cannot accept further events, - ** we disable notification of READs. They'll - ** be enabled again when we get a call to ready() - */ - fdevent_del(s->fde, FDE_READ); + if (s->available_send_bytes) { + if (*s->available_send_bytes <= 0) { + D("LS(%u): send buffer full (%" PRId64 ")", saved_id, *s->available_send_bytes); + fdevent_del(s->fde, FDE_READ); + } + } else { + D("LS(%u): acks not deferred, blocking", saved_id); + fdevent_del(s->fde, FDE_READ); + } } } - // Don't allow a forced eof if data is still there. - if ((s->fde->force_eof && !r) || is_eof) { - D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde->force_eof); + if (is_eof) { + D(" closing because is_eof=%d", is_eof); s->close(s); return false; } @@ -340,7 +371,7 @@ static void local_socket_close(asocket* s) { /* otherwise, put on the closing list */ D("LS(%d): closing", s->id); - s->closing = 1; + s->closing = true; fdevent_del(s->fde, FDE_READ); remove_socket(s); D("LS(%d): put on socket_closing_list fd=%d", s->id, s->fd); @@ -364,7 +395,6 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { break; case SocketFlushResult::Completed: - s->peer->ready(s->peer); break; } } @@ -385,6 +415,32 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { } } +void local_socket_ack(asocket* s, std::optional<int32_t> acked_bytes) { + // acked_bytes can be negative! + // + // In the future, we can use this to preemptively supply backpressure, instead + // of waiting for the writer to hit its limit. + if (s->available_send_bytes.has_value() != acked_bytes.has_value()) { + LOG(ERROR) << "delayed ack mismatch: socket = " << s->available_send_bytes.has_value() + << ", payload = " << acked_bytes.has_value(); + return; + } + + if (s->available_send_bytes.has_value()) { + D("LS(%d) received delayed ack, available bytes: %" PRId64 " += %" PRIu32, s->id, + *s->available_send_bytes, *acked_bytes); + + // This can't (reasonably) overflow: available_send_bytes is 64-bit. + *s->available_send_bytes += *acked_bytes; + if (*s->available_send_bytes > 0) { + s->ready(s); + } + } else { + D("LS(%d) received ack", s->id); + s->ready(s); + } +} + asocket* create_local_socket(unique_fd ufd) { int fd = ufd.release(); asocket* s = new asocket(); @@ -402,7 +458,7 @@ asocket* create_local_socket(unique_fd ufd) { asocket* create_local_service_socket(std::string_view name, atransport* transport) { #if !ADB_HOST - if (asocket* s = daemon_service_to_socket(name); s) { + if (asocket* s = daemon_service_to_socket(name, transport); s) { return s; } #endif @@ -413,6 +469,7 @@ asocket* create_local_service_socket(std::string_view name, atransport* transpor int fd_value = fd.get(); asocket* s = create_local_socket(std::move(fd)); + s->transport = transport; LOG(VERBOSE) << "LS(" << s->id << "): bound to '" << name << "' via " << fd_value; #if !ADB_HOST @@ -501,6 +558,12 @@ asocket* create_remote_socket(unsigned id, atransport* t) { } void connect_to_remote(asocket* s, std::string_view destination) { +#if ADB_HOST + // Snoop reverse:forward: requests to track them so that an + // appropriate filter (to figure out whether the remote is + // allowed to connect locally) can be applied. + s->transport->UpdateReverseConfig(destination); +#endif D("Connect_to_remote call RS(%d) fd=%d", s->id, s->fd); apacket* p = get_apacket(); @@ -508,6 +571,11 @@ void connect_to_remote(asocket* s, std::string_view destination) { p->msg.command = A_OPEN; p->msg.arg0 = s->id; + if (s->transport->SupportsDelayedAck()) { + p->msg.arg1 = INITIAL_DELAYED_ACK_BYTES; + s->available_send_bytes = 0; + } + // adbd used to expect a null-terminated string. // Keep doing so to maintain backward compatibility. p->payload.resize(destination.size() + 1); @@ -51,10 +51,6 @@ static inline void* mempcpy(void* dst, const void* src, size_t n) { #ifdef _WIN32 -// Clang-only nullability specifiers -#define _Nonnull -#define _Nullable - #include <ctype.h> #include <direct.h> #include <dirent.h> @@ -789,6 +785,9 @@ static inline void disable_tcp_nagle(borrowed_fd fd) { // configured to drop after 10 missed keepalives. Returns true on success. bool set_tcp_keepalive(borrowed_fd fd, int interval_sec); +// Returns a human-readable OS version string. +extern std::string GetOSVersion(void); + #if defined(_WIN32) // Win32 defines ERROR, which we don't need, but which conflicts with google3 logging. #undef ERROR diff --git a/sysdeps/uio.h b/sysdeps/uio.h index ced884ba..5f7ad5c4 100644 --- a/sysdeps/uio.h +++ b/sysdeps/uio.h @@ -24,7 +24,7 @@ // Layout of this struct must match struct WSABUF (verified via static assert in sysdeps_win32.cpp) struct adb_iovec { - size_t iov_len; + unsigned int iov_len; void* iov_base; }; diff --git a/sysdeps_unix.cpp b/sysdeps_unix.cpp index e5657067..66757cd9 100644 --- a/sysdeps_unix.cpp +++ b/sysdeps_unix.cpp @@ -16,6 +16,10 @@ #include "sysdeps.h" +#include <sys/utsname.h> + +#include <android-base/stringprintf.h> + bool set_tcp_keepalive(borrowed_fd fd, int interval_sec) { int enable = (interval_sec > 0); if (adb_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable))) { @@ -90,3 +94,16 @@ Process adb_launch_process(std::string_view executable, std::vector<std::string> } exit(execv(copies.front().data(), rawArgs.data())); } + +// For Unix variants (Linux, OSX), the underlying uname() system call +// is utilized to extract out a version string comprising of: +// 1.) "Linux" or "Darwin" +// 2.) OS system release (e.g. "5.19.11") +// 3.) machine (e.g. "x86_64") +// like: "Linux 5.19.11-1<snip>1-amd64 (x86_64)" +std::string GetOSVersion(void) { + utsname name; + uname(&name); + + return android::base::StringPrintf("%s %s (%s)", name.sysname, name.release, name.machine); +} diff --git a/sysdeps_win32.cpp b/sysdeps_win32.cpp index a43a3fc6..bbf3ee5f 100644 --- a/sysdeps_win32.cpp +++ b/sysdeps_win32.cpp @@ -2353,6 +2353,34 @@ int unix_read_interruptible(borrowed_fd fd, void* buf, size_t len) { /**************************************************************************/ /**************************************************************************/ /***** *****/ +/***** Versioning support *****/ +/***** *****/ +/**************************************************************************/ +/**************************************************************************/ +std::string GetOSVersion() { + // We also considered GetVersionInfoEx(), however the internal + // API below is preferable since Windows-8 (and above) returns + // the manifest windows data (as opposed to the actual version info). + typedef FARPROC(WINAPI * RtlGetVersionPtr)(PRTL_OSVERSIONINFOW); + RtlGetVersionPtr RtlGetVersionInternal = reinterpret_cast<RtlGetVersionPtr>( + GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "RtlGetVersion")); + + if (!RtlGetVersionInternal) { + return "<Could not get handle to RtlGetVersion in ntdll.dll>"; + } + + OSVERSIONINFO version; + version.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); + + RtlGetVersionInternal(static_cast<PRTL_OSVERSIONINFOW>(&version)); + + return android::base::StringPrintf("Windows %lu.%lu.%lu", version.dwMajorVersion, + version.dwMinorVersion, version.dwBuildNumber); +} + +/**************************************************************************/ +/**************************************************************************/ +/***** *****/ /***** Unicode support *****/ /***** *****/ /**************************************************************************/ diff --git a/test_device.py b/test_device.py index 0d635917..c606adf2 100755 --- a/test_device.py +++ b/test_device.py @@ -39,26 +39,6 @@ from datetime import datetime import adb -def requires_root(func): - def wrapper(self, *args): - if self.device.get_prop('ro.debuggable') != '1': - raise unittest.SkipTest('requires rootable build') - - was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' - if not was_root: - self.device.root() - self.device.wait() - - try: - func(self, *args) - finally: - if not was_root: - self.device.unroot() - self.device.wait() - - return wrapper - - def requires_non_root(func): def wrapper(self, *args): was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' @@ -629,7 +609,6 @@ class ArgumentEscapingTest(DeviceTest): os.remove(tf.name) -@unittest.skip("b/172372960: temporarily disabled due to flakiness") class RootUnrootTest(DeviceTest): def _test_root(self): message = self.device.root() @@ -680,14 +659,15 @@ class SystemPropertiesTest(DeviceTest): def test_get_prop(self): self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') - @requires_root def test_set_prop(self): - prop_name = 'foo.bar' + # debug.* prop does not require root privileges + prop_name = 'debug.foo' self.device.shell(['setprop', prop_name, '""']) - self.device.set_prop(prop_name, 'qux') + val = random.random() + self.device.set_prop(prop_name, str(val)) self.assertEqual( - self.device.shell(['getprop', prop_name])[0].strip(), 'qux') + self.device.shell(['getprop', prop_name])[0].strip(), str(val)) def compute_md5(string): @@ -965,7 +945,7 @@ class FileOperationsTest: Disabled because this broken on the adbd side as well: b/141943968 """ with tempfile.NamedTemporaryFile() as tmp_file: - tmp_file.write('\0' * 1024 * 1024) + tmp_file.write(b'\0' * 1024 * 1024) tmp_file.flush() remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' self.device.shell(['rm', '-rf', remote_path]) @@ -1330,6 +1310,7 @@ class FileOperationsTest: for file_size in [8, 1024 * 1024]: try: + host_dir = tempfile.mkdtemp() device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') device_file = posixpath.join(device_dir, 'file') @@ -1337,7 +1318,6 @@ class FileOperationsTest: self.device.shell(['mkdir', '-p', device_dir]) self.device.shell(['echo', 'foo', '>', device_file]) - host_dir = tempfile.mkdtemp() host_file = posixpath.join(host_dir, 'file') with open(host_file, "w") as f: @@ -1576,7 +1556,6 @@ class SocketTest(DeviceTest): class FramebufferTest(DeviceTest): - @requires_root def test_framebuffer(self): """Test that we get something from the framebuffer service.""" output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"]) diff --git a/test_utils/test_utils.cpp b/test_utils/test_utils.cpp new file mode 100644 index 00000000..9e722447 --- /dev/null +++ b/test_utils/test_utils.cpp @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2023 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "test_utils.h" + +#include <android-base/strings.h> +#include <android-base/test_utils.h> + +#include <cutils/sockets.h> +#include <gtest/gtest.h> + +#include "shell_protocol.h" +#include "sysdeps.h" + +namespace test_utils { + +// Reads raw data from |fd| until it closes or errors. +std::string ReadRaw(android::base::borrowed_fd fd) { + char buffer[1024]; + char *cur_ptr = buffer, *end_ptr = buffer + sizeof(buffer); + + while (1) { + int bytes = adb_read(fd, cur_ptr, end_ptr - cur_ptr); + if (bytes <= 0) { + return std::string(buffer, cur_ptr); + } + cur_ptr += bytes; + } +} + +// Reads shell protocol data from |fd| until it closes or errors. Fills +// |stdout| and |stderr| with their respective data, and returns the exit code +// read from the protocol or -1 if an exit code packet was not received. +int ReadShellProtocol(android::base::borrowed_fd fd, std::string* std_out, std::string* std_err) { + int exit_code = -1; + std_out->clear(); + std_err->clear(); + + auto protocol = std::make_unique<ShellProtocol>(fd.get()); + while (protocol->Read()) { + switch (protocol->id()) { + case ShellProtocol::kIdStdout: + std_out->append(protocol->data(), protocol->data_length()); + break; + case ShellProtocol::kIdStderr: + std_err->append(protocol->data(), protocol->data_length()); + break; + case ShellProtocol::kIdExit: + EXPECT_EQ(-1, exit_code) << "Multiple exit packets received"; + EXPECT_EQ(1u, protocol->data_length()); + exit_code = protocol->data()[0]; + break; + default: + ADD_FAILURE() << "Unidentified packet ID: " << protocol->id(); + } + } + + return exit_code; +} + +// Checks if each line in |lines| exists in the same order in |output|. Blank +// lines in |output| are ignored for simplicity. +bool ExpectLinesEqual(const std::string& output, const std::vector<std::string>& lines) { + auto output_lines = android::base::Split(output, "\r\n"); + size_t i = 0; + + for (const std::string& line : lines) { + // Skip empty lines in output. + while (i < output_lines.size() && output_lines[i].empty()) { + ++i; + } + if (i >= output_lines.size()) { + ADD_FAILURE() << "Ran out of output lines"; + return false; + } + EXPECT_EQ(output_lines[i], line); + ++i; + } + + while (i < output_lines.size() && output_lines[i].empty()) { + ++i; + } + EXPECT_EQ(i, output_lines.size()) << "Found unmatched output lines"; + return true; +} + +// Relies on the device to allocate an available port, and +// returns it to the caller. Also returns the associated fd. +// Existing client (LocalSocketTest) of this interface is +// implemented only on Linux, hence using cutils. +int GetUnassignedPort(android::base::unique_fd& fd) { + fd.reset(socket_inaddr_any_server(0, SOCK_STREAM)); + EXPECT_NE(static_cast<cutils_socket_t>(fd.get()), INVALID_SOCKET); + + const int port = socket_get_local_port(fd.get()); + EXPECT_GT(port, 0); + + return port; +} + +} // namespace test_utils diff --git a/test_utils/test_utils.h b/test_utils/test_utils.h new file mode 100644 index 00000000..6edcbcef --- /dev/null +++ b/test_utils/test_utils.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2023 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <vector> + +#include <android-base/file.h> +#include <android-base/unique_fd.h> + +namespace test_utils { + +// Reads raw data from |fd| until it closes or errors. +std::string ReadRaw(android::base::borrowed_fd fd); + +// / Reads shell protocol data from |fd| until it closes or errors. Fills +// |stdout| and |stderr| with their respective data, and returns the exit code +// read from the protocol or -1 if an exit code packet was not received. +int ReadShellProtocol(android::base::borrowed_fd fd, std::string* std_out, std::string* std_err); + +// Checks if each line in |lines| exists in the same order in |output|. Blank +// lines in |output| are ignored for simplicity. +bool ExpectLinesEqual(const std::string& output, const std::vector<std::string>& lines); + +// Allows the device to allocate a port, which is returned to the caller. +// Also returns the associated fd. +int GetUnassignedPort(android::base::unique_fd& fd); + +} // namespace test_utils diff --git a/tls/include/adb/tls/tls_connection.h b/tls/include/adb/tls/tls_connection.h index bc5b98ab..a112756c 100644 --- a/tls/include/adb/tls/tls_connection.h +++ b/tls/include/adb/tls/tls_connection.h @@ -19,6 +19,7 @@ #include <stddef.h> #include <stdint.h> +#include <functional> #include <string_view> #include <vector> diff --git a/tls/tls_connection.cpp b/tls/tls_connection.cpp index 853cdac0..d87993d0 100644 --- a/tls/tls_connection.cpp +++ b/tls/tls_connection.cpp @@ -16,6 +16,8 @@ #include "adb/tls/tls_connection.h" +#include <limits.h> + #include <algorithm> #include <vector> diff --git a/transport.cpp b/transport.cpp index 0dfad0e1..35db37c7 100644 --- a/transport.cpp +++ b/transport.cpp @@ -33,6 +33,7 @@ #include <memory> #include <mutex> #include <set> +#include <string> #include <thread> #include <adb/crypto/rsa_2048_key.h> @@ -60,6 +61,7 @@ using namespace adb::crypto; using namespace adb::tls; +using namespace std::string_literals; using android::base::ScopedLockAssertion; using TlsError = TlsConnection::TlsError; @@ -90,6 +92,7 @@ const char* const kFeatureSendRecv2Brotli = "sendrecv_v2_brotli"; const char* const kFeatureSendRecv2LZ4 = "sendrecv_v2_lz4"; const char* const kFeatureSendRecv2Zstd = "sendrecv_v2_zstd"; const char* const kFeatureSendRecv2DryRunSend = "sendrecv_v2_dry_run_send"; +const char* const kFeatureDelayedAck = "delayed_ack"; // TODO(joshuaduong): Bump to v2 when openscreen discovery is enabled by default const char* const kFeatureOpenscreenMdns = "openscreen_mdns"; @@ -149,12 +152,12 @@ class ReconnectHandler { }; void ReconnectHandler::Start() { - check_main_thread(); + fdevent_check_looper(); handler_thread_ = std::thread(&ReconnectHandler::Run, this); } void ReconnectHandler::Stop() { - check_main_thread(); + fdevent_check_looper(); { std::lock_guard<std::mutex> lock(reconnect_mutex_); running_ = false; @@ -172,7 +175,7 @@ void ReconnectHandler::Stop() { } void ReconnectHandler::TrackTransport(atransport* transport) { - check_main_thread(); + fdevent_check_looper(); { std::lock_guard<std::mutex> lock(reconnect_mutex_); if (!running_) return; @@ -723,9 +726,27 @@ void update_transports() { #endif // ADB_HOST +// The transport listeners communicate with the transports list via fdevent. tmsg structure is a +// container used as message unit and written to a pipe in order to communicate the transport +// (pointer) and the action to perform. +// +// Transport listener FDEVENT Transports list +// -------------------------------------------------------------- +// (&transport,action) -> tmsg -> (&transport, action) +// +// TODO: Figure out if this fdevent bridge really is necessary? With the re-entrant lock to sync +// access, what prevents us from "simply" updating the transport_list directly? + struct tmsg { atransport* transport; - int action; + + enum struct Action : int { + UNREGISTER = 0, // Unregister the transport from transport list (typically a device has + // been unplugged from USB or disconnected from TCP. + REGISTER = 1, // Register the transport to the transport list (typically a device has been + // plugged via USB or connected via TCP. + }; + Action action; }; static int transport_read_action(int fd, struct tmsg* m) { @@ -788,7 +809,7 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { t = m.transport; - if (m.action == 0) { + if (m.action == tmsg::Action::UNREGISTER) { D("transport: %s deleting", t->serial.c_str()); { @@ -885,7 +906,7 @@ void kick_all_transports_by_auth_key(std::string_view auth_key) { void register_transport(atransport* transport) { tmsg m; m.transport = transport; - m.action = 1; + m.action = tmsg::Action::REGISTER; D("transport: %s registered", transport->serial.c_str()); if (transport_write_action(transport_registration_send, &m)) { PLOG(FATAL) << "cannot write transport registration socket"; @@ -895,7 +916,7 @@ void register_transport(atransport* transport) { static void remove_transport(atransport* transport) { tmsg m; m.transport = transport; - m.action = 0; + m.action = tmsg::Action::UNREGISTER; D("transport: %s removed", transport->serial.c_str()); if (transport_write_action(transport_registration_send, &m)) { PLOG(FATAL) << "cannot write transport registration socket"; @@ -903,7 +924,7 @@ static void remove_transport(atransport* transport) { } static void transport_destroy(atransport* t) { - check_main_thread(); + fdevent_check_looper(); CHECK(t != nullptr); std::lock_guard<std::recursive_mutex> lock(transport_lock); @@ -1010,7 +1031,7 @@ atransport* acquire_one_transport(TransportType type, const char* serial, Transp } else if (serial) { if (t->MatchesTarget(serial)) { if (result) { - *error_out = "more than one device"; + *error_out = "more than one device with serial "s + serial; if (is_ambiguous) *is_ambiguous = true; result = nullptr; break; @@ -1020,7 +1041,7 @@ atransport* acquire_one_transport(TransportType type, const char* serial, Transp } else { if (type == kTransportUsb && t->type == kTransportUsb) { if (result) { - *error_out = "more than one device"; + *error_out = "more than one USB device"; if (is_ambiguous) *is_ambiguous = true; result = nullptr; break; @@ -1141,7 +1162,7 @@ ConnectionState atransport::GetConnectionState() const { } void atransport::SetConnectionState(ConnectionState state) { - check_main_thread(); + fdevent_check_looper(); connection_state_ = state; update_transports(); } @@ -1149,7 +1170,7 @@ void atransport::SetConnectionState(ConnectionState state) { #if ADB_HOST bool atransport::Attach(std::string* error) { D("%s: attach", serial.c_str()); - check_main_thread(); + fdevent_check_looper(); if (!should_use_libusb()) { *error = "attach/detach only implemented for libusb backend"; @@ -1176,7 +1197,7 @@ bool atransport::Attach(std::string* error) { bool atransport::Detach(std::string* error) { D("%s: detach", serial.c_str()); - check_main_thread(); + fdevent_check_looper(); if (!should_use_libusb()) { *error = "attach/detach only implemented for libusb backend"; @@ -1216,14 +1237,16 @@ bool atransport::HandleRead(std::unique_ptr<apacket> p) { VLOG(TRANSPORT) << dump_packet(serial.c_str(), "from remote", p.get()); apacket* packet = p.release(); - // TODO: Does this need to run on the main thread? - fdevent_run_on_main_thread([packet, this]() { handle_packet(packet, this); }); + // This needs to run on the looper thread since the associated fdevent + // message pump exists in that context. + fdevent_run_on_looper([packet, this]() { handle_packet(packet, this); }); + return true; } void atransport::HandleError(const std::string& error) { LOG(INFO) << serial_name() << ": connection terminated: " << error; - fdevent_run_on_main_thread([this]() { + fdevent_run_on_looper([this]() { handle_offline(this); transport_destroy(this); }); @@ -1246,31 +1269,51 @@ size_t atransport::get_max_payload() const { return max_payload; } +#if ADB_HOST +static bool delayed_ack_enabled() { + static const char* env = getenv("ADB_DELAYED_ACK"); + static bool result = env && strcmp(env, "1") == 0; + return result; +} +#endif + const FeatureSet& supported_features() { - static const android::base::NoDestructor<FeatureSet> features([] { - return FeatureSet{ - kFeatureShell2, - kFeatureCmd, - kFeatureStat2, - kFeatureLs2, - kFeatureFixedPushMkdir, - kFeatureApex, - kFeatureAbb, - kFeatureFixedPushSymlinkTimestamp, - kFeatureAbbExec, - kFeatureRemountShell, - kFeatureTrackApp, - kFeatureSendRecv2, - kFeatureSendRecv2Brotli, - kFeatureSendRecv2LZ4, - kFeatureSendRecv2Zstd, - kFeatureSendRecv2DryRunSend, - kFeatureOpenscreenMdns, - // Increment ADB_SERVER_VERSION when adding a feature that adbd needs - // to know about. Otherwise, the client can be stuck running an old - // version of the server even after upgrading their copy of adb. - // (http://b/24370690) + static const android::base::NoDestructor<FeatureSet> features([]() { + // Increment ADB_SERVER_VERSION when adding a feature that adbd needs + // to know about. Otherwise, the client can be stuck running an old + // version of the server even after upgrading their copy of adb. + // (http://b/24370690) + + // clang-format off + FeatureSet result { + kFeatureShell2, + kFeatureCmd, + kFeatureStat2, + kFeatureLs2, + kFeatureFixedPushMkdir, + kFeatureApex, + kFeatureAbb, + kFeatureFixedPushSymlinkTimestamp, + kFeatureAbbExec, + kFeatureRemountShell, + kFeatureTrackApp, + kFeatureSendRecv2, + kFeatureSendRecv2Brotli, + kFeatureSendRecv2LZ4, + kFeatureSendRecv2Zstd, + kFeatureSendRecv2DryRunSend, + kFeatureOpenscreenMdns, }; + // clang-format on + +#if ADB_HOST + if (delayed_ack_enabled()) { + result.push_back(kFeatureDelayedAck); + } +#else + result.push_back(kFeatureDelayedAck); +#endif + return result; }()); return *features; @@ -1303,6 +1346,7 @@ bool atransport::has_feature(const std::string& feature) const { void atransport::SetFeatures(const std::string& features_string) { features_ = StringToFeatureSet(features_string); + delayed_ack_ = CanUseFeature(features_, kFeatureDelayedAck); } void atransport::AddDisconnect(adisconnect* disconnect) { @@ -1448,12 +1492,28 @@ void close_usb_devices(bool reset) { } #endif +bool validate_transport_list(const std::list<atransport*>& list, const std::string& serial, + atransport* t, int* error) { + for (const auto& transport : list) { + if (serial == transport->serial) { + const std::string list_name(&list == &pending_list ? "pending" : "transport"); + VLOG(TRANSPORT) << "socket transport " << transport->serial << " is already in the " + << list_name << " list and fails to register"; + delete t; + if (error) *error = EALREADY; + return false; + } + } + return true; +} + bool register_socket_transport(unique_fd s, std::string serial, int port, int local, atransport::ReconnectCallback reconnect, bool use_tls, int* error) { atransport* t = new atransport(std::move(reconnect), kCsOffline); t->use_tls = use_tls; + t->serial = std::move(serial); - D("transport: %s init'ing for socket %d, on port %d", serial.c_str(), s.get(), port); + D("transport: %s init'ing for socket %d, on port %d", t->serial.c_str(), s.get(), port); if (init_socket_transport(t, std::move(s), port, local) < 0) { delete t; if (error) *error = errno; @@ -1461,27 +1521,14 @@ bool register_socket_transport(unique_fd s, std::string serial, int port, int lo } std::unique_lock<std::recursive_mutex> lock(transport_lock); - for (const auto& transport : pending_list) { - if (serial == transport->serial) { - VLOG(TRANSPORT) << "socket transport " << transport->serial - << " is already in pending_list and fails to register"; - delete t; - if (error) *error = EALREADY; - return false; - } + if (!validate_transport_list(pending_list, t->serial, t, error)) { + return false; } - for (const auto& transport : transport_list) { - if (serial == transport->serial) { - VLOG(TRANSPORT) << "socket transport " << transport->serial - << " is already in transport_list and fails to register"; - delete t; - if (error) *error = EALREADY; - return false; - } + if (!validate_transport_list(transport_list, t->serial, t, error)) { + return false; } - t->serial = std::move(serial); pending_list.push_front(t); lock.unlock(); @@ -1590,6 +1637,63 @@ void unregister_usb_transport(usb_handle* usb) { return t->GetUsbHandle() == usb && t->GetConnectionState() == kCsNoPerm; }); } + +// Track reverse:forward commands, so that info can be used to develop +// an 'allow-list': +// - adb reverse tcp:<device_port> localhost:<host_port> : responds with the +// device_port +// - adb reverse --remove tcp:<device_port> : responds OKAY +// - adb reverse --remove-all : responds OKAY +void atransport::UpdateReverseConfig(std::string_view service_addr) { + fdevent_check_looper(); + if (!android::base::ConsumePrefix(&service_addr, "reverse:")) { + return; + } + + if (android::base::ConsumePrefix(&service_addr, "forward:")) { + // forward:[norebind:]<remote>;<local> + bool norebind = android::base::ConsumePrefix(&service_addr, "norebind:"); + auto it = service_addr.find(';'); + if (it == std::string::npos) { + return; + } + std::string remote(service_addr.substr(0, it)); + + if (norebind && reverse_forwards_.find(remote) != reverse_forwards_.end()) { + // This will fail, don't update the map. + LOG(DEBUG) << "ignoring reverse forward that will fail due to norebind"; + return; + } + + std::string local(service_addr.substr(it + 1)); + reverse_forwards_[remote] = local; + } else if (android::base::ConsumePrefix(&service_addr, "killforward:")) { + // kill-forward:<remote> + auto it = service_addr.find(';'); + if (it != std::string::npos) { + return; + } + reverse_forwards_.erase(std::string(service_addr)); + } else if (service_addr == "killforward-all") { + reverse_forwards_.clear(); + } else if (service_addr == "list-forward") { + LOG(DEBUG) << __func__ << " ignoring --list"; + } else { // Anything else we need to know about? + LOG(FATAL) << "unhandled reverse service: " << service_addr; + } +} + +// Is this an authorized :connect request? +bool atransport::IsReverseConfigured(const std::string& local_addr) { + fdevent_check_looper(); + for (const auto& [remote, local] : reverse_forwards_) { + if (local == local_addr) { + return true; + } + } + return false; +} + #endif bool check_header(apacket* p, atransport* t) { diff --git a/transport.h b/transport.h index 86186936..fc0e322d 100644 --- a/transport.h +++ b/transport.h @@ -31,6 +31,7 @@ #include <string> #include <string_view> #include <thread> +#include <unordered_map> #include <vector> #include <android-base/macros.h> @@ -98,6 +99,8 @@ extern const char* const kFeatureSendRecv2LZ4; extern const char* const kFeatureSendRecv2Zstd; // adbd supports dry-run send for send/recv v2. extern const char* const kFeatureSendRecv2DryRunSend; +// adbd supports delayed acks. +extern const char* const kFeatureDelayedAck; TransportId NextTransportId(); @@ -296,6 +299,10 @@ class atransport : public enable_weak_from_this<atransport> { #if ADB_HOST void SetUsbHandle(usb_handle* h) { usb_handle_ = h; } usb_handle* GetUsbHandle() { return usb_handle_; } + + // Interface for management/filter on forward:reverse: configuration. + void UpdateReverseConfig(std::string_view service_addr); + bool IsReverseConfigured(const std::string& local_addr); #endif const TransportId id; @@ -344,6 +351,10 @@ class atransport : public enable_weak_from_this<atransport> { bool has_feature(const std::string& feature) const; + bool SupportsDelayedAck() const { + return delayed_ack_; + } + // Loads the transport's feature set from the given string. void SetFeatures(const std::string& features_string); @@ -419,6 +430,15 @@ class atransport : public enable_weak_from_this<atransport> { std::mutex mutex_; + bool delayed_ack_ = false; + +#if ADB_HOST + // Track remote addresses against local addresses (configured) + // through `adb reverse` commands. + // Access constrained to primary thread by virtue of check_main_thread(). + std::unordered_map<std::string, std::string> reverse_forwards_; +#endif + DISALLOW_COPY_AND_ASSIGN(atransport); }; @@ -307,12 +307,12 @@ struct weak_ptr { } T* get() const { - check_main_thread(); + fdevent_check_looper(); return ptr_; } void reset(T* ptr = nullptr) { - check_main_thread(); + fdevent_check_looper(); if (ptr == ptr_) { return; @@ -338,7 +338,7 @@ template <typename T> struct enable_weak_from_this { ~enable_weak_from_this() { if (!weak_ptrs_.empty()) { - check_main_thread(); + fdevent_check_looper(); for (auto& weak : weak_ptrs_) { weak->ptr_ = nullptr; } @@ -349,7 +349,7 @@ struct enable_weak_from_this { weak_ptr<T> weak() { return weak_ptr<T>(static_cast<T*>(this)); } void schedule_deletion() { - fdevent_run_on_main_thread([this]() { delete static_cast<T*>(this); }); + fdevent_run_on_looper([this]() { delete static_cast<T*>(this); }); } private: diff --git a/types_test.cpp b/types_test.cpp index 086a35dd..5cb21336 100644 --- a/types_test.cpp +++ b/types_test.cpp @@ -47,6 +47,13 @@ TEST(IOVector, empty) { CHECK_EQ(0ULL, bc.coalesce().size()); } +TEST(IOVector, move_constructor) { + IOVector x; + size_t xsize = x.coalesce().size(); + IOVector y(std::move(x)); + CHECK_EQ(xsize, y.coalesce().size()); +} + TEST(IOVector, single_block) { // A single block. auto block = create_block('x', 100); @@ -179,7 +186,7 @@ TEST_F(weak_ptr_test, smoke) { bool destroyed = false; std::optional<weak_ptr<Destructor>> p; - fdevent_run_on_main_thread([&p, &destructor, &destroyed]() { + fdevent_run_on_looper([&p, &destructor, &destroyed]() { destructor = new Destructor(&destroyed); p = destructor->weak(); ASSERT_TRUE(p->get()); @@ -198,7 +205,7 @@ TEST_F(weak_ptr_test, smoke) { WaitForFdeventLoop(); ASSERT_TRUE(destroyed); - fdevent_run_on_main_thread([&p]() { + fdevent_run_on_looper([&p]() { ASSERT_FALSE(p->get()); p.reset(); }); |