aboutsummaryrefslogtreecommitdiff
path: root/catapult/telemetry/telemetry/internal/backends/chrome_inspector/inspector_page.py
blob: 0c37347fc88986c731aad32efb9812dc17fb2e3e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import time

from telemetry.util import image_util


class InspectorPage(object):
  """Class that controls a page connected by an inspector_websocket.

  This class provides utility methods for controlling a page connected by an
  inspector_websocket. It does not perform any exception handling. All
  inspector_websocket exceptions must be handled by the caller.
  """
  def __init__(self, inspector_websocket, timeout):
    self._inspector_websocket = inspector_websocket
    self._inspector_websocket.RegisterDomain('Page', self._OnNotification)

    self._navigation_pending = False
    self._navigation_url = ''  # Support for legacy backends.
    self._navigation_frame_id = ''
    self._navigated_frame_ids = None  # Holds frame ids while navigating.
    self._script_to_evaluate_on_commit = None
    # Turn on notifications. We need them to get the Page.frameNavigated event.
    self._EnablePageNotifications(timeout=timeout)

  def _OnNotification(self, msg):
    if msg['method'] == 'Page.frameNavigated':
      url = msg['params']['frame']['url']
      if not self._navigated_frame_ids == None:
        frame_id = msg['params']['frame']['id']
        if self._navigation_frame_id == frame_id:
          self._navigation_frame_id = ''
          self._navigated_frame_ids = None
          self._navigation_pending = False
        else:
          self._navigated_frame_ids.add(frame_id)
      elif self._navigation_url == url:
        # TODO(tonyg): Remove this when Chrome 38 goes stable.
        self._navigation_url = ''
        self._navigation_pending = False
      elif (not url == 'chrome://newtab/' and not url == 'about:blank'
        and not 'parentId' in msg['params']['frame']):
        # Marks the navigation as complete and unblocks the
        # WaitForNavigate call.
        self._navigation_pending = False

  def _SetScriptToEvaluateOnCommit(self, source, timeout):
    existing_source = (self._script_to_evaluate_on_commit and
                       self._script_to_evaluate_on_commit['source'])
    if source == existing_source:
      return
    if existing_source:
      request = {
          'method': 'Page.removeScriptToEvaluateOnLoad',
          'params': {
              'identifier': self._script_to_evaluate_on_commit['id'],
              }
          }
      self._inspector_websocket.SyncRequest(request, timeout)
      self._script_to_evaluate_on_commit = None
    if source:
      request = {
          'method': 'Page.addScriptToEvaluateOnLoad',
          'params': {
              'scriptSource': source,
              }
          }
      res = self._inspector_websocket.SyncRequest(request, timeout)
      self._script_to_evaluate_on_commit = {
          'id': res['result']['identifier'],
          'source': source
          }

  def _EnablePageNotifications(self, timeout=60):
    request = {
        'method': 'Page.enable'
        }
    res = self._inspector_websocket.SyncRequest(request, timeout)
    assert len(res['result'].keys()) == 0

  def WaitForNavigate(self, timeout=60):
    """Waits for the navigation to complete.

    The current page is expect to be in a navigation. This function returns
    when the navigation is complete or when the timeout has been exceeded.
    """
    start_time = time.time()
    remaining_time = timeout
    self._navigation_pending = True
    while self._navigation_pending and remaining_time > 0:
      remaining_time = max(timeout - (time.time() - start_time), 0.0)
      self._inspector_websocket.DispatchNotifications(remaining_time)

  def Navigate(self, url, script_to_evaluate_on_commit=None, timeout=60):
    """Navigates to |url|.

    If |script_to_evaluate_on_commit| is given, the script source string will be
    evaluated when the navigation is committed. This is after the context of
    the page exists, but before any script on the page itself has executed.
    """

    self._SetScriptToEvaluateOnCommit(script_to_evaluate_on_commit, timeout)
    request = {
        'method': 'Page.navigate',
        'params': {
            'url': url,
            }
        }
    self._navigated_frame_ids = set()
    res = self._inspector_websocket.SyncRequest(request, timeout)
    if 'frameId' in res['result']:
      # Modern backends are returning frameId from Page.navigate.
      # Use it here to unblock upon precise navigation.
      frame_id = res['result']['frameId']
      if self._navigated_frame_ids and frame_id in self._navigated_frame_ids:
        self._navigated_frame_ids = None
        return
      self._navigation_frame_id = frame_id
    else:
      # TODO(tonyg): Remove this when Chrome 38 goes stable.
      self._navigated_frame_ids = None
      self._navigation_url = url
    self.WaitForNavigate(timeout)

  def GetCookieByName(self, name, timeout=60):
    """Returns the value of the cookie by the given |name|."""
    request = {
        'method': 'Page.getCookies'
        }
    res = self._inspector_websocket.SyncRequest(request, timeout)
    cookies = res['result']['cookies']
    for cookie in cookies:
      if cookie['name'] == name:
        return cookie['value']
    return None

  def CaptureScreenshot(self, timeout=60):
    request = {
        'method': 'Page.captureScreenshot'
        }
    # "Google API are missing..." infobar might cause a viewport resize
    # which invalidates screenshot request. See crbug.com/459820.
    for _ in range(2):
      res = self._inspector_websocket.SyncRequest(request, timeout)
      if res and ('result' in res) and ('data' in res['result']):
        return image_util.FromBase64Png(res['result']['data'])
    return None

  def CollectGarbage(self, timeout=60):
    request = {
        'method': 'HeapProfiler.collectGarbage'
        }
    res = self._inspector_websocket.SyncRequest(request, timeout)
    assert 'result' in res