# Copyright 2012 Google Inc. All Rights Reserved. # Author: mrdmnd@ (Matt Redmond) """A unit test for sending data to Bartlett. Requires poster module.""" import cookielib import os import signal import subprocess import tempfile import time import unittest import urllib2 from poster.encode import multipart_encode from poster.streaminghttp import register_openers SERVER_DIR = '../.' SERVER_URL = 'http://localhost:8080/' GET = '_ah/login?email=googler@google.com&action=Login&continue=%s' AUTH_URL = SERVER_URL + GET class ServerTest(unittest.TestCase): """A unit test for the bartlett server. Tests upload, serve, and delete.""" def setUp(self): """Instantiate the files and server needed to test upload functionality.""" self._server_proc = LaunchLocalServer() self._jar = cookielib.LWPCookieJar() self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar)) # We need these files to not delete when closed, because we have to reopen # them in read mode after we write and close them. self.profile_data = tempfile.NamedTemporaryFile(delete=False) size = 16 * 1024 self.profile_data.write(os.urandom(size)) def tearDown(self): self.profile_data.close() os.remove(self.profile_data.name) os.kill(self._server_proc.pid, signal.SIGINT) def testIntegration(self): # pylint: disable-msg=C6409 key = self._testUpload() self._testListAll() self._testServeKey(key) self._testDelKey(key) def _testUpload(self): # pylint: disable-msg=C6409 register_openers() data = {'profile_data': self.profile_data, 'board': 'x86-zgb', 'chromeos_version': '2409.0.2012_06_08_1114'} datagen, headers = multipart_encode(data) request = urllib2.Request(SERVER_URL + 'upload', datagen, headers) response = urllib2.urlopen(request).read() self.assertTrue(response) return response def _testListAll(self): # pylint: disable-msg=C6409 request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve')) response = self.opener.open(request).read() self.assertTrue(response) def _testServeKey(self, key): # pylint: disable-msg=C6409 request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve/' + key)) response = self.opener.open(request).read() self.assertTrue(response) def _testDelKey(self, key): # pylint: disable-msg=C6409 # There is no response to a delete request. # We will check the listAll page to ensure there is no data. request = urllib2.Request(AUTH_URL % (SERVER_URL + 'del/' + key)) response = self.opener.open(request).read() request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve')) response = self.opener.open(request).read() self.assertFalse(response) def LaunchLocalServer(): """Launch and store an authentication cookie with a local server.""" proc = subprocess.Popen( ['dev_appserver.py', '--clear_datastore', SERVER_DIR], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Wait for server to come up while True: time.sleep(1) try: request = urllib2.Request(SERVER_URL + 'serve') response = urllib2.urlopen(request).read() if response: break except urllib2.URLError: continue return proc if __name__ == '__main__': unittest.main()