aboutsummaryrefslogtreecommitdiff
path: root/cwp/bartlett/test/server_tester.py
blob: 585da43aec77d9f1416a3d019dad8680c9f7b1cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright 2012 Google Inc. All Rights Reserved.
# Author: mrdmnd@ (Matt Redmond)
"""A unit test for sending data to Bartlett. Requires poster module."""

import cookielib
import os
import signal
import subprocess
import tempfile
import time
import unittest
import urllib2

from poster.encode import multipart_encode
from poster.streaminghttp import register_openers

SERVER_DIR = '../.'
SERVER_URL = 'http://localhost:8080/'
GET = '_ah/login?email=googler@google.com&action=Login&continue=%s'
AUTH_URL = SERVER_URL + GET


class ServerTest(unittest.TestCase):
  """A unit test for the bartlett server. Tests upload, serve, and delete."""

  def setUp(self):
    """Instantiate the files and server needed to test upload functionality."""
    self._server_proc = LaunchLocalServer()
    self._jar = cookielib.LWPCookieJar()
    self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar))

    # We need these files to not delete when closed, because we have to reopen
    # them in read mode after we write and close them.
    self.profile_data = tempfile.NamedTemporaryFile(delete=False)

    size = 16 * 1024
    self.profile_data.write(os.urandom(size))

  def tearDown(self):
    self.profile_data.close()
    os.remove(self.profile_data.name)
    os.kill(self._server_proc.pid, signal.SIGINT)

  def testIntegration(self):  # pylint: disable-msg=C6409
    key = self._testUpload()
    self._testListAll()
    self._testServeKey(key)
    self._testDelKey(key)

  def _testUpload(self):  # pylint: disable-msg=C6409
    register_openers()
    data = {'profile_data': self.profile_data,
            'board': 'x86-zgb',
            'chromeos_version': '2409.0.2012_06_08_1114'}
    datagen, headers = multipart_encode(data)
    request = urllib2.Request(SERVER_URL + 'upload', datagen, headers)
    response = urllib2.urlopen(request).read()
    self.assertTrue(response)
    return response

  def _testListAll(self):  # pylint: disable-msg=C6409
    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve'))
    response = self.opener.open(request).read()
    self.assertTrue(response)

  def _testServeKey(self, key):  # pylint: disable-msg=C6409
    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve/' + key))
    response = self.opener.open(request).read()
    self.assertTrue(response)

  def _testDelKey(self, key):  # pylint: disable-msg=C6409
    # There is no response to a delete request.
    # We will check the listAll page to ensure there is no data.
    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'del/' + key))
    response = self.opener.open(request).read()
    request = urllib2.Request(AUTH_URL % (SERVER_URL + 'serve'))
    response = self.opener.open(request).read()
    self.assertFalse(response)


def LaunchLocalServer():
  """Launch and store an authentication cookie with a local server."""
  proc = subprocess.Popen(
      ['dev_appserver.py', '--clear_datastore', SERVER_DIR],
      stdout=subprocess.PIPE,
      stderr=subprocess.PIPE)
  # Wait for server to come up
  while True:
    time.sleep(1)
    try:
      request = urllib2.Request(SERVER_URL + 'serve')
      response = urllib2.urlopen(request).read()
      if response:
        break
    except urllib2.URLError:
      continue
  return proc


if __name__ == '__main__':
  unittest.main()