summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXin Li <delphij@google.com>2024-01-23 22:22:13 -0800
committerXin Li <delphij@google.com>2024-01-23 22:22:13 -0800
commit44138172a9ab8021ac7a5b683a06c3b85a42d547 (patch)
tree67fad8b817c1885f6499f680018859619740e81d
parentf294e9ffc0c7dabd663593a1bb8b8f3d2ad0fbbb (diff)
parentc7df9f42dccf5304ef000cca649c82c5fe84ce66 (diff)
downloadconnectivity-44138172a9ab8021ac7a5b683a06c3b85a42d547.tar.gz
Merge Android 24Q1 Release (ab/11220357)temp_319669529
Bug: 319669529 Merged-In: I327b552cb0ff2a5157f298ec36e2ffe068307a70 Change-Id: Ib54a280b9f4bae3b8f946a1a2d46b896e2628c53
-rw-r--r--acts/framework/acts/controllers/cellular_lib/PresetSimulation.py19
-rw-r--r--acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py72
-rw-r--r--acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py158
-rw-r--r--acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py187
-rw-r--r--acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py21
-rw-r--r--acts_tests/acts_contrib/test_utils/gnss/device_doze.py67
-rw-r--r--acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py54
-rw-r--r--acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py409
-rw-r--r--acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py3
-rw-r--r--acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py9
-rw-r--r--acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py540
-rw-r--r--acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py590
-rw-r--r--acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py178
-rw-r--r--acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py131
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py18
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py2
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py4
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py76
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py4
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py4
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py4
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py4
-rw-r--r--acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py324
-rw-r--r--acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py143
-rw-r--r--acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py34
-rw-r--r--acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py88
-rw-r--r--acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py211
-rw-r--r--acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py10
-rw-r--r--acts_tests/tests/google/gnss/GnssConcurrencyTest.py46
-rw-r--r--acts_tests/tests/google/gnss/GnssFunctionTest.py160
-rw-r--r--acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py1
-rw-r--r--acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py6
-rw-r--r--acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py7
-rw-r--r--acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py46
-rw-r--r--acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py7
-rw-r--r--acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py7
-rw-r--r--acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py109
-rw-r--r--acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py9
-rw-r--r--acts_tests/tests/google/wifi/WifiPingTest.py33
-rw-r--r--acts_tests/tests/google/wifi/WifiRssiTest.py45
-rw-r--r--acts_tests/tests/google/wifi/WifiRvrTest.py13
-rw-r--r--acts_tests/tests/google/wifi/WifiSensitivityTest.py2
-rw-r--r--acts_tests/tests/google/wifi/WifiSoftApTest.py2
-rw-r--r--acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py18
-rw-r--r--acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py5
45 files changed, 2939 insertions, 941 deletions
diff --git a/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py b/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py
index d536b0921..35e715e92 100644
--- a/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py
+++ b/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py
@@ -13,8 +13,6 @@
# limitations under the License.
from acts.controllers.cellular_lib.BaseSimulation import BaseSimulation
-from acts.controllers.cellular_lib import BaseCellularDut
-
class PresetSimulation(BaseSimulation):
"""5G preset simulation.
@@ -27,12 +25,6 @@ class PresetSimulation(BaseSimulation):
KEY_CELL_INFO = "cell_info"
KEY_SCPI_FILE_NAME = "scpi_file"
- NETWORK_BIT_MASK = {
- 'nr_lte': '11000001000000000000'
- }
- ADB_CMD_LOCK_NETWORK = 'cmd phone set-allowed-network-types-for-users -s 0 {network_bit_mask}'
- NR_LTE_BIT_MASK_KEY = 'nr_lte'
-
def __init__(self,
simulator,
log,
@@ -61,9 +53,6 @@ class PresetSimulation(BaseSimulation):
self.dut.set_apn('Keysight', 'Keysight')
self.num_carriers = None
- # Enable roaming on the phone
- self.dut.toggle_data_roaming(True)
-
def setup_simulator(self):
"""Do initial configuration in the simulator. """
self.log.info('This simulation does not require initial setup.')
@@ -110,11 +99,9 @@ class PresetSimulation(BaseSimulation):
RuntimeError: attaching fail
due to unable to connect dut and cells.
"""
- try:
- self.simulator.wait_until_attached(self.dut, self.attach_timeout,
- self.attach_retries)
- except Exception as exc:
- raise RuntimeError('Could not attach to base station.') from exc
+ self.simulator.wait_until_attached(self.dut,
+ self.attach_timeout,
+ self.attach_retries)
def calibrated_downlink_rx_power(self, bts_config, rsrp):
"""Convert RSRP to total signal power from the basestation.
diff --git a/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py b/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py
index 5ef7eac89..ddca8d581 100644
--- a/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py
+++ b/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py
@@ -91,8 +91,8 @@ class UXMCellularSimulator(AbstractCellularSimulator):
"""A cellular simulator for UXM callbox."""
# Keys to obtain data from cell_info dictionary.
- KEY_CELL_NUMBER = "cell_number"
- KEY_CELL_TYPE = "cell_type"
+ _KEY_CELL_NUMBER = "cell_number"
+ _KEY_CELL_TYPE = "cell_type"
# UXM socket port
UXM_SOCKET_PORT = 5125
@@ -102,6 +102,8 @@ class UXMCellularSimulator(AbstractCellularSimulator):
SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n'
SCPI_CHECK_CONNECTION_CMD = '*IDN?\n'
SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister'
+ _SCPI_CHANGE_DL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:DL:TDOMain:APOLicy ONMac'
+ _SCPI_CHANGE_UL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:UL:NUL:TDOMain:APOLicy ONSRbsr'
# require: path to SCPI file
SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n'
# require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1)
@@ -399,7 +401,7 @@ class UXMCellularSimulator(AbstractCellularSimulator):
It is required to create a dedicated bearer setup
with EPS bearer ID 10.
"""
- cell_number = self.cells[0][self.KEY_CELL_NUMBER]
+ cell_number = self.cells[0][self._KEY_CELL_NUMBER]
self._socket_send_SCPI_command(
self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number))
@@ -433,6 +435,20 @@ class UXMCellularSimulator(AbstractCellularSimulator):
f' cell type: {cell_type}\n' +
f' cell number: {cell_number}\n')
+ def get_all_cell_status(self):
+ """Gets status of all cells.
+
+ Returns:
+ List of tuples which has values (cell_type, cell_number, cell_status)
+ """
+ res = []
+ for cell in self.cells:
+ cell_type = cell[self._KEY_CELL_TYPE]
+ cell_number = cell[self._KEY_CELL_NUMBER]
+ cell_status = self.get_cell_status(cell_type, cell_number)
+ res.append((cell_type, cell_number, cell_status))
+ return res
+
def get_cell_status(self, cell_type, cell_number):
"""Get status of cell.
@@ -546,23 +562,6 @@ class UXMCellularSimulator(AbstractCellularSimulator):
"""
self.import_configuration(path)
- def dut_rockbottom(self, dut):
- """Set the dut to rockbottom state.
-
- Args:
- dut: a CellularAndroid controller.
- """
- # The rockbottom script might include a device reboot, so it is
- # necessary to stop SL4A during its execution.
- dut.ad.stop_services()
- self.log.info('Executing rockbottom script for ' + dut.ad.model)
- os.chmod(self.rockbottom_script, 0o777)
- os.system('{} {}'.format(self.rockbottom_script, dut.ad.serial))
- # Make sure the DUT is in root mode after coming back
- dut.ad.root_adb()
- # Restart SL4A
- dut.ad.start_services()
-
def set_sim_type(self, is_3gpp_sim):
sim_type = 'KEYSight'
if is_3gpp_sim:
@@ -610,7 +609,7 @@ class UXMCellularSimulator(AbstractCellularSimulator):
interval = 10
# waits for device to camp
- for index in range(1, attach_retries):
+ for index in range(1, attach_retries+1):
count = 0
# airplane mode off
dut.toggle_airplane_mode(False)
@@ -636,12 +635,7 @@ class UXMCellularSimulator(AbstractCellularSimulator):
# reboot device
if (index % 2) == 0:
dut.ad.reboot()
- if self.rockbottom_script:
- self.dut_rockbottom(dut)
- else:
- self.log.warning(
- f'Rockbottom script was not executed after reboot.'
- )
+
# toggle APM and cell on/off
elif (index % 1) == 0:
# Toggle APM on
@@ -676,11 +670,11 @@ class UXMCellularSimulator(AbstractCellularSimulator):
to connect to 1 basestation.
"""
# get cell info
- first_cell_type = self.cells[0][self.KEY_CELL_TYPE]
- first_cell_number = self.cells[0][self.KEY_CELL_NUMBER]
+ first_cell_type = self.cells[0][self._KEY_CELL_TYPE]
+ first_cell_number = self.cells[0][self._KEY_CELL_NUMBER]
if len(self.cells) == 2:
- second_cell_type = self.cells[1][self.KEY_CELL_TYPE]
- second_cell_number = self.cells[1][self.KEY_CELL_NUMBER]
+ second_cell_type = self.cells[1][self._KEY_CELL_TYPE]
+ second_cell_number = self.cells[1][self._KEY_CELL_NUMBER]
# connect to 1st cell
self.wait_until_attached_one_cell(first_cell_type,
@@ -694,7 +688,7 @@ class UXMCellularSimulator(AbstractCellularSimulator):
second_cell_number,
)
- for _ in range(1, attach_retries):
+ for _ in range(1, attach_retries+1):
self.log.info('Try to aggregate to NR.')
self._socket_send_SCPI_command(
'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None')
@@ -722,6 +716,12 @@ class UXMCellularSimulator(AbstractCellularSimulator):
raise RuntimeError(f'Fail to aggregate to NR from LTE.')
+ def modify_dl_ul_mac_padding(self):
+ """Disables dl/ul mac padding packets."""
+ self.log.info('modifying dl ul mac padding')
+ self._socket_send_SCPI_command(self._SCPI_CHANGE_DL_TDOMAIN)
+ self._socket_send_SCPI_command(self._SCPI_CHANGE_UL_TDOMAIN)
+
def set_lte_rrc_state_change_timer(self, enabled, time=10):
"""Configures the LTE RRC state change timer.
@@ -950,8 +950,8 @@ class UXMCellularSimulator(AbstractCellularSimulator):
CellularSimulatorError exception. Default is 120 seconds.
"""
# turn on RRC release
- cell_type = self.cells[0][self.KEY_CELL_TYPE]
- cell_number = self.cells[0][self.KEY_CELL_NUMBER]
+ cell_type = self.cells[0][self._KEY_CELL_TYPE]
+ cell_number = self.cells[0][self._KEY_CELL_NUMBER]
# choose cmd base on cell type
cmd = None
@@ -981,8 +981,8 @@ class UXMCellularSimulator(AbstractCellularSimulator):
def detach(self):
""" Turns off all the base stations so the DUT loose connection."""
for cell in self.cells:
- cell_type = cell[self.KEY_CELL_TYPE]
- cell_number = cell[self.KEY_CELL_NUMBER]
+ cell_type = cell[self._KEY_CELL_TYPE]
+ cell_number = cell[self._KEY_CELL_NUMBER]
self.turn_cell_off(cell_type, cell_number)
time.sleep(5)
diff --git a/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py b/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py
index 8e4d10ab6..f4ba74f41 100644
--- a/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py
+++ b/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py
@@ -15,6 +15,7 @@
# limitations under the License.
import collections
+import itertools
import pyvisa
import time
from acts import logger
@@ -38,6 +39,12 @@ class Keysight5GTestApp(object):
def __init__(self, config):
self.config = config
+ self.test_app_settings = {
+ 'lte_cell_count': 0,
+ 'nr_cell_count': 0,
+ 'lte_cell_configs': [],
+ 'nr_cell_configs': []
+ }
self.log = logger.create_tagged_trace_logger("{}{}".format(
self.config['brand'], self.config['model']))
self.resource_manager = pyvisa.ResourceManager(self.VISA_LOCATION)
@@ -57,6 +64,9 @@ class Keysight5GTestApp(object):
else:
self.log.info("Test App ID: {}".format(inst_id))
+ self.test_app_settings['lte_cell_count'] = self.get_cell_count('LTE')
+ self.test_app_settings['nr_cell_count'] = self.get_cell_count('NR5G')
+
def destroy(self):
self.test_app.close()
@@ -194,6 +204,18 @@ class Keysight5GTestApp(object):
self.select_cell(cell_type, cell)
self.send_cmd('DISPlay:{} {},{}'.format(cell_type, tab, subtab))
+ def get_cell_count(self, cell_type):
+ """Function to get cell count
+
+ Args:
+ cell_type: LTE or NR5G cell
+ Returns:
+ cell_count: number of cells of cell_type supported.
+ """
+ cell_count = int(
+ self.send_cmd('BSE:CONFig:{}:CELL:COUNt?'.format(cell_type), 1))
+ return cell_count
+
def get_cell_state(self, cell_type, cell):
"""Function to get cell on/off state.
@@ -249,17 +271,23 @@ class Keysight5GTestApp(object):
self.send_cmd('BSE:CONFig:{}:{}:ACTive:STATe {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), state))
- def set_cell_type(self, cell_type, cell, sa_or_nsa):
+ def turn_all_cells_off(self):
+ for cell in range(self.test_app_settings['lte_cell_count']):
+ self.set_cell_state('LTE', cell + 1, 0)
+ for cell in range(self.test_app_settings['nr_cell_count']):
+ self.set_cell_state('NR5G', cell + 1, 0)
+
+ def set_nr_cell_type(self, cell_type, cell, nr_cell_type):
"""Function to set cell duplex mode
Args:
cell_type: LTE or NR5G cell
cell: cell/carrier number
- sa_or_nsa: SA or NSA
+ nr_cell_type: SA or NSA
"""
self.assert_cell_off(cell_type, cell)
- self.send_cmd('BSE: CONFig:NR5G:{}:TYPE {}'.format(
- Keysight5GTestApp._format_cells(cell), sa_or_nsa))
+ self.send_cmd('BSE:CONFig:{}:{}:TYPE {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell), nr_cell_type))
def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
"""Function to set cell duplex mode
@@ -297,7 +325,8 @@ class Keysight5GTestApp(object):
if cell_type == 'NR5G' and isinstance(
channel, str) and channel.lower() in ['low', 'mid', 'high']:
self.send_cmd('BSE:CONFig:{}:{}:TESTChanLoc {}'.format(
- cell_type, Keysight5GTestApp._format_cells(cell), channel.upper()))
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ channel.upper()))
elif arfcn == 1:
self.send_cmd('BSE:CONFig:{}:{}:DL:CHANnel {}'.format(
cell_type, Keysight5GTestApp._format_cells(cell), channel))
@@ -308,7 +337,8 @@ class Keysight5GTestApp(object):
def toggle_contiguous_nr_channels(self, force_contiguous):
self.assert_cell_off('NR5G', 1)
- self.log.warning('Forcing contiguous NR channels overrides channel config.')
+ self.log.warning(
+ 'Forcing contiguous NR channels overrides channel config.')
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
if force_contiguous:
self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
@@ -415,6 +445,40 @@ class Keysight5GTestApp(object):
cell_type, Keysight5GTestApp._format_cells(cell), power))
self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
+ def set_cell_ul_power_control(self, cell_type, cell, mode, target_power=0):
+ """Function configure UL power control
+
+ Args:
+ cell_type: LTE or NR5G cell
+ cell: cell/carrier number
+ mode: UL power control mode. One of [TARget | MANual | UP | DOWN | DISabled]
+ target_power: target power for PUSCH
+ """
+ self.send_cmd('BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:MODE {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell), mode))
+ if cell_type == 'NR5G' and mode == 'TARget':
+ self.send_cmd(
+ 'BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:TARGet:POWer {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ target_power))
+ elif cell_type == 'LTE' and mode == 'TARget':
+ self.send_cmd(
+ 'BSE:CONFig:{}:{}:UL:CLPControl:TARGet:POWer:PUSCH {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell),
+ target_power))
+
+ def set_cell_input_power(self, cell_type, cell, power):
+ """Function to set cell input power
+
+ Args:
+ cell_type: LTE or NR5G cell
+ cell: cell/carrier number
+ power: expected input power
+ """
+ self.send_cmd('BSE:CONFIG:{}:{}:MANual:POWer {}'.format(
+ cell_type, Keysight5GTestApp._format_cells(cell), power))
+ self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
+
def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
"""Function to set cell power
@@ -459,6 +523,18 @@ class Keysight5GTestApp(object):
Keysight5GTestApp._format_cells(cell), scenario))
self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
+ def set_nr_schedule_slot_ratio(self, cell, slot_ratio):
+ """Function to set NR schedule to one of predefince quick configs.
+
+ Args:
+ cell: cell number to address. schedule will apply to all cells
+ slot_ratio: downlink slot ratio
+ """
+ self.assert_cell_off('NR5G', cell)
+ self.send_cmd('BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:RATIo {}'.format(
+ Keysight5GTestApp._format_cells(cell), slot_ratio))
+ self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
+
def set_nr_cell_mcs(self, cell, dl_mcs, ul_mcs):
"""Function to set NR cell DL & UL MCS
@@ -468,13 +544,58 @@ class Keysight5GTestApp(object):
ul_mcs: mcs index to use on UL
"""
self.assert_cell_off('NR5G', cell)
- self.send_cmd(
- 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"'
- .format(dl_mcs))
+ frame_config_count = 5
+ slot_config_count = 8
+ if isinstance(dl_mcs, dict):
+ self.configure_nr_link_adaptation(cell, link_config=dl_mcs)
+ else:
+ for frame, slot in itertools.product(range(frame_config_count),
+ range(slot_config_count)):
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy FIXed'
+ .format(Keysight5GTestApp._format_cells(cell), frame,
+ slot))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"'
+ .format(dl_mcs))
self.send_cmd(
'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "UL:IMCS", "{}"'
.format(ul_mcs))
+ def configure_nr_link_adaptation(self, cell, link_config):
+ frame_config_count = 5
+ slot_config_count = 8
+ for frame, slot in itertools.product(range(frame_config_count),
+ range(slot_config_count)):
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy {}'
+ .format(Keysight5GTestApp._format_cells(cell), frame, slot,
+ link_config['link_policy']))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:IMCS {}'.
+ format(Keysight5GTestApp._format_cells(cell), frame, slot,
+ link_config['initial_mcs']))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:MAXimum:IMCS {}'
+ .format(Keysight5GTestApp._format_cells(cell), frame, slot,
+ link_config['maximum_mcs']))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:MAC:LADaptation:NTX:BEValuation {}'.format(
+ Keysight5GTestApp._format_cells(cell),
+ link_config.get('adaptation_interval', 10000)))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:COUNt {}'.format(
+ Keysight5GTestApp._format_cells(cell),
+ link_config.get('target_nack_count', 1000)))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:MARGin {}'.format(
+ Keysight5GTestApp._format_cells(cell),
+ link_config.get('target_nack_margin', 100)))
+ self.send_cmd(
+ 'BSE:CONFig:NR5G:{}:MAC:DL:LADaptation:MCS:INCRement {}'.format(
+ Keysight5GTestApp._format_cells(cell),
+ link_config.get('mcs_step', 1)))
+
def set_lte_cell_mcs(
self,
cell,
@@ -500,9 +621,16 @@ class Keysight5GTestApp(object):
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "DL:MCS:TABle", "{}"'
.format(dl_mcs_table_formatted))
- self.send_cmd(
- 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"'
- .format(dl_mcs))
+ self.configure_lte_periodic_csi_reporting(cell, 1)
+ if dl_mcs == 'WCQI':
+ self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE WCQI'.format(
+ Keysight5GTestApp._format_cells(cell)))
+ else:
+ self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE EXPLicit'.format(
+ Keysight5GTestApp._format_cells(cell)))
+ self.send_cmd(
+ 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"'
+ .format(dl_mcs))
self.send_cmd(
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MCS:TABle", "{}"'
.format(ul_mcs_table))
@@ -510,6 +638,12 @@ class Keysight5GTestApp(object):
'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL", "UL:IMCS", "{}"'
.format(ul_mcs))
+ def configure_lte_periodic_csi_reporting(self, cell, enable):
+ """Function to enable/disable LTE CSI reporting."""
+
+ self.send_cmd('BSE:CONFig:LTE:{}:PHY:CSI:PERiodic:STATe {}'.format(
+ Keysight5GTestApp._format_cells(cell), enable))
+
def set_lte_control_region_size(self, cell, num_symbols):
self.assert_cell_off('LTE', cell)
self.send_cmd('BSE:CONFig:LTE:{}:PHY:PCFich:CFI {}'.format(
diff --git a/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py b/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py
index 57e66ff72..fd2d79490 100644
--- a/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py
+++ b/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py
@@ -29,6 +29,7 @@ from acts import base_test
from acts import utils
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts.controllers.utils_lib import ssh
+from acts.controllers.android_lib.tel import tel_utils
from acts.controllers import iperf_server as ipf
from acts_contrib.test_utils.cellular.keysight_5g_testapp import Keysight5GTestApp
from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
@@ -87,18 +88,20 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
self.user_params['retry_tests'] = [self.__class__.__name__]
# Turn Airplane mode on
- asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- 'Can not turn on airplane mode.')
+ #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ # 'Can not turn on airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, True)
def teardown_class(self):
self.log.info('Turning airplane mode on')
try:
- asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- 'Can not turn on airplane mode.')
+ #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ # 'Can not turn on airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, True)
except:
self.log.warning('Cannot perform teardown operations on DUT.')
try:
- self.keysight_test_app.set_cell_state('LTE', 1, 0)
+ self.keysight_test_app.turn_all_cells_off()
self.keysight_test_app.destroy()
except:
self.log.warning('Cannot perform teardown operations on tester.')
@@ -112,11 +115,11 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
def teardown_test(self):
self.retry_flag = False
self.log.info('Turing airplane mode on')
- asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- 'Can not turn on airplane mode.')
- if self.keysight_test_app.get_cell_state('LTE', 'CELL1'):
- self.log.info('Turning LTE off.')
- self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0)
+ #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ # 'Can not turn on airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ self.log.info('Turning all cells off.')
+ self.keysight_test_app.turn_all_cells_off()
log_path = os.path.join(
context.get_current_context().get_full_output_path(), 'pixel_logs')
os.makedirs(self.log_path, exist_ok=True)
@@ -133,8 +136,9 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
and sets a retry_flag to enable further tweaking the test logic on
second attempts.
"""
- asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- 'Can not turn on airplane mode.')
+ #asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ # 'Can not turn on airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, True)
if self.keysight_test_app.get_cell_state('LTE', 'CELL1'):
self.log.info('Turning LTE off.')
self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0)
@@ -318,6 +322,10 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
# Configure all cells
for cell_idx, cell in enumerate(
testcase_params['endc_combo_config']['cell_list']):
+ if cell['cell_type'] == 'NR5G':
+ self.keysight_test_app.set_nr_cell_type(
+ cell['cell_type'], cell['cell_number'],
+ cell['nr_cell_type'])
self.keysight_test_app.set_cell_duplex_mode(
cell['cell_type'], cell['cell_number'], cell['duplex_mode'])
self.keysight_test_app.set_cell_band(cell['cell_type'],
@@ -326,6 +334,14 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
self.keysight_test_app.set_cell_dl_power(
cell['cell_type'], cell['cell_number'],
testcase_params['cell_power_sweep'][cell_idx][0], 1)
+ self.keysight_test_app.set_cell_input_power(
+ cell['cell_type'], cell['cell_number'],
+ self.testclass_params['input_power'][cell['cell_type']])
+ self.keysight_test_app.set_cell_ul_power_control(
+ cell['cell_type'], cell['cell_number'],
+ self.testclass_params['ul_power_control_mode'],
+ self.testclass_params.get('ul_power_control_target',0)
+ )
if cell['cell_type'] == 'NR5G':
self.keysight_test_app.set_nr_subcarrier_spacing(
cell['cell_number'], cell['subcarrier_spacing'])
@@ -361,10 +377,15 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
self.testclass_params['lte_ul_mac_padding'])
if testcase_params['endc_combo_config']['nr_cell_count']:
+
if 'schedule_scenario' in testcase_params:
self.keysight_test_app.set_nr_cell_schedule_scenario(
'CELL1',
testcase_params['schedule_scenario'])
+ if testcase_params['schedule_scenario'] == 'FULL_TPUT':
+ self.keysight_test_app.set_nr_schedule_slot_ratio(
+ 'CELL1',
+ testcase_params['schedule_slot_ratio'])
self.keysight_test_app.set_nr_ul_dft_precoding(
'CELL1', testcase_params['transform_precoding'])
self.keysight_test_app.set_nr_cell_mcs(
@@ -375,47 +396,80 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
self.keysight_test_app.set_ul_carriers(
testcase_params['endc_combo_config']['nr_ul_carriers'])
- # Turn on LTE cells
- for cell in testcase_params['endc_combo_config']['cell_list']:
- if cell['cell_type'] == 'LTE' and not self.keysight_test_app.get_cell_state(
- cell['cell_type'], cell['cell_number']):
- self.log.info('Turning LTE Cell {} on.'.format(
- cell['cell_number']))
- self.keysight_test_app.set_cell_state(cell['cell_type'],
- cell['cell_number'], 1)
-
- # Activate LTE aggregation
- if testcase_params['endc_combo_config']['lte_scc_list']:
- self.keysight_test_app.apply_lte_carrier_agg(
- testcase_params['endc_combo_config']['lte_scc_list'])
-
- self.log.info('Waiting for LTE connections')
- # Turn airplane mode off
- num_apm_toggles = 5
- for idx in range(num_apm_toggles):
- self.log.info('Turning off airplane mode')
- asserts.assert_true(utils.force_airplane_mode(self.dut, False),
- 'Can not turn off airplane mode.')
- if self.keysight_test_app.wait_for_cell_status(
- 'LTE', 'CELL1', 'CONN', 180):
- break
- elif idx < num_apm_toggles - 1:
- self.log.info('Turning on airplane mode')
- asserts.assert_true(utils.force_airplane_mode(self.dut, True),
- 'Can not turn on airplane mode.')
- time.sleep(MEDIUM_SLEEP)
- else:
- asserts.fail('DUT did not connect to LTE.')
-
- if testcase_params['endc_combo_config']['nr_cell_count']:
- self.keysight_test_app.apply_carrier_agg()
- self.log.info('Waiting for 5G connection')
- connected = self.keysight_test_app.wait_for_cell_status(
- 'NR5G', testcase_params['endc_combo_config']['nr_cell_count'],
- ['ACT', 'CONN'], 60)
- if not connected:
- asserts.fail('DUT did not connect to NR.')
- time.sleep(SHORT_SLEEP)
+ if testcase_params['endc_combo_config']['lte_cell_count']:
+ # Connect flow for LTE and LTE+FR1 ENDC
+ # Turn on LTE cells
+ for cell in testcase_params['endc_combo_config']['cell_list']:
+ if cell['cell_type'] == 'LTE' and not self.keysight_test_app.get_cell_state(
+ cell['cell_type'], cell['cell_number']):
+ self.log.info('Turning LTE Cell {} on.'.format(
+ cell['cell_number']))
+ self.keysight_test_app.set_cell_state(cell['cell_type'],
+ cell['cell_number'], 1)
+ # Activate LTE aggregation if applicable
+ if testcase_params['endc_combo_config']['lte_scc_list']:
+ self.keysight_test_app.apply_lte_carrier_agg(
+ testcase_params['endc_combo_config']['lte_scc_list'])
+ self.log.info('Waiting for LTE connections')
+ # Turn airplane mode off
+ num_apm_toggles = 10
+ for idx in range(num_apm_toggles):
+ self.log.info('Turning off airplane mode')
+ #asserts.assert_true(utils.force_airplane_mode(self.dut, False),
+ # 'Can not turn off airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, False)
+ if self.keysight_test_app.wait_for_cell_status(
+ 'LTE', 'CELL1', 'CONN', 10*(idx+1)):
+ self.log.info('Connected! Waiting for {} seconds.'.format(LONG_SLEEP))
+ time.sleep(LONG_SLEEP)
+ break
+ elif idx < num_apm_toggles - 1:
+ self.log.info('Turning on airplane mode')
+ # asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ # 'Can not turn on airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ time.sleep(MEDIUM_SLEEP)
+ else:
+ asserts.fail('DUT did not connect to LTE.')
+
+ if testcase_params['endc_combo_config']['nr_cell_count']:
+ self.keysight_test_app.apply_carrier_agg()
+ self.log.info('Waiting for 5G connection')
+ connected = self.keysight_test_app.wait_for_cell_status(
+ 'NR5G', testcase_params['endc_combo_config']['nr_cell_count'],
+ ['ACT', 'CONN'], 60)
+ if not connected:
+ asserts.fail('DUT did not connect to NR.')
+ time.sleep(SHORT_SLEEP)
+ elif testcase_params['endc_combo_config']['nr_cell_count']:
+ # Connect flow for NR FR1 Standalone
+ # Turn on NR cells
+ for cell in testcase_params['endc_combo_config']['cell_list']:
+ if cell['cell_type'] == 'NR5G' and not self.keysight_test_app.get_cell_state(
+ cell['cell_type'], cell['cell_number']):
+ self.log.info('Turning NR Cell {} on.'.format(
+ cell['cell_number']))
+ self.keysight_test_app.set_cell_state(cell['cell_type'],
+ cell['cell_number'], 1)
+ num_apm_toggles = 10
+ for idx in range(num_apm_toggles):
+ self.log.info('Turning off airplane mode now.')
+ #asserts.assert_true(utils.force_airplane_mode(self.dut, False),
+ # 'Can not turn off airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, False)
+ if self.keysight_test_app.wait_for_cell_status(
+ 'NR5G', 'CELL1', 'CONN', 10*(idx+1)):
+ self.log.info('Connected! Waiting for {} seconds.'.format(LONG_SLEEP))
+ time.sleep(LONG_SLEEP)
+ break
+ elif idx < num_apm_toggles - 1:
+ self.log.info('Turning on airplane mode now.')
+ # asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ # 'Can not turn on airplane mode.')
+ tel_utils.toggle_airplane_mode(self.log, self.dut, True)
+ time.sleep(MEDIUM_SLEEP)
+ else:
+ asserts.fail('DUT did not connect to NR.')
def _test_throughput_bler(self, testcase_params):
"""Test function to run cellular throughput and BLER measurements.
@@ -449,27 +503,34 @@ class CellularThroughputBaseTest(base_test.BaseTestClass):
for power_idx in range(len(testcase_params['cell_power_sweep'][0])):
result = collections.OrderedDict()
# Set DL cell power
- result['cell_power'] = []
for cell_idx, cell in enumerate(
testcase_params['endc_combo_config']['cell_list']):
+ cell_power_array = []
current_cell_power = testcase_params['cell_power_sweep'][
cell_idx][power_idx]
- result['cell_power'].append(current_cell_power)
+ cell_power_array.append(current_cell_power)
self.keysight_test_app.set_cell_dl_power(
cell['cell_type'], cell['cell_number'], current_cell_power,
1)
-
+ result['cell_power'] = cell_power_array
# Start BLER and throughput measurements
- result = self.run_single_throughput_measurement(testcase_params)
- testcase_results['results'].append(result)
+ current_throughput = self.run_single_throughput_measurement(testcase_params)
+ lte_rx_meas = cputils.get_rx_measurements(self.dut, 'LTE')
+ nr_rx_meas = cputils.get_rx_measurements(self.dut, 'NR5G')
+ result['throughput_measurements'] = current_throughput
+ result['lte_rx_measurements'] = lte_rx_meas
+ result['nr_rx_measurements'] = nr_rx_meas
- self.print_throughput_result(result)
+ self.print_throughput_result(current_throughput)
+ self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas))
+ self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas))
- if (('lte_bler_result' in result
- and result['lte_bler_result']['total']['DL']['nack_ratio'] *
+ testcase_results['results'].append(result)
+ if (('lte_bler_result' in result['throughput_measurements']
+ and result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio'] *
100 > 99) or
- ('nr_bler_result' in result
- and result['nr_bler_result']['total']['DL']['nack_ratio'] *
+ ('nr_bler_result' in result['throughput_measurements']
+ and result['throughput_measurements']['nr_bler_result']['total']['DL']['nack_ratio'] *
100 > 99)):
stop_counter = stop_counter + 1
else:
diff --git a/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py b/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py
index 4aaff91ef..2a725902c 100644
--- a/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py
+++ b/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py
@@ -17,6 +17,7 @@
import collections
import logging
import os
+import re
import time
PCC_PRESET_MAPPING = {
@@ -228,3 +229,23 @@ def log_system_power_metrics(ad, verbose=1):
battery_meaurements['bat_{}'.format(par)] = ad.adb.shell(
'cat /sys/class/power_supply/maxfg/{}'.format(par))
logging.debug('Battery readings: {}'.format(battery_meaurements))
+
+
+def send_at_command(ad, at_command):
+ at_cmd_output = ad.adb.shell('am instrument -w -e request {} -e response wait '
+ '"com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'.format(at_command))
+ return at_cmd_output
+
+def get_rx_measurements(ad, cell_type):
+ cell_type_int = 7 if cell_type == 'LTE' else 8
+ rx_meas = send_at_command(ad, 'AT+GOOGGETRXMEAS\={}?'.format(cell_type_int))
+ rsrp_regex = r"RSRP\[\d+\]\s+(-?\d+)"
+ rsrp_values = [float(x) for x in re.findall(rsrp_regex, rx_meas)]
+ rsrq_regex = r"RSRQ\[\d+\]\s+(-?\d+)"
+ rsrq_values = [float(x) for x in re.findall(rsrq_regex, rx_meas)]
+ rssi_regex = r"RSSI\[\d+\]\s+(-?\d+)"
+ rssi_values = [float(x) for x in re.findall(rssi_regex, rx_meas)]
+ sinr_regex = r"SINR\[\d+\]\s+(-?\d+)"
+ sinr_values = [float(x) for x in re.findall(sinr_regex, rx_meas)]
+ return {'rsrp': rsrp_values, 'rsrq': rsrq_values, 'rssi': rssi_values, 'sinr': sinr_values}
+
diff --git a/acts_tests/acts_contrib/test_utils/gnss/device_doze.py b/acts_tests/acts_contrib/test_utils/gnss/device_doze.py
new file mode 100644
index 000000000..558037b27
--- /dev/null
+++ b/acts_tests/acts_contrib/test_utils/gnss/device_doze.py
@@ -0,0 +1,67 @@
+"""A module to trigger device into doze mode."""
+import enum
+from retry import retry
+
+
+_UNPLUG_POWER = "dumpsys battery unplug"
+_RESET_POWER = "dumpsys battery reset"
+_GET_DOZE_STATUS = "dumpsys deviceidle get {status}"
+_FORCE_IDLE = "dumpsys deviceidle force-idle {status}"
+_LEAVE_IDLE = "dumpsys deviceidle disable"
+
+
+class DozeState(enum.Enum):
+ """Doze state."""
+ INACTIVE = "INACTIVE"
+ ACTIVE = "ACTIVE"
+ IDLE = "IDLE"
+
+
+class DozeType(enum.Enum):
+ DEEP = "deep"
+ LIGHT = "light"
+
+
+def _check_doze_status(
+ dut,
+ doze_type: DozeType,
+ doze_state: DozeState):
+ command = _GET_DOZE_STATUS.format(status=doze_type.value)
+ doze_status = dut.adb.shell(command).strip()
+ dut.log.info("%s doze status is %s" % (doze_type.value, doze_status))
+ if doze_status != doze_state.value:
+ raise ValueError("Unexpected doze status.")
+
+
+@retry(exceptions=ValueError, tries=3, delay=1)
+def enter_doze_mode(
+ dut,
+ doze_type: DozeType):
+ """Sets device into doze mode according to given doze type.
+ Args:
+ dut: The device under test.
+ doze_type: The desired doze type.
+ Raises:
+ ValueError: When fail to sets device into doze mode.
+ """
+ dut.adb.shell(_UNPLUG_POWER)
+ dut.adb.shell(
+ _FORCE_IDLE.format(status=doze_type.value),
+ )
+ _check_doze_status(dut, doze_type, DozeState.IDLE)
+
+
+@retry(exceptions=ValueError, tries=3, delay=1)
+def leave_doze_mode(
+ dut,
+ doze_type: DozeType):
+ """Sets device out of doze mode.
+ Args:
+ dut: The device under test.
+ doze_type: The desired doze type.
+ Raises:
+ ValueError: When fail to sets device out of doze mode.
+ """
+ dut.adb.shell(_RESET_POWER)
+ dut.adb.shell(_LEAVE_IDLE)
+ _check_doze_status(dut, doze_type, DozeState.ACTIVE)
diff --git a/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py b/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py
index a9f957947..d2930a680 100644
--- a/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py
+++ b/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py
@@ -1,3 +1,4 @@
+from datetime import datetime
import os
import pathlib
import re
@@ -6,6 +7,13 @@ import tempfile
from collections import defaultdict
+_EVENT_TIME_FORMAT = "%Y/%m/%d %H:%M:%S.%f"
+_EVENT_TIME_PATTERN = re.compile(r"(\d+(?:\/\d+){2}\s\d{2}(?::\d{2}){2}\.\d+)\sRead:")
+_GNSS_CLOCK_START_LOG_PATTERN = re.compile(r"^GnssClock:")
+_GNSS_CLOCK_TIME_NANOS_PATTERN = re.compile(f"^\s+TimeNanos\s*=\s*([-]?\d*)")
+_GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN = re.compile(f"^\s+FullBiasNanos\s*=\s*([-]?\d*)")
+_GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN = re.compile(f"^\s+elapsedRealtimeNanos\s*=\s*([-]?\d*)")
+
class AdrInfo:
"""Represent one ADR value
An ADR value is a decimal number range from 0 - 31
@@ -120,6 +128,34 @@ class AdrStatistic:
self.total_count += adr_info.count
+class GnssClockSubEvent:
+ time_nanos: int
+ full_bias_nanos: int
+ elapsed_real_time_nanos: int
+
+ def __init__(self, event_time):
+ self.event_time = event_time
+
+ def parse(self, line):
+ if _GNSS_CLOCK_TIME_NANOS_PATTERN.search(line):
+ self.time_nanos = int(_GNSS_CLOCK_TIME_NANOS_PATTERN.search(line).group(1))
+ elif _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line):
+ self.full_bias_nanos = int(
+ _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line).group(1))
+ elif _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line):
+ self.elapsed_real_time_nanos = int(
+ _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line).group(1))
+
+ def __repr__(self) -> str:
+ return (f"event time: {self.event_time}, "
+ f"timenanos: {self.time_nanos}, "
+ f"full_bias: {self.full_bias_nanos}, "
+ f"elapsed_realtime: {self.elapsed_real_time_nanos}")
+
+ @property
+ def gps_elapsed_realtime_diff(self):
+ return self.time_nanos - self.full_bias_nanos - self.elapsed_real_time_nanos
+
class GnssMeasurement:
"""Represent the content of measurement file generated by gps tool"""
@@ -206,3 +242,21 @@ class GnssMeasurement:
adr_info = AdrInfo(key, value)
adr_static.add_adr_info(adr_info)
return adr_static
+
+ def get_gnss_clock_info(self):
+ sub_events = []
+ event_time = None
+ with tempfile.TemporaryDirectory() as folder:
+ local_measurement_file = os.path.join(folder, "measurement_file")
+ self.ad.pull_files(self._get_latest_measurement_file_path(), local_measurement_file)
+ with open(local_measurement_file) as f:
+ for line in f:
+ if re.search(_EVENT_TIME_PATTERN, line):
+ event_time = re.search(_EVENT_TIME_PATTERN, line).group(1)
+ event_time = datetime.strptime(event_time, _EVENT_TIME_FORMAT)
+ elif re.search(_GNSS_CLOCK_START_LOG_PATTERN, line):
+ sub_events.append(GnssClockSubEvent(event_time))
+ elif line.startswith(" "):
+ sub_events[-1].parse(line)
+ return sub_events
+
diff --git a/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py b/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py
index ec9ba616e..50202fce0 100644
--- a/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py
+++ b/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py
@@ -23,9 +23,10 @@ import fnmatch
import posixpath
import subprocess
import tempfile
+import functools
from retry import retry
from collections import namedtuple
-from datetime import datetime
+from datetime import datetime, timedelta
from xml.etree import ElementTree
from contextlib import contextmanager
from statistics import median
@@ -43,6 +44,7 @@ from acts_contrib.test_utils.gnss.gnss_measurement import GnssMeasurement
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.tel import tel_logging_utils as tlutils
from acts_contrib.test_utils.tel import tel_test_utils as tutils
+from acts_contrib.test_utils.gnss import device_doze
from acts_contrib.test_utils.gnss import gnssstatus_utils
from acts_contrib.test_utils.gnss import gnss_constant
from acts_contrib.test_utils.gnss import supl
@@ -115,7 +117,8 @@ XTRA_SERVER_2="http://"
XTRA_SERVER_3="http://"
"""
_BRCM_DUTY_CYCLE_PATTERN = re.compile(r".*PGLOR,\d+,STA.*")
-
+_WEARABLE_QCOM_VENDOR_REGEX = re.compile(r"init.svc.qcom")
+_GPS_ELAPSED_REALTIME_DIFF_TOLERANCE = 500_000
class GnssTestUtilsError(Exception):
pass
@@ -266,11 +269,11 @@ def enable_vendor_orbit_assistance_data(ad):
ad: An AndroidDevice object.
"""
ad.root_adb()
- if is_device_wearable(ad):
- lto_mode_wearable(ad, True)
- elif check_chipset_vendor_by_qualcomm(ad):
+ if check_chipset_vendor_by_qualcomm(ad):
disable_xtra_throttle(ad)
reboot(ad)
+ elif is_device_wearable(ad):
+ lto_mode_wearable(ad, True)
else:
lto_mode(ad, True)
@@ -285,10 +288,10 @@ def disable_vendor_orbit_assistance_data(ad):
ad: An AndroidDevice object.
"""
ad.root_adb()
- if is_device_wearable(ad):
- lto_mode_wearable(ad, False)
- elif check_chipset_vendor_by_qualcomm(ad):
+ if check_chipset_vendor_by_qualcomm(ad):
disable_qualcomm_orbit_assistance_data(ad)
+ elif is_device_wearable(ad):
+ lto_mode_wearable(ad, False)
else:
lto_mode(ad, False)
@@ -625,6 +628,9 @@ def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60):
Args:
ad: An AndroidDevice object.
dwelltime: Waiting time for modem reset. Default is 60 seconds.
+
+ Returns:
+ ssr_crash_time: The epoch time SSR is crashed
"""
mds_check = ad.adb.shell("pm path com.google.mdstest")
if not mds_check:
@@ -634,6 +640,7 @@ def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60):
'"com.google.mdstest/com.google.mdstest.instrument'
'.ModemCommandInstrumentation"')
ad.log.info("Triggering modem SSR crash by MDS")
+ ssr_crash_time = get_current_epoch_time()
output = ad.adb.shell(cmd, ignore_status=True)
ad.log.debug(output)
time.sleep(dwelltime)
@@ -643,6 +650,7 @@ def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60):
else:
raise signals.TestError(
"Failed to trigger modem SSR crash by MDS. \n%s" % output)
+ return ssr_crash_time
def check_xtra_download(ad, begin_time):
@@ -1027,8 +1035,11 @@ def gnss_tracking_via_gtw_gpstool(ad,
set to False.
freq: An integer to set location update frequency. Default set to 0.
is_screen_off: whether to turn off during tracking
+
+ Returns:
+ The datetime obj of first fixed
"""
- process_gnss_by_gtw_gpstool(
+ first_fixed_time = process_gnss_by_gtw_gpstool(
ad, criteria=criteria, api_type=api_type, meas_flag=meas_flag, freq=freq,
bg_display=is_screen_off)
ad.log.info("Start %s tracking test for %d minutes" % (api_type.upper(),
@@ -1039,6 +1050,8 @@ def gnss_tracking_via_gtw_gpstool(ad,
ad.log.info("Successfully tested for %d minutes" % testtime)
start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type)
+ return first_fixed_time
+
def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss",
ignore_hal_crash=False):
@@ -1056,7 +1069,9 @@ def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss",
# add sleep here to avoid too many request and cause device not responding
time.sleep(1)
-def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location):
+def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location,
+ raninterval: bool = False, mininterval: int = 10,
+ maxinterval: int = 40):
"""Run GNSS TTFF test with selected mode and parse the results.
Args:
@@ -1069,7 +1084,12 @@ def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location):
# Before running TTFF, we will run tracking and try to get first fixed.
# But the TTFF before TTFF doesn't apply to any criteria, so we set a maximum value.
process_gnss_by_gtw_gpstool(ad, criteria=FIRST_FIXED_MAX_WAITING_TIME)
- ttff_start_time = start_ttff_by_gtw_gpstool(ad, mode, test_cycle)
+ ttff_start_time = start_ttff_by_gtw_gpstool(ad,
+ mode,
+ iteration=test_cycle,
+ raninterval=raninterval,
+ mininterval=mininterval,
+ maxinterval=maxinterval)
ttff_data = process_ttff_by_gtw_gpstool(ad, ttff_start_time, true_location)
result = check_ttff_data(ad, ttff_data, gnss_constant.TTFF_MODE.get(mode), criteria)
asserts.assert_true(
@@ -1207,7 +1227,7 @@ def verify_gps_time_should_be_close_to_device_time(ad, tracking_result):
tracking_result: The result we get from GNSS tracking.
"""
ad.log.info("Validating GPS/Device time difference")
- max_time_diff_in_seconds = 2.0
+ max_time_diff_in_seconds = 3.0 if is_device_wearable() else 2.0
exceed_report = []
for report in tracking_result.values():
time_diff_in_seconds = abs((report.report_time - report.device_time).total_seconds())
@@ -1973,10 +1993,13 @@ def check_chipset_vendor_by_qualcomm(ad):
Returns:
True if it's by Qualcomm. False irf not.
"""
- ad.root_adb()
- soc = str(ad.adb.shell("getprop gsm.version.ril-impl"))
- ad.log.debug("SOC = %s" % soc)
- return "Qualcomm" in soc
+ if is_device_wearable(ad):
+ props = str(ad.adb.shell("getprop"))
+ return True if _WEARABLE_QCOM_VENDOR_REGEX.search(props) else False
+ else:
+ soc = str(ad.adb.shell("getprop gsm.version.ril-impl"))
+ ad.log.debug("SOC = %s" % soc)
+ return "Qualcomm" in soc
def delete_lto_file(ad):
@@ -2248,30 +2271,114 @@ def get_process_pid(ad, process_name):
return pid
-def restart_gps_daemons(ad):
+def restart_gps_daemons(ad, service):
"""Restart GPS daemons by killing services of gpsd, lhd and scd.
Args:
ad: An AndroidDevice object.
+
+ Returns:
+ kill_start_time: The time GPSd being killed.
"""
- gps_daemons_list = ["gpsd", "lhd", "scd"]
+ kill_start_time = 0
ad.root_adb()
- for service in gps_daemons_list:
- time.sleep(3)
- ad.log.info("Kill GPS daemon \"%s\"" % service)
- service_pid = get_process_pid(ad, service)
- ad.log.debug("%s PID: %s" % (service, service_pid))
- ad.adb.shell(f"kill -9 {service_pid}")
- # Wait 3 seconds for daemons and services to start.
- time.sleep(3)
+ ad.log.info("Kill GPS daemon \"%s\"" % service)
+ service_pid = get_process_pid(ad, service)
+ ad.log.debug("%s PID: %s" % (service, service_pid))
+ ad.adb.shell(f"kill -9 {service_pid}")
+ kill_start_time = get_current_epoch_time()
+ new_pid, recover_time = get_new_pid_process_time(ad, service_pid, service, 20)
+
+ ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % (
+ service, service_pid, new_pid))
+ ad.log.info("\t- \"%s\" process recovered time: %d ms" % (service, recover_time))
+ return kill_start_time
+
- new_pid = get_process_pid(ad, service)
- ad.log.debug("%s new PID: %s" % (service, new_pid))
- if not new_pid or service_pid == new_pid:
- raise signals.TestError("Unable to restart \"%s\"" % service)
+def get_new_pid_process_time(ad, origin_pid, process_name, timeout):
+ """Get the new process PID and the time it took for restarting
+
+ Args:
+ ad: An AndroidDevice object.
+ origin_pid: The original pid of specified process
+ process_name: Name of process
+ timeout: Timeout of checking
+
+ Returns:
+ 1. How long takes for restarting the specified process.
+ 2. New PID
+ """
+ begin_time = get_current_epoch_time()
+ pid = None
+ while not pid and get_current_epoch_time() - begin_time < timeout * 1000:
+ pid = get_process_pid(ad, process_name)
+ if pid and origin_pid != pid:
+ ad.log.debug("%s new PID: %s" % (process_name, pid))
+ return pid, get_current_epoch_time() - begin_time
+ raise ValueError("Unable to restart \"%s\"" % process_name)
- ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % (
- service, service_pid, new_pid))
+
+def get_gpsd_update_time(ad, begin_time, dwelltime=30):
+ """Get the UTC time of first GPSd status update shows up after begin_time
+
+ Args:
+ ad: An AndroidDevice object.
+ begin_time: The start time of the log.
+ dwelltime: Waiting time for gnss status update. Default is 30 seconds.
+
+ Returns:
+ The datetime object which indicates when is first GPSd status update
+ """
+ ad.log.info("Checking GNSS status after %s",
+ datetime.fromtimestamp( begin_time / 1000))
+ time.sleep(dwelltime)
+
+ gnss_status = ad.search_logcat("Gnss status update",
+ begin_time=begin_time)
+ if not gnss_status:
+ raise ValueError("No \"GNSS status update\" found in logs.")
+ ad.log.info("GNSS status update found.")
+ return int(gnss_status[0]["datetime_obj"].timestamp() * 1000)
+
+
+def get_location_fix_time_via_gpstool_log(ad, begin_time):
+ """Get the UTC time of location update with given
+ device time from the log output by GPSTool.
+
+ Args:
+ ad: An AndroidDevice object.
+ begin_time: The start time of the log.
+
+ Returns:
+ The datetime object which indicates when is the
+ first location fix time shows up
+ """
+ location_fix_time = ad.search_logcat("GPSService: Time",
+ begin_time=begin_time)
+ if not location_fix_time:
+ raise ValueError("No \"Location fix time\" found in logs.")
+ return int(location_fix_time[0]["datetime_obj"].timestamp() * 1000)
+
+
+def get_gps_process_and_kill_function_by_vendor(ad):
+ """Get process to be killed by vendor and
+ return the kill function accordingly
+
+ Args:
+ ad: An AndroidDevice object.
+
+ Returns:
+ killed_processes: What processes to be killed
+ functions: The methods for killing each process
+ """
+ if check_chipset_vendor_by_qualcomm(ad):
+ ad.log.info("Triggered modem SSR")
+ return {"ssr": functools.partial(gnss_trigger_modem_ssr_by_mds, ad=ad)}
+ else:
+ ad.log.info("Triggered restarting GPS daemons")
+ return {"gpsd": functools.partial(restart_gps_daemons, ad=ad, service="gpsd"),
+ "scd": functools.partial(restart_gps_daemons, ad=ad, service="scd"),
+ "lhd": functools.partial(restart_gps_daemons, ad=ad, service="lhd"),}
def is_device_wearable(ad):
@@ -2585,7 +2692,8 @@ def is_wearable_btwifi(ad):
"""
package = ad.adb.getprop("ro.product.product.name")
ad.log.debug("[ro.product.product.name]: [%s]" % package)
- return "btwifi" in package
+ # temp solution. Will check with dev team if there is a command to check.
+ return "btwifi" in package or ad.model == 'aurora'
def compare_watch_phone_location(ad,watch_file, phone_file):
@@ -2681,103 +2789,73 @@ def delete_bcm_nvmem_sto_file(ad):
ad.log.info("Delete BCM's NVMEM ephemeris files.\n%s" % status)
-def bcm_gps_xml_update_option(ad,
- option,
- search_line=None,
- append_txt=None,
- update_txt=None,
- delete_txt=None,
- gps_xml_path=BCM_GPS_XML_PATH):
- """Append parameter setting in gps.xml for BCM solution
+def bcm_gps_xml_update_option(
+ ad, child_tag, items_to_update={}, items_to_delete=[], gps_xml_path=BCM_GPS_XML_PATH):
+ """Updates gps.xml attributes.
+
+ The process will go through update first then delete.
Args:
- option: A str to identify the operation (add/update/delete).
- ad: An AndroidDevice object.
- search_line: Pattern matching of target
- line for appending new line data.
- append_txt: New lines that will be appended after the search_line.
- update_txt: New line to update the original file.
- delete_txt: lines to delete from the original file.
- gps_xml_path: gps.xml file location of DUT
+ ad: Device under test.
+ child_tag: (str) Which child node should be updated.
+ items_to_update: (dict) The attributes to be updated.
+ items_to_delete: (list) The attributes to be deleted.
+ gps_xml_path: (str) The gps.xml file path. Default is BCM_GPS_XML_PATH.
"""
remount_device(ad)
- #Update gps.xml
- tmp_log_path = tempfile.mkdtemp()
- ad.pull_files(gps_xml_path, tmp_log_path)
- gps_xml_tmp_path = os.path.join(tmp_log_path, "gps.xml")
- gps_xml_file = open(gps_xml_tmp_path, "r")
- lines = gps_xml_file.readlines()
- gps_xml_file.close()
- fout = open(gps_xml_tmp_path, "w")
- if option == "add":
- for line in lines:
- if line.strip() in append_txt:
- ad.log.info("{} is already in the file. Skip".format(append_txt))
- continue
- fout.write(line)
- if search_line in line:
- for add_txt in append_txt:
- fout.write(add_txt)
- ad.log.info("Add new line: '{}' in gps.xml.".format(add_txt))
- elif option == "update":
- for line in lines:
- if search_line in line:
- ad.log.info(line)
- fout.write(update_txt)
- ad.log.info("Update line: '{}' in gps.xml.".format(update_txt))
- continue
- fout.write(line)
- elif option == "delete":
- for line in lines:
- if delete_txt in line:
- ad.log.info("Delete line: '{}' in gps.xml.".format(line.strip()))
- continue
- fout.write(line)
- fout.close()
+ # to prevent adding nso into xml file
+ ElementTree.register_namespace("", "http://www.glpals.com/")
+ with tempfile.TemporaryDirectory() as temp_dir:
+ local_xml = os.path.join(temp_dir, "gps.xml.ori")
+ modified_xml = os.path.join(temp_dir, "gps.xml")
+ ad.pull_files(gps_xml_path, local_xml)
+ xml_data = ElementTree.parse(local_xml)
+ root_data = xml_data.getroot()
+ child_node = None
+
+ for node in root_data:
+ if node.tag.endswith(child_tag):
+ child_node = node
+ break
- # Update gps.xml with gps_new.xml
- ad.push_system_file(gps_xml_tmp_path, gps_xml_path)
+ if child_node is None:
+ raise LookupError(f"Couldn't find node with {child_tag}")
- # remove temp folder
- shutil.rmtree(tmp_log_path, ignore_errors=True)
+ for key, value in items_to_update.items():
+ child_node.attrib[key] = value
+
+ for key in items_to_delete:
+ if key in child_node.attrib:
+ child_node.attrib.pop(key)
+
+ xml_data.write(modified_xml, xml_declaration=True, encoding="utf-8", method="xml")
+ ad.push_system_file(modified_xml, gps_xml_path)
+ ad.log.info("Finish modify gps.xml")
def bcm_gps_ignore_warmstandby(ad):
""" remove warmstandby setting in BCM gps.xml to reset tracking filter
Args:
ad: An AndroidDevice object.
"""
- search_line_tag = '<gll\n'
- delete_line_str = 'WarmStandbyTimeout1Seconds'
+ search_line_tag = 'gll'
+ delete_line_str = ['WarmStandbyTimeout1Seconds', 'WarmStandbyTimeout2Seconds']
bcm_gps_xml_update_option(ad,
- "delete",
- search_line_tag,
- append_txt=None,
- update_txt=None,
- delete_txt=delete_line_str)
-
- search_line_tag = '<gll\n'
- delete_line_str = 'WarmStandbyTimeout2Seconds'
- bcm_gps_xml_update_option(ad,
- "delete",
- search_line_tag,
- append_txt=None,
- update_txt=None,
- delete_txt=delete_line_str)
+ child_tag=search_line_tag,
+ items_to_delete=delete_line_str)
def bcm_gps_ignore_rom_alm(ad):
""" Update BCM gps.xml with ignoreRomAlm="True"
Args:
ad: An AndroidDevice object.
"""
- search_line_tag = '<hal\n'
- append_line_str = [' IgnoreJniTime=\"true\"\n']
- bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str)
+ search_line_tag = 'hal'
+ append_line_str = {"IgnoreJniTime":"true",
+ "AutoColdStartSignal":"SIMULATED"}
+ bcm_gps_xml_update_option(ad, child_tag=search_line_tag, items_to_update=append_line_str)
- search_line_tag = '<gll\n'
- append_line_str = [' IgnoreRomAlm=\"true\"\n',
- ' AutoColdStartSignal=\"SIMULATED\"\n',
- ' IgnoreJniTime=\"true\"\n']
- bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str)
+ search_line_tag = "gll"
+ append_line_str = {"IgnoreRomAlm":"true"}
+ bcm_gps_xml_update_option(ad, child_tag=search_line_tag, items_to_update=append_line_str)
def check_inject_time(ad):
@@ -2866,49 +2944,6 @@ def disable_ramdump(ad):
ad.start_adb_logcat()
-def deep_suspend_device(ad):
- """Force DUT to enter deep suspend mode
-
- When DUT is connected to PCs, it won't enter deep suspend mode
- by pressing power button.
-
- To force DUT enter deep suspend mode, we need to send the
- following command to DUT "echo mem >/sys/power/state"
-
- To make sure the DUT stays in deep suspend mode for a while,
- it will send the suspend command 3 times with 15s interval
-
- Args:
- ad: An AndroidDevice object.
- """
- ad.log.info("Ready to go to deep suspend mode")
- begin_time = get_device_time(ad)
- ad.droid.goToSleepNow()
- ensure_power_manager_is_dozing(ad, begin_time)
- ad.stop_services()
- try:
- command = "echo deep > /sys/power/mem_sleep && echo mem > /sys/power/state"
- for i in range(1, 4):
- ad.log.debug(f"Send deep suspend command round {i}")
- ad.adb.shell(command, ignore_status=True)
- # sleep here to ensure the device stays enough time in deep suspend mode
- time.sleep(15)
- if not _is_device_enter_deep_suspend(ad):
- raise signals.TestFailure("Device didn't enter deep suspend mode")
- ad.log.info("Wake device up now")
- except Exception:
- # when exception happen, it's very likely the device is rebooting
- # to ensure the test can go on, wait for the device is ready
- ad.log.warn("Device may be rebooting, wait for it")
- ad.wait_for_boot_completion()
- ad.root_adb()
- raise
- finally:
- tutils.bring_up_sl4a(ad)
- ad.start_adb_logcat()
- ad.droid.wakeUpNow()
-
-
def get_device_time(ad):
"""Get current datetime from device
@@ -2942,27 +2977,23 @@ def ensure_power_manager_is_dozing(ad, begin_time):
else:
ad.log.warn("Power manager didn't enter dozing")
-
-
-def _is_device_enter_deep_suspend(ad):
- """Check device has been enter deep suspend mode
-
- If device has entered deep suspend mode, we should be able to find keyword
- "suspend entry (deep)"
+def enter_deep_doze_mode(ad, lasting_time_in_seconds: int):
+ """Puts the device into deep doze mode.
Args:
- ad: An AndroidDevice object.
-
- Returns:
- bool: True / False -> has / has not entered deep suspend
+ ad: The device under test.
+ lasting_time_in_seconds: How long does the doze mode last.
"""
- cmd = f"adb -s {ad.serial} logcat -d|grep \"suspend entry (deep)\""
- process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, shell=True)
- result, _ = process.communicate()
- ad.log.debug(f"suspend result = {result}")
+ target_time = datetime.now() + timedelta(seconds=lasting_time_in_seconds)
- return bool(result)
+ try:
+ ad.log.info("Enter deep doze mode for %d seconds" % lasting_time_in_seconds)
+ device_doze.enter_doze_mode(ad, device_doze.DozeType.DEEP)
+ while datetime.now() < target_time:
+ time.sleep(1)
+ finally:
+ ad.log.info("Leave deep doze mode")
+ device_doze.leave_doze_mode(ad, device_doze.DozeType.DEEP)
def check_location_report_interval(ad, location_reported_time_src, total_seconds, tolerance):
@@ -3026,6 +3057,7 @@ def set_screen_status(ad, off=True):
def full_gnss_measurement(ad):
"""Context manager function to enable full gnss measurement"""
try:
+ ad.adb.shell("settings put global development_settings_enabled 1")
ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 1")
yield ad
finally:
@@ -3268,4 +3300,39 @@ def log_current_epoch_time(ad, sponge_key):
sponge_key: The key name of the sponge property.
"""
current_epoch_time = get_current_epoch_time() // 1000
- ad.log.info(f"TestResult {sponge_key} {current_epoch_time}") \ No newline at end of file
+ ad.log.info(f"TestResult {sponge_key} {current_epoch_time}")
+
+
+def validate_diff_of_gps_clock_elapsed_realtime(ad, start_time):
+ """Validates the diff of gps clock and elapsed realtime should be stable.
+
+ Args:
+ ad: The device under test.
+ start_time: When should the validation start. For BRCM devices, the PPS feature takes some
+ time after first fixed to start working. Therefore we should ignore some data.
+ """
+ last_gps_elapsed_realtime_diff = 0
+ variation_diff = {}
+
+ for clock in GnssMeasurement(ad).get_gnss_clock_info():
+ if clock.event_time < start_time:
+ continue
+
+ if not bool(last_gps_elapsed_realtime_diff):
+ last_gps_elapsed_realtime_diff = clock.gps_elapsed_realtime_diff
+ continue
+
+ current_gps_elapsed_realtime_diff = clock.gps_elapsed_realtime_diff
+ variation_diff[clock.event_time] = abs(
+ current_gps_elapsed_realtime_diff - last_gps_elapsed_realtime_diff)
+ last_gps_elapsed_realtime_diff = current_gps_elapsed_realtime_diff
+
+ over_criteria_data = [
+ (event_time, diff) for (event_time, diff) in variation_diff.items() if (
+ diff > _GPS_ELAPSED_REALTIME_DIFF_TOLERANCE)
+ ]
+
+ asserts.assert_true(
+ [] == over_criteria_data,
+ msg=f"Following data are over criteria: {over_criteria_data}",
+ )
diff --git a/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py b/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py
index 43f792073..51b96f232 100644
--- a/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py
+++ b/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py
@@ -1,7 +1,7 @@
TEST_NAME_BY_TESTTRACKER_UUID = {
# GnssFunctionTest
"test_cs_first_fixed_system_server_restart": "8169c19d-ba2a-4fef-969b-87f793f4e699",
- "test_cs_ttff_after_gps_service_restart": "247110d9-1c9e-429e-8e73-f16dd4a1ac74",
+ "test_recovery_and_location_time_after_gnss_services_restart": "247110d9-1c9e-429e-8e73-f16dd4a1ac74",
"test_gnss_one_hour_tracking": "b3d20ecb-3727-48ed-8a03-19694cc726c1",
"test_duty_cycle_function": "0bbfb818-da93-41d7-8d83-15bc53d8d2cf",
"test_gnss_init_error": "c661780d-4864-4292-9988-88f64448fb78",
@@ -24,6 +24,7 @@ TEST_NAME_BY_TESTTRACKER_UUID = {
"test_measure_adr_rate_after_10_mins_tracking": "7ebf3b52-229a-4eaf-bbff-7c527e4a1d7c",
"test_hal_crashing_should_resume_tracking": "0aee4450-edce-4e1a-8744-70d8c01937b0",
"test_power_save_mode_should_apply_latest_measurement_setting": "59a14da2-40df-4106-a190-dcbcd2e877e0",
+ "test_the_diff_of_gps_clock_and_elapsed_realtime_should_be_stable": "ea6ba987-216c-4cd0-89bc-eace8a691210",
# GnssConcurrencyTest
"test_gnss_concurrency_location_1_chre_1": "cbd9ff54-4405-44a4-ac20-de33278406d1",
"test_gnss_concurrency_location_1_chre_8": "ab56cb47-384e-4269-b2d8-6e80ce066de2",
diff --git a/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py b/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py
index 7af1e127e..bb59513b1 100644
--- a/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py
+++ b/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py
@@ -155,7 +155,8 @@ class PowerBaseTest(base_test.BaseTestClass):
pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT,
ap_dtim_period=None,
- bits_root_rail_csv_export=False)
+ bits_root_rail_csv_export=False,
+ is_odpm_supported=False)
# Setup the must have controllers, phone and monsoon
self.dut = self.android_devices[0]
@@ -165,7 +166,11 @@ class PowerBaseTest(base_test.BaseTestClass):
# Make odpm path for P21 or later
platform = self.dut.adb.shell(GET_PROPERTY_HARDWARE_PLATFORM)
self.log.info('The hardware platform is {}'.format(platform))
- if platform.startswith('gs'):
+ if (
+ platform.startswith('gs')
+ or platform.startswith('z')
+ or self.is_odpm_supported
+ ):
self.odpm_folder = os.path.join(self.log_path, 'odpm')
os.makedirs(self.odpm_folder, exist_ok=True)
self.log.info('For P21 or later, create odpm folder {}'.format(
diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py b/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py
index 15043d4f2..720affc85 100644
--- a/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py
+++ b/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py
@@ -1,10 +1,13 @@
+import json
import os
from typing import Optional, List
import time
-from acts import context
+
+from acts import asserts
from acts import signals
-from acts.controllers.cellular_lib import AndroidCellularDut
import acts_contrib.test_utils.power.cellular.cellular_power_base_test as PWCEL
+from acts_contrib.test_utils.tel import tel_test_utils as telutils
+from acts_contrib.test_utils.power.cellular import modem_logs
# TODO: b/261639867
class AtUtil():
@@ -14,6 +17,11 @@ class AtUtil():
dut: AndroidDevice controller object.
"""
ADB_CMD_DISABLE_TXAS = 'am instrument -w -e request at+googtxas=2 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ ADB_CMD_GET_TXAS = 'am instrument -w -e request at+googtxas? -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ ADB_MODEM_STATUS = 'cat /sys/bus/platform/devices/cpif/modem_state'
+ ADB_CMD_SET_NV = ('am instrument -w '
+ '-e request \'at+googsetnv=\"{nv_name}\",{nv_index},\"{nv_value}\"\' '
+ '-e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"')
def __init__(self, dut, log) -> None:
self.dut = dut
@@ -21,33 +29,43 @@ class AtUtil():
# TODO: to be remove when b/261639867 complete,
# and we are using parent method.
- def send(self, cmd: str,) -> Optional[str]:
- res = str(self.dut.adb.shell(cmd))
- self.log.info(f'cmd sent: {cmd}')
- self.log.info(f'response: {res}')
- if 'SUCCESS' in res:
- self.log.info('Command executed.')
- else:
- self.log.error('Fail to executed command.')
+ def send(self, cmd: str, retries: int=5) -> Optional[str]:
+ for _ in range(30):
+ modem_status = self.dut.adb.shell(self.ADB_MODEM_STATUS)
+ self.log.debug(f'Modem status: {modem_status}')
+ if modem_status == 'ONLINE':
+ break
+ time.sleep(1)
+
+ wait_for_device_ready_time = 2
+ for i in range(retries):
+ res = self.dut.adb.shell(cmd)
+ self.log.info(f'cmd sent: {cmd}')
+ self.log.debug(f'response: {res}')
+ if 'SUCCESS' in res and 'OK' in res:
+ return res
+ else:
+ self.log.warning('Fail to execute cmd, retry to send again.')
+ time.sleep(wait_for_device_ready_time)
+ self.log.error(f'Fail to execute cmd: {cmd}')
return res
- def lock_LTE(self):
+ def lock_band(self):
+ """Lock lte and nr bands.
+
+ LTE bands to be locked include B1, B2, B4.
+ NR bands to belocked include n71, n78, n260.
+ """
adb_enable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",00,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
- adb_set_band_lock_mode_lte = r'am instrument -w -e request at+GOOGSETNV=\"NASU.SIPC.NetworkMode.ManualMode\",0,\"0D\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
- adb_set_band_lock_bitmap_0 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",0,\"09,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ adb_set_band_lock_bitmap_0 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",0,\"0B,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
adb_set_band_lock_bitmap_1 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",1,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
adb_set_band_lock_bitmap_2 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",2,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
adb_set_band_lock_bitmap_3 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",3,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
-
# enable lte
self.send(adb_enable_band_lock_lte)
time.sleep(2)
- # OD is for NR/LTE in 4412 menu
- self.send(adb_set_band_lock_mode_lte)
- time.sleep(2)
-
- # lock to B1 and B4
+ # lock to B1, B2 and B4
self.send(adb_set_band_lock_bitmap_0)
time.sleep(2)
self.send(adb_set_band_lock_bitmap_1)
@@ -57,28 +75,119 @@ class AtUtil():
self.send(adb_set_band_lock_bitmap_3)
time.sleep(2)
- def clear_lock_band(self):
- adb_set_band_lock_mode_auto = r'am instrument -w -e request at+GOOGSETNV=\"NASU.SIPC.NetworkMode.ManualMode\",0,\"03\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
- adb_disable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",0,\"00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
-
- # band lock mode auto
- self.send(adb_set_band_lock_mode_auto)
+ adb_enable_band_lock_nr = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_BASED_BAND_LIST_SUPPORT\",00,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ self.send(adb_enable_band_lock_nr)
+ time.sleep(2)
+ adb_add_band_list_n71 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",00,\"47,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ self.send(adb_add_band_list_n71)
time.sleep(2)
+ adb_add_band_list_n78 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",01,\"4E,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ self.send(adb_add_band_list_n78)
+ time.sleep(2)
+ adb_add_band_list_n260 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",02,\"04,01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ self.send(adb_add_band_list_n260)
+ time.sleep(2)
+
+ def disable_lock_band_lte(self):
+ adb_disable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",0,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
# disable band lock lte
self.send(adb_disable_band_lock_lte)
time.sleep(2)
def disable_txas(self):
+ res = self.send(self.ADB_CMD_GET_TXAS)
+ if '+GOOGGETTXAS:2' in res:
+ self.log.info('TXAS is in default.')
+ return res
cmd = self.ADB_CMD_DISABLE_TXAS
response = self.send(cmd)
return 'OK' in response
+ def get_band_lock_info(self):
+ cmd = r'am instrument -w -e request at+GOOGGETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ res = self.send(cmd)
+ cmd = r'am instrument -w -e request at+GOOGGETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ res += self.send(cmd)
+ cmd = r'am instrument -w -e request at+GOOGGETNV=\"!NRRRC.SIM_BASED_BAND_LIST_SUPPORT\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ res += self.send(cmd)
+ cmd = r'am instrument -w -e request at+GOOGGETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ res += self.send(cmd)
+ return res
+
+ def set_nv(self, nv_name, index, value):
+ cmd = self.ADB_CMD_SET_NV.format(
+ nv_name=nv_name,
+ nv_index=index,
+ nv_value=value
+ )
+ res = self.send(cmd)
+ return res
+
+ def get_sim_slot_mapping(self):
+ cmd = r'am instrument -w -e request at+slotmap? -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ return self.send(cmd)
+
+ def set_single_psim(self):
+ cmd = r'am instrument -w -e request at+slotmap=1 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ return self.send(cmd)
+
+ def disable_dsp(self):
+ cmd = r'am instrument -w -e request at+googsetnv=\"NASU\.LCPU\.LOG\.SWITCH\",0,\"00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ return self.send(cmd)
+
+ def get_dsp_status(self):
+ cmd = r'am instrument -w -e request at+googgetnv=\"NASU\.LCPU\.LOG\.SWITCH\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
+ return self.send(cmd)
+
+ def enable_ims_nr(self):
+ # set !NRCAPA.Gen.VoiceOverNr
+ self.set_nv(
+ nv_name = '!NRCAPA.Gen.VoiceOverNr',
+ index = '0',
+ value = '01'
+ )
+ # set PSS.AIMS.Enable.NRSACONTROL
+ self.set_nv(
+ nv_name = 'PSS.AIMS.Enable.NRSACONTROL',
+ index = '0',
+ value = '00'
+ )
+ # set DS.PSS.AIMS.Enable.NRSACONTROL
+ self.set_nv(
+ nv_name = 'DS.PSS.AIMS.Enable.NRSACONTROL',
+ index = '0',
+ value = '00'
+ )
+ if self.dut.model == 'oriole':
+ # For P21, NR.CONFIG.MODE/DS.NR.CONFIG.MODE
+ self.set_nv(
+ nv_name = 'NR.CONFIG.MODE',
+ index = '0',
+ value = '11'
+ )
+ # set DS.NR.CONFIG.MODE
+ self.set_nv(
+ nv_name = 'DS.NR.CONFIG.MODE',
+ index = '0',
+ value = '11'
+ )
+ else:
+ # For P22, NASU.NR.CONFIG.MODE to 11
+ self.set_nv(
+ nv_name = 'NASU.NR.CONFIG.MODE',
+ index = '0',
+ value = '11'
+ )
+
class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
# Key for ODPM report
ODPM_ENERGY_TABLE_NAME = 'PowerStats HAL 2.0 energy meter'
ODPM_MODEM_CHANNEL_NAME = '[VSYS_PWR_MODEM]:Modem'
+ # Pass fail threshold lower bound
+ THRESHOLD_TOLERANCE_LOWER_BOUND_DEFAULT = 0.3
+
# Key for custom_property in Sponge
CUSTOM_PROP_KEY_BUILD_ID = 'build_id'
CUSTOM_PROP_KEY_INCR_BUILD_ID = 'incremental_build_id'
@@ -94,6 +203,7 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
CUSTOM_PROP_KEY_MODEM_KIBBLE_PCIE_POWER = 'modem_kibble_pcie_power'
CUSTOM_PROP_KEY_RFFE_POWER = 'rffe_power'
CUSTOM_PROP_KEY_MMWAVE_POWER = 'mmwave_power'
+ CUSTOM_PROP_KEY_CURRENT_REFERENCE_TARGET = 'reference_target'
# kibble report
KIBBLE_SYSTEM_RECORD_NAME = '- name: default_device.C10_EVT_1_1.Monsoon:mA'
MODEM_PCIE_RAIL_NAME_LIST = [
@@ -102,27 +212,41 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
'PP0850_L8C_PCIE'
]
- MODEM_RFFE_RAIL_NAME_LIST = [
- 'PP1200_L31C_RFFE',
- 'VSYS_PWR_RFFE',
- 'PP2800_L33C_RFFE'
- ]
+ MODEM_RFFE_RAIL_NAME = 'VSYS_PWR_RFFE'
MODEM_POWER_RAIL_NAME = 'VSYS_PWR_MODEM'
+ MODEM_POWER_RAIL_WO_PCIE_NAME = 'VSYS_PWR_MODEM_W_O_PCIE'
+
+ WEARABLE_POWER_RAIL = 'LTE_DC'
+
+ WEARABLE_SOC_MODEM_RAIL = 'SOC_MODEM_USBHS'
+
MODEM_MMWAVE_RAIL_NAME = 'VSYS_PWR_MMWAVE'
- MONSOON_RAIL_NAME = 'Monsoon'
+ MONSOON_RAIL_NAME = 'Monsoon:mW'
# params key
MONSOON_VOLTAGE_KEY = 'mon_voltage'
MDSTEST_APP_APK_NAME = 'mdstest.apk'
- ADB_CMD_INSTALL = 'install {apk_path}'
- ADB_CMD_DISABLE_TXAS = 'am instrument -w -e request at+googtxas=2 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'
- ADB_CMD_SET_NV = ('am instrument -w '
- '-e request at+googsetnv=\"{nv_name}\",{nv_index},\"{nv_value}\" '
- '-e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"')
+
+ ADB_CMD_ENABLE_ALWAYS_ON_LOGGING = (
+ 'am broadcast -n com.android.pixellogger/.receiver.AlwaysOnLoggingReceiver '
+ '-a com.android.pixellogger.service.logging.LoggingService.ACTION_CONFIGURE_ALWAYS_ON_LOGGING '
+ '-e intent_key_enable "true" '
+ '-e intent_key_config "Lassen\ default" '
+ '--ei intent_key_max_log_size_mb 100 '
+ '--ei intent_key_max_number_of_files 20'
+ )
+ ADB_CMD_DISABLE_ALWAYS_ON_LOGGING = (
+ 'am start-foreground-service -a '
+ 'com.android.pixellogger.service.logging.LoggingService.ACTION_STOP_LOGGING')
+
+ ADB_CMD_TOGGLE_MODEM_LOG = 'setprop persist.vendor.sys.modem.logging.enable {state}'
+
+ _ADB_GET_ACTIVE_NETWORK = ('dumpsys connectivity | '
+ 'grep \'Active default network\'')
def __init__(self, controllers):
super().__init__(controllers)
@@ -133,6 +257,9 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
self.mmwave_power = 0
self.modem_power = 0
self.monsoon_power = 0
+ self.kibble_error_range = 2
+ self.system_power = 0
+ self.odpm_power = 0
def setup_class(self):
super().setup_class()
@@ -144,21 +271,85 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
self.at_util = AtUtil(self.cellular_dut.ad, self.log)
# preset UE.
+ self.log.info(f'Bug report mode: {self.bug_report}')
+ self.toggle_modem_log(False)
self.log.info('Installing mdstest app.')
self.install_apk()
- self.log.info('Disable antenna switch.')
- is_txas_disabled = self.at_util.disable_txas()
- self.log.info('Disable txas: ' + str(is_txas_disabled))
+ self.unpack_userparams(is_mdstest_supported=True)
+ self.log.info(f'Supports mdstest: {self.is_mdstest_supported}')
+ if self.is_mdstest_supported:
+ # UE preset
+ self.log.info('Disable antenna switch.')
+ self.at_util.disable_txas()
+ time.sleep(10)
+
+ # set device to be data centric
+ nv_result = self.at_util.set_nv(
+ nv_name = '!SAEL3.SAE_UE_OPERATION_MODE',
+ index = '0',
+ value = '03'
+ )
+ self.log.info(nv_result)
+
+ self.at_util.lock_band()
+ self.log.info('Band lock info: \n%s',self.at_util.get_band_lock_info())
- # get sim type
+ self.at_util.set_single_psim()
+
+ self.unpack_userparams(is_wifi_only_device=False)
+
+ # extract log only flag
+ self.unpack_userparams(collect_log_only=False)
+ # get sdim type
self.unpack_userparams(has_3gpp_sim=True)
+ # extract time to take log after test
+ self.unpack_userparams(post_test_log_duration=30)
+
+ # toggle on/off APM for all devices
+ self.log.info('Toggle APM on/off for all devices.')
+ for ad in self.android_devices:
+ telutils.toggle_airplane_mode_by_adb(self.log, ad, False)
+ time.sleep(2)
+ telutils.toggle_airplane_mode_by_adb(self.log, ad, True)
+ time.sleep(2)
+
+ # clear modem logs
+ modem_logs.clear_modem_logging(self.cellular_dut.ad)
+
+ def collect_power_data_and_validate(self):
+ cells_status_before = sorted(self.cellular_simulator.get_all_cell_status())
+ self.log.info('UXM cell status before collect power: %s', cells_status_before)
+
+ super().collect_power_data()
+ cells_status_after = sorted(self.cellular_simulator.get_all_cell_status())
+ self.log.info('UXM cell status after collect power: %s', cells_status_after)
+
+ # power measurement results
+ odpm_power_results = self.get_odpm_values()
+ self.odpm_power = odpm_power_results.get(
+ self.ODPM_MODEM_CHANNEL_NAME.lower(), 0)
+ if hasattr(self, 'bitses'):
+ self.parse_power_rails_csv()
+
+ asserts.assert_true(cells_status_before == cells_status_after,
+ 'Cell status before {} and after {} the test run are not the same.'.format(
+ cells_status_before, cells_status_after
+ ))
+ self.threshold_check()
def setup_test(self):
- self.cellular_simulator.set_sim_type(self.has_3gpp_sim)
try:
- if 'LTE' in self.test_name:
- self.at_util.lock_LTE()
+ if self.collect_log_only:
+ self.log.info('Collect log only mode on.')
+ # set log mask
+ modem_logs.set_modem_log_profle(self.cellular_dut.ad, modem_logs.ModemLogProfile.LASSEN_TCP_DSP)
+ # start log
+ modem_logs.start_modem_logging(self.cellular_dut.ad)
+ modem_log_dir = os.path.join(self.root_output_path, 'modem_log')
+ os.makedirs(modem_log_dir, exist_ok=True)
+ self.modem_log_path = os.path.join(modem_log_dir, self.test_name)
+ os.makedirs(self.modem_log_path, exist_ok=True)
super().setup_test()
except BrokenPipeError:
self.log.info('TA crashed test need retry.')
@@ -166,16 +357,37 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
self.cellular_simulator.recovery_ta()
self.cellular_simulator.socket_connect()
raise signals.TestFailure('TA crashed mid test, retry needed.')
- # except:
- # # self.log.info('Waiting for device to on.')
- # # self.dut.adb.wait_for_device()
- # # self.cellular_dut = AndroidCellularDut.AndroidCellularDut(
- # # self.android_devices[0], self.log)
- # # self.dut.root_adb()
- # # # Restart SL4A
- # # self.dut.start_services()
- # # self.need_retry = True
- # raise signals.TestError('Device reboot mid test, retry needed.')
+
+ def toggle_modem_log(self, new_state: bool, timeout: int=30):
+ new_state = str(new_state).lower()
+ current_state = self.cellular_dut.ad.adb.shell('getprop persist.vendor.sys.modem.logging.enable')
+ cmd = self.ADB_CMD_TOGGLE_MODEM_LOG.format(state=new_state)
+ if new_state != current_state:
+ self.cellular_dut.ad.adb.shell(cmd)
+ for _ in range(timeout):
+ self.log.debug(f'Wait for modem logging status to be {new_state}.')
+ time.sleep(1)
+ current_state = self.cellular_dut.ad.adb.shell('getprop persist.vendor.sys.modem.logging.enable')
+ if new_state == current_state:
+ self.log.info(f'Always-on modem logging status is {new_state}.')
+ return
+ raise RuntimeError(f'Fail to set modem logging to {new_state}.')
+
+ def collect_modem_log(self, out_path, duration: int=30):
+ # set log mask
+ modem_logs.set_modem_log_profle(self.cellular_dut.ad, modem_logs.ModemLogProfile.LASSEN_TCP_DSP)
+
+ # start log
+ modem_logs.start_modem_logging(self.cellular_dut.ad)
+ time.sleep(duration)
+ # stop log
+ modem_logs.stop_modem_logging(self.cellular_dut.ad)
+ try:
+ # pull log
+ modem_logs.pull_logs(self.cellular_dut.ad, out_path)
+ finally:
+ # clear log
+ modem_logs.clear_modem_logging(self.cellular_dut.ad)
def install_apk(self):
sleep_time = 3
@@ -189,55 +401,6 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
else:
self.log.warning('fail to install mdstest.')
- def set_nv(self, nv_name, index, value):
- cmd = self.ADB_CMD_SET_NV.format(
- nv_name=nv_name,
- nv_index=index,
- nv_value=value
- )
- response = str(self.cellular_dut.ad.adb.shell(cmd))
- self.log.info(response)
-
- def enable_ims_nr(self):
- # set !NRCAPA.Gen.VoiceOverNr
- self.set_nv(
- nv_name = '!NRCAPA.Gen.VoiceOverNr',
- index = '0',
- value = '01'
- )
- # set PSS.AIMS.Enable.NRSACONTROL
- self.set_nv(
- nv_name = 'PSS.AIMS.Enable.NRSACONTROL',
- index = '0',
- value = '00'
- )
- # set DS.PSS.AIMS.Enable.NRSACONTROL
- self.set_nv(
- nv_name = 'DS.PSS.AIMS.Enable.NRSACONTROL',
- index = '0',
- value = '00'
- )
- if self.cellular_dut.ad.model == 'oriole':
- # For P21, NR.CONFIG.MODE/DS.NR.CONFIG.MODE
- self.set_nv(
- nv_name = 'NR.CONFIG.MODE',
- index = '0',
- value = '11'
- )
- # set DS.NR.CONFIG.MODE
- self.set_nv(
- nv_name = 'DS.NR.CONFIG.MODE',
- index = '0',
- value = '11'
- )
- else:
- # For P22, NASU.NR.CONFIG.MODE to 11
- self.set_nv(
- nv_name = 'NASU.NR.CONFIG.MODE',
- index = '0',
- value = '11'
- )
-
def get_odpm_values(self):
"""Get power measure from ODPM.
@@ -289,6 +452,7 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
# example result of line.strip().split()
# ['[VSYS_PWR_DISPLAY]:Display', '1039108.42', 'mWs', '(', '344.69)']
channel, _, _, _, delta_str = line.strip().split()
+ channel = channel.lower()
delta = float(delta_str[:-2].strip())
# calculate OPDM power
@@ -308,36 +472,44 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
def parse_power_rails_csv(self):
kibble_dir = os.path.join(self.root_output_path, 'Kibble')
- kibble_csv_path = None
+ kibble_json_path = None
if os.path.exists(kibble_dir):
for f in os.listdir(kibble_dir):
- if self.test_name in f and '.csv' in f:
- kibble_csv_path = os.path.join(kibble_dir, f)
- self.log.info('Kibble csv file path: ' + kibble_csv_path)
+ if self.test_name in f and '.json' in f:
+ kibble_json_path = os.path.join(kibble_dir, f)
+ self.log.info('Kibble json file path: ' + kibble_json_path)
break
self.log.info('Parsing power rails from csv.')
- if kibble_csv_path:
- with open(kibble_csv_path, 'r') as f:
- for line in f:
- # railname,val,mA,val,mV,val,mW
- railname, _, _, _, _, power, _ = line.split(',')
+ if kibble_json_path:
+ with open(kibble_json_path, 'r') as f:
+ rails_data_json = json.load(f)
+ if rails_data_json:
+ for record in rails_data_json:
+ unit = record['unit']
+ if unit != 'mW':
+ continue
+ railname = record['name']
+ power = record['avg']
# parse pcie power
if self._is_any_substring(railname, self.MODEM_PCIE_RAIL_NAME_LIST):
- self.log.info(railname + ': ' + power)
- self.pcie_power += float(power)
+ self.log.info('%s: %f',railname, power)
+ self.pcie_power += power
elif self.MODEM_POWER_RAIL_NAME in railname:
- self.log.info(railname + ': ' + power)
- self.modem_power = float(power)
- elif self._is_any_substring(railname, self.MODEM_RFFE_RAIL_NAME_LIST):
- self.log.info(railname + ': ' + power)
- self.rffe_power = float(power)
+ self.log.info('%s: %f',railname, power)
+ self.modem_power = power
+ elif self.MODEM_RFFE_RAIL_NAME in railname:
+ self.log.info('%s: %f',railname, power)
+ self.rffe_power = power
elif self.MODEM_MMWAVE_RAIL_NAME in railname:
- self.log.info(railname + ': ' + power)
- self.mmwave_power = float(power)
- elif self.MONSOON_RAIL_NAME == railname:
- self.log.info(railname + ': ' + power)
- self.monsoon_power = float(power)
+ self.log.info('%s: %f',railname, power)
+ self.mmwave_power = power
+ elif self.MONSOON_RAIL_NAME in railname:
+ self.log.info('%s: %f',railname, power)
+ self.monsoon_power = power
+ elif self.WEARABLE_POWER_RAIL in railname or self.WEARABLE_SOC_MODEM_RAIL in railname:
+ self.log.info('%s: %f',railname, power)
+ self.modem_power += power
if self.modem_power:
self.power_results[self.test_name] = self.modem_power
@@ -366,29 +538,28 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
'ro.boot.hardware.revision'
)
- # power measurement results
- odpm_power_results = self.get_odpm_values()
- odpm_power = odpm_power_results.get(self.ODPM_MODEM_CHANNEL_NAME, 0)
- system_power = 0
-
# if kibbles are using, get power from kibble
modem_kibble_power_wo_pcie = 0
if hasattr(self, 'bitses'):
- self.parse_power_rails_csv()
modem_kibble_power_wo_pcie = self.modem_power - self.pcie_power
- system_power = self.monsoon_power
+ self.system_power = self.monsoon_power
else:
- system_power = self.power_results.get(self.test_name, 0)
+ self.system_power = self.power_results.get(self.test_name, 0)
+
+ # record reference target, if it exists
+ self.reference_target = ''
+ if self.threshold and self.test_name in self.threshold:
+ self.reference_target = self.threshold[self.test_name]
self.record_data({
'Test Name': self.test_name,
'sponge_properties': {
- self.CUSTOM_PROP_KEY_SYSTEM_POWER: system_power,
+ self.CUSTOM_PROP_KEY_SYSTEM_POWER: self.system_power,
self.CUSTOM_PROP_KEY_BUILD_ID: build_id,
self.CUSTOM_PROP_KEY_INCR_BUILD_ID: incr_build_id,
self.CUSTOM_PROP_KEY_MODEM_BASEBAND: modem_base_band,
self.CUSTOM_PROP_KEY_BUILD_TYPE: build_type,
- self.CUSTOM_PROP_KEY_MODEM_ODPM_POWER: odpm_power,
+ self.CUSTOM_PROP_KEY_MODEM_ODPM_POWER: self.odpm_power,
self.CUSTOM_PROP_KEY_DEVICE_NAME: device_name,
self.CUSTOM_PROP_KEY_DEVICE_BUILD_PHASE: device_build_phase,
self.CUSTOM_PROP_KEY_MODEM_KIBBLE_POWER: self.modem_power,
@@ -396,19 +567,112 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest):
self.CUSTOM_PROP_KEY_MODEM_KIBBLE_WO_PCIE_POWER: modem_kibble_power_wo_pcie,
self.CUSTOM_PROP_KEY_MODEM_KIBBLE_PCIE_POWER: self.pcie_power,
self.CUSTOM_PROP_KEY_RFFE_POWER: self.rffe_power,
- self.CUSTOM_PROP_KEY_MMWAVE_POWER: self.mmwave_power
+ self.CUSTOM_PROP_KEY_MMWAVE_POWER: self.mmwave_power,
+ self.CUSTOM_PROP_KEY_CURRENT_REFERENCE_TARGET: self.reference_target
},
})
+ def threshold_check(self):
+ """Check the test result and decide if it passed or failed.
+
+ The threshold is provided in the config file. In this class, result is
+ current in mA.
+ """
+
+ if not self.threshold or self.test_name not in self.threshold:
+ self.log.error("No threshold is provided for the test '{}' in "
+ "the configuration file.".format(self.test_name))
+ return
+
+ if not hasattr(self, 'bitses'):
+ self.log.error("No bitses attribute found, threshold cannot be"
+ "checked against system power.")
+ return
+
+ average_current = self.modem_power
+ if ('modem_rail' in self.threshold.keys() and self.threshold['modem_rail'] == self.MODEM_POWER_RAIL_WO_PCIE_NAME):
+ average_current = average_current - self.pcie_power
+ current_threshold = self.threshold[self.test_name]
+
+ acceptable_upper_difference = max(
+ self.threshold[self.test_name] * self.pass_fail_tolerance,
+ self.kibble_error_range
+ )
+ self.log.info('acceptable upper difference' + str(acceptable_upper_difference))
+
+ self.unpack_userparams(pass_fail_tolerance_lower_bound=self.THRESHOLD_TOLERANCE_LOWER_BOUND_DEFAULT)
+ acceptable_lower_difference = max(
+ self.threshold[self.test_name] * self.pass_fail_tolerance_lower_bound,
+ self.kibble_error_range)
+ self.log.info('acceptable lower diff ' + str(acceptable_lower_difference))
+
+ if average_current:
+ asserts.assert_true(
+ average_current < current_threshold + acceptable_upper_difference,
+ 'Measured average current in [{}]: {:.2f}mW, which is '
+ 'out of the acceptable upper range {:.2f}+{:.2f}mW'.format(
+ self.test_name, average_current, current_threshold,
+ acceptable_upper_difference))
+
+ asserts.assert_true(
+ average_current > current_threshold - acceptable_lower_difference,
+ 'Measured average current in [{}]: {:.2f}mW, which is '
+ 'out of the acceptable lower range {:.2f}-{:.2f}mW'.format(
+ self.test_name, average_current, current_threshold,
+ acceptable_lower_difference))
+
+ asserts.explicit_pass(
+ 'Measured average current in [{}]: {:.2f}mW, which is '
+ 'within the acceptable range of {:.2f}-{:.2f} and {:.2f}+{:.2f}'.format(
+ self.test_name, average_current, current_threshold,
+ acceptable_lower_difference, current_threshold, acceptable_upper_difference))
+ else:
+ asserts.fail(
+ 'Something happened, measurement is not complete, test failed')
+
+ def _get_device_network(self) -> str:
+ """Get active network on device.
+
+ Returns:
+ Information of active network in string.
+ """
+ return self.dut.adb.shell(
+ self._ADB_GET_ACTIVE_NETWORK)
+
def teardown_test(self):
+ if self.collect_log_only:
+ try:
+ # stop log
+ modem_logs.stop_modem_logging(self.cellular_dut.ad)
+ # pull log
+ modem_logs.pull_logs(self.cellular_dut.ad, self.modem_log_path)
+ finally:
+ # clear log
+ modem_logs.clear_modem_logging(self.cellular_dut.ad)
+ else:
+ if self.is_mdstest_supported:
+ try:
+ self.collect_modem_log(self.modem_log_path, self.post_test_log_duration)
+ except RuntimeError:
+ self.log.warning('Fail to collect log before test end.')
+ self.log.info('===>Before test end info.<====')
+ cells_status = self.cellular_simulator.get_all_cell_status()
+ self.log.info('UXM cell status: %s', cells_status)
+ active_network = self._get_device_network()
+ self.log.info('Device network: %s', active_network)
super().teardown_test()
# restore device to ready state for next test
- self.log.info('Enable mobile data.')
- self.dut.adb.shell('svc data enable')
+ if not self.is_wifi_only_device:
+ self.log.info('Enable mobile data.')
+ self.cellular_dut.ad.adb.shell('svc data enable')
self.cellular_simulator.detach()
self.cellular_dut.toggle_airplane_mode(True)
+ if self.is_mdstest_supported:
+ self.at_util.disable_dsp()
+ self.log.info('Band lock info: \n%s', self.at_util.get_band_lock_info())
+ self.log.info('Sim slot map: \n%s', self.at_util.get_sim_slot_mapping())
+ self.log.info('DSP status: \n%s', self.at_util.get_dsp_status)
+
# processing result
self.sponge_upload()
- if 'LTE' in self.test_name:
- self.at_util.clear_lock_band() \ No newline at end of file
diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py b/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py
index cfdc67a1f..ecdb61869 100644
--- a/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py
+++ b/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py
@@ -1,228 +1,370 @@
-# TODO(hmtuan): add type annotation.
-import requests
+"""Abstraction for interacting with Keysight IMS emulator.
+
+Keysight provided an API Connector application which is a HTTP server
+running on the same host as Keysight IMS server app and client app.
+It allows IMS simulator/app to be controlled via HTTP request.
+"""
+import enum
+import json
+import logging
import time
+from typing import List, Optional, Any
+import uuid
+import requests
+
+from acts_contrib.test_utils.power.cellular.ssh_library import SshLibrary
+
+_LOG = logging.getLogger(__name__)
+
+
+class ImsAppName(enum.Enum):
+ """IMS app name predefined by Keysight."""
+
+ CLIENT = 'client'
+ SERVER = 'server'
+
+
+class ImsApiConnector:
+ """A wrapper class for Keysight Ims API Connector.
+
+ This class provides high-level interface to control Keysigt IMS app.
+ Each instance of this class conresponding to one IMS app process.
+
+ Attributes:
+ log: An logger object.
+ api_connector_ip: An IP of host where API Connector reside.
+ api_connector_port: A port number of API Connector server.
+ ims_app: An ImsAppName enum to specify which emulator/app to control.
+ _api_token: An arbitrary and unique string to identify the link between API
+ connector and ims app.
+ _ims_app_ip: An IP of IMS emulator/app, usually the value of localhost.
+ _ims_app_port: Listening port of IMS emulator/app.
+ ssh: An ssh connection object that can be used to check app status, close
+ and reopen apps.
+ """
+
+ _BASE_URL_FORMAT = 'http://{addr}:{port}/ims/api/{app}s/{api_token}'
+
+ _SSH_USERNAME = 'User'
+ _APP_BOOT_TIME = 30
+
+ _IMS_CLIENT_IDLE_STATUS = 'Idle'
+ _IMS_CLIENT_APP = 'Keysight.ImsSip.Client.exe'
+ _IMS_CLIENT_APP_LOC = (
+ r'C:\Program Files (x86)\Keysight\C8700201A\IMS-SIP Client\\'
+ )
+ _IMS_SERVER_APP = 'Keysight.ImsSip.Server.exe'
+ _IMS_SERVER_APP_LOC = (
+ r'C:\Program Files (x86)\Keysight\C8700201A\IMS-SIP Server\\'
+ )
+ _IMS_API_APP = "Keysight.ImsSip.ApiConnector.exe"
+ _IMS_API_APP_LOC = (
+ r"C:\Program Files (x86)\Keysight\C8700201A\IMS-SIP API Connector\\"
+ )
+ _IMS_APP_DEFAULT_PORT_MAPPING = {
+ ImsAppName.CLIENT: 8250,
+ ImsAppName.SERVER: 8240,
+ }
+
+ _IMS_APP_DEFAULT_IP = '127.0.0.1'
+
+ def __init__(
+ self,
+ api_connector_ip: str,
+ api_connector_port: int,
+ ims_app: ImsAppName,
+ ):
+ self.log = _LOG
+
+ # create ssh connection to host pc
+ self.ssh = SshLibrary(api_connector_ip, self._SSH_USERNAME)
+
+ # api connector info
+ self.api_connector_ip = api_connector_ip
+ self.api_connector_port = api_connector_port
+
+ # ims app info
+ self._api_token = str(uuid.uuid4())[0:7] # api token can't contain '-'
+ self.log.debug('API token: %s', self._api_token)
+ self.ims_app = ims_app.value
+ self._ims_app_ip = self._IMS_APP_DEFAULT_IP
+ self._ims_app_port = self._IMS_APP_DEFAULT_PORT_MAPPING[ims_app]
+
+ # construct base url
+ self._base_url = self._BASE_URL_FORMAT.format(
+ addr=self.api_connector_ip,
+ port=self.api_connector_port,
+ app=self.ims_app,
+ api_token=self._api_token,
+ )
+
+ # start server and client if they are not started
+ self._start_apps_if_down()
+ # create IMS-Client API link
+ is_app_linked = self.create_ims_app_link()
+
+ if not is_app_linked:
+ raise RuntimeError('Fail to create link to IMS app.')
+
+ def log_response_info(self, r: requests.Response):
+ self.log.debug('HTTP request sent:')
+ self.log.debug('-> method: %s', str(r.request.method))
+ self.log.debug('-> url: %s', str(r.url))
+ self.log.debug('-> status_code: %s', str(r.status_code))
+
+ def create_ims_app_link(self) -> bool:
+ """Creates link between Keysight API Connector to ims app.
+
+ Returns:
+ True if API connector server linked/connected with ims app,
+ False otherwise.
+ """
+ self.log.info('Creating ims_%s link: ', self.ims_app)
+ self.log.info(
+ '%s:%s:%s', self._api_token, self._ims_app_ip, self._ims_app_port
+ )
+
+ request_data = {
+ 'targetIpAddress': self._ims_app_ip,
+ 'targetWcfPort': self._ims_app_port,
+ }
+
+ r = requests.post(url=self._base_url, json=request_data)
+ self.log_response_info(r)
+
+ return r.status_code == requests.codes.created
+
+ def _remove_ims_app_link(self) -> bool:
+ """Removes link between Keysight API Connector to ims app.
+
+ Returns:
+ True if successfully disconnected/unlinked,
+ False otherwise.
+ """
+ self.log.info('Remove ims_%s link: %s', self.ims_app, self._api_token)
+
+ r = requests.delete(url=self._base_url)
+ self.log_response_info(r)
+
+ return r.status_code == requests.codes.ok
+
+ def get_ims_app_property(self, property_name: str) -> Optional[str]:
+ """Gets property value of IMS app.
+
+ Args:
+ property_name: Name of property to get value.
+
+ Returns:
+ Value of property which is inquired.
+ """
+ self.log.info('Getting ims app property: %s', property_name)
+
+ request_url = self._base_url + '/get_property'
+ request_params = {'propertyName': property_name}
+ r = requests.get(url=request_url, params=request_params)
+ self.log_response_info(r)
+
+ try:
+ res_json = r.json()
+ except json.JSONDecodeError:
+ res_json = {'propertyValue': None}
+ prop_value = res_json['propertyValue']
+
+ return prop_value
+
+ def set_ims_app_property(
+ self, property_name: str, property_value: Optional[Any]
+ ) -> bool:
+ """Sets property value of IMS app.
+
+ Args:
+ property_name: Name of property to set value.
+ property_value: Value to be set.
-class ImsApiConnector():
- """A wrapper class for Keysight Ims API Connector.
-
- Keysight provided an API connector application
- which is a HTTP server running on the same host
- as Keysight IMS server simulator and client simulator.
- It allows IMS simulator/app to be controlled via HTTP request.
-
- Attributes:
- api_connector_ip: ip of http server.
- api_connector_port: port of http server.
- ims_app: type of ims app (client/server).
- api_token: an arbitrary and unique token-string
- to identify the link between API connector
- and ims app.
- log: logger object.
+ Returns:
+ True if success, False otherwise.
"""
+ self.log.info(
+ 'Setting ims property: %s = %s', property_name, str(property_value)
+ )
+
+ request_url = self._base_url + '/set_property'
+ data = {'propertyName': property_name, 'propertyValue': property_value}
+ r = requests.post(url=request_url, json=data)
+ self.log_response_info(r)
+
+ return r.status_code == requests.codes.ok
+
+ def ims_api_call_method(
+ self, method_name: str, method_args: List = []
+ ) -> Optional[str]:
+ """Call Keysight API to control simulator.
+
+ API Connector allows us to call Keysight Simulators' API without using C#.
+ To invoke an API, we are sending post request to API Connector (http
+ server).
+
+ Args:
+ method_name: A name of method from Keysight API in string.
+ method_args: A python-array contains arguments for the called API.
+
+ Returns:
+ A string value parse from response.
+
+ Raises:
+ HTTPError: Response status code is different than requests.codes.ok.
+ """
+ self.log.info('Calling Keysight simulator API: %s', method_name)
+
+ if not isinstance(method_args, list):
+ method_args = [method_args]
+ request_url = self._base_url + '/call_method'
+ request_data = {'methodName': method_name, 'arguments': method_args}
+ r = requests.post(url=request_url, json=request_data)
+
+ ret_val = None
+
+ if r.status_code == requests.codes.ok:
+ return_value_key = 'returnValue'
+ if ('Content-Type' in r.headers) and r.headers[
+ 'Content-Type'
+ ] == 'application/json':
+ response_body = r.json()
+ if response_body:
+ ret_val = response_body.get(return_value_key, None)
+ else:
+ raise requests.HTTPError(r.status_code, r.text)
+
+ self.log_response_info(r)
+
+ return ret_val
+
+ def _is_line_idle(self, call_line_number) -> bool:
+ is_line_idle_prop = self.get_ims_app_property(
+ f'IVoip.CallLineParams({call_line_number}).SessionState'
+ )
+ return is_line_idle_prop == self._IMS_CLIENT_IDLE_STATUS
+
+ def _is_ims_client_app_registered(self) -> bool:
+ is_registered_prop = self.get_ims_app_property(
+ 'IComponentControl.IsRegistered'
+ )
+ self.log.info('Registered: %s', str(is_registered_prop))
+ return is_registered_prop == 'True'
+
+ def restart_server(self) -> bool:
+ """Restarts the ims server application.
+
+ Returns:
+ A boolean representing if the application was successfully started
+ """
+ self.create_ims_app_link()
+ self.log.info('Stopping and starting server')
+ self.ims_api_call_method('IServer.StopListeners()')
+ result = self.ims_api_call_method('IServer.Start()')
+ self.log.info(result)
+ return result == 'True'
+
+ def reregister_client(self):
+ """Re-registers the ims client with the server.
+
+ Attempts to unregister, then register the client. If this fails, Attempts
+ to restart both the client and server apps.
+ """
+ self.ims_api_call_method('ISipConnection.Unregister()')
+ self.ims_api_call_method('ISipConnection.Register()')
+
+ # failed to re-register client, so try restarting server and client
+ if not self._is_ims_client_app_registered():
+ self._restart_client_server_app()
+ self.ims_api_call_method('ISipConnection.Register()')
+
+ def _restart_client_server_app(self):
+ """Restarts the client and server app."""
+ self.ssh.close_app(self._IMS_CLIENT_APP)
+ self.ssh.close_app(self._IMS_SERVER_APP)
+ self.ssh.start_app(self._IMS_CLIENT_APP, self._IMS_CLIENT_APP_LOC)
+ self.ssh.start_app(self._IMS_SERVER_APP, self._IMS_SERVER_APP_LOC)
+ time.sleep(self._APP_BOOT_TIME)
+ self.create_ims_app_link()
+
+ def _start_apps_if_down(self):
+ """Starts the client, server, api connector app if they are down."""
+ started = False
+
+ if not self.ssh.check_app_running(self._IMS_API_APP):
+ self.log.info('api connector was not running, starting now')
+ self.ssh.start_app(self._IMS_API_APP, self._IMS_API_APP_LOC)
+ started = True
+
+ if not self.ssh.check_app_running(self._IMS_CLIENT_APP):
+ self.log.info('client was not running, starting now')
+ self.ssh.start_app(self._IMS_CLIENT_APP, self._IMS_CLIENT_APP_LOC)
+ started = True
+
+ if not self.ssh.check_app_running(self._IMS_SERVER_APP):
+ self.log.info('server was not running, starting now')
+ self.ssh.start_app(self._IMS_SERVER_APP, self._IMS_SERVER_APP_LOC)
+ started = True
+
+ if started:
+ time.sleep(self._APP_BOOT_TIME)
+
+ def initiate_call(self, callee_number: str, call_line_idx: int = 0):
+ """Dials to callee_number.
+
+ Args:
+ callee_number: A string value of number to be dialed to.
+ call_line_idx: An inteter index for call line.
+
+ Raises:
+ RuntimeError: If ims client is cannot be registered.
+ RuntimeError: If ims client is not idle when it attempts to dial.
+ RuntimeError: If ims client is still in idle after starting dial.
+ """
+ sleep_time = 5
+
+ self.log.info('checking if server/client/api connector are registered and running')
+ self._start_apps_if_down()
+
+ # Reregister client to server
+ self.reregister_client()
+
+ # clear logs
+ self.ims_api_call_method('ILogs.ClearResults()')
+
+ # switch to call-line #1 (idx = 0)
+ self.log.info('Switching to call-line #1.')
+ self.set_ims_app_property('IVoip.SelectedCallLine', call_line_idx)
+
+ # check whether the call-line #1 is ready for dialling
+ is_line1_idle = self._is_line_idle(call_line_idx)
+ if not is_line1_idle:
+ raise RuntimeError('Call-line not is not in indle state.')
+
+ # entering callee number for call-line #1
+ self.log.info('Enter callee number: %s.', callee_number)
+ self.set_ims_app_property(
+ 'IVoip.CallLineParams(0).CallLocation', callee_number
+ )
+
+ # dial entered callee number
+ self.log.info('Dialing call.')
+ self.ims_api_call_method('IVoip.Dial()')
+
+ time.sleep(sleep_time)
+
+ # check if dial success (not idle)
+ if self._is_line_idle(call_line_idx):
+ raise RuntimeError('Fail to dial.')
+
+ def hangup_call(self):
+ self.ims_api_call_method('IVoip.HangUp()')
+ # get logs
+ self.log.info("Call Logs: ")
+ call_logs = self.get_ims_app_property('ILogs.Results')
+ self.log.info(call_logs)
- def __init__(self, api_connector_ip,
- api_connector_port, ims_app,
- api_token, ims_app_ip,
- ims_app_port, log):
- # api connector info
- self.api_connector_ip = api_connector_ip
- self.api_connector_port = api_connector_port
-
- # ims app info
- self.ims_app = ims_app
- self.api_token = api_token
- self.ims_app_ip = ims_app_ip
- self.ims_app_port = ims_app_port
-
- self.log = log
- # construct base url
- self.base_url = 'http://{addr}:{port}/ims/api/{app}s/{api_token}'.format(
- addr = self.api_connector_ip,
- port = self.api_connector_port,
- app = self.ims_app,
- api_token = self.api_token
- )
-
- def get_base_url(self):
- return self.base_url
-
- def create_ims_app_link(self):
- """Create link between Keysight API Connector to ims app."""
- self.log.info('Create ims app link: token:ip:port')
- self.log.info('Creating ims_{app} link: {token}:{target_ip}:{target_port}'.format(
- app = self.ims_app,
- token = self.api_token,
- target_ip = self.ims_app_ip,
- target_port= self.ims_app_port)
- )
-
- request_data = {
- "targetIpAddress": self.ims_app_ip,
- "targetWcfPort": self.ims_app_port
- }
- self.log.debug(f'Payload to create ims app link: {request_data}')
- r = requests.post(url = self.get_base_url(), json = request_data)
-
- self.log.info('HTTP request sent:')
- self.log.info('-> method: ' + str(r.request.method))
- self.log.info('-> url: ' + str(r.url))
- self.log.info('-> status_code: ' + str(r.status_code))
-
- return (r.status_code == 201)
-
- def remove_ims_app_link(self):
- """Remove link between Keysight API Connector to ims app."""
- self.log.info('Remove ims_{app} link: {token}'.format(
- app = self.ims_app,
- token = self.api_token)
- )
-
- r = requests.delete(url = self.get_base_url())
-
- self.log.info('-> method: ' + str(r.request.method))
- self.log.info('-> url: ' + str(r.url))
- self.log.info('-> status_code: ' + str(r.status_code))
-
- return (r.status_code == 200)
-
- def get_ims_app_property(self, property_name):
- """Get property value of IMS app.
-
- Attributes:
- property_name: name of property to get value.
- """
- self.log.info('Getting ims app property: ' + property_name)
-
- request_url = self.get_base_url() + '/get_property'
- request_params = {"propertyName": property_name}
- r = requests.get(url = request_url, params = request_params)
-
- self.log.info('-> method: ' + str(r.request.method))
- self.log.info('-> url: ' + str(r.url))
- self.log.info('-> status_code: ' + str(r.status_code))
-
- try:
- res_json = r.json()
- except:
- res_json = {'propertyValue': None }
- prop_value = res_json['propertyValue']
-
- return prop_value
-
- def set_ims_app_property(self, property_name, property_value):
- """Set property value of IMS app.
-
- Attributes:
- property_name: name of property to set value.
- property_value: value to be set.
- """
- self.log.info('Setting ims property: ' + property_name + ' = ' + str(property_value))
-
- request_url = self.get_base_url() + '/set_property'
- data = {
- 'propertyName': property_name,
- 'propertyValue': property_value
- }
- r = requests.post(url = request_url, json = data)
-
- self.log.info('-> method: ' + str(r.request.method))
- self.log.info('-> url: ' + str(r.url))
- self.log.info('-> status_code: ' + str(r.status_code))
-
- return (r.status_code == 200)
-
- def ims_api_call_method(self, method_name, method_args=None):
- """
- Attributes:
- method_name: a name of method from Keysight API in string.
- method_args: a python-array contains
- arguments for the called API method.
- Returns:
- a tuple of (STATUS_BOOL, FUNC_RET_VAL),
- if STATUS_BOOL is false, FUNC_RET_VAL is questionable/undefined,
- if STATUS_BOOL is true, FUNC_RET_VAL will be the API function return value
- or FUNC_RET_VAL is None if the called method return nothing.
- """
- self.log.info('Calling ims method: ' + method_name)
-
- if (method_args == None):
- method_args = []
- elif (type(method_args) != list):
- method_args = [method_args]
- data = {
- 'methodName': method_name,
- 'arguments': method_args
- }
- request_url = self.get_base_url() + '/call_method'
- r = requests.post(url = request_url, json = data)
-
- ret_val = None
-
- if ( ('Content-Type' in r.headers.keys()) and r.headers['Content-Type'] == 'application/json'):
- # TODO(hmtuan): try json.loads() instead
- response_body = r.json()
- if ((response_body != None) and ('returnValue' in response_body.keys())) :
- ret_val = response_body['returnValue']
-
- self.log.info('-> method: ' + str(r.request.method))
- self.log.info('-> url: ' + str(r.url))
- self.log.info('-> status_code: ' + str(r.status_code))
- self.log.info('-> ret_val: ' + str(ret_val))
-
- return (r.status_code == 200), ret_val
-
- def _is_line_idle(self, call_line_number):
- is_line_idle_prop = self.get_ims_app_property(
- f'IVoip.CallLineParams({call_line_number}).SessionState')
- return is_line_idle_prop == 'Idle'
-
- def _is_ims_client_app_registered(self):
- is_registered_prop = self.get_ims_app_property('IComponentControl.IsRegistered')
- return is_registered_prop == 'True'
-
- def initiate_call(self, callee_number, call_line_idx=0):
- """Dial to callee_number.
-
- Attributes:
- callee_number: number to be dialed to.
- """
- # create IMS-Client API link
- ret_val = self.create_ims_app_link()
-
- if not ret_val:
- raise RuntimeError('Fail to create link to IMS app.')
-
- # check if IMS-Client is registered, and if not, request client to perform Registration
- self.log.info('Ensuring client registered.')
- is_registered = self._is_ims_client_app_registered()
- if not is_registered:
- self.log.info('Client not currently registered - registering.')
- self.ims_api_call_method('ISipConnection.Register()')
-
- is_registered = self._is_ims_client_app_registered()
- if not is_registered:
- raise RuntimeError('Failed to register IMS-client to IMS-server.')
-
- # switch to call-line #1 (idx = 0)
- self.log.info('Switching to call-line #1.')
- self.set_ims_app_property('IVoip.SelectedCallLine', call_line_idx)
-
- # check whether the call-line #1 is ready for dialling
- is_line1_idle = self._is_line_idle(call_line_idx)
- if not is_line1_idle:
- raise RuntimeError('Call-line not is not in indle state.')
-
- # entering callee number for call-line #1
- self.log.info(f'Enter callee number: {callee_number}.')
- self.set_ims_app_property('IVoip.CallLineParams(0).CallLocation', callee_number)
-
- # dial entered callee number
- self.log.info('Dialling call.')
- self.ims_api_call_method('IVoip.Dial()')
-
- time.sleep(5)
-
- # check if dial success (not idle)
- is_line1_idle = self._is_line_idle(call_line_idx)
- if is_line1_idle:
- raise RuntimeError('Fail to dial.')
+ def tear_down(self):
+ self._remove_ims_app_link()
+ self.ssh.close_ssh_connection()
diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py b/acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py
new file mode 100644
index 000000000..eb21bd09c
--- /dev/null
+++ b/acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py
@@ -0,0 +1,178 @@
+"""Functions to interact with modem log.
+
+Different modem logging profile can be found here:
+cs/vendor/google/apps/PixelLogger/log_profile/GFT_Call_Performance.xml
+"""
+import enum
+import logging
+import time
+
+from mobly.controllers import android_device # type: ignore
+
+_LOG = logging.getLogger(__name__)
+
+
+class ModemLogAction(enum.Enum):
+ """All possible valid PILOT logging actions."""
+
+ START = 'ACTION_START_LOGGING'
+ STOP = 'ACTION_STOP_LOGGING'
+ CLEAR = 'ACTION_CLEAR_LOG'
+
+
+class ModemLogProfile(enum.Enum):
+ """All possible modem logging profiles."""
+
+ LASSEN_AUDIO_TCP_DSP = 'Call_Performance.xml'
+ LASSEN_TCP_DSP = 'Data_Performance.xml'
+
+
+_MODEM_PILOT_ENABLE_PROP_NAME = 'vendor.pixellogger.pilot.logging_enable'
+
+_MODEM_LOG_PATH = '/sdcard/Android/data/com.android.pixellogger/files/logs'
+
+_ADB_SET_LOG_PROFILE_TEMPLATE = (
+ 'am broadcast '
+ '-a com.android.pixellogger.experiment.ACTION_LOAD_PROFILE '
+ '-n com.android.pixellogger/.receiver.ExperimentLoggingReceiver '
+ '--es name "{log_profile_name}"'
+)
+
+_ADB_LOG_ACTION = (
+ 'am broadcast '
+ '-a com.android.pixellogger.experiment.{log_action} '
+ '-n com.android.pixellogger/.receiver.ExperimentLoggingReceiver'
+)
+
+_MODEM_LOGGING_PROFILE_PROP_NAME = (
+ 'persist.vendor.pixellogger.pilot.profile_name'
+)
+
+
+def start_modem_logging(
+ dut: android_device.AndroidDevice,
+ timeout: int = 20,
+ polling_interval: int = 1,
+) -> bool:
+ """Starts modem PILOT logging.
+
+ Args:
+ dut: A mobly AndroidDevice controller object.
+ timeout: Seconds to try to confirm logging before giving up.
+ polling_interval: Seconds to wait between confirmation attempts.
+
+ Raises:
+ RuntimeError: If unable to enable PILOT modem logging within timeout.
+ """
+ dut.adb.root()
+ cmd = _ADB_LOG_ACTION.format(log_action=ModemLogAction.START.value)
+ dut.adb.shell(cmd)
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ time.sleep(polling_interval)
+ res = dut.adb.getprop(_MODEM_PILOT_ENABLE_PROP_NAME).strip()
+ _LOG.debug('PILOT modem logging enable: %s', res)
+ if res == 'true':
+ return
+ raise RuntimeError('Fail to start modem logging in PILOT mode.')
+
+
+def stop_modem_logging(
+ dut: android_device.AndroidDevice,
+ timeout: int = 20,
+ polling_interval: int = 1,
+) -> bool:
+ """Stops modem PILOT logging.
+
+ Args:
+ dut: A mobly AndroidDevice controller object.
+ timeout: An integer of time in second to wait for modem log to stop.
+ polling_interval: Interval in second to check if modem logging stopped.
+
+ Raises:
+ RuntimeError: If unable to disable PILOT modem logging within timeout.
+ """
+ dut.adb.root()
+ cmd = _ADB_LOG_ACTION.format(log_action=ModemLogAction.STOP.value)
+ dut.adb.shell(cmd)
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ time.sleep(polling_interval)
+ res = dut.adb.getprop(_MODEM_PILOT_ENABLE_PROP_NAME).strip()
+ _LOG.debug('PILOT modem logging enable: %s', res)
+ if res == 'false' or not res:
+ return
+ raise RuntimeError('Fail to stop modem logging in PILOT mode.')
+
+
+def clear_modem_logging(dut: android_device.AndroidDevice) -> None:
+ """Stops modem PILOT logging.
+
+ Args:
+ dut: A mobly AndroidDevice controller object.
+ """
+ dut.adb.root()
+ cmd = _ADB_LOG_ACTION.format(log_action=ModemLogAction.CLEAR.value)
+ dut.adb.shell(cmd)
+ _LOG.debug('Cleared modem logs.')
+
+
+def set_modem_log_profle(
+ dut: android_device.AndroidDevice,
+ profile: ModemLogProfile,
+ timeout: int = 10,
+ polling_interval: int = 1,
+) -> bool:
+ """Set modem log profile.
+
+ Args:
+ dut: An mobly AndroidDevice controller object.
+ profile: An ModemLogProfile enum represent modem logging profile.
+ timeout: Time waiting for modem log profile to be set.
+ polling_interval: Interval in second to check if log profile change.
+
+ Returns:
+ True if successfully set modem log profile within timeout. Fail otherwise.
+ """
+ dut.adb.root()
+ cmd = _ADB_SET_LOG_PROFILE_TEMPLATE.format(log_profile_name=profile.value)
+ dut.adb.shell(cmd)
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ time.sleep(polling_interval)
+ if profile.value in get_modem_log_profile(dut):
+ return True
+ return False
+
+
+def get_modem_log_profile(dut: android_device.AndroidDevice) -> str:
+ """Get modem log profile.
+
+ Args:
+ dut: An mobly AndroidDevice controller object.
+
+ Returns:
+ String value of modem logging profile name.
+
+ Raises:
+ RuntimeError: If get empty response from adb shell.
+ """
+ dut.adb.root()
+ res = dut.adb.getprop(_MODEM_LOGGING_PROFILE_PROP_NAME)
+ if not res:
+ raise RuntimeError('Fail to get modem logging profile from device.')
+ return res
+
+
+def pull_logs(dut: android_device.AndroidDevice, out_path: str, pull_timeout = 300) -> None:
+ """Pulls logs on device.
+
+ Args:
+ dut: An mobly AndroidDevice controller object.
+ out_path: A path to extract logs to.
+ pull_timeout: Seconds to wait for pulling complete.
+ """
+ dut.adb.root()
+ dut.adb.pull(
+ "%s %s" % (_MODEM_LOG_PATH, out_path), timeout=pull_timeout)
+ _LOG.debug('Modem logs exported to %s', out_path) \ No newline at end of file
diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py b/acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py
new file mode 100644
index 000000000..8bc932cfb
--- /dev/null
+++ b/acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py
@@ -0,0 +1,131 @@
+"""Ssh module for checking status, starting and closing apps."""
+import logging
+import re
+import paramiko # type: ignore
+
+_LOG = logging.getLogger(__name__)
+
+
+class SshLibrary:
+ """Library for creating a ssh connection, closing and opening apps."""
+
+ _PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}'
+
+ _SSH_START_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"'
+ _SSH_CHECK_APP_RUNNING_CMD_FORMAT = (
+ 'tasklist /fi "ImageName eq {regex_app_name}"'
+ )
+ _SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F'
+
+ def __init__(self, hostname: str, username: str):
+ self.log = _LOG
+ self.ssh = self.create_ssh_socket(hostname, username)
+
+ def create_ssh_socket(
+ self, hostname: str, username: str
+ ) -> paramiko.SSHClient:
+ """Creates ssh session to host.
+
+ Args:
+ hostname: IP address of the host machine.
+ username: Username of the host ims account.
+
+ Returns:
+ An SSHClient object connected the hostname.
+ """
+
+ self.log.info('Creating ssh session to hostname:%s ', hostname)
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.load_system_host_keys()
+ ssh.connect(hostname=hostname, username=username)
+ self.log.info('SSH client to hostname:%s is connected', hostname)
+ return ssh
+
+ def run_command_paramiko(self, command: str) -> tuple[str, str, str]:
+ """Runs a command using Paramiko and return stdout code.
+
+ Args:
+ command: Command to run on the connected host.
+
+ Returns:
+ A tuple containing the command result output, error information, and exit
+ status.
+ """
+
+ self.log.info('Running command: command:%s', command)
+ stdin, stdout, stderr = self.ssh.exec_command(command, timeout=10)
+ stdin.close()
+ err = ''.join(stderr.readlines())
+ out = ''.join(stdout.readlines())
+
+ # psexec return process ID as part of the exit code
+ exit_status = stderr.channel.recv_exit_status()
+ if err:
+ self.log.error(str(err))
+ else:
+ self.log.info(str(out))
+ return out, err, str(exit_status)
+
+ def close_ssh_connection(self):
+ """Closes ssh connection."""
+
+ self.log.info('Closing ssh connection')
+ self.ssh.close()
+
+ def close_app(self, app: str) -> str:
+ """Closes any app whose name passed as an argument.
+
+ Args:
+ app: Application name.
+
+ Returns:
+ Resulting output of closing the application.
+ """
+
+ command = self._SSH_KILL_PROCESS_BY_NAME.format(process_name=app)
+ result, _, _ = self.run_command_paramiko(command)
+ return result
+
+ def start_app(self, app: str, location: str) -> str:
+ """Starts any app whose name passed as an argument.
+
+ Args:
+ app: Application name.
+ location: Directory location of the application.
+
+ Returns:
+ Resulting output of starting the application.
+
+ Raises:
+ RuntimeError:
+ Application failed to start.
+ """
+
+ command = self._SSH_START_APP_CMD_FORMAT.format(exe_path=location + app)
+ results, err, exit_status = self.run_command_paramiko(command)
+
+ id_in_err = re.search(
+ self._PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status),
+ err[-1],
+ )
+ if id_in_err:
+ raise RuntimeError('Fail to start app: ' + results + err)
+
+ return results
+
+ def check_app_running(self, app: str) -> bool:
+ """Checks if the given app is running.
+
+ Args:
+ app: Application name.
+
+ Returns:
+ A boolean representing if the application is running or not.
+ """
+ is_running_cmd1 = self._SSH_CHECK_APP_RUNNING_CMD_FORMAT.format(
+ regex_app_name=app
+ )
+
+ result, _, _ = self.run_command_paramiko(is_running_cmd1)
+ return 'PID' in result
diff --git a/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py b/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py
index b9bbf093d..d2133ad2d 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py
@@ -100,6 +100,9 @@ class WifiBaseTest(BaseTestClass):
self.country_code = WifiEnums.CountryCode.US
wutils.set_wifi_country_code(ad, self.country_code)
+ if hasattr(self, "flagged_features"):
+ self._configure_flagged_features(ad, self.flagged_features)
+
def setup_test(self):
if (hasattr(self, "android_devices")):
wutils.start_all_wlan_logs(self.android_devices)
@@ -1018,3 +1021,18 @@ class WifiBaseTest(BaseTestClass):
asserts.fail(self.result_detail)
return _safe_wrap_test_case
+
+ def _configure_flagged_features(self, ad, flagged_features):
+ for module, features in flagged_features.items():
+ for feature in features:
+ value = flagged_features[module][feature].lower()
+ if value not in ("true", "false"):
+ raise ValueError("Invalid flag value for %s %s." % (module, feature))
+ self.log.info("Setting feature flag %s %s to %s." % (module, feature, value))
+ adb_put_command = "device_config put %s %s %s" % (module, feature, value)
+ adb_get_command = "device_config get %s %s" % (module, feature)
+ ad.adb.shell(adb_put_command)
+ value_from_get = ad.adb.shell(adb_get_command)
+
+ if type(value_from_get) != str or value_from_get.lower() != value:
+ raise RuntimeError("Failed to set flag value to %s (now is %s) for %s %s." % (value, value_from_get, module, feature)) \ No newline at end of file
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py
index 9da35290d..9817e4bf1 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py
@@ -276,7 +276,7 @@ def get_iperf_arg_string(duration,
num_processes=1,
udp_throughput='1000M',
ipv6=False,
- udp_length=1470):
+ udp_length=1448):
"""Function to format iperf client arguments.
This function takes in iperf client parameters and returns a properly
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py
index d91803b30..60b74d549 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py
@@ -72,6 +72,7 @@ class BokehFigure():
title_size='15pt',
axis_label_size='12pt',
legend_label_size='12pt',
+ legend_location = 'top_right',
axis_tick_label_size='12pt',
x_axis_type='auto',
sizing_mode='scale_both',
@@ -91,6 +92,7 @@ class BokehFigure():
'title_size': title_size,
'axis_label_size': axis_label_size,
'legend_label_size': legend_label_size,
+ 'legend_location': legend_location,
'axis_tick_label_size': axis_tick_label_size,
'x_axis_type': x_axis_type,
'sizing_mode': sizing_mode
@@ -305,7 +307,7 @@ class BokehFigure():
axis_label_text_font_size=self.
fig_property['axis_label_size']), 'right')
# plot formatting
- self.plot.legend.location = 'top_right'
+ self.plot.legend.location = self.fig_property['legend_location']
self.plot.legend.click_policy = 'hide'
self.plot.title.text_font_size = self.fig_property['title_size']
self.plot.legend.label_text_font_size = self.fig_property[
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py
index fe2d3e7e3..d944db5ac 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py
@@ -114,6 +114,44 @@ RATE_TABLE = {
]
},
},
+ 'EHT': {
+ 1: {
+ 20: [
+ 8.6, 17.2, 25.8, 34.4, 51.6, 68.8, 77.4, 86.0, 103.2, 114.7,
+ 129.0, 143.4, 154.9, 172.1, 0, 0
+ ],
+ 40: [
+ 17.2, 34.4, 51.6, 68.8, 103.2, 137.6, 154.9, 172.1, 206.5, 229.4,
+ 258.1, 286.8, 309.7, 344.1, 0, 0
+ ],
+ 80: [
+ 36.0, 72.1, 108.1, 144.1, 216.2, 288.2, 324.3, 360.3, 432.4,
+ 480.4, 540.4, 600.4, 648.5, 720.6, 0, 0
+ ],
+ 160: [
+ 72.1, 144.1, 216.2, 288.2, 432.4, 576.5, 648.5, 720.6, 864.7,
+ 960.7, 1080.9, 1201, 1297.1, 1441.2, 0, 0
+ ]
+ },
+ 2: {
+ 20: [
+ 17.2, 34.4, 51.6, 68.8, 103.2, 137.6, 154.8, 172, 206.4, 229.4,
+ 258, 286.8, 309.8, 344.2, 0, 0
+ ],
+ 40: [
+ 34.4, 68.8, 103.2, 137.6, 206.4, 275.2, 309.6, 344, 412.8,
+ 458.8, 516, 573.6, 619.4, 688.2, 0, 0
+ ],
+ 80: [
+ 72, 144.2, 216.2, 288.2, 432.4, 576.4, 648.6, 720.6, 864.8,
+ 960.8, 1080.8, 1200.8, 1297, 1441.2, 0, 0
+ ],
+ 160: [
+ 144, 288.4, 432.4, 576.4, 864.8, 1152.8, 1297.2, 1441.2,
+ 1729.6, 1921.6, 2161.6, 2401.6, 2594.2, 2882.4, 0, 0
+ ]
+ },
+ },
}
@@ -343,7 +381,14 @@ def push_firmware(dut, firmware_files):
def disable_beamforming(dut):
- dut.adb.shell('wl txbf 0')
+ dut.adb.shell('wl down')
+ time.sleep(VERY_SHORT_SLEEP)
+ try:
+ dut.adb.shell('wl txbf 0')
+ dut.adb.shell('wl txbf_bfe_cap 0')
+ except:
+ logging.warning('Could not disable beamforming.')
+ dut.adb.shell('wl up')
def set_nss_capability(dut, nss):
@@ -366,16 +411,27 @@ def set_chain_mask(dut, chain):
return
# Set chain mask if needed
dut.adb.shell('wl down')
- time.sleep(VERY_SHORT_SLEEP)
+ time.sleep(SHORT_SLEEP)
dut.adb.shell('wl txchain 0x{}'.format(chain))
dut.adb.shell('wl rxchain 0x{}'.format(chain))
dut.adb.shell('wl up')
+ try:
+ curr_tx_chain = int(dut.adb.shell('wl txchain'))
+ curr_rx_chain = int(dut.adb.shell('wl rxchain'))
+ except:
+ curr_tx_chain = -1
+ curr_rx_chain = -1
+ if curr_tx_chain != chain or curr_rx_chain != chain:
+ logging.error('Set chain mask failed.')
+
class LinkLayerStats():
LLSTATS_CMD = 'wl dump ampdu; wl counters;'
LL_STATS_CLEAR_CMD = 'wl dump_clear ampdu; wl reset_cnts;'
+ BRCM_PHY_LOG_CLEAR_CMD = 'wl dump phycal; wl dump_clear txbf;'
+ BRCM_PHY_LOG_CMD = 'wl phy_rssi_ant; wl phy_snr_ant; wl nrate; wl dump phycal; wl tvpm; wl dump txbf;'
BW_REGEX = re.compile(r'Chanspec:.+ (?P<bandwidth>[0-9]+)MHz')
MCS_REGEX = re.compile(r'(?P<count>[0-9]+)\((?P<percent>[0-9]+)%\)')
RX_REGEX = re.compile(
@@ -413,6 +469,7 @@ class LinkLayerStats():
try:
llstats_output = self.dut.adb.shell(self.LLSTATS_CMD,
timeout=1)
+
self.dut.adb.shell_nb(self.LL_STATS_CLEAR_CMD)
wl_join = self.dut.adb.shell("wl status")
@@ -420,10 +477,20 @@ class LinkLayerStats():
self.bandwidth = int(
re.search(self.BW_REGEX, wl_join).group('bandwidth'))
except:
+ logging.debug('Failed to get counters and ampdu dumps.')
llstats_output = ''
+ try:
+ phy_log_output = self.dut.adb.shell(self.BRCM_PHY_LOG_CMD,
+ ignore_status=True,
+ timeout=1)
+ self.dut.adb.shell_nb(self.BRCM_PHY_LOG_CLEAR_CMD)
+ except:
+ logging.debug('Failed to get phy log.')
+ phy_log_output = ''
else:
llstats_output = ''
- self._update_stats(llstats_output)
+ phy_log_output = ''
+ self._update_stats(llstats_output, phy_log_output)
def reset_stats(self):
self.llstats_cumulative = self._empty_llstats()
@@ -574,10 +641,11 @@ class LinkLayerStats():
llstats_summary['rx_per'] = 0
return llstats_summary
- def _update_stats(self, llstats_output):
+ def _update_stats(self, llstats_output, phy_log_output):
self.llstats_cumulative = self._empty_llstats()
self.llstats_incremental = self._empty_llstats()
self.llstats_incremental['raw_output'] = llstats_output
+ self.llstats_incremental['phy_log_output'] = phy_log_output
self.llstats_incremental['mcs_stats'] = self._parse_mcs_stats(
llstats_output)
self.llstats_incremental['mpdu_stats'] = self._parse_mpdu_stats(
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py
index da47cb8d5..1f63fdd86 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py
@@ -84,6 +84,10 @@ def create(configs):
'name': 'NetgearRAXE500AP',
'package': 'netgear_raxe500'
},
+ ('Netgear', 'RS700'): {
+ 'name': 'NetgearRS700AP',
+ 'package': 'netgear_rs700'
+ },
('Brcm', 'Reference'): {
'name': 'BrcmRefAP',
'package': 'brcm_ref'
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py
index d1420df33..5c892f137 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py
@@ -187,7 +187,7 @@ class NetgearRAX120AP(NetgearR7500AP):
setting_to_update = {network: {}}
if channel:
if channel not in self.capabilities['channels'][network]:
- self.log.error('Ch{} is not supported on {} interface.'.format(
+ raise RuntimeError('Ch{} is not supported on {} interface.'.format(
channel, network))
setting_to_update[network]['channel'] = channel
@@ -198,7 +198,7 @@ class NetgearRAX120AP(NetgearR7500AP):
bandwidth = bandwidth.replace('bw',
self.capabilities['default_mode'])
if bandwidth not in self.capabilities['modes'][network]:
- self.log.error('{} mode is not supported on {} interface.'.format(
+ raise RuntimeError('{} mode is not supported on {} interface.'.format(
bandwidth, network))
setting_to_update[network]['bandwidth'] = str(bandwidth)
setting_to_update['enable_ax'] = int('HE' in bandwidth)
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py
index 9363bbef2..9928f33d0 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py
@@ -195,7 +195,7 @@ class NetgearRAX200AP(WifiRetailAP):
setting_to_update = {network: {}}
if channel:
if channel not in self.capabilities['channels'][network]:
- self.log.error('Ch{} is not supported on {} interface.'.format(
+ raise RuntimeError('Ch{} is not supported on {} interface.'.format(
channel, network))
setting_to_update[network]['channel'] = channel
@@ -206,7 +206,7 @@ class NetgearRAX200AP(WifiRetailAP):
bandwidth = bandwidth.replace('bw',
self.capabilities['default_mode'])
if bandwidth not in self.capabilities['modes'][network]:
- self.log.error('{} mode is not supported on {} interface.'.format(
+ raise RuntimeError('{} mode is not supported on {} interface.'.format(
bandwidth, network))
setting_to_update[network]['bandwidth'] = str(bandwidth)
setting_to_update['enable_ax'] = int('HE' in bandwidth)
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py
index c885e05a8..5c52b5941 100644
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py
@@ -197,7 +197,7 @@ class NetgearRAXE500AP(WifiRetailAP):
setting_to_update = {network: {}}
if channel:
if channel not in self.capabilities['channels'][network]:
- self.log.error('Ch{} is not supported on {} interface.'.format(
+ raise RuntimeError('Ch{} is not supported on {} interface.'.format(
channel, network))
if isinstance(channel, str) and '6g' in channel:
channel = int(channel[2:])
@@ -210,7 +210,7 @@ class NetgearRAXE500AP(WifiRetailAP):
bandwidth = bandwidth.replace('bw',
self.capabilities['default_mode'])
if bandwidth not in self.capabilities['modes'][network]:
- self.log.error('{} mode is not supported on {} interface.'.format(
+ raise RuntimeError('{} mode is not supported on {} interface.'.format(
bandwidth, network))
setting_to_update[network]['bandwidth'] = str(bandwidth)
setting_to_update['enable_ax'] = int('HE' in bandwidth)
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py
new file mode 100644
index 000000000..8976466c6
--- /dev/null
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py
@@ -0,0 +1,324 @@
+#!/usr/bin/env python3
+#
+# Copyright 2020 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import numpy
+import re
+import time
+from acts_contrib.test_utils.wifi.wifi_retail_ap import WifiRetailAP
+from acts_contrib.test_utils.wifi.wifi_retail_ap import BlockingBrowser
+
+BROWSER_WAIT_SHORT = 1
+BROWSER_WAIT_MED = 3
+BROWSER_WAIT_LONG = 30
+BROWSER_WAIT_EXTRA_LONG = 60
+
+
+class NetgearRS700AP(WifiRetailAP):
+ """Class that implements Netgear RS700 AP.
+
+ Since most of the class' implementation is shared with the R7000, this
+ class inherits from NetgearR7000AP and simply redefines config parameters
+ """
+
+ def __init__(self, ap_settings):
+ super().__init__(ap_settings)
+ self.init_gui_data()
+ # Read and update AP settings
+ self.read_ap_firmware()
+ self.read_ap_settings()
+ self.update_ap_settings(ap_settings)
+
+ def init_gui_data(self):
+ self.config_page = (
+ '{protocol}://{username}:{password}@'
+ '{ip_address}:{port}/WLG_wireless_tri_band.htm').format(
+ protocol=self.ap_settings['protocol'],
+ username=self.ap_settings['admin_username'],
+ password=self.ap_settings['admin_password'],
+ ip_address=self.ap_settings['ip_address'],
+ port=self.ap_settings['port'])
+ self.config_page_nologin = (
+ '{protocol}://{ip_address}:{port}/'
+ 'WLG_wireless_tri_band.htm').format(
+ protocol=self.ap_settings['protocol'],
+ ip_address=self.ap_settings['ip_address'],
+ port=self.ap_settings['port'])
+ self.config_page_advanced = (
+ '{protocol}://{username}:{password}@'
+ '{ip_address}:{port}/WLG_adv_tri_band2.htm').format(
+ protocol=self.ap_settings['protocol'],
+ username=self.ap_settings['admin_username'],
+ password=self.ap_settings['admin_password'],
+ ip_address=self.ap_settings['ip_address'],
+ port=self.ap_settings['port'])
+ self.firmware_page = (
+ '{protocol}://{username}:{password}@'
+ '{ip_address}:{port}/ADVANCED_home2_tri_band.htm').format(
+ protocol=self.ap_settings['protocol'],
+ username=self.ap_settings['admin_username'],
+ password=self.ap_settings['admin_password'],
+ ip_address=self.ap_settings['ip_address'],
+ port=self.ap_settings['port'])
+ self.capabilities = {
+ 'interfaces': ['2G', '5G_1', '6G'],
+ 'channels': {
+ '2G': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
+ '5G_1': [
+ 36, 40, 44, 48, 52, 56, 60, 64, 100, 104, 108, 112, 116,
+ 120, 124, 128, 132, 136, 140, 144, 149, 153, 157, 161, 165
+ ],
+ '6G': ['6g' + str(ch) for ch in numpy.arange(37, 222, 16)]
+ },
+ 'modes': {
+ '2G': ['EHT20', 'EHT40'],
+ '5G_1': ['EHT40', 'EHT80', 'EHT160'],
+ '6G': ['EHT40', 'EHT80', 'EHT160', 'EHT320']
+ },
+ 'default_mode': 'EHT'
+ }
+ for interface in self.capabilities['interfaces']:
+ self.ap_settings[interface] = {}
+
+ self.region_map = {
+ '3': 'Australia',
+ '4': 'Canada',
+ '5': 'Europe',
+ '7': 'Japan',
+ '8': 'Korea',
+ '11': 'North America',
+ '16': 'China',
+ '17': 'India',
+ '21': 'Middle East(Saudi Arabia/United Arab Emirates)',
+ '23': 'Singapore',
+ '25': 'Hong Kong',
+ '26': 'Vietnam'
+ }
+
+ self.bw_mode_text = {
+ '2G': {
+ 'EHT20': 'Up to 0.7 Gbps',
+ 'EHT40': 'Up to 1.4 Gbps',
+ },
+ '5G_1': {
+ 'EHT40': 'Up to 1.4 Gbps',
+ 'EHT80': 'Up to 2.9 Gbps',
+ 'EHT160': 'Up to 5.8 Gbps'
+ },
+ '6G': {
+ 'EHT40': 'Up to 1.4 Gbps',
+ 'EHT80': 'Up to 2.9 Gbps',
+ 'EHT160': 'Up to 5.8 Gbps',
+ 'EHT320': 'Up to 11.5 Gbps',
+ }
+ }
+ self.bw_mode_values = {
+ 'HT20': 'EHT20',
+ 'HT40': 'EHT40',
+ 'HT80': 'EHT80',
+ 'HT160': 'EHT160',
+ 'HT320': 'EHT320'
+ }
+
+ # Config ordering intentional to avoid GUI bugs
+ self.config_page_fields = collections.OrderedDict([
+ ('region', 'WRegion'), (('2G', 'ssid'), 'ssid'),
+ (('5G_1', 'ssid'), 'ssid_an'), (('6G', 'ssid'), 'ssid_an_2'),
+ (('2G', 'channel'), 'w_channel'),
+ (('5G_1', 'channel'), 'w_channel_an'),
+ (('6G', 'channel'), 'w_channel_an_2'),
+ (('2G', 'bandwidth'), 'opmode'),
+ (('5G_1', 'bandwidth'), 'opmode_an'),
+ (('6G', 'bandwidth'), 'opmode_an_2'),
+ (('6G', 'security_type'), 'security_type_an_2'),
+ (('5G_1', 'security_type'), 'security_type_an'),
+ (('2G', 'security_type'), 'security_type'),
+ (('2G', 'password'), 'passphrase'),
+ (('5G_1', 'password'), 'passphrase_an'),
+ (('6G', 'password'), 'passphrase_an_2')
+ ])
+
+ def _set_channel_and_bandwidth(self,
+ network,
+ channel=None,
+ bandwidth=None):
+ """Helper function that sets network bandwidth and channel.
+
+ Args:
+ network: string containing network identifier (2G, 5G_1, 5G_2)
+ channel: desired channel
+ bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
+ """
+
+ setting_to_update = {network: {}}
+ if channel:
+ if channel not in self.capabilities['channels'][network]:
+ raise RuntimeError('Ch{} is not supported on {} interface.'.format(
+ channel, network))
+ if isinstance(channel, str) and '6g' in channel:
+ channel = int(channel[2:])
+ setting_to_update[network]['channel'] = channel
+
+ if bandwidth is None:
+ return setting_to_update
+
+ if 'bw' in bandwidth:
+ bandwidth = bandwidth.replace('bw',
+ self.capabilities['default_mode'])
+ if bandwidth not in self.capabilities['modes'][network]:
+ raise RuntimeError('{} mode is not supported on {} interface.'.format(
+ bandwidth, network))
+ setting_to_update[network]['bandwidth'] = str(bandwidth)
+ return setting_to_update
+
+ def set_bandwidth(self, network, bandwidth):
+ """Function that sets network bandwidth/mode.
+
+ Args:
+ network: string containing network identifier (2G, 5G_1, 5G_2)
+ bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
+ """
+
+ setting_to_update = self._set_channel_and_bandwidth(
+ network, bandwidth=bandwidth)
+ self.update_ap_settings(setting_to_update)
+
+ def set_channel(self, network, channel):
+ """Function that sets network channel.
+
+ Args:
+ network: string containing network identifier (2G, 5G_1, 5G_2)
+ channel: string or int containing channel
+ """
+ setting_to_update = self._set_channel_and_bandwidth(network,
+ channel=channel)
+ self.update_ap_settings(setting_to_update)
+
+ def set_channel_and_bandwidth(self, network, channel, bandwidth):
+ """Function that sets network bandwidth/mode.
+
+ Args:
+ network: string containing network identifier (2G, 5G_1, 5G_2)
+ channel: desired channel
+ bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80.
+ """
+ setting_to_update = self._set_channel_and_bandwidth(
+ network, channel=channel, bandwidth=bandwidth)
+ self.update_ap_settings(setting_to_update)
+
+ def read_ap_firmware(self):
+ """Function to read ap settings."""
+ with BlockingBrowser(self.ap_settings['headless_browser'],
+ 900) as browser:
+
+ # Visit URL
+ browser.visit_persistent(self.firmware_page, BROWSER_WAIT_MED, 10)
+ firmware_regex = re.compile(
+ r'Firmware Version[\s\S]+V(?P<version>[0-9._]+)')
+ #firmware_version = re.search(firmware_regex, browser.html)
+ firmware_version = re.search(firmware_regex,
+ browser.driver.page_source)
+ if firmware_version:
+ self.ap_settings['firmware_version'] = firmware_version.group(
+ 'version')
+ else:
+ self.ap_settings['firmware_version'] = -1
+
+ def read_ap_settings(self):
+ """Function to read ap settings."""
+ with BlockingBrowser(self.ap_settings['headless_browser'],
+ 900) as browser:
+ # Visit URL
+ browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10)
+
+ for field_key, field_name in self.config_page_fields.items():
+ field_value = browser.get_element_value(field_name)
+ if 'bandwidth' in field_key:
+ self.ap_settings[field_key[0]][
+ field_key[1]] = self.bw_mode_values[field_value]
+ elif 'region' in field_key:
+ self.ap_settings['region'] = self.region_map[field_value]
+ elif 'security_type' in field_key:
+ self.ap_settings[field_key[0]][field_key[1]] = field_value
+ elif 'channel' in field_key:
+ self.ap_settings[field_key[0]][field_key[1]] = int(
+ field_value)
+ else:
+ self.ap_settings[field_key[0]][field_key[1]] = field_value
+ return self.ap_settings.copy()
+
+ def configure_ap(self, **config_flags):
+ """Function to configure ap wireless settings."""
+ # Configure radios
+ with BlockingBrowser(self.ap_settings['headless_browser'],
+ 900) as browser:
+ # Visit URL
+ browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10)
+ browser.visit_persistent(self.config_page_nologin,
+ BROWSER_WAIT_MED, 10, self.config_page)
+
+ # Update region, and power/bandwidth for each network
+ if browser.is_element_enabled(self.config_page_fields['region']):
+ browser.set_element_value(self.config_page_fields['region'],
+ self.ap_settings['region'],
+ select_method='text')
+ else:
+ self.log.warning('Cannot change region.')
+ for field_key, field_name in self.config_page_fields.items():
+ if 'bandwidth' in field_key:
+ try:
+ browser.set_element_value(
+ field_name,
+ self.bw_mode_text[field_key[0]][self.ap_settings[
+ field_key[0]][field_key[1]]],
+ select_method='text')
+ except AttributeError:
+ self.log.warning(
+ 'Cannot select bandwidth. Keeping AP default.')
+
+ # Update security settings (passwords updated only if applicable)
+ for field_key, field_name in self.config_page_fields.items():
+ if 'security_type' in field_key:
+ browser.set_element_value(
+ field_name,
+ self.ap_settings[field_key[0]][field_key[1]])
+ if 'WPA' in self.ap_settings[field_key[0]][field_key[1]]:
+ browser.set_element_value(
+ self.config_page_fields[(field_key[0],
+ 'password')],
+ self.ap_settings[field_key[0]]['password'])
+
+ for field_key, field_name in self.config_page_fields.items():
+ if 'ssid' in field_key:
+ browser.set_element_value(
+ field_name,
+ self.ap_settings[field_key[0]][field_key[1]])
+ elif 'channel' in field_key:
+ try:
+ browser.set_element_value(
+ field_name,
+ self.ap_settings[field_key[0]][field_key[1]])
+ except AttributeError:
+ self.log.warning(
+ 'Cannot select channel. Keeping AP default.')
+ browser.accept_alert_if_present(BROWSER_WAIT_SHORT)
+
+ time.sleep(BROWSER_WAIT_SHORT)
+ browser.click_button('Apply')
+ browser.accept_alert_if_present(BROWSER_WAIT_SHORT)
+ time.sleep(BROWSER_WAIT_SHORT)
+ browser.visit_persistent(self.config_page, BROWSER_WAIT_EXTRA_LONG,
+ 10)
diff --git a/acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py b/acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py
new file mode 100644
index 000000000..96b2aa0e0
--- /dev/null
+++ b/acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2022 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import csv
+import itertools
+import numpy
+import json
+import os
+from acts import context
+from acts import base_test
+from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
+from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
+from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
+from CellularLtePlusFr1PeakThroughputTest import CellularFr1SingleCellPeakThroughputTest
+
+from functools import partial
+
+
+class CellularFr1RvrTest(CellularFr1SingleCellPeakThroughputTest):
+ """Class to test single cell FR1 NSA sensitivity"""
+
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.testcase_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_case())
+ self.testclass_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_class())
+ self.publish_testcase_metrics = True
+ self.testclass_params = self.user_params['nr_rvr_test_params']
+ self.tests = self.generate_test_cases(
+ channel_list=['LOW', 'MID', 'HIGH'],
+ nr_ul_mcs=4,
+ lte_dl_mcs_table='QAM256',
+ lte_dl_mcs=4,
+ lte_ul_mcs_table='QAM256',
+ lte_ul_mcs=4,
+ transform_precoding=0)
+
+ def process_testclass_results(self):
+ pass
+
+ def process_testcase_results(self):
+ if self.current_test_name not in self.testclass_results:
+ return
+ testcase_data = self.testclass_results[self.current_test_name]
+ results_file_path = os.path.join(
+ context.get_current_context().get_full_output_path(),
+ '{}.json'.format(self.current_test_name))
+ with open(results_file_path, 'w') as results_file:
+ json.dump(wputils.serialize_dict(testcase_data),
+ results_file,
+ indent=4)
+
+ average_throughput_list = []
+ theoretical_throughput_list = []
+ nr_cell_index = testcase_data['testcase_params']['endc_combo_config']['lte_cell_count']
+ cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][nr_cell_index]
+ for result in testcase_data['results']:
+ average_throughput_list.append(
+ result['throughput_measurements']['nr_tput_result']['total']['DL']['average_tput'])
+ theoretical_throughput_list.append(
+ result['throughput_measurements']['nr_tput_result']['total']['DL']['theoretical_tput'])
+ padding_len = len(cell_power_list) - len(average_throughput_list)
+ average_throughput_list.extend([0] * padding_len)
+ theoretical_throughput_list.extend([0] * padding_len)
+
+ testcase_data['average_throughput_list'] = average_throughput_list
+ testcase_data[
+ 'theoretical_throughput_list'] = theoretical_throughput_list
+ testcase_data['cell_power_list'] = cell_power_list
+
+ plot = BokehFigure(
+ title='Band {} - RvR'.format(testcase_data['testcase_params']['endc_combo_config']['cell_list'][nr_cell_index]['band']),
+ x_label='Cell Power (dBm)',
+ primary_y_label='PHY Rate (Mbps)')
+
+ plot.add_line(
+ testcase_data['cell_power_list'],
+ testcase_data['average_throughput_list'],
+ 'Average Throughput',
+ width=1)
+ plot.add_line(
+ testcase_data['cell_power_list'],
+ testcase_data['theoretical_throughput_list'],
+ 'Average Throughput',
+ width=1,
+ style='dashed')
+ plot.generate_figure()
+ output_file_path = os.path.join(self.log_path, '{}.html'.format(self.current_test_name))
+ BokehFigure.save_figure(plot, output_file_path)
+
+
+ def get_per_cell_power_sweeps(self, testcase_params):
+ nr_cell_index = testcase_params['endc_combo_config']['lte_cell_count']
+ start_atten = self.testclass_params['nr_cell_power_start']
+ # get current cell power start
+ nr_cell_sweep = list(
+ numpy.arange(start_atten,
+ self.testclass_params['nr_cell_power_stop'],
+ self.testclass_params['nr_cell_power_step']))
+ lte_sweep = [self.testclass_params['lte_cell_power']
+ ] * len(nr_cell_sweep)
+ if nr_cell_index == 0:
+ cell_power_sweeps = [nr_cell_sweep]
+ else:
+ cell_power_sweeps = [lte_sweep, nr_cell_sweep]
+ return cell_power_sweeps
+
+ def generate_test_cases(self, channel_list, **kwargs):
+ test_cases = []
+ with open(self.testclass_params['nr_single_cell_configs'],
+ 'r') as csvfile:
+ test_configs = csv.DictReader(csvfile)
+ for test_config, channel in itertools.product(
+ test_configs, channel_list):
+ if int(test_config['skip_test']):
+ continue
+ endc_combo_config = self.generate_endc_combo_config(
+ test_config)
+ test_name = 'test_fr1_{}_{}'.format(
+ test_config['nr_band'], channel.lower())
+ test_params = collections.OrderedDict(
+ endc_combo_config=endc_combo_config,
+ nr_dl_mcs=self.testclass_params['link_adaptation_config'],
+ **kwargs)
+ setattr(self, test_name,
+ partial(self._test_throughput_bler, test_params))
+ test_cases.append(test_name)
+ return test_cases
diff --git a/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py b/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py
index e483e59a2..2e515cb67 100644
--- a/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py
+++ b/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py
@@ -56,8 +56,10 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest):
plots = collections.OrderedDict()
compiled_data = collections.OrderedDict()
for testcase_name, testcase_data in self.testclass_results.items():
+ nr_cell_index = testcase_data['testcase_params'][
+ 'endc_combo_config']['lte_cell_count']
cell_config = testcase_data['testcase_params'][
- 'endc_combo_config']['cell_list'][1]
+ 'endc_combo_config']['cell_list'][nr_cell_index]
test_id = tuple(('band', cell_config['band']))
if test_id not in plots:
# Initialize test id data when not present
@@ -134,15 +136,19 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest):
bler_list = []
average_throughput_list = []
theoretical_throughput_list = []
+ nr_cell_index = testcase_data['testcase_params']['endc_combo_config'][
+ 'lte_cell_count']
cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][
- 1]
+ nr_cell_index]
for result in testcase_data['results']:
- bler_list.append(
- result['nr_bler_result']['total']['DL']['nack_ratio'])
+ bler_list.append(result['throughput_measurements']
+ ['nr_bler_result']['total']['DL']['nack_ratio'])
average_throughput_list.append(
- result['nr_tput_result']['total']['DL']['average_tput'])
+ result['throughput_measurements']['nr_tput_result']['total']
+ ['DL']['average_tput'])
theoretical_throughput_list.append(
- result['nr_tput_result']['total']['DL']['theoretical_tput'])
+ result['throughput_measurements']['nr_tput_result']['total']
+ ['DL']['theoretical_tput'])
padding_len = len(cell_power_list) - len(average_throughput_list)
average_throughput_list.extend([0] * padding_len)
theoretical_throughput_list.extend([0] * padding_len)
@@ -160,8 +166,8 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest):
sensitivity = cell_power_list[sensitivity_idx]
self.log.info('NR Band {} MCS {} Sensitivity = {}dBm'.format(
testcase_data['testcase_params']['endc_combo_config']['cell_list']
- [1]['band'], testcase_data['testcase_params']['nr_dl_mcs'],
- sensitivity))
+ [nr_cell_index]['band'],
+ testcase_data['testcase_params']['nr_dl_mcs'], sensitivity))
testcase_data['bler_list'] = bler_list
testcase_data['average_throughput_list'] = average_throughput_list
@@ -172,13 +178,14 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest):
def get_per_cell_power_sweeps(self, testcase_params):
# get reference test
- current_band = testcase_params['endc_combo_config']['cell_list'][1][
- 'band']
+ nr_cell_index = testcase_params['endc_combo_config']['lte_cell_count']
+ current_band = testcase_params['endc_combo_config']['cell_list'][
+ nr_cell_index]['band']
reference_test = None
reference_sensitivity = None
for testcase_name, testcase_data in self.testclass_results.items():
if testcase_data['testcase_params']['endc_combo_config'][
- 'cell_list'][1]['band'] == current_band:
+ 'cell_list'][nr_cell_index]['band'] == current_band:
reference_test = testcase_name
reference_sensitivity = testcase_data['sensitivity']
if reference_test and reference_sensitivity and not self.retry_flag:
@@ -199,7 +206,10 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest):
self.testclass_params['nr_cell_power_step']))
lte_sweep = [self.testclass_params['lte_cell_power']
] * len(nr_cell_sweep)
- cell_power_sweeps = [lte_sweep, nr_cell_sweep]
+ if nr_cell_index == 0:
+ cell_power_sweeps = [nr_cell_sweep]
+ else:
+ cell_power_sweeps = [lte_sweep, nr_cell_sweep]
return cell_power_sweeps
def generate_test_cases(self, channel_list, dl_mcs_list, **kwargs):
diff --git a/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py b/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py
index b422a3045..65fef9300 100644
--- a/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py
+++ b/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py
@@ -66,60 +66,60 @@ class CellularLtePlusFr1PeakThroughputTest(CellularThroughputBaseTest):
'nr_cell_count']:
metric_map.update({
'nr_min_dl_tput':
- testcase_result['nr_tput_result']['total']['DL']['min_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']['min_tput'],
'nr_max_dl_tput':
- testcase_result['nr_tput_result']['total']['DL']['max_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']['max_tput'],
'nr_avg_dl_tput':
- testcase_result['nr_tput_result']['total']['DL']
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']
['average_tput'],
'nr_theoretical_dl_tput':
- testcase_result['nr_tput_result']['total']['DL']
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']
['theoretical_tput'],
'nr_dl_bler':
- testcase_result['nr_bler_result']['total']['DL']['nack_ratio']
+ testcase_result['throughput_measurements']['nr_bler_result']['total']['DL']['nack_ratio']
* 100,
'nr_min_dl_tput':
- testcase_result['nr_tput_result']['total']['UL']['min_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']['min_tput'],
'nr_max_dl_tput':
- testcase_result['nr_tput_result']['total']['UL']['max_tput'],
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']['max_tput'],
'nr_avg_dl_tput':
- testcase_result['nr_tput_result']['total']['UL']
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']
['average_tput'],
'nr_theoretical_dl_tput':
- testcase_result['nr_tput_result']['total']['UL']
+ testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']
['theoretical_tput'],
'nr_ul_bler':
- testcase_result['nr_bler_result']['total']['UL']['nack_ratio']
+ testcase_result['throughput_measurements']['nr_bler_result']['total']['UL']['nack_ratio']
* 100
})
if testcase_data['testcase_params']['endc_combo_config'][
'lte_cell_count']:
metric_map.update({
'lte_min_dl_tput':
- testcase_result['lte_tput_result']['total']['DL']['min_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']['min_tput'],
'lte_max_dl_tput':
- testcase_result['lte_tput_result']['total']['DL']['max_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']['max_tput'],
'lte_avg_dl_tput':
- testcase_result['lte_tput_result']['total']['DL']
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']
['average_tput'],
'lte_theoretical_dl_tput':
- testcase_result['lte_tput_result']['total']['DL']
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']
['theoretical_tput'],
'lte_dl_bler':
- testcase_result['lte_bler_result']['total']['DL']['nack_ratio']
+ testcase_result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio']
* 100,
'lte_min_dl_tput':
- testcase_result['lte_tput_result']['total']['UL']['min_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']['min_tput'],
'lte_max_dl_tput':
- testcase_result['lte_tput_result']['total']['UL']['max_tput'],
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']['max_tput'],
'lte_avg_dl_tput':
- testcase_result['lte_tput_result']['total']['UL']
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']
['average_tput'],
'lte_theoretical_dl_tput':
- testcase_result['lte_tput_result']['total']['UL']
+ testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']
['theoretical_tput'],
'lte_ul_bler':
- testcase_result['lte_bler_result']['total']['UL']['nack_ratio']
+ testcase_result['throughput_measurements']['lte_bler_result']['total']['UL']['nack_ratio']
* 100
})
if self.publish_testcase_metrics:
@@ -160,68 +160,68 @@ class CellularLtePlusFr1PeakThroughputTest(CellularThroughputBaseTest):
'endc_combo_config']['nr_cell_count']:
row_dict.update({
'NR DL Min. Throughput':
- result['nr_tput_result']['total']['DL']
+ result['throughput_measurements']['nr_tput_result']['total']['DL']
['min_tput'],
'NR DL Max. Throughput':
- result['nr_tput_result']['total']['DL']
+ result['throughput_measurements']['nr_tput_result']['total']['DL']
['max_tput'],
'NR DL Avg. Throughput':
- result['nr_tput_result']['total']['DL']
+ result['throughput_measurements']['nr_tput_result']['total']['DL']
['average_tput'],
'NR DL Theoretical Throughput':
- result['nr_tput_result']['total']['DL']
+ result['throughput_measurements']['nr_tput_result']['total']['DL']
['theoretical_tput'],
'NR UL Min. Throughput':
- result['nr_tput_result']['total']['UL']
+ result['throughput_measurements']['nr_tput_result']['total']['UL']
['min_tput'],
'NR UL Max. Throughput':
- result['nr_tput_result']['total']['UL']
+ result['throughput_measurements']['nr_tput_result']['total']['UL']
['max_tput'],
'NR UL Avg. Throughput':
- result['nr_tput_result']['total']['UL']
+ result['throughput_measurements']['nr_tput_result']['total']['UL']
['average_tput'],
'NR UL Theoretical Throughput':
- result['nr_tput_result']['total']['UL']
+ result['throughput_measurements']['nr_tput_result']['total']['UL']
['theoretical_tput'],
'NR DL BLER (%)':
- result['nr_bler_result']['total']['DL']
+ result['throughput_measurements']['nr_bler_result']['total']['DL']
['nack_ratio'] * 100,
'NR UL BLER (%)':
- result['nr_bler_result']['total']['UL']
+ result['throughput_measurements']['nr_bler_result']['total']['UL']
['nack_ratio'] * 100
})
if testcase_results['testcase_params'][
'endc_combo_config']['lte_cell_count']:
row_dict.update({
'LTE DL Min. Throughput':
- result['lte_tput_result']['total']['DL']
+ result['throughput_measurements']['lte_tput_result']['total']['DL']
['min_tput'],
'LTE DL Max. Throughput':
- result['lte_tput_result']['total']['DL']
+ result['throughput_measurements']['lte_tput_result']['total']['DL']
['max_tput'],
'LTE DL Avg. Throughput':
- result['lte_tput_result']['total']['DL']
+ result['throughput_measurements']['lte_tput_result']['total']['DL']
['average_tput'],
'LTE DL Theoretical Throughput':
- result['lte_tput_result']['total']['DL']
+ result['throughput_measurements']['lte_tput_result']['total']['DL']
['theoretical_tput'],
'LTE UL Min. Throughput':
- result['lte_tput_result']['total']['UL']
+ result['throughput_measurements']['lte_tput_result']['total']['UL']
['min_tput'],
'LTE UL Max. Throughput':
- result['lte_tput_result']['total']['UL']
+ result['throughput_measurements']['lte_tput_result']['total']['UL']
['max_tput'],
'LTE UL Avg. Throughput':
- result['lte_tput_result']['total']['UL']
+ result['throughput_measurements']['lte_tput_result']['total']['UL']
['average_tput'],
'LTE UL Theoretical Throughput':
- result['lte_tput_result']['total']['UL']
+ result['throughput_measurements']['lte_tput_result']['total']['UL']
['theoretical_tput'],
'LTE DL BLER (%)':
- result['lte_bler_result']['total']['DL']
+ result['throughput_measurements']['lte_bler_result']['total']['DL']
['nack_ratio'] * 100,
'LTE UL BLER (%)':
- result['lte_bler_result']['total']['UL']
+ result['throughput_measurements']['lte_bler_result']['total']['UL']
['nack_ratio'] * 100
})
writer.writerow(row_dict)
@@ -316,7 +316,8 @@ class CellularLteFr1EndcPeakThroughputTest(CellularLtePlusFr1PeakThroughputTest
cell_config['cell_type'] = 'NR5G'
nr_cell_count = nr_cell_count + 1
cell_config['cell_number'] = nr_cell_count
- nr_dl_carriers.append(cell_config['cell_number'])
+ nr_dl_carriers.append(cell_config['cell_number']),
+ cell_config['nr_cell_type'] = 'NSA',
cell_config['band'] = 'N' + dl_config_match.group('band')
cell_config['duplex_mode'] = 'FDD' if cell_config[
'band'] in cputils.DUPLEX_MODE_TO_BAND_MAPPING['NR5G'][
@@ -442,6 +443,7 @@ class CellularSingleCellThroughputTest(CellularLtePlusFr1PeakThroughputTest):
1,
'band':
test_config['nr_band'],
+ 'nr_cell_type': test_config['nr_cell_type'],
'duplex_mode':
test_config['nr_duplex_mode'],
'dl_mimo_config':
@@ -489,6 +491,8 @@ class CellularFr1SingleCellPeakThroughputTest(CellularSingleCellThroughputTest
self.tests = self.generate_test_cases(
nr_mcs_pair_list=[(27, 4), (4, 27)],
nr_channel_list=['LOW', 'MID', 'HIGH'],
+ schedule_scenario='FULL_TPUT',
+ schedule_slot_ratio=80,
transform_precoding=0,
lte_dl_mcs=4,
lte_dl_mcs_table='QAM256',
@@ -507,7 +511,7 @@ class CellularFr1SingleCellPeakThroughputTest(CellularSingleCellThroughputTest
continue
endc_combo_config = self.generate_endc_combo_config(
test_config)
- endc_combo_config['cell_list'][1]['channel'] = nr_channel
+ endc_combo_config['cell_list'][endc_combo_config['lte_cell_count']]['channel'] = nr_channel
test_name = 'test_fr1_{}_{}_dl_mcs{}_ul_mcs{}'.format(
test_config['nr_band'], nr_channel.lower(), nr_mcs_pair[0],
nr_mcs_pair[1])
diff --git a/acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py b/acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py
new file mode 100644
index 000000000..d444bca0a
--- /dev/null
+++ b/acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py
@@ -0,0 +1,211 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2022 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import csv
+import itertools
+import numpy
+import json
+import re
+import os
+from acts import context
+from acts import base_test
+from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
+from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
+from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure
+from CellularLtePlusFr1PeakThroughputTest import CellularLteSingleCellPeakThroughputTest
+
+from functools import partial
+
+
+class CellularLteRvrTest(CellularLteSingleCellPeakThroughputTest):
+ """Class to test single cell LTE sensitivity"""
+
+ def __init__(self, controllers):
+ base_test.BaseTestClass.__init__(self, controllers)
+ self.testcase_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_case())
+ self.testclass_metric_logger = (
+ BlackboxMappedMetricLogger.for_test_class())
+ self.publish_testcase_metrics = True
+ self.testclass_params = self.user_params['lte_sensitivity_test_params']
+ self.tests = self.generate_test_cases(lte_dl_mcs_table='QAM256',
+ lte_ul_mcs_table='QAM256',
+ lte_ul_mcs=4,
+ transform_precoding=0)
+
+ def process_testclass_results(self):
+ # Plot individual test id results raw data and compile metrics
+ plots = collections.OrderedDict()
+ compiled_data = collections.OrderedDict()
+ for testcase_name, testcase_data in self.testclass_results.items():
+ cell_config = testcase_data['testcase_params'][
+ 'endc_combo_config']['cell_list'][0]
+ test_id = tuple(('band', cell_config['band']))
+ if test_id not in plots:
+ # Initialize test id data when not present
+ compiled_data[test_id] = {
+ 'mcs': [],
+ 'average_throughput': [],
+ 'theoretical_throughput': [],
+ 'cell_power': [],
+ }
+ plots[test_id] = BokehFigure(
+ title='Band {} ({}) - BLER Curves'.format(
+ cell_config['band'],
+ testcase_data['testcase_params']['lte_dl_mcs_table']),
+ x_label='Cell Power (dBm)',
+ primary_y_label='BLER (Mbps)')
+ test_id_rvr = test_id + tuple('RvR')
+ plots[test_id_rvr] = BokehFigure(
+ title='Band {} ({}) - RvR'.format(
+ cell_config['band'],
+ testcase_data['testcase_params']['lte_dl_mcs_table']),
+ x_label='Cell Power (dBm)',
+ primary_y_label='PHY Rate (Mbps)')
+ # Compile test id data and metrics
+ compiled_data[test_id]['average_throughput'].append(
+ testcase_data['average_throughput_list'])
+ compiled_data[test_id]['cell_power'].append(
+ testcase_data['cell_power_list'])
+ compiled_data[test_id]['mcs'].append(
+ testcase_data['testcase_params']['lte_dl_mcs'])
+ # Add test id to plots
+ plots[test_id].add_line(
+ testcase_data['cell_power_list'],
+ testcase_data['bler_list'],
+ 'MCS {}'.format(
+ testcase_data['testcase_params']['lte_dl_mcs']),
+ width=1)
+ plots[test_id_rvr].add_line(
+ testcase_data['cell_power_list'],
+ testcase_data['average_throughput_list'],
+ 'MCS {}'.format(
+ testcase_data['testcase_params']['lte_dl_mcs']),
+ width=1,
+ style='dashed')
+
+ # Compute average RvRs and compute metrics over orientations
+ for test_id, test_data in compiled_data.items():
+ test_id_rvr = test_id + tuple('RvR')
+ cell_power_interp = sorted(set(sum(test_data['cell_power'], [])))
+ average_throughput_interp = []
+ for mcs, cell_power, throughput in zip(
+ test_data['mcs'], test_data['cell_power'],
+ test_data['average_throughput']):
+ throughput_interp = numpy.interp(cell_power_interp,
+ cell_power[::-1],
+ throughput[::-1])
+ average_throughput_interp.append(throughput_interp)
+ rvr = numpy.max(average_throughput_interp, 0)
+ plots[test_id_rvr].add_line(cell_power_interp, rvr,
+ 'Rate vs. Range')
+
+ figure_list = []
+ for plot_id, plot in plots.items():
+ plot.generate_figure()
+ figure_list.append(plot)
+ output_file_path = os.path.join(self.log_path, 'results.html')
+ BokehFigure.save_figures(figure_list, output_file_path)
+
+ def process_testcase_results(self):
+ if self.current_test_name not in self.testclass_results:
+ return
+ testcase_data = self.testclass_results[self.current_test_name]
+ results_file_path = os.path.join(
+ context.get_current_context().get_full_output_path(),
+ '{}.json'.format(self.current_test_name))
+ with open(results_file_path, 'w') as results_file:
+ json.dump(wputils.serialize_dict(testcase_data),
+ results_file,
+ indent=4)
+
+ bler_list = []
+ average_throughput_list = []
+ theoretical_throughput_list = []
+ cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][
+ 0]
+ for result in testcase_data['results']:
+ bler_list.append(
+ result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio'])
+ average_throughput_list.append(
+ result['throughput_measurements']['lte_tput_result']['total']['DL']['average_tput'])
+ theoretical_throughput_list.append(
+ result['throughput_measurements']['lte_tput_result']['total']['DL']['theoretical_tput'])
+ padding_len = len(cell_power_list) - len(average_throughput_list)
+ average_throughput_list.extend([0] * padding_len)
+ theoretical_throughput_list.extend([0] * padding_len)
+
+ testcase_data['bler_list'] = bler_list
+ testcase_data['average_throughput_list'] = average_throughput_list
+ testcase_data[
+ 'theoretical_throughput_list'] = theoretical_throughput_list
+ testcase_data['cell_power_list'] = cell_power_list
+
+ plot = BokehFigure(
+ title='Band {} - RvR'.format(testcase_data['testcase_params']['endc_combo_config']['cell_list'][0]['band']),
+ x_label='Cell Power (dBm)',
+ primary_y_label='PHY Rate (Mbps)')
+
+ plot.add_line(
+ testcase_data['cell_power_list'],
+ testcase_data['average_throughput_list'],
+ 'Average Throughput',
+ width=1)
+ plot.add_line(
+ testcase_data['cell_power_list'],
+ testcase_data['theoretical_throughput_list'],
+ 'Average Throughput',
+ width=1,
+ style='dashed')
+ plot.generate_figure()
+ output_file_path = os.path.join(self.log_path, '{}.html'.format(self.current_test_name))
+ BokehFigure.save_figure(plot, output_file_path)
+
+ def get_per_cell_power_sweeps(self, testcase_params):
+ # get current cell power start
+ cell_power_sweeps = [
+ list(
+ numpy.arange(self.testclass_params['lte_cell_power_start'],
+ self.testclass_params['lte_cell_power_stop'],
+ self.testclass_params['lte_cell_power_step']))
+ ]
+ return cell_power_sweeps
+
+ def generate_test_cases(self, lte_dl_mcs_table,
+ lte_ul_mcs_table, lte_ul_mcs, **kwargs):
+ test_cases = []
+ with open(self.testclass_params['lte_single_cell_configs'],
+ 'r') as csvfile:
+ test_configs = csv.DictReader(csvfile)
+ for test_config in test_configs:
+ if int(test_config['skip_test']):
+ continue
+ endc_combo_config = self.generate_endc_combo_config(
+ test_config)
+ test_name = 'test_lte_B{}_dl_{}'.format(
+ test_config['lte_band'], lte_dl_mcs_table)
+ test_params = collections.OrderedDict(
+ endc_combo_config=endc_combo_config,
+ lte_dl_mcs_table=lte_dl_mcs_table,
+ lte_dl_mcs='WCQI',
+ lte_ul_mcs_table=lte_ul_mcs_table,
+ lte_ul_mcs=lte_ul_mcs,
+ **kwargs)
+ setattr(self, test_name,
+ partial(self._test_throughput_bler, test_params))
+ test_cases.append(test_name)
+ return test_cases
diff --git a/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py b/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py
index 5e27bca5a..22e436bc1 100644
--- a/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py
+++ b/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py
@@ -141,12 +141,14 @@ class CellularLteSensitivityTest(CellularLteSingleCellPeakThroughputTest):
cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][
0]
for result in testcase_data['results']:
- bler_list.append(
- result['lte_bler_result']['total']['DL']['nack_ratio'])
+ bler_list.append(result['throughput_measurements']
+ ['lte_bler_result']['total']['DL']['nack_ratio'])
average_throughput_list.append(
- result['lte_tput_result']['total']['DL']['average_tput'])
+ result['throughput_measurements']['lte_tput_result']['total']
+ ['DL']['average_tput'])
theoretical_throughput_list.append(
- result['lte_tput_result']['total']['DL']['theoretical_tput'])
+ result['throughput_measurements']['lte_tput_result']['total']
+ ['DL']['theoretical_tput'])
padding_len = len(cell_power_list) - len(average_throughput_list)
average_throughput_list.extend([0] * padding_len)
theoretical_throughput_list.extend([0] * padding_len)
diff --git a/acts_tests/tests/google/gnss/GnssConcurrencyTest.py b/acts_tests/tests/google/gnss/GnssConcurrencyTest.py
index 3cbb2a7ff..f6f78f274 100644
--- a/acts_tests/tests/google/gnss/GnssConcurrencyTest.py
+++ b/acts_tests/tests/google/gnss/GnssConcurrencyTest.py
@@ -35,22 +35,22 @@ CONCURRENCY_TYPE = {
}
GPS_XML_CONFIG = {
- "CS": [
- ' IgnorePosition=\"true\"\n', ' IgnoreEph=\"true\"\n',
- ' IgnoreTime=\"true\"\n', ' AsstIgnoreLto=\"true\"\n',
- ' IgnoreJniTime=\"true\"\n'
- ],
- "WS": [
- ' IgnorePosition=\"true\"\n', ' AsstIgnoreLto=\"true\"\n',
- ' IgnoreJniTime=\"true\"\n'
- ],
- "HS": []
+ "CS": {
+ 'IgnorePosition': 'true', 'IgnoreEph': 'true',
+ 'IgnoreTime': 'true', 'AsstIgnoreLto': 'true',
+ 'IgnoreJniTime': 'true',
+ },
+ "WS": {
+ 'IgnorePosition': 'true', 'AsstIgnoreLto': 'true',
+ 'IgnoreJniTime': 'true',
+ },
+ "HS": {}
}
-ONCHIP_CONFIG = [
- ' EnableOnChipStopNotification=\"1\"\n',
- ' EnableOnChipStopNotification=\"2\"\n'
-]
+ONCHIP_CONFIG = {
+ "enable": {"EnableOnChipStopNotification": "1"},
+ "disable": {"EnableOnChipStopNotification": "2"},
+}
class GnssConcurrencyTest(BaseTestClass):
@@ -251,6 +251,7 @@ class GnssConcurrencyTest(BaseTestClass):
request_type, len(outliers[request_type]))
if failure_log:
+ failure_log += f"The test begins at {begin_time}\n"
raise signals.TestFailure(failure_log)
def run_engine_switching_test(self, freq):
@@ -325,12 +326,10 @@ class GnssConcurrencyTest(BaseTestClass):
Args:
conf_type: a string identify the config type
"""
- search_line_tag = "<gll\n"
- append_line_str = GPS_XML_CONFIG[conf_type]
- gutils.bcm_gps_xml_update_option(self.ad, "add", search_line_tag,
- append_line_str)
+ gutils.bcm_gps_xml_update_option(
+ self.ad, child_tag="gll", items_to_update=GPS_XML_CONFIG[conf_type])
- def update_gps_conf(self, search_line, update_line):
+ def update_gps_conf(self, update_attrib):
""" Update gps.xml content
Args:
@@ -338,7 +337,7 @@ class GnssConcurrencyTest(BaseTestClass):
update_line: update content
"""
gutils.bcm_gps_xml_update_option(
- self.ad, "update", search_line, update_txt=update_line)
+ self.ad, child_tag="gll", items_to_update=update_attrib)
def delete_gps_conf(self, conf_type):
""" Delete gps.xml content
@@ -346,9 +345,8 @@ class GnssConcurrencyTest(BaseTestClass):
Args:
conf_type: a string identify the config type
"""
- search_line_tag = GPS_XML_CONFIG[conf_type]
gutils.bcm_gps_xml_update_option(
- self.ad, "delete", delete_txt=search_line_tag)
+ self.ad, child_tag="gll", items_to_delete=GPS_XML_CONFIG[conf_type].keys())
def preset_mcu_test(self, mode):
""" Preseting mcu test with config and device state
@@ -359,7 +357,7 @@ class GnssConcurrencyTest(BaseTestClass):
self.add_ttff_conf(mode)
gutils.push_lhd_overlay(self.ad)
toggle_airplane_mode(self.ad.log, self.ad, new_state=True)
- self.update_gps_conf(ONCHIP_CONFIG[1], ONCHIP_CONFIG[0])
+ self.update_gps_conf(ONCHIP_CONFIG["enable"])
gutils.clear_aiding_data_by_gtw_gpstool(self.ad)
self.ad.reboot(self.ad)
self.load_chre_nanoapp()
@@ -371,7 +369,7 @@ class GnssConcurrencyTest(BaseTestClass):
mode: a string identify the test type
"""
self.delete_gps_conf(mode)
- self.update_gps_conf(ONCHIP_CONFIG[0], ONCHIP_CONFIG[1])
+ self.update_gps_conf(ONCHIP_CONFIG["disable"])
def get_mcu_ttff(self):
""" Get mcu ttff seconds
diff --git a/acts_tests/tests/google/gnss/GnssFunctionTest.py b/acts_tests/tests/google/gnss/GnssFunctionTest.py
index b22cd4fd5..6f5f74a1c 100644
--- a/acts_tests/tests/google/gnss/GnssFunctionTest.py
+++ b/acts_tests/tests/google/gnss/GnssFunctionTest.py
@@ -14,9 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from collections import defaultdict
+import datetime
import os
import re
+import time
+import functools
import fnmatch
+from statistics import mean
from acts import asserts
from acts import signals
@@ -60,6 +65,8 @@ from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump
from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log
+_PPS_KICKIN_WAITING_TIME_IN_SECOND = 30
+
class GnssFunctionTest(BaseTestClass):
""" GNSS Function Tests"""
def setup_class(self):
@@ -71,6 +78,7 @@ class GnssFunctionTest(BaseTestClass):
"supl_hs_criteria",
"standalone_cs_criteria",
"wearable_reboot_hs_criteria",
+ "first_satellite_criteria",
"default_gnss_signal_attenuation",
"weak_gnss_signal_attenuation",
"gnss_init_error_list",
@@ -99,6 +107,7 @@ class GnssFunctionTest(BaseTestClass):
gutils.enable_supl_mode(self.ad)
gutils.enable_vendor_orbit_assistance_data(self.ad)
gutils.disable_ramdump(self.ad)
+ gutils.enable_compact_and_particle_fusion_log(self.ad)
def setup_test(self):
log_current_epoch_time(self.ad, "test_start_time")
@@ -141,6 +150,13 @@ class GnssFunctionTest(BaseTestClass):
set_wifi_and_bt_scanning(self.ad, True)
log_current_epoch_time(self.ad, "test_end_time")
+ def keep_logs(self):
+ # for debug cs is faster than ws issue
+ test_name = self.test_name
+ self.ad.take_bug_report(test_name, self.begin_time)
+ get_gnss_qxdm_log(self.ad, self.qdsp6m_path)
+ get_tcpdump_log(self.ad, test_name, self.begin_time)
+
def on_fail(self, test_name, begin_time):
if self.collect_logs:
self.ad.take_bug_report(test_name, begin_time)
@@ -164,7 +180,7 @@ class GnssFunctionTest(BaseTestClass):
path = self.user_params.get("radio_image")
if isinstance(path, list):
path = path[0]
- if "dev/null" in path:
+ if not path or "dev/null" in path:
self.ad.log.info("Radio image path is not defined in Test flag.")
return False
for path_key in os.listdir(path):
@@ -245,8 +261,19 @@ class GnssFunctionTest(BaseTestClass):
gutils.start_qxdm_and_tcpdump_log(self.ad, self.collect_logs)
self.ad.log.info("Turn airplane mode on")
toggle_airplane_mode(self.ad.log, self.ad, new_state=True)
+ """
+ The change for arguments here is for b/277667310
+ Randomized interval is used for breaking CS GLNS only fix sequence.
+ """
gutils.run_ttff_via_gtw_gpstool(
- self.ad, mode, criteria, self.ttff_test_cycle, self.pixel_lab_location)
+ self.ad,
+ mode,
+ criteria,
+ self.ttff_test_cycle,
+ self.pixel_lab_location,
+ raninterval=True,
+ mininterval=15,
+ maxinterval=20,)
""" Test Cases """
@@ -276,44 +303,88 @@ class GnssFunctionTest(BaseTestClass):
asserts.assert_true(all(overall_test_result),
"SUPL fail after system server restart.")
- def test_cs_ttff_after_gps_service_restart(self):
- """Verify cs ttff after modem silent reboot / GPS daemons restart.
+ def test_recovery_and_location_time_after_gnss_services_restart(self):
+ """Verify gpsd recover time after gpsd being killed.
Steps:
- 1. Trigger modem crash by adb/Restart GPS daemons by killing PID.
- 2. Wait 1 minute for modem to recover.
- 3. TTFF Cold Start for 3 iteration.
- 4. Repeat Step 1. to Step 3. for 5 times.
+ 1. Start GPS tracking
+ 2. Restart GPS daemons by killing PID.
+ 3. Waiting for GPS service to restart
+ 4. Waiting for the first fixed
+ 4. Re-run steps 1~4 for 5 times.
Expected Results:
- All SUPL TTFF Cold Start results should be within supl_cs_criteria.
+ 1. The time GPSd services take to restart must be within 3 seconds.
+ 2. Location fix time must be within supl_hs_criteria
"""
- supl_ssr_test_result_all = []
+ if gutils.check_chipset_vendor_by_qualcomm(self.ad):
+ raise signals.TestSkip("Skip the test due to Qualcomm chipset")
+ test_times = 5
gutils.start_qxdm_and_tcpdump_log(self.ad, self.collect_logs)
- for times in range(1, 6):
- begin_time = get_current_epoch_time()
- if gutils.check_chipset_vendor_by_qualcomm(self.ad):
- test_info = "Modem SSR"
- gutils.gnss_trigger_modem_ssr_by_mds(self.ad)
- else:
- test_info = "restarting GPS daemons"
- gutils.restart_gps_daemons(self.ad)
- if not verify_internet_connection(self.ad.log, self.ad, retries=3,
- expected_state=True):
- raise signals.TestFailure("Fail to connect to LTE network.")
- gutils.process_gnss_by_gtw_gpstool(self.ad, self.standalone_cs_criteria)
- gutils.start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=3)
- ttff_data = gutils.process_ttff_by_gtw_gpstool(self.ad, begin_time,
- self.pixel_lab_location)
- supl_ssr_test_result = gutils.check_ttff_data(
- self.ad, ttff_data, ttff_mode="Cold Start",
- criteria=self.supl_cs_criteria)
- self.ad.log.info("SUPL after %s test %d times -> %s" % (
- test_info, times, supl_ssr_test_result))
- supl_ssr_test_result_all.append(supl_ssr_test_result)
-
- asserts.assert_true(all(supl_ssr_test_result_all),
- "TTFF fails to reach designated criteria")
+ satellite_times = defaultdict(list)
+ location_fix_times = defaultdict(list)
+
+ kill_functions = (
+ gutils.get_gps_process_and_kill_function_by_vendor(self.ad))
+ for time in range(1, test_times+1):
+ self.ad.log.info("Performing test times %d", time)
+ first_fixed_time = process_gnss_by_gtw_gpstool(
+ self.ad,
+ criteria=self.supl_hs_criteria,
+ clear_data=False)
+
+ begin_time = int(first_fixed_time.timestamp() * 1000)
+ self.ad.log.info("Start tracking")
+ gutils.wait_n_mins_for_gnss_tracking(self.ad,
+ begin_time,
+ testtime=0.5,
+ ignore_hal_crash=False)
+
+
+ for num, (process, kill_function) in enumerate(kill_functions.items()):
+ kill_start_time = kill_function()
+ first_gpsd_update_time = (gutils.get_gpsd_update_time(
+ self.ad,
+ kill_start_time))
+ self.ad.log.info("Resume tracking ... ")
+ gutils.wait_n_mins_for_gnss_tracking(self.ad,
+ begin_time,
+ testtime=num+1,
+ ignore_hal_crash=True)
+
+ location_fix_time = (gutils.
+ get_location_fix_time_via_gpstool_log(
+ self.ad, first_gpsd_update_time))
+
+ satellite_times[process].append(first_gpsd_update_time - kill_start_time)
+ location_fix_times[process].append(location_fix_time - first_gpsd_update_time)
+ # gpsd recovery time : Time between gpsd killed to first satellite update.
+ self.ad.log.info("%s recovery time : %d ms",
+ process, (first_gpsd_update_time - kill_start_time))
+ # TTFF Hot Start : Time between first satellite update to first location fix.
+ self.ad.log.info("TTFF Hot Start %d ms",
+ (location_fix_time - first_gpsd_update_time))
+ start_gnss_by_gtw_gpstool(self.ad, state=False)
+
+ for num, process in enumerate(kill_functions):
+ prop_basename = gutils.UPLOAD_TO_SPONGE_PREFIX + f"{process}_recovery_time_"
+ self.ad.log.info(prop_basename + "AVG %d",
+ mean(satellite_times[process]))
+ self.ad.log.info(prop_basename + "MAX %d",
+ max(satellite_times[process]))
+ prop_basename = gutils.UPLOAD_TO_SPONGE_PREFIX + f"{process}_ttff_hs_"
+ self.ad.log.info(prop_basename + "AVG %d",
+ mean(location_fix_times[process]))
+ self.ad.log.info(prop_basename + "MAX %d",
+ max(location_fix_times[process]))
+ asserts.assert_true(mean(satellite_times[process])/1000 <=
+ self.first_satellite_criteria,
+ f"{process} takes more than {self.first_satellite_criteria}"
+ "seconds in average to recover")
+ asserts.assert_true(mean(location_fix_times[process])/1000 <=
+ self.supl_hs_criteria,
+ f"Location fix time is more than {self.supl_hs_criteria}"
+ "seconds in average")
def test_gnss_one_hour_tracking(self):
"""Verify GNSS tracking performance of signal strength and position
@@ -666,9 +737,9 @@ class GnssFunctionTest(BaseTestClass):
validate_gnssstatus=True)
def test_location_update_after_resuming_from_deep_suspend(self):
- """Verify the GPS location reported after resume from suspend mode
+ """Verify the GPS location reported after resume from deep doze mode
1. Enable GPS location report for 1 min to make sure the GPS is working
- 2. Force DUT into deep suspend mode for a while(3 times with 15s interval)
+ 2. Force DUT into deep doze mode for 60s
3. Enable GPS location report for 5 mins
4. Check the report frequency
5. Check the location fix rate
@@ -678,18 +749,18 @@ class GnssFunctionTest(BaseTestClass):
gnss_tracking_via_gtw_gpstool(self.ad, criteria=self.supl_cs_criteria, api_type="gnss",
testtime=gps_enable_minutes)
result = parse_gtw_gpstool_log(self.ad, self.pixel_lab_location, api_type="gnss")
- self.ad.log.debug("Location report details before suspend")
+ self.ad.log.debug("Location report details before deep doze")
self.ad.log.debug(result)
gutils.validate_location_fix_rate(self.ad, result, run_time=gps_enable_minutes,
fix_rate_criteria=0.95)
- gutils.deep_suspend_device(self.ad)
+ gutils.enter_deep_doze_mode(self.ad, lasting_time_in_seconds=60)
gps_enable_minutes = 5
gnss_tracking_via_gtw_gpstool(self.ad, criteria=self.supl_cs_criteria, api_type="gnss",
testtime=gps_enable_minutes)
result = parse_gtw_gpstool_log(self.ad, self.pixel_lab_location, api_type="gnss")
- self.ad.log.debug("Location report details after suspend")
+ self.ad.log.debug("Location report details after deep doze")
self.ad.log.debug(result)
location_report_time = list(result.keys())
@@ -812,3 +883,14 @@ class GnssFunctionTest(BaseTestClass):
gutils.stop_pixel_logger(self.ad)
+ def test_the_diff_of_gps_clock_and_elapsed_realtime_should_be_stable(self):
+ gutils.start_pixel_logger(self.ad)
+ with gutils.full_gnss_measurement(self.ad):
+ first_fixed_time = gnss_tracking_via_gtw_gpstool(
+ self.ad, criteria=self.supl_cs_criteria, api_type="gnss",
+ testtime=5, meas_flag=True)
+ gutils.stop_pixel_logger(self.ad)
+ start_time = first_fixed_time + datetime.timedelta(
+ seconds=time.timezone + _PPS_KICKIN_WAITING_TIME_IN_SECOND)
+ self.ad.log.debug("Start time is %s" % start_time)
+ gutils.validate_diff_of_gps_clock_elapsed_realtime(self.ad, start_time)
diff --git a/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py b/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py
index e94626fd6..744676441 100644
--- a/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py
+++ b/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py
@@ -35,6 +35,7 @@ class GnssVendorFeaturesTest(BaseTestClass):
gutils._init_device(self.ad)
gutils.disable_supl_mode(self.ad)
gutils.enable_vendor_orbit_assistance_data(self.ad)
+ gutils.reboot(self.ad)
def setup_test(self):
gutils.log_current_epoch_time(self.ad, "test_start_time")
diff --git a/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py b/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py
index 44fab02a3..d61930a44 100644
--- a/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py
+++ b/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py
@@ -26,10 +26,8 @@ class PowerTelAirplaneModeTest(PB.PowerCellularPresetLabBaseTest):
# Allow airplane mode to propagate
time.sleep(3)
- # Measure power
- self.collect_power_data()
- # Check if power measurement is within the required values
- self.pass_fail_check(self.avg_current)
+ # Measure power and check against threshold
+ self.collect_power_data_and_validate()
class PowerTelAirplaneMode_Test(PowerTelAirplaneModeTest):
def test_airplane_mode(self):
diff --git a/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py
index 8efbc76e6..05c647281 100644
--- a/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py
+++ b/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py
@@ -24,11 +24,8 @@ class PowerTelIdle_Preset_Test(PB.PowerCellularPresetLabBaseTest):
# Wait for RRC status change to trigger
self.cellular_simulator.wait_until_idle_state(idle_wait_time)
- # Measure power
- self.collect_power_data()
-
- # Check if power measurement is below the required value
- self.pass_fail_check(self.avg_current)
+ # Measure power and check against threshold
+ self.collect_power_data_and_validate()
def test_preset_LTE_idle(self):
self.power_tel_idle_test()
diff --git a/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py
index a18653d92..c674a6ed8 100644
--- a/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py
+++ b/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py
@@ -16,9 +16,8 @@
import time
from acts_contrib.test_utils.power.cellular.ims_api_connector_utils import ImsApiConnector
-import acts_contrib.test_utils.power.cellular.cellular_power_base_test as PWCEL
+from acts_contrib.test_utils.power.cellular.ims_api_connector_utils import ImsAppName
from acts_contrib.test_utils.tel.tel_test_utils import set_phone_silent_mode
-from acts_contrib.test_utils.tel.tel_voice_utils import hangup_call
import acts_contrib.test_utils.power.cellular.cellular_power_preset_base_test as PB
@@ -57,6 +56,8 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest):
IMS_CLIENT = 'client'
IMS_SERVER = 'server'
+ UE_DEFAULT_NUMBER = '001010123456789'
+
def setup_class(self):
""" Executed only once when initializing the class. """
super().setup_class()
@@ -80,23 +81,27 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest):
self.unpack_userparams(api_connector_port=self.IMS_API_CONNECTOR_DEFAULT_PORT,
api_token=self.IMS_CLIENT_DEFAULT_API_TOKEN,
ims_client_ip=self.IMS_CLIENT_DEFAULT_IP,
- ims_client_port=self.IMS_CLIENT_DEFAULT_PORT)
+ ims_client_port=self.IMS_CLIENT_DEFAULT_PORT,
+ ue_number=self.UE_DEFAULT_NUMBER)
self.ims_client = ImsApiConnector(
self.uxm_ip,
self.api_connector_port,
- self.IMS_CLIENT,
- self.api_token,
- self.ims_client_ip,
- self.ims_client_port,
- self.log
+ ImsAppName.CLIENT
+ )
+
+ self.ims_server = ImsApiConnector(
+ self.uxm_ip,
+ self.api_connector_port,
+ ImsAppName.SERVER
)
def setup_test(self):
# Enable NR if it is VoNR test case
self.log.info(f'test name: {self.test_name}')
+ self.ims_server.restart_server()
if 'NR' in self.test_name:
self.log.info('Enable VoNR for UE.')
- self.enable_ims_nr()
+ self.at_util.enable_ims_nr()
super().setup_test()
def power_ims_call_test(self):
@@ -114,7 +119,7 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest):
# Initiate the voice call
self.log.info('Callbox initiates call to UE.')
- self.ims_client.initiate_call('001010123456789')
+ self.ims_client.initiate_call(self.ue_number)
time.sleep(5)
@@ -122,28 +127,29 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest):
self.log.info('UE pick up call.')
self.dut.adb.shell('input keyevent KEYCODE_CALL')
+ # Set mac padding
+ if 'NR' in self.test_name:
+ self.cellular_simulator.modify_dl_ul_mac_padding()
+
# Mute the call
self.dut.droid.telecomCallMute()
# Turn of screen
self.dut.droid.goToSleepNow()
- # Measure power
- self.collect_power_data()
-
- # End the call
- hangup_call(self.log, self.dut)
-
- # Check if power measurement is within the required values
- self.pass_fail_check()
+ # Measure power and check against threshold
+ self.collect_power_data_and_validate()
def teardown_test(self):
super().teardown_test()
- #self.cellular_simulator.deregister_ue_ims()
- self.ims_client.remove_ims_app_link()
+ # End the call
+ self.log.info('Hangup.')
+ self.ims_client.hangup_call()
def teardown_class(self):
super().teardown_class()
+ self.ims_client.tear_down()
+ self.ims_server.tear_down()
self.log.info('Disable IMS.')
self.dut.adb.shell(self.ADB_CMD_DISABLE_IMS)
diff --git a/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py
index 3f023f3d0..ceb2223ff 100644
--- a/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py
+++ b/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py
@@ -23,11 +23,8 @@ class PowerTelPdcchFr2_Preset_Test(PB.PowerCellularPresetLabBaseTest):
Requirements for this test are that mac padding is off and that the
inactivity timer is not enabled. """
- # Measure power
- self.collect_power_data()
-
- # Check if power measurement is within the required values
- self.pass_fail_check(self.avg_current)
+ # Measure power and check against threshold
+ self.collect_power_data_and_validate()
def test_preset_nsa_cdrx_fr2(self):
self.power_pdcch_test()
diff --git a/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py
index 270463853..937b953c7 100644
--- a/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py
+++ b/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py
@@ -22,11 +22,8 @@ class PowerTelPdcch_Preset_Test(PB.PowerCellularPresetLabBaseTest):
Requirements for this test are that mac padding is off and that the
inactivity timer is not enabled. """
- # Measure power
- self.collect_power_data()
-
- # Check if power measurement is within the required values
- self.pass_fail_check(self.avg_current)
+ # Measure power and check against threshold
+ self.collect_power_data_and_validate()
def test_preset_sa_pdcch_fr1(self):
self.power_pdcch_test()
diff --git a/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py
index d05c80154..4242307df 100644
--- a/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py
+++ b/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py
@@ -14,9 +14,9 @@
import os
import time
+from acts import asserts
import acts_contrib.test_utils.power.cellular.cellular_power_preset_base_test as PB
-
class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
# command to enable mobile data
ADB_CMD_ENABLE_MOBILE_DATA = 'svc data enable'
@@ -26,7 +26,7 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
# command to start iperf server on UE
# (require: 1.path to iperf exe 2.hostname/hostIP)
- START_IPERF_CLIENT_UE_CMD = 'nohup > /dev/null 2>&1 sh -c "iperf3 -c {iperf_host_ip} -i1 -p5202 -w8m -t2000 > /dev/null &"'
+ START_IPERF_CLIENT_UE_CMD = 'nohup > /dev/null 2>&1 sh -c "iperf3 -c {iperf_host_ip} -i1 -p5202 -w8m -t2000 -O{second} > /dev/null &"'
# command to start iperf server on host()
START_IPERF_SV_HOST_CMD = '{exe_path}\\iperf3 -s -p5202'
@@ -34,10 +34,10 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
# command to start iperf client on host
# (require: 1.path to iperf exe 2.UE IP)
START_IPERF_CLIENT_HOST_CMD = (
- '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201')
+ '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201 -O{second}')
START_IPERF_CLIENT_HOST_CMD_FR2 = (
- '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201 -P32')
+ '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201 -O{second}')
def __init__(self, controllers):
super().__init__(controllers)
@@ -63,6 +63,13 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
# Call parent method first to setup simulation
super().setup_test()
+ # get tput configs
+ self.unpack_userparams(abnormal_bandwidth_tolerance=0.1,
+ bandwidth_tolerance=0.1,
+ n_second_to_omitted=45)
+ self.expected_downlink_bandwidth = float(self.test_configs[self.test_name]['tput']['downlink'].split()[0])
+ self.expected_uplink_bandwidth = float(self.test_configs[self.test_name]['tput']['uplink'].split()[0])
+
# setup ssh client
self.ssh_iperf_client = self.cellular_simulator.create_ssh_client()
self.ssh_iperf_server = self.cellular_simulator.create_ssh_client()
@@ -73,19 +80,81 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
"""Measure power while data is transferring."""
# Start data traffic
self.start_uplink_process()
- time.sleep(5)
self.start_downlink_process()
- # Measure power
- self.collect_power_data()
+ # Measure power and check against threshold
+ self.collect_power_data_and_validate()
+ def _end_iperfs(self):
+ err_message = []
# Write iperf log
self.ssh_iperf_server.close()
uplink_log_name = self.test_name + '_uplink.txt'
- self._write_iperf_log(uplink_log_name, self.ssh_iperf_server)
+ out, err = self.iperf_out_err[self.ssh_iperf_server]
+ output_content = ''.join(out.readlines())
+ err_content = ''.join(err.readlines())
+ self._write_iperf_log(uplink_log_name, output_content + err_content)
+ if err_content.strip():
+ err_message.append(f'Uplink process fail due to error: {err_content}\n')
+ else:
+ if not self._iperf_log_check(output_content, self.expected_uplink_bandwidth):
+ err_message.append('Bandwidth of uplink process is unstable.')
+
self.ssh_iperf_client.close()
downlink_log_name = self.test_name + '_downlink.txt'
- self._write_iperf_log(downlink_log_name, self.ssh_iperf_client)
+ out, err = self.iperf_out_err[self.ssh_iperf_client]
+ output_content = ''.join(out.readlines())
+ err_content = ''.join(err.readlines())
+ self._write_iperf_log(downlink_log_name, output_content + err_content)
+ if err_content.strip():
+ err_message.append(f'Downlink process fail due to error: {err_content}\n')
+ else:
+ if not self._iperf_log_check(output_content, self.expected_downlink_bandwidth):
+ err_message.append('Bandwidth of downlink process is unstable.')
+
+ if err_message:
+ raise RuntimeError('\n'.join(err_message))
+
+ def teardown_test(self):
+ try:
+ self._end_iperfs()
+ except RuntimeError as re:
+ raise re
+ finally:
+ super().teardown_test()
+
+ def _iperf_log_check(self, file, expected_bandwidth):
+ """Check iperf log and abnormal bandwidth instances.
+
+ Args:
+ file: file object of iperf log to be checked.
+ expected_bandwidth: integer value for expected bandwidth.
+ Returns:
+ True if log is normal, False otherwise.
+ """
+ # example of record line
+ #[ 4] 0.00-1.00 sec 20.2 MBytes 169 Mbits/sec
+ total_abnormal_entries = 0
+ total_record_entries = 0
+ bandwidth_val_idx = 6
+ record_entry_total_cols = 8
+ lines = file.split('\n')
+ acceptable_difference = self.bandwidth_tolerance * expected_bandwidth
+ self.log.debug('Expected bandwidth: %f', expected_bandwidth)
+ self.log.debug('Acceptance difference: %f', acceptable_difference)
+ for line in lines:
+ cols = line.split()
+ self.log.debug(cols)
+ if len(cols) == record_entry_total_cols:
+ total_record_entries += 1
+ bandwidth = float(cols[bandwidth_val_idx])
+ self.log.debug('bandwidth: %f', bandwidth)
+ if abs(bandwidth - expected_bandwidth) > acceptable_difference:
+ total_abnormal_entries += 1
+ if not total_record_entries:
+ raise RuntimeError('No tput data record found.')
+ self.log.debug('Total abnormal entries: %d - Total record: %d', total_abnormal_entries, total_record_entries)
+ return (total_abnormal_entries/total_record_entries) <= self.abnormal_bandwidth_tolerance
def _exec_ssh_cmd(self, ssh_client, cmd):
"""Execute command on given ssh client.
@@ -112,11 +181,13 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
if 'fr2' in self.test_name:
cmd = self.START_IPERF_CLIENT_HOST_CMD_FR2.format(
exe_path=self.iperf_exe_path,
- ue_ip=self.ue_ip)
+ ue_ip=self.ue_ip,
+ second=self.n_second_to_omitted)
else:
cmd = self.START_IPERF_CLIENT_HOST_CMD.format(
exe_path=self.iperf_exe_path,
- ue_ip=self.ue_ip)
+ ue_ip=self.ue_ip,
+ second=self.n_second_to_omitted)
if not cmd:
raise RuntimeError('Cannot format command to start iperf client.')
@@ -134,29 +205,25 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest):
time.sleep(5)
# start UE iperf
adb_cmd = self.START_IPERF_CLIENT_UE_CMD.format(
- iperf_host_ip=self.iperf_host_ip)
+ iperf_host_ip=self.iperf_host_ip,
+ second=self.n_second_to_omitted)
self.cellular_dut.ad.adb.shell(adb_cmd)
self.log.info('cmd sent to UE: ' + adb_cmd)
self.log.info('UE iperf client started')
time.sleep(5)
- def _write_iperf_log(self, file_name, ssh):
+ def _write_iperf_log(self, file_name, content):
""" Writing ssh stdout and stdin to log file.
Args:
- file_name: log file name to write log to.
- ssh: paramiko client object.
+ file_name: Log file name to write log to.
+ content: Content to write to file.
"""
iperf_log_dir = os.path.join(self.root_output_path, 'iperf')
os.makedirs(iperf_log_dir, exist_ok=True)
iperf_log_file_path = os.path.join(iperf_log_dir, file_name)
with open(iperf_log_file_path, 'w') as f:
- out, err = self.iperf_out_err[ssh]
- out_content = ''.join(out.readlines())
- err_content = ''.join(err.readlines())
- f.write(out_content)
- f.write('\nErrors:\n')
- f.write(err_content)
+ f.write(content)
def turn_on_mobile_data(self):
self.dut.adb.shell(self.ADB_CMD_ENABLE_MOBILE_DATA)
diff --git a/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py
index daf9b1a9e..b3cc779ab 100644
--- a/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py
+++ b/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py
@@ -50,7 +50,9 @@ class WifiEnterpriseRoamingTest(WifiBaseTest):
"wifi6_models",
"radius_conf_2g",
"radius_conf_5g")
- self.unpack_userparams(req_params)
+ opt_param = ["domain_suffix_match"]
+ self.unpack_userparams(req_params,
+ opt_param)
if "AccessPoint" in self.user_params:
self.legacy_configure_ap_and_start(
mirror_ap=True,
@@ -81,15 +83,17 @@ class WifiEnterpriseRoamingTest(WifiBaseTest):
Ent.IDENTITY: self.eap_identity,
Ent.PASSWORD: self.eap_password,
Ent.PHASE2: int(EapPhase2.MSCHAPV2),
+ Ent.DOM_SUFFIX_MATCH: self.domain_suffix_match,
WifiEnums.SSID_KEY: self.ent_roaming_ssid
}
self.config_tls = {
Ent.EAP: int(EAP.TLS),
Ent.CA_CERT: self.ca_cert,
- WifiEnums.SSID_KEY: self.ent_roaming_ssid,
Ent.CLIENT_CERT: self.client_cert,
Ent.PRIVATE_KEY_ID: self.client_key,
Ent.IDENTITY: self.eap_identity,
+ Ent.DOM_SUFFIX_MATCH: self.domain_suffix_match,
+ WifiEnums.SSID_KEY: self.ent_roaming_ssid
}
self.config_ttls = {
Ent.EAP: int(EAP.TTLS),
@@ -97,6 +101,7 @@ class WifiEnterpriseRoamingTest(WifiBaseTest):
Ent.IDENTITY: self.eap_identity,
Ent.PASSWORD: self.eap_password,
Ent.PHASE2: int(EapPhase2.MSCHAPV2),
+ Ent.DOM_SUFFIX_MATCH: self.domain_suffix_match,
WifiEnums.SSID_KEY: self.ent_roaming_ssid
}
self.config_sim = {
diff --git a/acts_tests/tests/google/wifi/WifiPingTest.py b/acts_tests/tests/google/wifi/WifiPingTest.py
index ca1ccd7db..a1ea5d907 100644
--- a/acts_tests/tests/google/wifi/WifiPingTest.py
+++ b/acts_tests/tests/google/wifi/WifiPingTest.py
@@ -868,30 +868,13 @@ class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
self.tests = self.generate_test_cases(
ap_power='standard',
channels=[6, 36, 149, '6g37', '6g117', '6g213'],
- modes=['bw20'],
+ modes=['bw20', 'bw80', 'bw160'],
chain_masks=['2x2'],
chamber_mode='orientation',
positions=list(range(0, 360, 10)),
reference_params=['channel', 'mode', 'chain_mask'])
-class WifiOtaPing_45Degree_Test(WifiOtaPingTest):
-
- def __init__(self, controllers):
- WifiOtaPingTest.__init__(self, controllers)
- self.tests = self.generate_test_cases(
- ap_power='standard',
- channels=[
- 1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117',
- '6g213'
- ],
- modes=['bw20'],
- chain_masks=['2x2'],
- chamber_mode='orientation',
- positions=list(range(0, 360, 45)),
- reference_params=['channel', 'mode', 'chain_mask'])
-
-
class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
def __init__(self, controllers):
@@ -920,20 +903,6 @@ class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
reference_params=['channel', 'mode', 'chain_mask'])
-class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest):
-
- def __init__(self, controllers):
- WifiOtaPingTest.__init__(self, controllers)
- self.tests = self.generate_test_cases(
- ap_power='low_power',
- channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161],
- modes=['bw20'],
- chain_masks=['2x2'],
- chamber_mode='orientation',
- positions=list(range(0, 360, 45)),
- reference_params=['channel', 'mode', 'chain_mask'])
-
-
class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
def __init__(self, controllers):
diff --git a/acts_tests/tests/google/wifi/WifiRssiTest.py b/acts_tests/tests/google/wifi/WifiRssiTest.py
index 452591f85..eb00327b6 100644
--- a/acts_tests/tests/google/wifi/WifiRssiTest.py
+++ b/acts_tests/tests/google/wifi/WifiRssiTest.py
@@ -850,25 +850,6 @@ class WifiRssiTest(base_test.BaseTestClass):
return test_cases
-class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest):
-
- def __init__(self, controllers):
- super().__init__(controllers)
- self.tests = self.generate_test_cases(
- ['test_rssi_stability', 'test_rssi_vs_atten'], [1, 2, 6, 10, 11],
- ['bw20'], ['ActiveTraffic'])
-
-
-class WifiRssi_5GHz_ActiveTraffic_Test(WifiRssiTest):
-
- def __init__(self, controllers):
- super().__init__(controllers)
- self.tests = self.generate_test_cases(
- ['test_rssi_stability', 'test_rssi_vs_atten'],
- [36, 40, 44, 48, 149, 153, 157, 161], ['bw20', 'bw40', 'bw80'],
- ['ActiveTraffic'])
-
-
class WifiRssi_AllChannels_ActiveTraffic_Test(WifiRssiTest):
def __init__(self, controllers):
@@ -885,8 +866,8 @@ class WifiRssi_SampleChannels_NoTraffic_Test(WifiRssiTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases(
- ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149],
- ['bw20', 'bw40', 'bw80'], ['NoTraffic'])
+ ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149, '6g37'],
+ ['bw20', 'bw40', 'bw80', 'bw160'], ['NoTraffic'])
class WifiRssiTrackingTest(WifiRssiTest):
@@ -894,8 +875,8 @@ class WifiRssiTrackingTest(WifiRssiTest):
def __init__(self, controllers):
super().__init__(controllers)
self.tests = self.generate_test_cases(['test_rssi_tracking'],
- [6, 36, 149],
- ['bw20', 'bw40', 'bw80'],
+ [6, 36, 149, '6g37'],
+ ['bw20', 'bw40', 'bw80', 'bw160'],
['ActiveTraffic', 'NoTraffic'])
@@ -1109,24 +1090,13 @@ class WifiOtaRssiTest(WifiRssiTest):
test_cases.append(test_name)
return test_cases
-
-class WifiOtaRssi_Accuracy_Test(WifiOtaRssiTest):
-
- def __init__(self, controllers):
- super().__init__(controllers)
- self.tests = self.generate_test_cases(['test_rssi_vs_atten'],
- [6, 36, 149, '6g37'], ['bw20'],
- ['ActiveTraffic'],
- ['orientation'],
- list(range(0, 360, 45)))
-
-
class WifiOtaRssi_StirrerVariation_Test(WifiOtaRssiTest):
def __init__(self, controllers):
WifiRssiTest.__init__(self, controllers)
self.tests = self.generate_test_cases(['test_rssi_variation'],
- [6, 36, 149, '6g37'], ['bw20'],
+ [6, 36, 149, '6g37'],
+ ['bw20', 'bw80', 'bw160'],
['ActiveTraffic'],
['StirrersOn'], [0])
@@ -1136,7 +1106,8 @@ class WifiOtaRssi_TenDegree_Test(WifiOtaRssiTest):
def __init__(self, controllers):
WifiRssiTest.__init__(self, controllers)
self.tests = self.generate_test_cases(['test_rssi_over_orientation'],
- [6, 36, 149, '6g37'], ['bw20'],
+ [6, 36, 149, '6g37'],
+ ['bw20', 'bw80', 'bw160'],
['ActiveTraffic'],
['orientation'],
list(range(0, 360, 10)))
diff --git a/acts_tests/tests/google/wifi/WifiRvrTest.py b/acts_tests/tests/google/wifi/WifiRvrTest.py
index 4a6455218..cb188fd50 100644
--- a/acts_tests/tests/google/wifi/WifiRvrTest.py
+++ b/acts_tests/tests/google/wifi/WifiRvrTest.py
@@ -1097,6 +1097,19 @@ class WifiOtaRvr_SampleChannel_Test(WifiOtaRvrTest):
self.generate_test_cases(['6g37'], ['bw160'],
list(range(0, 360, 45)), ['TCP'], ['DL']))
+class WifiOtaRvr_SampleChannel_UDP_Test(WifiOtaRvrTest):
+
+ def __init__(self, controllers):
+ WifiOtaRvrTest.__init__(self, controllers)
+ self.tests = self.generate_test_cases([6], ['bw20'],
+ list(range(0, 360, 45)), ['UDP'],
+ ['DL'])
+ self.tests.extend(
+ self.generate_test_cases([36, 149], ['bw80', 'bw160'],
+ list(range(0, 360, 45)), ['UDP'], ['DL']))
+ self.tests.extend(
+ self.generate_test_cases(['6g37'], ['bw160'],
+ list(range(0, 360, 45)), ['UDP'], ['DL']))
class WifiOtaRvr_SingleOrientation_Test(WifiOtaRvrTest):
diff --git a/acts_tests/tests/google/wifi/WifiSensitivityTest.py b/acts_tests/tests/google/wifi/WifiSensitivityTest.py
index 535572d6c..58f938d85 100644
--- a/acts_tests/tests/google/wifi/WifiSensitivityTest.py
+++ b/acts_tests/tests/google/wifi/WifiSensitivityTest.py
@@ -248,7 +248,7 @@ class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
for plot_id, plot in plots.items():
plot.generate_figure()
figure_list.append(plot)
- output_file_path = os.path.join(self.log_path, 'results.html')
+ output_file_path = os.path.join(self.log_path, 'PER_curves.html')
BokehFigure.save_figures(figure_list, output_file_path)
def process_testclass_results(self):
diff --git a/acts_tests/tests/google/wifi/WifiSoftApTest.py b/acts_tests/tests/google/wifi/WifiSoftApTest.py
index 3aa815c45..12cf328da 100644
--- a/acts_tests/tests/google/wifi/WifiSoftApTest.py
+++ b/acts_tests/tests/google/wifi/WifiSoftApTest.py
@@ -190,8 +190,6 @@ class WifiSoftApTest(WifiBaseTest):
initial_cell_state = tel_utils.is_sim_ready(self.log, self.dut)
self.dut.log.info("current state: %s", initial_wifi_state)
self.dut.log.info("is sim ready? %s", initial_cell_state)
- if initial_cell_state:
- self.check_cell_data_and_enable()
config = self.create_softap_config()
wutils.start_wifi_tethering(self.dut,
config[wutils.WifiEnums.SSID_KEY],
diff --git a/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py b/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py
index 3b55b6c03..671e78c6f 100644
--- a/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py
+++ b/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py
@@ -109,11 +109,12 @@ class WifiWpa3EnterpriseTest(WifiBaseTest):
logcat_msg = "E WifiKeyStore: Invalid certificate type for Suite-B"
try:
wutils.connect_to_wifi_network(self.dut, config)
- asserts.fail("WPA3 Ent worked with insecure RSA key. Expected to fail.")
except:
logcat_search = self.dut.search_logcat(logcat_msg)
self.log.info("Logcat search results: %s" % logcat_search)
asserts.assert_true(logcat_search, "No valid error msg in logcat")
+ else:
+ asserts.fail("WPA3 Ent worked with insecure RSA key. Expected to fail.")
@test_tracker_info(uuid="897957f3-de25-4f9e-b6fc-9d7798ea1e6f")
def test_connect_to_wpa3_enterprise_expired_rsa_cert(self):
@@ -130,11 +131,12 @@ class WifiWpa3EnterpriseTest(WifiBaseTest):
logcat_msg = "E WifiKeyStore: Invalid certificate type for Suite-B"
try:
wutils.connect_to_wifi_network(self.dut, config)
- asserts.fail("WPA3 Ent worked with expired cert. Expected to fail.")
except:
logcat_search = self.dut.search_logcat(logcat_msg)
self.log.info("Logcat search results: %s" % logcat_search)
asserts.assert_true(logcat_search, "No valid error msg in logcat")
+ else:
+ asserts.fail("WPA3 Ent worked with expired cert. Expected to fail.")
@test_tracker_info(uuid="f7ab30e2-f2b5-488a-8667-e45920fc24d1")
def test_connect_to_wpa3_enterprise_corrupted_rsa_cert(self):
@@ -150,9 +152,10 @@ class WifiWpa3EnterpriseTest(WifiBaseTest):
}
try:
wutils.connect_to_wifi_network(self.dut, config)
- asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.")
except:
asserts.explicit_pass("Connection failed as expected.")
+ else:
+ asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.")
@test_tracker_info(uuid="f934f388-dc0b-4c78-a493-026b798c15ca")
def test_connect_to_wpa3_enterprise_unsigned_rsa_cert(self):
@@ -168,9 +171,10 @@ class WifiWpa3EnterpriseTest(WifiBaseTest):
}
try:
wutils.connect_to_wifi_network(self.dut, config)
- asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.")
except:
asserts.explicit_pass("Connection failed as expected.")
+ else:
+ asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.")
@test_tracker_info(uuid="7082dc90-5eb8-4055-8b48-b555a98a837a")
def test_connect_to_wpa3_enterprise_wrong_domain_rsa_cert(self):
@@ -186,9 +190,10 @@ class WifiWpa3EnterpriseTest(WifiBaseTest):
}
try:
wutils.connect_to_wifi_network(self.dut, config)
- asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.")
except:
asserts.explicit_pass("Connection failed as expected.")
+ else:
+ asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.")
@test_tracker_info(uuid="9ad5fd82-f115-42c3-b8e8-520144485ea1")
def test_network_selection_status_for_wpa3_ent_wrong_domain_rsa_cert(self):
@@ -204,10 +209,11 @@ class WifiWpa3EnterpriseTest(WifiBaseTest):
}
try:
wutils.connect_to_wifi_network(self.dut, config)
- asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.")
except:
asserts.assert_true(
self.dut.droid.wifiIsNetworkTemporaryDisabledForNetwork(config),
"WiFi network is not temporary disabled.")
asserts.explicit_pass(
"Connection failed with correct network selection status.")
+ else:
+ asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.")
diff --git a/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py b/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py
index 69dc42c84..7c061f24c 100644
--- a/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py
+++ b/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py
@@ -293,11 +293,6 @@ class DiscoveryTest(AwareBaseTest):
autils.wait_for_event(s_dut,
aconsts.SESSION_CB_ON_SESSION_TERMINATED)
-
- # verify that there were no other events
- autils.verify_no_more_events(p_dut, timeout=0)
- autils.verify_no_more_events(s_dut, timeout=0)
-
# verify that forbidden callbacks aren't called
autils.validate_forbidden_callbacks(p_dut, {aconsts.CB_EV_MATCH: 0})