diff options
Diffstat (limited to 'automation/common/command_executer.py')
-rw-r--r-- | automation/common/command_executer.py | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/automation/common/command_executer.py b/automation/common/command_executer.py new file mode 100644 index 00000000..c0f314f5 --- /dev/null +++ b/automation/common/command_executer.py @@ -0,0 +1,230 @@ +# Copyright 2011 Google Inc. All Rights Reserved. +# +"""Classes that help running commands in a subshell. + +Commands can be run locally, or remotly using SSH connection. You may log the +output of a command to a terminal or a file, or any other destination. +""" + +__author__ = 'kbaclawski@google.com (Krystian Baclawski)' + +import fcntl +import logging +import os +import select +import subprocess +import time + +from automation.common import logger + + +class CommandExecuter(object): + DRY_RUN = False + + def __init__(self, dry_run=False): + self._logger = logging.getLogger(self.__class__.__name__) + self._dry_run = dry_run or self.DRY_RUN + + @classmethod + def Configure(cls, dry_run): + cls.DRY_RUN = dry_run + + def RunCommand(self, + cmd, + machine=None, + username=None, + command_terminator=None, + command_timeout=None): + cmd = str(cmd) + + if self._dry_run: + return 0 + + if not command_terminator: + command_terminator = CommandTerminator() + + if command_terminator.IsTerminated(): + self._logger.warning('Command has been already terminated!') + return 1 + + # Rewrite command for remote execution. + if machine: + if username: + login = '%s@%s' % (username, machine) + else: + login = machine + + self._logger.debug("Executing '%s' on %s.", cmd, login) + + # FIXME(asharif): Remove this after crosbug.com/33007 is fixed. + cmd = "ssh -t -t %s -- '%s'" % (login, cmd) + else: + self._logger.debug("Executing: '%s'.", cmd) + + child = self._SpawnProcess(cmd, command_terminator, command_timeout) + + self._logger.debug('{PID: %d} Finished with %d code.', child.pid, + child.returncode) + + return child.returncode + + def _Terminate(self, child, command_timeout, wait_timeout=10): + """Gracefully shutdown the child by sending SIGTERM.""" + + if command_timeout: + self._logger.warning('{PID: %d} Timeout of %s seconds reached since ' + 'process started.', child.pid, command_timeout) + + self._logger.warning('{PID: %d} Terminating child.', child.pid) + + try: + child.terminate() + except OSError: + pass + + wait_started = time.time() + + while not child.poll(): + if time.time() - wait_started >= wait_timeout: + break + time.sleep(0.1) + + return child.poll() + + def _Kill(self, child): + """Kill the child with immediate result.""" + self._logger.warning('{PID: %d} Process still alive.', child.pid) + self._logger.warning('{PID: %d} Killing child.', child.pid) + child.kill() + child.wait() + + def _SpawnProcess(self, cmd, command_terminator, command_timeout): + # Create a child process executing provided command. + child = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + shell=True) + + # Close stdin so the child won't be able to block on read. + child.stdin.close() + + started_time = time.time() + + # Watch for data on process stdout, stderr. + pipes = [child.stdout, child.stderr] + + # Put pipes into non-blocking mode. + for pipe in pipes: + fd = pipe.fileno() + fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK) + + already_terminated = False + + while pipes: + # Maybe timeout reached? + if command_timeout and time.time() - started_time > command_timeout: + command_terminator.Terminate() + + # Check if terminate request was received. + if command_terminator.IsTerminated() and not already_terminated: + if not self._Terminate(child, command_timeout): + self._Kill(child) + # Don't exit the loop immediately. Firstly try to read everything that + # was left on stdout and stderr. + already_terminated = True + + # Wait for pipes to become ready. + ready_pipes, _, _ = select.select(pipes, [], [], 0.1) + + # Handle file descriptors ready to be read. + for pipe in ready_pipes: + fd = pipe.fileno() + + data = os.read(fd, 4096) + + # check for end-of-file + if not data: + pipes.remove(pipe) + continue + + # read all data that's available + while data: + if pipe == child.stdout: + self.DataReceivedOnOutput(data) + elif pipe == child.stderr: + self.DataReceivedOnError(data) + + try: + data = os.read(fd, 4096) + except OSError: + # terminate loop if EWOULDBLOCK (EAGAIN) is received + data = '' + + if not already_terminated: + self._logger.debug('Waiting for command to finish.') + child.wait() + + return child + + def DataReceivedOnOutput(self, data): + """Invoked when the child process wrote data to stdout.""" + sys.stdout.write(data) + + def DataReceivedOnError(self, data): + """Invoked when the child process wrote data to stderr.""" + sys.stderr.write(data) + + +class LoggingCommandExecuter(CommandExecuter): + + def __init__(self, *args, **kwargs): + super(LoggingCommandExecuter, self).__init__(*args, **kwargs) + + # Create a logger for command's stdout/stderr streams. + self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output')) + + def OpenLog(self, log_path): + """The messages are going to be saved to gzip compressed file.""" + formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s', + '%Y-%m-%d %H:%M:%S') + handler = logger.CompressedFileHandler(log_path, delay=True) + handler.setFormatter(formatter) + self._output.addHandler(handler) + + # Set a flag to prevent log records from being propagated up the logger + # hierarchy tree. We don't want for command output messages to appear in + # the main log. + self._output.propagate = 0 + + def CloseLog(self): + """Remove handlers and reattach the logger to its parent.""" + for handler in list(self._output.handlers): + self._output.removeHandler(handler) + handler.flush() + handler.close() + + self._output.propagate = 1 + + def DataReceivedOnOutput(self, data): + """Invoked when the child process wrote data to stdout.""" + for line in data.splitlines(): + self._output.info(line, extra={'prefix': 'STDOUT'}) + + def DataReceivedOnError(self, data): + """Invoked when the child process wrote data to stderr.""" + for line in data.splitlines(): + self._output.warning(line, extra={'prefix': 'STDERR'}) + + +class CommandTerminator(object): + + def __init__(self): + self.terminated = False + + def Terminate(self): + self.terminated = True + + def IsTerminated(self): + return self.terminated |