aboutsummaryrefslogtreecommitdiff
path: root/deprecated/automation/common/command_executer.py
diff options
context:
space:
mode:
Diffstat (limited to 'deprecated/automation/common/command_executer.py')
-rw-r--r--deprecated/automation/common/command_executer.py230
1 files changed, 230 insertions, 0 deletions
diff --git a/deprecated/automation/common/command_executer.py b/deprecated/automation/common/command_executer.py
new file mode 100644
index 00000000..c0f314f5
--- /dev/null
+++ b/deprecated/automation/common/command_executer.py
@@ -0,0 +1,230 @@
+# Copyright 2011 Google Inc. All Rights Reserved.
+#
+"""Classes that help running commands in a subshell.
+
+Commands can be run locally, or remotly using SSH connection. You may log the
+output of a command to a terminal or a file, or any other destination.
+"""
+
+__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
+
+import fcntl
+import logging
+import os
+import select
+import subprocess
+import time
+
+from automation.common import logger
+
+
+class CommandExecuter(object):
+ DRY_RUN = False
+
+ def __init__(self, dry_run=False):
+ self._logger = logging.getLogger(self.__class__.__name__)
+ self._dry_run = dry_run or self.DRY_RUN
+
+ @classmethod
+ def Configure(cls, dry_run):
+ cls.DRY_RUN = dry_run
+
+ def RunCommand(self,
+ cmd,
+ machine=None,
+ username=None,
+ command_terminator=None,
+ command_timeout=None):
+ cmd = str(cmd)
+
+ if self._dry_run:
+ return 0
+
+ if not command_terminator:
+ command_terminator = CommandTerminator()
+
+ if command_terminator.IsTerminated():
+ self._logger.warning('Command has been already terminated!')
+ return 1
+
+ # Rewrite command for remote execution.
+ if machine:
+ if username:
+ login = '%s@%s' % (username, machine)
+ else:
+ login = machine
+
+ self._logger.debug("Executing '%s' on %s.", cmd, login)
+
+ # FIXME(asharif): Remove this after crosbug.com/33007 is fixed.
+ cmd = "ssh -t -t %s -- '%s'" % (login, cmd)
+ else:
+ self._logger.debug("Executing: '%s'.", cmd)
+
+ child = self._SpawnProcess(cmd, command_terminator, command_timeout)
+
+ self._logger.debug('{PID: %d} Finished with %d code.', child.pid,
+ child.returncode)
+
+ return child.returncode
+
+ def _Terminate(self, child, command_timeout, wait_timeout=10):
+ """Gracefully shutdown the child by sending SIGTERM."""
+
+ if command_timeout:
+ self._logger.warning('{PID: %d} Timeout of %s seconds reached since '
+ 'process started.', child.pid, command_timeout)
+
+ self._logger.warning('{PID: %d} Terminating child.', child.pid)
+
+ try:
+ child.terminate()
+ except OSError:
+ pass
+
+ wait_started = time.time()
+
+ while not child.poll():
+ if time.time() - wait_started >= wait_timeout:
+ break
+ time.sleep(0.1)
+
+ return child.poll()
+
+ def _Kill(self, child):
+ """Kill the child with immediate result."""
+ self._logger.warning('{PID: %d} Process still alive.', child.pid)
+ self._logger.warning('{PID: %d} Killing child.', child.pid)
+ child.kill()
+ child.wait()
+
+ def _SpawnProcess(self, cmd, command_terminator, command_timeout):
+ # Create a child process executing provided command.
+ child = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ shell=True)
+
+ # Close stdin so the child won't be able to block on read.
+ child.stdin.close()
+
+ started_time = time.time()
+
+ # Watch for data on process stdout, stderr.
+ pipes = [child.stdout, child.stderr]
+
+ # Put pipes into non-blocking mode.
+ for pipe in pipes:
+ fd = pipe.fileno()
+ fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK)
+
+ already_terminated = False
+
+ while pipes:
+ # Maybe timeout reached?
+ if command_timeout and time.time() - started_time > command_timeout:
+ command_terminator.Terminate()
+
+ # Check if terminate request was received.
+ if command_terminator.IsTerminated() and not already_terminated:
+ if not self._Terminate(child, command_timeout):
+ self._Kill(child)
+ # Don't exit the loop immediately. Firstly try to read everything that
+ # was left on stdout and stderr.
+ already_terminated = True
+
+ # Wait for pipes to become ready.
+ ready_pipes, _, _ = select.select(pipes, [], [], 0.1)
+
+ # Handle file descriptors ready to be read.
+ for pipe in ready_pipes:
+ fd = pipe.fileno()
+
+ data = os.read(fd, 4096)
+
+ # check for end-of-file
+ if not data:
+ pipes.remove(pipe)
+ continue
+
+ # read all data that's available
+ while data:
+ if pipe == child.stdout:
+ self.DataReceivedOnOutput(data)
+ elif pipe == child.stderr:
+ self.DataReceivedOnError(data)
+
+ try:
+ data = os.read(fd, 4096)
+ except OSError:
+ # terminate loop if EWOULDBLOCK (EAGAIN) is received
+ data = ''
+
+ if not already_terminated:
+ self._logger.debug('Waiting for command to finish.')
+ child.wait()
+
+ return child
+
+ def DataReceivedOnOutput(self, data):
+ """Invoked when the child process wrote data to stdout."""
+ sys.stdout.write(data)
+
+ def DataReceivedOnError(self, data):
+ """Invoked when the child process wrote data to stderr."""
+ sys.stderr.write(data)
+
+
+class LoggingCommandExecuter(CommandExecuter):
+
+ def __init__(self, *args, **kwargs):
+ super(LoggingCommandExecuter, self).__init__(*args, **kwargs)
+
+ # Create a logger for command's stdout/stderr streams.
+ self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output'))
+
+ def OpenLog(self, log_path):
+ """The messages are going to be saved to gzip compressed file."""
+ formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S')
+ handler = logger.CompressedFileHandler(log_path, delay=True)
+ handler.setFormatter(formatter)
+ self._output.addHandler(handler)
+
+ # Set a flag to prevent log records from being propagated up the logger
+ # hierarchy tree. We don't want for command output messages to appear in
+ # the main log.
+ self._output.propagate = 0
+
+ def CloseLog(self):
+ """Remove handlers and reattach the logger to its parent."""
+ for handler in list(self._output.handlers):
+ self._output.removeHandler(handler)
+ handler.flush()
+ handler.close()
+
+ self._output.propagate = 1
+
+ def DataReceivedOnOutput(self, data):
+ """Invoked when the child process wrote data to stdout."""
+ for line in data.splitlines():
+ self._output.info(line, extra={'prefix': 'STDOUT'})
+
+ def DataReceivedOnError(self, data):
+ """Invoked when the child process wrote data to stderr."""
+ for line in data.splitlines():
+ self._output.warning(line, extra={'prefix': 'STDERR'})
+
+
+class CommandTerminator(object):
+
+ def __init__(self):
+ self.terminated = False
+
+ def Terminate(self):
+ self.terminated = True
+
+ def IsTerminated(self):
+ return self.terminated