diff options
author | Xin Li <delphij@google.com> | 2024-01-23 22:22:13 -0800 |
---|---|---|
committer | Xin Li <delphij@google.com> | 2024-01-23 22:22:13 -0800 |
commit | 44138172a9ab8021ac7a5b683a06c3b85a42d547 (patch) | |
tree | 67fad8b817c1885f6499f680018859619740e81d | |
parent | f294e9ffc0c7dabd663593a1bb8b8f3d2ad0fbbb (diff) | |
parent | c7df9f42dccf5304ef000cca649c82c5fe84ce66 (diff) | |
download | connectivity-44138172a9ab8021ac7a5b683a06c3b85a42d547.tar.gz |
Merge Android 24Q1 Release (ab/11220357)temp_319669529
Bug: 319669529
Merged-In: I327b552cb0ff2a5157f298ec36e2ffe068307a70
Change-Id: Ib54a280b9f4bae3b8f946a1a2d46b896e2628c53
45 files changed, 2939 insertions, 941 deletions
diff --git a/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py b/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py index d536b0921..35e715e92 100644 --- a/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py +++ b/acts/framework/acts/controllers/cellular_lib/PresetSimulation.py @@ -13,8 +13,6 @@ # limitations under the License. from acts.controllers.cellular_lib.BaseSimulation import BaseSimulation -from acts.controllers.cellular_lib import BaseCellularDut - class PresetSimulation(BaseSimulation): """5G preset simulation. @@ -27,12 +25,6 @@ class PresetSimulation(BaseSimulation): KEY_CELL_INFO = "cell_info" KEY_SCPI_FILE_NAME = "scpi_file" - NETWORK_BIT_MASK = { - 'nr_lte': '11000001000000000000' - } - ADB_CMD_LOCK_NETWORK = 'cmd phone set-allowed-network-types-for-users -s 0 {network_bit_mask}' - NR_LTE_BIT_MASK_KEY = 'nr_lte' - def __init__(self, simulator, log, @@ -61,9 +53,6 @@ class PresetSimulation(BaseSimulation): self.dut.set_apn('Keysight', 'Keysight') self.num_carriers = None - # Enable roaming on the phone - self.dut.toggle_data_roaming(True) - def setup_simulator(self): """Do initial configuration in the simulator. """ self.log.info('This simulation does not require initial setup.') @@ -110,11 +99,9 @@ class PresetSimulation(BaseSimulation): RuntimeError: attaching fail due to unable to connect dut and cells. """ - try: - self.simulator.wait_until_attached(self.dut, self.attach_timeout, - self.attach_retries) - except Exception as exc: - raise RuntimeError('Could not attach to base station.') from exc + self.simulator.wait_until_attached(self.dut, + self.attach_timeout, + self.attach_retries) def calibrated_downlink_rx_power(self, bts_config, rsrp): """Convert RSRP to total signal power from the basestation. diff --git a/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py b/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py index 5ef7eac89..ddca8d581 100644 --- a/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py +++ b/acts/framework/acts/controllers/uxm_lib/uxm_cellular_simulator.py @@ -91,8 +91,8 @@ class UXMCellularSimulator(AbstractCellularSimulator): """A cellular simulator for UXM callbox.""" # Keys to obtain data from cell_info dictionary. - KEY_CELL_NUMBER = "cell_number" - KEY_CELL_TYPE = "cell_type" + _KEY_CELL_NUMBER = "cell_number" + _KEY_CELL_TYPE = "cell_type" # UXM socket port UXM_SOCKET_PORT = 5125 @@ -102,6 +102,8 @@ class UXMCellularSimulator(AbstractCellularSimulator): SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n' SCPI_CHECK_CONNECTION_CMD = '*IDN?\n' SCPI_DEREGISTER_UE_IMS = 'SYSTem:IMS:SERVer:UE:DERegister' + _SCPI_CHANGE_DL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:DL:TDOMain:APOLicy ONMac' + _SCPI_CHANGE_UL_TDOMAIN = 'BSE:CONFig:NR5G:CELL1:SCHeduling:BWP0:FC0:SC0:UL:NUL:TDOMain:APOLicy ONSRbsr' # require: path to SCPI file SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n' # require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1) @@ -399,7 +401,7 @@ class UXMCellularSimulator(AbstractCellularSimulator): It is required to create a dedicated bearer setup with EPS bearer ID 10. """ - cell_number = self.cells[0][self.KEY_CELL_NUMBER] + cell_number = self.cells[0][self._KEY_CELL_NUMBER] self._socket_send_SCPI_command( self.SCPI_CREATE_DEDICATED_BEARER.format(cell_number)) @@ -433,6 +435,20 @@ class UXMCellularSimulator(AbstractCellularSimulator): f' cell type: {cell_type}\n' + f' cell number: {cell_number}\n') + def get_all_cell_status(self): + """Gets status of all cells. + + Returns: + List of tuples which has values (cell_type, cell_number, cell_status) + """ + res = [] + for cell in self.cells: + cell_type = cell[self._KEY_CELL_TYPE] + cell_number = cell[self._KEY_CELL_NUMBER] + cell_status = self.get_cell_status(cell_type, cell_number) + res.append((cell_type, cell_number, cell_status)) + return res + def get_cell_status(self, cell_type, cell_number): """Get status of cell. @@ -546,23 +562,6 @@ class UXMCellularSimulator(AbstractCellularSimulator): """ self.import_configuration(path) - def dut_rockbottom(self, dut): - """Set the dut to rockbottom state. - - Args: - dut: a CellularAndroid controller. - """ - # The rockbottom script might include a device reboot, so it is - # necessary to stop SL4A during its execution. - dut.ad.stop_services() - self.log.info('Executing rockbottom script for ' + dut.ad.model) - os.chmod(self.rockbottom_script, 0o777) - os.system('{} {}'.format(self.rockbottom_script, dut.ad.serial)) - # Make sure the DUT is in root mode after coming back - dut.ad.root_adb() - # Restart SL4A - dut.ad.start_services() - def set_sim_type(self, is_3gpp_sim): sim_type = 'KEYSight' if is_3gpp_sim: @@ -610,7 +609,7 @@ class UXMCellularSimulator(AbstractCellularSimulator): interval = 10 # waits for device to camp - for index in range(1, attach_retries): + for index in range(1, attach_retries+1): count = 0 # airplane mode off dut.toggle_airplane_mode(False) @@ -636,12 +635,7 @@ class UXMCellularSimulator(AbstractCellularSimulator): # reboot device if (index % 2) == 0: dut.ad.reboot() - if self.rockbottom_script: - self.dut_rockbottom(dut) - else: - self.log.warning( - f'Rockbottom script was not executed after reboot.' - ) + # toggle APM and cell on/off elif (index % 1) == 0: # Toggle APM on @@ -676,11 +670,11 @@ class UXMCellularSimulator(AbstractCellularSimulator): to connect to 1 basestation. """ # get cell info - first_cell_type = self.cells[0][self.KEY_CELL_TYPE] - first_cell_number = self.cells[0][self.KEY_CELL_NUMBER] + first_cell_type = self.cells[0][self._KEY_CELL_TYPE] + first_cell_number = self.cells[0][self._KEY_CELL_NUMBER] if len(self.cells) == 2: - second_cell_type = self.cells[1][self.KEY_CELL_TYPE] - second_cell_number = self.cells[1][self.KEY_CELL_NUMBER] + second_cell_type = self.cells[1][self._KEY_CELL_TYPE] + second_cell_number = self.cells[1][self._KEY_CELL_NUMBER] # connect to 1st cell self.wait_until_attached_one_cell(first_cell_type, @@ -694,7 +688,7 @@ class UXMCellularSimulator(AbstractCellularSimulator): second_cell_number, ) - for _ in range(1, attach_retries): + for _ in range(1, attach_retries+1): self.log.info('Try to aggregate to NR.') self._socket_send_SCPI_command( 'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None') @@ -722,6 +716,12 @@ class UXMCellularSimulator(AbstractCellularSimulator): raise RuntimeError(f'Fail to aggregate to NR from LTE.') + def modify_dl_ul_mac_padding(self): + """Disables dl/ul mac padding packets.""" + self.log.info('modifying dl ul mac padding') + self._socket_send_SCPI_command(self._SCPI_CHANGE_DL_TDOMAIN) + self._socket_send_SCPI_command(self._SCPI_CHANGE_UL_TDOMAIN) + def set_lte_rrc_state_change_timer(self, enabled, time=10): """Configures the LTE RRC state change timer. @@ -950,8 +950,8 @@ class UXMCellularSimulator(AbstractCellularSimulator): CellularSimulatorError exception. Default is 120 seconds. """ # turn on RRC release - cell_type = self.cells[0][self.KEY_CELL_TYPE] - cell_number = self.cells[0][self.KEY_CELL_NUMBER] + cell_type = self.cells[0][self._KEY_CELL_TYPE] + cell_number = self.cells[0][self._KEY_CELL_NUMBER] # choose cmd base on cell type cmd = None @@ -981,8 +981,8 @@ class UXMCellularSimulator(AbstractCellularSimulator): def detach(self): """ Turns off all the base stations so the DUT loose connection.""" for cell in self.cells: - cell_type = cell[self.KEY_CELL_TYPE] - cell_number = cell[self.KEY_CELL_NUMBER] + cell_type = cell[self._KEY_CELL_TYPE] + cell_number = cell[self._KEY_CELL_NUMBER] self.turn_cell_off(cell_type, cell_number) time.sleep(5) diff --git a/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py b/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py index 8e4d10ab6..f4ba74f41 100644 --- a/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py +++ b/acts_tests/acts_contrib/test_utils/cellular/keysight_5g_testapp.py @@ -15,6 +15,7 @@ # limitations under the License. import collections +import itertools import pyvisa import time from acts import logger @@ -38,6 +39,12 @@ class Keysight5GTestApp(object): def __init__(self, config): self.config = config + self.test_app_settings = { + 'lte_cell_count': 0, + 'nr_cell_count': 0, + 'lte_cell_configs': [], + 'nr_cell_configs': [] + } self.log = logger.create_tagged_trace_logger("{}{}".format( self.config['brand'], self.config['model'])) self.resource_manager = pyvisa.ResourceManager(self.VISA_LOCATION) @@ -57,6 +64,9 @@ class Keysight5GTestApp(object): else: self.log.info("Test App ID: {}".format(inst_id)) + self.test_app_settings['lte_cell_count'] = self.get_cell_count('LTE') + self.test_app_settings['nr_cell_count'] = self.get_cell_count('NR5G') + def destroy(self): self.test_app.close() @@ -194,6 +204,18 @@ class Keysight5GTestApp(object): self.select_cell(cell_type, cell) self.send_cmd('DISPlay:{} {},{}'.format(cell_type, tab, subtab)) + def get_cell_count(self, cell_type): + """Function to get cell count + + Args: + cell_type: LTE or NR5G cell + Returns: + cell_count: number of cells of cell_type supported. + """ + cell_count = int( + self.send_cmd('BSE:CONFig:{}:CELL:COUNt?'.format(cell_type), 1)) + return cell_count + def get_cell_state(self, cell_type, cell): """Function to get cell on/off state. @@ -249,17 +271,23 @@ class Keysight5GTestApp(object): self.send_cmd('BSE:CONFig:{}:{}:ACTive:STATe {}'.format( cell_type, Keysight5GTestApp._format_cells(cell), state)) - def set_cell_type(self, cell_type, cell, sa_or_nsa): + def turn_all_cells_off(self): + for cell in range(self.test_app_settings['lte_cell_count']): + self.set_cell_state('LTE', cell + 1, 0) + for cell in range(self.test_app_settings['nr_cell_count']): + self.set_cell_state('NR5G', cell + 1, 0) + + def set_nr_cell_type(self, cell_type, cell, nr_cell_type): """Function to set cell duplex mode Args: cell_type: LTE or NR5G cell cell: cell/carrier number - sa_or_nsa: SA or NSA + nr_cell_type: SA or NSA """ self.assert_cell_off(cell_type, cell) - self.send_cmd('BSE: CONFig:NR5G:{}:TYPE {}'.format( - Keysight5GTestApp._format_cells(cell), sa_or_nsa)) + self.send_cmd('BSE:CONFig:{}:{}:TYPE {}'.format( + cell_type, Keysight5GTestApp._format_cells(cell), nr_cell_type)) def set_cell_duplex_mode(self, cell_type, cell, duplex_mode): """Function to set cell duplex mode @@ -297,7 +325,8 @@ class Keysight5GTestApp(object): if cell_type == 'NR5G' and isinstance( channel, str) and channel.lower() in ['low', 'mid', 'high']: self.send_cmd('BSE:CONFig:{}:{}:TESTChanLoc {}'.format( - cell_type, Keysight5GTestApp._format_cells(cell), channel.upper())) + cell_type, Keysight5GTestApp._format_cells(cell), + channel.upper())) elif arfcn == 1: self.send_cmd('BSE:CONFig:{}:{}:DL:CHANnel {}'.format( cell_type, Keysight5GTestApp._format_cells(cell), channel)) @@ -308,7 +337,8 @@ class Keysight5GTestApp(object): def toggle_contiguous_nr_channels(self, force_contiguous): self.assert_cell_off('NR5G', 1) - self.log.warning('Forcing contiguous NR channels overrides channel config.') + self.log.warning( + 'Forcing contiguous NR channels overrides channel config.') self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0') if force_contiguous: self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1') @@ -415,6 +445,40 @@ class Keysight5GTestApp(object): cell_type, Keysight5GTestApp._format_cells(cell), power)) self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type)) + def set_cell_ul_power_control(self, cell_type, cell, mode, target_power=0): + """Function configure UL power control + + Args: + cell_type: LTE or NR5G cell + cell: cell/carrier number + mode: UL power control mode. One of [TARget | MANual | UP | DOWN | DISabled] + target_power: target power for PUSCH + """ + self.send_cmd('BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:MODE {}'.format( + cell_type, Keysight5GTestApp._format_cells(cell), mode)) + if cell_type == 'NR5G' and mode == 'TARget': + self.send_cmd( + 'BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:TARGet:POWer {}'.format( + cell_type, Keysight5GTestApp._format_cells(cell), + target_power)) + elif cell_type == 'LTE' and mode == 'TARget': + self.send_cmd( + 'BSE:CONFig:{}:{}:UL:CLPControl:TARGet:POWer:PUSCH {}'.format( + cell_type, Keysight5GTestApp._format_cells(cell), + target_power)) + + def set_cell_input_power(self, cell_type, cell, power): + """Function to set cell input power + + Args: + cell_type: LTE or NR5G cell + cell: cell/carrier number + power: expected input power + """ + self.send_cmd('BSE:CONFIG:{}:{}:MANual:POWer {}'.format( + cell_type, Keysight5GTestApp._format_cells(cell), power)) + self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type)) + def set_cell_duplex_mode(self, cell_type, cell, duplex_mode): """Function to set cell power @@ -459,6 +523,18 @@ class Keysight5GTestApp(object): Keysight5GTestApp._format_cells(cell), scenario)) self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL') + def set_nr_schedule_slot_ratio(self, cell, slot_ratio): + """Function to set NR schedule to one of predefince quick configs. + + Args: + cell: cell number to address. schedule will apply to all cells + slot_ratio: downlink slot ratio + """ + self.assert_cell_off('NR5G', cell) + self.send_cmd('BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:RATIo {}'.format( + Keysight5GTestApp._format_cells(cell), slot_ratio)) + self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL') + def set_nr_cell_mcs(self, cell, dl_mcs, ul_mcs): """Function to set NR cell DL & UL MCS @@ -468,13 +544,58 @@ class Keysight5GTestApp(object): ul_mcs: mcs index to use on UL """ self.assert_cell_off('NR5G', cell) - self.send_cmd( - 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"' - .format(dl_mcs)) + frame_config_count = 5 + slot_config_count = 8 + if isinstance(dl_mcs, dict): + self.configure_nr_link_adaptation(cell, link_config=dl_mcs) + else: + for frame, slot in itertools.product(range(frame_config_count), + range(slot_config_count)): + self.send_cmd( + 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy FIXed' + .format(Keysight5GTestApp._format_cells(cell), frame, + slot)) + self.send_cmd( + 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"' + .format(dl_mcs)) self.send_cmd( 'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "UL:IMCS", "{}"' .format(ul_mcs)) + def configure_nr_link_adaptation(self, cell, link_config): + frame_config_count = 5 + slot_config_count = 8 + for frame, slot in itertools.product(range(frame_config_count), + range(slot_config_count)): + self.send_cmd( + 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy {}' + .format(Keysight5GTestApp._format_cells(cell), frame, slot, + link_config['link_policy'])) + self.send_cmd( + 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:IMCS {}'. + format(Keysight5GTestApp._format_cells(cell), frame, slot, + link_config['initial_mcs'])) + self.send_cmd( + 'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:MAXimum:IMCS {}' + .format(Keysight5GTestApp._format_cells(cell), frame, slot, + link_config['maximum_mcs'])) + self.send_cmd( + 'BSE:CONFig:NR5G:{}:MAC:LADaptation:NTX:BEValuation {}'.format( + Keysight5GTestApp._format_cells(cell), + link_config.get('adaptation_interval', 10000))) + self.send_cmd( + 'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:COUNt {}'.format( + Keysight5GTestApp._format_cells(cell), + link_config.get('target_nack_count', 1000))) + self.send_cmd( + 'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:MARGin {}'.format( + Keysight5GTestApp._format_cells(cell), + link_config.get('target_nack_margin', 100))) + self.send_cmd( + 'BSE:CONFig:NR5G:{}:MAC:DL:LADaptation:MCS:INCRement {}'.format( + Keysight5GTestApp._format_cells(cell), + link_config.get('mcs_step', 1))) + def set_lte_cell_mcs( self, cell, @@ -500,9 +621,16 @@ class Keysight5GTestApp(object): self.send_cmd( 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "DL:MCS:TABle", "{}"' .format(dl_mcs_table_formatted)) - self.send_cmd( - 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"' - .format(dl_mcs)) + self.configure_lte_periodic_csi_reporting(cell, 1) + if dl_mcs == 'WCQI': + self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE WCQI'.format( + Keysight5GTestApp._format_cells(cell))) + else: + self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE EXPLicit'.format( + Keysight5GTestApp._format_cells(cell))) + self.send_cmd( + 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"' + .format(dl_mcs)) self.send_cmd( 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MCS:TABle", "{}"' .format(ul_mcs_table)) @@ -510,6 +638,12 @@ class Keysight5GTestApp(object): 'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL", "UL:IMCS", "{}"' .format(ul_mcs)) + def configure_lte_periodic_csi_reporting(self, cell, enable): + """Function to enable/disable LTE CSI reporting.""" + + self.send_cmd('BSE:CONFig:LTE:{}:PHY:CSI:PERiodic:STATe {}'.format( + Keysight5GTestApp._format_cells(cell), enable)) + def set_lte_control_region_size(self, cell, num_symbols): self.assert_cell_off('LTE', cell) self.send_cmd('BSE:CONFig:LTE:{}:PHY:PCFich:CFI {}'.format( diff --git a/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py b/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py index 57e66ff72..fd2d79490 100644 --- a/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py +++ b/acts_tests/acts_contrib/test_utils/cellular/performance/CellularThroughputBaseTest.py @@ -29,6 +29,7 @@ from acts import base_test from acts import utils from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger from acts.controllers.utils_lib import ssh +from acts.controllers.android_lib.tel import tel_utils from acts.controllers import iperf_server as ipf from acts_contrib.test_utils.cellular.keysight_5g_testapp import Keysight5GTestApp from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils @@ -87,18 +88,20 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): self.user_params['retry_tests'] = [self.__class__.__name__] # Turn Airplane mode on - asserts.assert_true(utils.force_airplane_mode(self.dut, True), - 'Can not turn on airplane mode.') + #asserts.assert_true(utils.force_airplane_mode(self.dut, True), + # 'Can not turn on airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, True) def teardown_class(self): self.log.info('Turning airplane mode on') try: - asserts.assert_true(utils.force_airplane_mode(self.dut, True), - 'Can not turn on airplane mode.') + #asserts.assert_true(utils.force_airplane_mode(self.dut, True), + # 'Can not turn on airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, True) except: self.log.warning('Cannot perform teardown operations on DUT.') try: - self.keysight_test_app.set_cell_state('LTE', 1, 0) + self.keysight_test_app.turn_all_cells_off() self.keysight_test_app.destroy() except: self.log.warning('Cannot perform teardown operations on tester.') @@ -112,11 +115,11 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): def teardown_test(self): self.retry_flag = False self.log.info('Turing airplane mode on') - asserts.assert_true(utils.force_airplane_mode(self.dut, True), - 'Can not turn on airplane mode.') - if self.keysight_test_app.get_cell_state('LTE', 'CELL1'): - self.log.info('Turning LTE off.') - self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0) + #asserts.assert_true(utils.force_airplane_mode(self.dut, True), + # 'Can not turn on airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, True) + self.log.info('Turning all cells off.') + self.keysight_test_app.turn_all_cells_off() log_path = os.path.join( context.get_current_context().get_full_output_path(), 'pixel_logs') os.makedirs(self.log_path, exist_ok=True) @@ -133,8 +136,9 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): and sets a retry_flag to enable further tweaking the test logic on second attempts. """ - asserts.assert_true(utils.force_airplane_mode(self.dut, True), - 'Can not turn on airplane mode.') + #asserts.assert_true(utils.force_airplane_mode(self.dut, True), + # 'Can not turn on airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, True) if self.keysight_test_app.get_cell_state('LTE', 'CELL1'): self.log.info('Turning LTE off.') self.keysight_test_app.set_cell_state('LTE', 'CELL1', 0) @@ -318,6 +322,10 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): # Configure all cells for cell_idx, cell in enumerate( testcase_params['endc_combo_config']['cell_list']): + if cell['cell_type'] == 'NR5G': + self.keysight_test_app.set_nr_cell_type( + cell['cell_type'], cell['cell_number'], + cell['nr_cell_type']) self.keysight_test_app.set_cell_duplex_mode( cell['cell_type'], cell['cell_number'], cell['duplex_mode']) self.keysight_test_app.set_cell_band(cell['cell_type'], @@ -326,6 +334,14 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): self.keysight_test_app.set_cell_dl_power( cell['cell_type'], cell['cell_number'], testcase_params['cell_power_sweep'][cell_idx][0], 1) + self.keysight_test_app.set_cell_input_power( + cell['cell_type'], cell['cell_number'], + self.testclass_params['input_power'][cell['cell_type']]) + self.keysight_test_app.set_cell_ul_power_control( + cell['cell_type'], cell['cell_number'], + self.testclass_params['ul_power_control_mode'], + self.testclass_params.get('ul_power_control_target',0) + ) if cell['cell_type'] == 'NR5G': self.keysight_test_app.set_nr_subcarrier_spacing( cell['cell_number'], cell['subcarrier_spacing']) @@ -361,10 +377,15 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): self.testclass_params['lte_ul_mac_padding']) if testcase_params['endc_combo_config']['nr_cell_count']: + if 'schedule_scenario' in testcase_params: self.keysight_test_app.set_nr_cell_schedule_scenario( 'CELL1', testcase_params['schedule_scenario']) + if testcase_params['schedule_scenario'] == 'FULL_TPUT': + self.keysight_test_app.set_nr_schedule_slot_ratio( + 'CELL1', + testcase_params['schedule_slot_ratio']) self.keysight_test_app.set_nr_ul_dft_precoding( 'CELL1', testcase_params['transform_precoding']) self.keysight_test_app.set_nr_cell_mcs( @@ -375,47 +396,80 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): self.keysight_test_app.set_ul_carriers( testcase_params['endc_combo_config']['nr_ul_carriers']) - # Turn on LTE cells - for cell in testcase_params['endc_combo_config']['cell_list']: - if cell['cell_type'] == 'LTE' and not self.keysight_test_app.get_cell_state( - cell['cell_type'], cell['cell_number']): - self.log.info('Turning LTE Cell {} on.'.format( - cell['cell_number'])) - self.keysight_test_app.set_cell_state(cell['cell_type'], - cell['cell_number'], 1) - - # Activate LTE aggregation - if testcase_params['endc_combo_config']['lte_scc_list']: - self.keysight_test_app.apply_lte_carrier_agg( - testcase_params['endc_combo_config']['lte_scc_list']) - - self.log.info('Waiting for LTE connections') - # Turn airplane mode off - num_apm_toggles = 5 - for idx in range(num_apm_toggles): - self.log.info('Turning off airplane mode') - asserts.assert_true(utils.force_airplane_mode(self.dut, False), - 'Can not turn off airplane mode.') - if self.keysight_test_app.wait_for_cell_status( - 'LTE', 'CELL1', 'CONN', 180): - break - elif idx < num_apm_toggles - 1: - self.log.info('Turning on airplane mode') - asserts.assert_true(utils.force_airplane_mode(self.dut, True), - 'Can not turn on airplane mode.') - time.sleep(MEDIUM_SLEEP) - else: - asserts.fail('DUT did not connect to LTE.') - - if testcase_params['endc_combo_config']['nr_cell_count']: - self.keysight_test_app.apply_carrier_agg() - self.log.info('Waiting for 5G connection') - connected = self.keysight_test_app.wait_for_cell_status( - 'NR5G', testcase_params['endc_combo_config']['nr_cell_count'], - ['ACT', 'CONN'], 60) - if not connected: - asserts.fail('DUT did not connect to NR.') - time.sleep(SHORT_SLEEP) + if testcase_params['endc_combo_config']['lte_cell_count']: + # Connect flow for LTE and LTE+FR1 ENDC + # Turn on LTE cells + for cell in testcase_params['endc_combo_config']['cell_list']: + if cell['cell_type'] == 'LTE' and not self.keysight_test_app.get_cell_state( + cell['cell_type'], cell['cell_number']): + self.log.info('Turning LTE Cell {} on.'.format( + cell['cell_number'])) + self.keysight_test_app.set_cell_state(cell['cell_type'], + cell['cell_number'], 1) + # Activate LTE aggregation if applicable + if testcase_params['endc_combo_config']['lte_scc_list']: + self.keysight_test_app.apply_lte_carrier_agg( + testcase_params['endc_combo_config']['lte_scc_list']) + self.log.info('Waiting for LTE connections') + # Turn airplane mode off + num_apm_toggles = 10 + for idx in range(num_apm_toggles): + self.log.info('Turning off airplane mode') + #asserts.assert_true(utils.force_airplane_mode(self.dut, False), + # 'Can not turn off airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, False) + if self.keysight_test_app.wait_for_cell_status( + 'LTE', 'CELL1', 'CONN', 10*(idx+1)): + self.log.info('Connected! Waiting for {} seconds.'.format(LONG_SLEEP)) + time.sleep(LONG_SLEEP) + break + elif idx < num_apm_toggles - 1: + self.log.info('Turning on airplane mode') + # asserts.assert_true(utils.force_airplane_mode(self.dut, True), + # 'Can not turn on airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, True) + time.sleep(MEDIUM_SLEEP) + else: + asserts.fail('DUT did not connect to LTE.') + + if testcase_params['endc_combo_config']['nr_cell_count']: + self.keysight_test_app.apply_carrier_agg() + self.log.info('Waiting for 5G connection') + connected = self.keysight_test_app.wait_for_cell_status( + 'NR5G', testcase_params['endc_combo_config']['nr_cell_count'], + ['ACT', 'CONN'], 60) + if not connected: + asserts.fail('DUT did not connect to NR.') + time.sleep(SHORT_SLEEP) + elif testcase_params['endc_combo_config']['nr_cell_count']: + # Connect flow for NR FR1 Standalone + # Turn on NR cells + for cell in testcase_params['endc_combo_config']['cell_list']: + if cell['cell_type'] == 'NR5G' and not self.keysight_test_app.get_cell_state( + cell['cell_type'], cell['cell_number']): + self.log.info('Turning NR Cell {} on.'.format( + cell['cell_number'])) + self.keysight_test_app.set_cell_state(cell['cell_type'], + cell['cell_number'], 1) + num_apm_toggles = 10 + for idx in range(num_apm_toggles): + self.log.info('Turning off airplane mode now.') + #asserts.assert_true(utils.force_airplane_mode(self.dut, False), + # 'Can not turn off airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, False) + if self.keysight_test_app.wait_for_cell_status( + 'NR5G', 'CELL1', 'CONN', 10*(idx+1)): + self.log.info('Connected! Waiting for {} seconds.'.format(LONG_SLEEP)) + time.sleep(LONG_SLEEP) + break + elif idx < num_apm_toggles - 1: + self.log.info('Turning on airplane mode now.') + # asserts.assert_true(utils.force_airplane_mode(self.dut, True), + # 'Can not turn on airplane mode.') + tel_utils.toggle_airplane_mode(self.log, self.dut, True) + time.sleep(MEDIUM_SLEEP) + else: + asserts.fail('DUT did not connect to NR.') def _test_throughput_bler(self, testcase_params): """Test function to run cellular throughput and BLER measurements. @@ -449,27 +503,34 @@ class CellularThroughputBaseTest(base_test.BaseTestClass): for power_idx in range(len(testcase_params['cell_power_sweep'][0])): result = collections.OrderedDict() # Set DL cell power - result['cell_power'] = [] for cell_idx, cell in enumerate( testcase_params['endc_combo_config']['cell_list']): + cell_power_array = [] current_cell_power = testcase_params['cell_power_sweep'][ cell_idx][power_idx] - result['cell_power'].append(current_cell_power) + cell_power_array.append(current_cell_power) self.keysight_test_app.set_cell_dl_power( cell['cell_type'], cell['cell_number'], current_cell_power, 1) - + result['cell_power'] = cell_power_array # Start BLER and throughput measurements - result = self.run_single_throughput_measurement(testcase_params) - testcase_results['results'].append(result) + current_throughput = self.run_single_throughput_measurement(testcase_params) + lte_rx_meas = cputils.get_rx_measurements(self.dut, 'LTE') + nr_rx_meas = cputils.get_rx_measurements(self.dut, 'NR5G') + result['throughput_measurements'] = current_throughput + result['lte_rx_measurements'] = lte_rx_meas + result['nr_rx_measurements'] = nr_rx_meas - self.print_throughput_result(result) + self.print_throughput_result(current_throughput) + self.log.info('LTE Rx Measurements: {}'.format(lte_rx_meas)) + self.log.info('NR Rx Measurements: {}'.format(nr_rx_meas)) - if (('lte_bler_result' in result - and result['lte_bler_result']['total']['DL']['nack_ratio'] * + testcase_results['results'].append(result) + if (('lte_bler_result' in result['throughput_measurements'] + and result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio'] * 100 > 99) or - ('nr_bler_result' in result - and result['nr_bler_result']['total']['DL']['nack_ratio'] * + ('nr_bler_result' in result['throughput_measurements'] + and result['throughput_measurements']['nr_bler_result']['total']['DL']['nack_ratio'] * 100 > 99)): stop_counter = stop_counter + 1 else: diff --git a/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py b/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py index 4aaff91ef..2a725902c 100644 --- a/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py +++ b/acts_tests/acts_contrib/test_utils/cellular/performance/cellular_performance_test_utils.py @@ -17,6 +17,7 @@ import collections import logging import os +import re import time PCC_PRESET_MAPPING = { @@ -228,3 +229,23 @@ def log_system_power_metrics(ad, verbose=1): battery_meaurements['bat_{}'.format(par)] = ad.adb.shell( 'cat /sys/class/power_supply/maxfg/{}'.format(par)) logging.debug('Battery readings: {}'.format(battery_meaurements)) + + +def send_at_command(ad, at_command): + at_cmd_output = ad.adb.shell('am instrument -w -e request {} -e response wait ' + '"com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"'.format(at_command)) + return at_cmd_output + +def get_rx_measurements(ad, cell_type): + cell_type_int = 7 if cell_type == 'LTE' else 8 + rx_meas = send_at_command(ad, 'AT+GOOGGETRXMEAS\={}?'.format(cell_type_int)) + rsrp_regex = r"RSRP\[\d+\]\s+(-?\d+)" + rsrp_values = [float(x) for x in re.findall(rsrp_regex, rx_meas)] + rsrq_regex = r"RSRQ\[\d+\]\s+(-?\d+)" + rsrq_values = [float(x) for x in re.findall(rsrq_regex, rx_meas)] + rssi_regex = r"RSSI\[\d+\]\s+(-?\d+)" + rssi_values = [float(x) for x in re.findall(rssi_regex, rx_meas)] + sinr_regex = r"SINR\[\d+\]\s+(-?\d+)" + sinr_values = [float(x) for x in re.findall(sinr_regex, rx_meas)] + return {'rsrp': rsrp_values, 'rsrq': rsrq_values, 'rssi': rssi_values, 'sinr': sinr_values} + diff --git a/acts_tests/acts_contrib/test_utils/gnss/device_doze.py b/acts_tests/acts_contrib/test_utils/gnss/device_doze.py new file mode 100644 index 000000000..558037b27 --- /dev/null +++ b/acts_tests/acts_contrib/test_utils/gnss/device_doze.py @@ -0,0 +1,67 @@ +"""A module to trigger device into doze mode.""" +import enum +from retry import retry + + +_UNPLUG_POWER = "dumpsys battery unplug" +_RESET_POWER = "dumpsys battery reset" +_GET_DOZE_STATUS = "dumpsys deviceidle get {status}" +_FORCE_IDLE = "dumpsys deviceidle force-idle {status}" +_LEAVE_IDLE = "dumpsys deviceidle disable" + + +class DozeState(enum.Enum): + """Doze state.""" + INACTIVE = "INACTIVE" + ACTIVE = "ACTIVE" + IDLE = "IDLE" + + +class DozeType(enum.Enum): + DEEP = "deep" + LIGHT = "light" + + +def _check_doze_status( + dut, + doze_type: DozeType, + doze_state: DozeState): + command = _GET_DOZE_STATUS.format(status=doze_type.value) + doze_status = dut.adb.shell(command).strip() + dut.log.info("%s doze status is %s" % (doze_type.value, doze_status)) + if doze_status != doze_state.value: + raise ValueError("Unexpected doze status.") + + +@retry(exceptions=ValueError, tries=3, delay=1) +def enter_doze_mode( + dut, + doze_type: DozeType): + """Sets device into doze mode according to given doze type. + Args: + dut: The device under test. + doze_type: The desired doze type. + Raises: + ValueError: When fail to sets device into doze mode. + """ + dut.adb.shell(_UNPLUG_POWER) + dut.adb.shell( + _FORCE_IDLE.format(status=doze_type.value), + ) + _check_doze_status(dut, doze_type, DozeState.IDLE) + + +@retry(exceptions=ValueError, tries=3, delay=1) +def leave_doze_mode( + dut, + doze_type: DozeType): + """Sets device out of doze mode. + Args: + dut: The device under test. + doze_type: The desired doze type. + Raises: + ValueError: When fail to sets device out of doze mode. + """ + dut.adb.shell(_RESET_POWER) + dut.adb.shell(_LEAVE_IDLE) + _check_doze_status(dut, doze_type, DozeState.ACTIVE) diff --git a/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py b/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py index a9f957947..d2930a680 100644 --- a/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py +++ b/acts_tests/acts_contrib/test_utils/gnss/gnss_measurement.py @@ -1,3 +1,4 @@ +from datetime import datetime import os import pathlib import re @@ -6,6 +7,13 @@ import tempfile from collections import defaultdict +_EVENT_TIME_FORMAT = "%Y/%m/%d %H:%M:%S.%f" +_EVENT_TIME_PATTERN = re.compile(r"(\d+(?:\/\d+){2}\s\d{2}(?::\d{2}){2}\.\d+)\sRead:") +_GNSS_CLOCK_START_LOG_PATTERN = re.compile(r"^GnssClock:") +_GNSS_CLOCK_TIME_NANOS_PATTERN = re.compile(f"^\s+TimeNanos\s*=\s*([-]?\d*)") +_GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN = re.compile(f"^\s+FullBiasNanos\s*=\s*([-]?\d*)") +_GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN = re.compile(f"^\s+elapsedRealtimeNanos\s*=\s*([-]?\d*)") + class AdrInfo: """Represent one ADR value An ADR value is a decimal number range from 0 - 31 @@ -120,6 +128,34 @@ class AdrStatistic: self.total_count += adr_info.count +class GnssClockSubEvent: + time_nanos: int + full_bias_nanos: int + elapsed_real_time_nanos: int + + def __init__(self, event_time): + self.event_time = event_time + + def parse(self, line): + if _GNSS_CLOCK_TIME_NANOS_PATTERN.search(line): + self.time_nanos = int(_GNSS_CLOCK_TIME_NANOS_PATTERN.search(line).group(1)) + elif _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line): + self.full_bias_nanos = int( + _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line).group(1)) + elif _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line): + self.elapsed_real_time_nanos = int( + _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line).group(1)) + + def __repr__(self) -> str: + return (f"event time: {self.event_time}, " + f"timenanos: {self.time_nanos}, " + f"full_bias: {self.full_bias_nanos}, " + f"elapsed_realtime: {self.elapsed_real_time_nanos}") + + @property + def gps_elapsed_realtime_diff(self): + return self.time_nanos - self.full_bias_nanos - self.elapsed_real_time_nanos + class GnssMeasurement: """Represent the content of measurement file generated by gps tool""" @@ -206,3 +242,21 @@ class GnssMeasurement: adr_info = AdrInfo(key, value) adr_static.add_adr_info(adr_info) return adr_static + + def get_gnss_clock_info(self): + sub_events = [] + event_time = None + with tempfile.TemporaryDirectory() as folder: + local_measurement_file = os.path.join(folder, "measurement_file") + self.ad.pull_files(self._get_latest_measurement_file_path(), local_measurement_file) + with open(local_measurement_file) as f: + for line in f: + if re.search(_EVENT_TIME_PATTERN, line): + event_time = re.search(_EVENT_TIME_PATTERN, line).group(1) + event_time = datetime.strptime(event_time, _EVENT_TIME_FORMAT) + elif re.search(_GNSS_CLOCK_START_LOG_PATTERN, line): + sub_events.append(GnssClockSubEvent(event_time)) + elif line.startswith(" "): + sub_events[-1].parse(line) + return sub_events + diff --git a/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py b/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py index ec9ba616e..50202fce0 100644 --- a/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py +++ b/acts_tests/acts_contrib/test_utils/gnss/gnss_test_utils.py @@ -23,9 +23,10 @@ import fnmatch import posixpath import subprocess import tempfile +import functools from retry import retry from collections import namedtuple -from datetime import datetime +from datetime import datetime, timedelta from xml.etree import ElementTree from contextlib import contextmanager from statistics import median @@ -43,6 +44,7 @@ from acts_contrib.test_utils.gnss.gnss_measurement import GnssMeasurement from acts_contrib.test_utils.wifi import wifi_test_utils as wutils from acts_contrib.test_utils.tel import tel_logging_utils as tlutils from acts_contrib.test_utils.tel import tel_test_utils as tutils +from acts_contrib.test_utils.gnss import device_doze from acts_contrib.test_utils.gnss import gnssstatus_utils from acts_contrib.test_utils.gnss import gnss_constant from acts_contrib.test_utils.gnss import supl @@ -115,7 +117,8 @@ XTRA_SERVER_2="http://" XTRA_SERVER_3="http://" """ _BRCM_DUTY_CYCLE_PATTERN = re.compile(r".*PGLOR,\d+,STA.*") - +_WEARABLE_QCOM_VENDOR_REGEX = re.compile(r"init.svc.qcom") +_GPS_ELAPSED_REALTIME_DIFF_TOLERANCE = 500_000 class GnssTestUtilsError(Exception): pass @@ -266,11 +269,11 @@ def enable_vendor_orbit_assistance_data(ad): ad: An AndroidDevice object. """ ad.root_adb() - if is_device_wearable(ad): - lto_mode_wearable(ad, True) - elif check_chipset_vendor_by_qualcomm(ad): + if check_chipset_vendor_by_qualcomm(ad): disable_xtra_throttle(ad) reboot(ad) + elif is_device_wearable(ad): + lto_mode_wearable(ad, True) else: lto_mode(ad, True) @@ -285,10 +288,10 @@ def disable_vendor_orbit_assistance_data(ad): ad: An AndroidDevice object. """ ad.root_adb() - if is_device_wearable(ad): - lto_mode_wearable(ad, False) - elif check_chipset_vendor_by_qualcomm(ad): + if check_chipset_vendor_by_qualcomm(ad): disable_qualcomm_orbit_assistance_data(ad) + elif is_device_wearable(ad): + lto_mode_wearable(ad, False) else: lto_mode(ad, False) @@ -625,6 +628,9 @@ def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60): Args: ad: An AndroidDevice object. dwelltime: Waiting time for modem reset. Default is 60 seconds. + + Returns: + ssr_crash_time: The epoch time SSR is crashed """ mds_check = ad.adb.shell("pm path com.google.mdstest") if not mds_check: @@ -634,6 +640,7 @@ def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60): '"com.google.mdstest/com.google.mdstest.instrument' '.ModemCommandInstrumentation"') ad.log.info("Triggering modem SSR crash by MDS") + ssr_crash_time = get_current_epoch_time() output = ad.adb.shell(cmd, ignore_status=True) ad.log.debug(output) time.sleep(dwelltime) @@ -643,6 +650,7 @@ def gnss_trigger_modem_ssr_by_mds(ad, dwelltime=60): else: raise signals.TestError( "Failed to trigger modem SSR crash by MDS. \n%s" % output) + return ssr_crash_time def check_xtra_download(ad, begin_time): @@ -1027,8 +1035,11 @@ def gnss_tracking_via_gtw_gpstool(ad, set to False. freq: An integer to set location update frequency. Default set to 0. is_screen_off: whether to turn off during tracking + + Returns: + The datetime obj of first fixed """ - process_gnss_by_gtw_gpstool( + first_fixed_time = process_gnss_by_gtw_gpstool( ad, criteria=criteria, api_type=api_type, meas_flag=meas_flag, freq=freq, bg_display=is_screen_off) ad.log.info("Start %s tracking test for %d minutes" % (api_type.upper(), @@ -1039,6 +1050,8 @@ def gnss_tracking_via_gtw_gpstool(ad, ad.log.info("Successfully tested for %d minutes" % testtime) start_gnss_by_gtw_gpstool(ad, state=False, api_type=api_type) + return first_fixed_time + def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss", ignore_hal_crash=False): @@ -1056,7 +1069,9 @@ def wait_n_mins_for_gnss_tracking(ad, begin_time, testtime, api_type="gnss", # add sleep here to avoid too many request and cause device not responding time.sleep(1) -def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location): +def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location, + raninterval: bool = False, mininterval: int = 10, + maxinterval: int = 40): """Run GNSS TTFF test with selected mode and parse the results. Args: @@ -1069,7 +1084,12 @@ def run_ttff_via_gtw_gpstool(ad, mode, criteria, test_cycle, true_location): # Before running TTFF, we will run tracking and try to get first fixed. # But the TTFF before TTFF doesn't apply to any criteria, so we set a maximum value. process_gnss_by_gtw_gpstool(ad, criteria=FIRST_FIXED_MAX_WAITING_TIME) - ttff_start_time = start_ttff_by_gtw_gpstool(ad, mode, test_cycle) + ttff_start_time = start_ttff_by_gtw_gpstool(ad, + mode, + iteration=test_cycle, + raninterval=raninterval, + mininterval=mininterval, + maxinterval=maxinterval) ttff_data = process_ttff_by_gtw_gpstool(ad, ttff_start_time, true_location) result = check_ttff_data(ad, ttff_data, gnss_constant.TTFF_MODE.get(mode), criteria) asserts.assert_true( @@ -1207,7 +1227,7 @@ def verify_gps_time_should_be_close_to_device_time(ad, tracking_result): tracking_result: The result we get from GNSS tracking. """ ad.log.info("Validating GPS/Device time difference") - max_time_diff_in_seconds = 2.0 + max_time_diff_in_seconds = 3.0 if is_device_wearable() else 2.0 exceed_report = [] for report in tracking_result.values(): time_diff_in_seconds = abs((report.report_time - report.device_time).total_seconds()) @@ -1973,10 +1993,13 @@ def check_chipset_vendor_by_qualcomm(ad): Returns: True if it's by Qualcomm. False irf not. """ - ad.root_adb() - soc = str(ad.adb.shell("getprop gsm.version.ril-impl")) - ad.log.debug("SOC = %s" % soc) - return "Qualcomm" in soc + if is_device_wearable(ad): + props = str(ad.adb.shell("getprop")) + return True if _WEARABLE_QCOM_VENDOR_REGEX.search(props) else False + else: + soc = str(ad.adb.shell("getprop gsm.version.ril-impl")) + ad.log.debug("SOC = %s" % soc) + return "Qualcomm" in soc def delete_lto_file(ad): @@ -2248,30 +2271,114 @@ def get_process_pid(ad, process_name): return pid -def restart_gps_daemons(ad): +def restart_gps_daemons(ad, service): """Restart GPS daemons by killing services of gpsd, lhd and scd. Args: ad: An AndroidDevice object. + + Returns: + kill_start_time: The time GPSd being killed. """ - gps_daemons_list = ["gpsd", "lhd", "scd"] + kill_start_time = 0 ad.root_adb() - for service in gps_daemons_list: - time.sleep(3) - ad.log.info("Kill GPS daemon \"%s\"" % service) - service_pid = get_process_pid(ad, service) - ad.log.debug("%s PID: %s" % (service, service_pid)) - ad.adb.shell(f"kill -9 {service_pid}") - # Wait 3 seconds for daemons and services to start. - time.sleep(3) + ad.log.info("Kill GPS daemon \"%s\"" % service) + service_pid = get_process_pid(ad, service) + ad.log.debug("%s PID: %s" % (service, service_pid)) + ad.adb.shell(f"kill -9 {service_pid}") + kill_start_time = get_current_epoch_time() + new_pid, recover_time = get_new_pid_process_time(ad, service_pid, service, 20) + + ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % ( + service, service_pid, new_pid)) + ad.log.info("\t- \"%s\" process recovered time: %d ms" % (service, recover_time)) + return kill_start_time + - new_pid = get_process_pid(ad, service) - ad.log.debug("%s new PID: %s" % (service, new_pid)) - if not new_pid or service_pid == new_pid: - raise signals.TestError("Unable to restart \"%s\"" % service) +def get_new_pid_process_time(ad, origin_pid, process_name, timeout): + """Get the new process PID and the time it took for restarting + + Args: + ad: An AndroidDevice object. + origin_pid: The original pid of specified process + process_name: Name of process + timeout: Timeout of checking + + Returns: + 1. How long takes for restarting the specified process. + 2. New PID + """ + begin_time = get_current_epoch_time() + pid = None + while not pid and get_current_epoch_time() - begin_time < timeout * 1000: + pid = get_process_pid(ad, process_name) + if pid and origin_pid != pid: + ad.log.debug("%s new PID: %s" % (process_name, pid)) + return pid, get_current_epoch_time() - begin_time + raise ValueError("Unable to restart \"%s\"" % process_name) - ad.log.info("GPS daemon \"%s\" restarts successfully. PID from %s to %s" % ( - service, service_pid, new_pid)) + +def get_gpsd_update_time(ad, begin_time, dwelltime=30): + """Get the UTC time of first GPSd status update shows up after begin_time + + Args: + ad: An AndroidDevice object. + begin_time: The start time of the log. + dwelltime: Waiting time for gnss status update. Default is 30 seconds. + + Returns: + The datetime object which indicates when is first GPSd status update + """ + ad.log.info("Checking GNSS status after %s", + datetime.fromtimestamp( begin_time / 1000)) + time.sleep(dwelltime) + + gnss_status = ad.search_logcat("Gnss status update", + begin_time=begin_time) + if not gnss_status: + raise ValueError("No \"GNSS status update\" found in logs.") + ad.log.info("GNSS status update found.") + return int(gnss_status[0]["datetime_obj"].timestamp() * 1000) + + +def get_location_fix_time_via_gpstool_log(ad, begin_time): + """Get the UTC time of location update with given + device time from the log output by GPSTool. + + Args: + ad: An AndroidDevice object. + begin_time: The start time of the log. + + Returns: + The datetime object which indicates when is the + first location fix time shows up + """ + location_fix_time = ad.search_logcat("GPSService: Time", + begin_time=begin_time) + if not location_fix_time: + raise ValueError("No \"Location fix time\" found in logs.") + return int(location_fix_time[0]["datetime_obj"].timestamp() * 1000) + + +def get_gps_process_and_kill_function_by_vendor(ad): + """Get process to be killed by vendor and + return the kill function accordingly + + Args: + ad: An AndroidDevice object. + + Returns: + killed_processes: What processes to be killed + functions: The methods for killing each process + """ + if check_chipset_vendor_by_qualcomm(ad): + ad.log.info("Triggered modem SSR") + return {"ssr": functools.partial(gnss_trigger_modem_ssr_by_mds, ad=ad)} + else: + ad.log.info("Triggered restarting GPS daemons") + return {"gpsd": functools.partial(restart_gps_daemons, ad=ad, service="gpsd"), + "scd": functools.partial(restart_gps_daemons, ad=ad, service="scd"), + "lhd": functools.partial(restart_gps_daemons, ad=ad, service="lhd"),} def is_device_wearable(ad): @@ -2585,7 +2692,8 @@ def is_wearable_btwifi(ad): """ package = ad.adb.getprop("ro.product.product.name") ad.log.debug("[ro.product.product.name]: [%s]" % package) - return "btwifi" in package + # temp solution. Will check with dev team if there is a command to check. + return "btwifi" in package or ad.model == 'aurora' def compare_watch_phone_location(ad,watch_file, phone_file): @@ -2681,103 +2789,73 @@ def delete_bcm_nvmem_sto_file(ad): ad.log.info("Delete BCM's NVMEM ephemeris files.\n%s" % status) -def bcm_gps_xml_update_option(ad, - option, - search_line=None, - append_txt=None, - update_txt=None, - delete_txt=None, - gps_xml_path=BCM_GPS_XML_PATH): - """Append parameter setting in gps.xml for BCM solution +def bcm_gps_xml_update_option( + ad, child_tag, items_to_update={}, items_to_delete=[], gps_xml_path=BCM_GPS_XML_PATH): + """Updates gps.xml attributes. + + The process will go through update first then delete. Args: - option: A str to identify the operation (add/update/delete). - ad: An AndroidDevice object. - search_line: Pattern matching of target - line for appending new line data. - append_txt: New lines that will be appended after the search_line. - update_txt: New line to update the original file. - delete_txt: lines to delete from the original file. - gps_xml_path: gps.xml file location of DUT + ad: Device under test. + child_tag: (str) Which child node should be updated. + items_to_update: (dict) The attributes to be updated. + items_to_delete: (list) The attributes to be deleted. + gps_xml_path: (str) The gps.xml file path. Default is BCM_GPS_XML_PATH. """ remount_device(ad) - #Update gps.xml - tmp_log_path = tempfile.mkdtemp() - ad.pull_files(gps_xml_path, tmp_log_path) - gps_xml_tmp_path = os.path.join(tmp_log_path, "gps.xml") - gps_xml_file = open(gps_xml_tmp_path, "r") - lines = gps_xml_file.readlines() - gps_xml_file.close() - fout = open(gps_xml_tmp_path, "w") - if option == "add": - for line in lines: - if line.strip() in append_txt: - ad.log.info("{} is already in the file. Skip".format(append_txt)) - continue - fout.write(line) - if search_line in line: - for add_txt in append_txt: - fout.write(add_txt) - ad.log.info("Add new line: '{}' in gps.xml.".format(add_txt)) - elif option == "update": - for line in lines: - if search_line in line: - ad.log.info(line) - fout.write(update_txt) - ad.log.info("Update line: '{}' in gps.xml.".format(update_txt)) - continue - fout.write(line) - elif option == "delete": - for line in lines: - if delete_txt in line: - ad.log.info("Delete line: '{}' in gps.xml.".format(line.strip())) - continue - fout.write(line) - fout.close() + # to prevent adding nso into xml file + ElementTree.register_namespace("", "http://www.glpals.com/") + with tempfile.TemporaryDirectory() as temp_dir: + local_xml = os.path.join(temp_dir, "gps.xml.ori") + modified_xml = os.path.join(temp_dir, "gps.xml") + ad.pull_files(gps_xml_path, local_xml) + xml_data = ElementTree.parse(local_xml) + root_data = xml_data.getroot() + child_node = None + + for node in root_data: + if node.tag.endswith(child_tag): + child_node = node + break - # Update gps.xml with gps_new.xml - ad.push_system_file(gps_xml_tmp_path, gps_xml_path) + if child_node is None: + raise LookupError(f"Couldn't find node with {child_tag}") - # remove temp folder - shutil.rmtree(tmp_log_path, ignore_errors=True) + for key, value in items_to_update.items(): + child_node.attrib[key] = value + + for key in items_to_delete: + if key in child_node.attrib: + child_node.attrib.pop(key) + + xml_data.write(modified_xml, xml_declaration=True, encoding="utf-8", method="xml") + ad.push_system_file(modified_xml, gps_xml_path) + ad.log.info("Finish modify gps.xml") def bcm_gps_ignore_warmstandby(ad): """ remove warmstandby setting in BCM gps.xml to reset tracking filter Args: ad: An AndroidDevice object. """ - search_line_tag = '<gll\n' - delete_line_str = 'WarmStandbyTimeout1Seconds' + search_line_tag = 'gll' + delete_line_str = ['WarmStandbyTimeout1Seconds', 'WarmStandbyTimeout2Seconds'] bcm_gps_xml_update_option(ad, - "delete", - search_line_tag, - append_txt=None, - update_txt=None, - delete_txt=delete_line_str) - - search_line_tag = '<gll\n' - delete_line_str = 'WarmStandbyTimeout2Seconds' - bcm_gps_xml_update_option(ad, - "delete", - search_line_tag, - append_txt=None, - update_txt=None, - delete_txt=delete_line_str) + child_tag=search_line_tag, + items_to_delete=delete_line_str) def bcm_gps_ignore_rom_alm(ad): """ Update BCM gps.xml with ignoreRomAlm="True" Args: ad: An AndroidDevice object. """ - search_line_tag = '<hal\n' - append_line_str = [' IgnoreJniTime=\"true\"\n'] - bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str) + search_line_tag = 'hal' + append_line_str = {"IgnoreJniTime":"true", + "AutoColdStartSignal":"SIMULATED"} + bcm_gps_xml_update_option(ad, child_tag=search_line_tag, items_to_update=append_line_str) - search_line_tag = '<gll\n' - append_line_str = [' IgnoreRomAlm=\"true\"\n', - ' AutoColdStartSignal=\"SIMULATED\"\n', - ' IgnoreJniTime=\"true\"\n'] - bcm_gps_xml_update_option(ad, "add", search_line_tag, append_line_str) + search_line_tag = "gll" + append_line_str = {"IgnoreRomAlm":"true"} + bcm_gps_xml_update_option(ad, child_tag=search_line_tag, items_to_update=append_line_str) def check_inject_time(ad): @@ -2866,49 +2944,6 @@ def disable_ramdump(ad): ad.start_adb_logcat() -def deep_suspend_device(ad): - """Force DUT to enter deep suspend mode - - When DUT is connected to PCs, it won't enter deep suspend mode - by pressing power button. - - To force DUT enter deep suspend mode, we need to send the - following command to DUT "echo mem >/sys/power/state" - - To make sure the DUT stays in deep suspend mode for a while, - it will send the suspend command 3 times with 15s interval - - Args: - ad: An AndroidDevice object. - """ - ad.log.info("Ready to go to deep suspend mode") - begin_time = get_device_time(ad) - ad.droid.goToSleepNow() - ensure_power_manager_is_dozing(ad, begin_time) - ad.stop_services() - try: - command = "echo deep > /sys/power/mem_sleep && echo mem > /sys/power/state" - for i in range(1, 4): - ad.log.debug(f"Send deep suspend command round {i}") - ad.adb.shell(command, ignore_status=True) - # sleep here to ensure the device stays enough time in deep suspend mode - time.sleep(15) - if not _is_device_enter_deep_suspend(ad): - raise signals.TestFailure("Device didn't enter deep suspend mode") - ad.log.info("Wake device up now") - except Exception: - # when exception happen, it's very likely the device is rebooting - # to ensure the test can go on, wait for the device is ready - ad.log.warn("Device may be rebooting, wait for it") - ad.wait_for_boot_completion() - ad.root_adb() - raise - finally: - tutils.bring_up_sl4a(ad) - ad.start_adb_logcat() - ad.droid.wakeUpNow() - - def get_device_time(ad): """Get current datetime from device @@ -2942,27 +2977,23 @@ def ensure_power_manager_is_dozing(ad, begin_time): else: ad.log.warn("Power manager didn't enter dozing") - - -def _is_device_enter_deep_suspend(ad): - """Check device has been enter deep suspend mode - - If device has entered deep suspend mode, we should be able to find keyword - "suspend entry (deep)" +def enter_deep_doze_mode(ad, lasting_time_in_seconds: int): + """Puts the device into deep doze mode. Args: - ad: An AndroidDevice object. - - Returns: - bool: True / False -> has / has not entered deep suspend + ad: The device under test. + lasting_time_in_seconds: How long does the doze mode last. """ - cmd = f"adb -s {ad.serial} logcat -d|grep \"suspend entry (deep)\"" - process = subprocess.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, shell=True) - result, _ = process.communicate() - ad.log.debug(f"suspend result = {result}") + target_time = datetime.now() + timedelta(seconds=lasting_time_in_seconds) - return bool(result) + try: + ad.log.info("Enter deep doze mode for %d seconds" % lasting_time_in_seconds) + device_doze.enter_doze_mode(ad, device_doze.DozeType.DEEP) + while datetime.now() < target_time: + time.sleep(1) + finally: + ad.log.info("Leave deep doze mode") + device_doze.leave_doze_mode(ad, device_doze.DozeType.DEEP) def check_location_report_interval(ad, location_reported_time_src, total_seconds, tolerance): @@ -3026,6 +3057,7 @@ def set_screen_status(ad, off=True): def full_gnss_measurement(ad): """Context manager function to enable full gnss measurement""" try: + ad.adb.shell("settings put global development_settings_enabled 1") ad.adb.shell("settings put global enable_gnss_raw_meas_full_tracking 1") yield ad finally: @@ -3268,4 +3300,39 @@ def log_current_epoch_time(ad, sponge_key): sponge_key: The key name of the sponge property. """ current_epoch_time = get_current_epoch_time() // 1000 - ad.log.info(f"TestResult {sponge_key} {current_epoch_time}")
\ No newline at end of file + ad.log.info(f"TestResult {sponge_key} {current_epoch_time}") + + +def validate_diff_of_gps_clock_elapsed_realtime(ad, start_time): + """Validates the diff of gps clock and elapsed realtime should be stable. + + Args: + ad: The device under test. + start_time: When should the validation start. For BRCM devices, the PPS feature takes some + time after first fixed to start working. Therefore we should ignore some data. + """ + last_gps_elapsed_realtime_diff = 0 + variation_diff = {} + + for clock in GnssMeasurement(ad).get_gnss_clock_info(): + if clock.event_time < start_time: + continue + + if not bool(last_gps_elapsed_realtime_diff): + last_gps_elapsed_realtime_diff = clock.gps_elapsed_realtime_diff + continue + + current_gps_elapsed_realtime_diff = clock.gps_elapsed_realtime_diff + variation_diff[clock.event_time] = abs( + current_gps_elapsed_realtime_diff - last_gps_elapsed_realtime_diff) + last_gps_elapsed_realtime_diff = current_gps_elapsed_realtime_diff + + over_criteria_data = [ + (event_time, diff) for (event_time, diff) in variation_diff.items() if ( + diff > _GPS_ELAPSED_REALTIME_DIFF_TOLERANCE) + ] + + asserts.assert_true( + [] == over_criteria_data, + msg=f"Following data are over criteria: {over_criteria_data}", + ) diff --git a/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py b/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py index 43f792073..51b96f232 100644 --- a/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py +++ b/acts_tests/acts_contrib/test_utils/gnss/testtracker_util.py @@ -1,7 +1,7 @@ TEST_NAME_BY_TESTTRACKER_UUID = { # GnssFunctionTest "test_cs_first_fixed_system_server_restart": "8169c19d-ba2a-4fef-969b-87f793f4e699", - "test_cs_ttff_after_gps_service_restart": "247110d9-1c9e-429e-8e73-f16dd4a1ac74", + "test_recovery_and_location_time_after_gnss_services_restart": "247110d9-1c9e-429e-8e73-f16dd4a1ac74", "test_gnss_one_hour_tracking": "b3d20ecb-3727-48ed-8a03-19694cc726c1", "test_duty_cycle_function": "0bbfb818-da93-41d7-8d83-15bc53d8d2cf", "test_gnss_init_error": "c661780d-4864-4292-9988-88f64448fb78", @@ -24,6 +24,7 @@ TEST_NAME_BY_TESTTRACKER_UUID = { "test_measure_adr_rate_after_10_mins_tracking": "7ebf3b52-229a-4eaf-bbff-7c527e4a1d7c", "test_hal_crashing_should_resume_tracking": "0aee4450-edce-4e1a-8744-70d8c01937b0", "test_power_save_mode_should_apply_latest_measurement_setting": "59a14da2-40df-4106-a190-dcbcd2e877e0", + "test_the_diff_of_gps_clock_and_elapsed_realtime_should_be_stable": "ea6ba987-216c-4cd0-89bc-eace8a691210", # GnssConcurrencyTest "test_gnss_concurrency_location_1_chre_1": "cbd9ff54-4405-44a4-ac20-de33278406d1", "test_gnss_concurrency_location_1_chre_8": "ab56cb47-384e-4269-b2d8-6e80ce066de2", diff --git a/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py b/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py index 7af1e127e..bb59513b1 100644 --- a/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py +++ b/acts_tests/acts_contrib/test_utils/power/PowerBaseTest.py @@ -155,7 +155,8 @@ class PowerBaseTest(base_test.BaseTestClass): pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT, ap_dtim_period=None, - bits_root_rail_csv_export=False) + bits_root_rail_csv_export=False, + is_odpm_supported=False) # Setup the must have controllers, phone and monsoon self.dut = self.android_devices[0] @@ -165,7 +166,11 @@ class PowerBaseTest(base_test.BaseTestClass): # Make odpm path for P21 or later platform = self.dut.adb.shell(GET_PROPERTY_HARDWARE_PLATFORM) self.log.info('The hardware platform is {}'.format(platform)) - if platform.startswith('gs'): + if ( + platform.startswith('gs') + or platform.startswith('z') + or self.is_odpm_supported + ): self.odpm_folder = os.path.join(self.log_path, 'odpm') os.makedirs(self.odpm_folder, exist_ok=True) self.log.info('For P21 or later, create odpm folder {}'.format( diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py b/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py index 15043d4f2..720affc85 100644 --- a/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py +++ b/acts_tests/acts_contrib/test_utils/power/cellular/cellular_power_preset_base_test.py @@ -1,10 +1,13 @@ +import json import os from typing import Optional, List import time -from acts import context + +from acts import asserts from acts import signals -from acts.controllers.cellular_lib import AndroidCellularDut import acts_contrib.test_utils.power.cellular.cellular_power_base_test as PWCEL +from acts_contrib.test_utils.tel import tel_test_utils as telutils +from acts_contrib.test_utils.power.cellular import modem_logs # TODO: b/261639867 class AtUtil(): @@ -14,6 +17,11 @@ class AtUtil(): dut: AndroidDevice controller object. """ ADB_CMD_DISABLE_TXAS = 'am instrument -w -e request at+googtxas=2 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + ADB_CMD_GET_TXAS = 'am instrument -w -e request at+googtxas? -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + ADB_MODEM_STATUS = 'cat /sys/bus/platform/devices/cpif/modem_state' + ADB_CMD_SET_NV = ('am instrument -w ' + '-e request \'at+googsetnv=\"{nv_name}\",{nv_index},\"{nv_value}\"\' ' + '-e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"') def __init__(self, dut, log) -> None: self.dut = dut @@ -21,33 +29,43 @@ class AtUtil(): # TODO: to be remove when b/261639867 complete, # and we are using parent method. - def send(self, cmd: str,) -> Optional[str]: - res = str(self.dut.adb.shell(cmd)) - self.log.info(f'cmd sent: {cmd}') - self.log.info(f'response: {res}') - if 'SUCCESS' in res: - self.log.info('Command executed.') - else: - self.log.error('Fail to executed command.') + def send(self, cmd: str, retries: int=5) -> Optional[str]: + for _ in range(30): + modem_status = self.dut.adb.shell(self.ADB_MODEM_STATUS) + self.log.debug(f'Modem status: {modem_status}') + if modem_status == 'ONLINE': + break + time.sleep(1) + + wait_for_device_ready_time = 2 + for i in range(retries): + res = self.dut.adb.shell(cmd) + self.log.info(f'cmd sent: {cmd}') + self.log.debug(f'response: {res}') + if 'SUCCESS' in res and 'OK' in res: + return res + else: + self.log.warning('Fail to execute cmd, retry to send again.') + time.sleep(wait_for_device_ready_time) + self.log.error(f'Fail to execute cmd: {cmd}') return res - def lock_LTE(self): + def lock_band(self): + """Lock lte and nr bands. + + LTE bands to be locked include B1, B2, B4. + NR bands to belocked include n71, n78, n260. + """ adb_enable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",00,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' - adb_set_band_lock_mode_lte = r'am instrument -w -e request at+GOOGSETNV=\"NASU.SIPC.NetworkMode.ManualMode\",0,\"0D\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' - adb_set_band_lock_bitmap_0 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",0,\"09,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + adb_set_band_lock_bitmap_0 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",0,\"0B,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' adb_set_band_lock_bitmap_1 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",1,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' adb_set_band_lock_bitmap_2 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",2,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' adb_set_band_lock_bitmap_3 = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\",3,\"00,00,00,00,00,00,00,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' - # enable lte self.send(adb_enable_band_lock_lte) time.sleep(2) - # OD is for NR/LTE in 4412 menu - self.send(adb_set_band_lock_mode_lte) - time.sleep(2) - - # lock to B1 and B4 + # lock to B1, B2 and B4 self.send(adb_set_band_lock_bitmap_0) time.sleep(2) self.send(adb_set_band_lock_bitmap_1) @@ -57,28 +75,119 @@ class AtUtil(): self.send(adb_set_band_lock_bitmap_3) time.sleep(2) - def clear_lock_band(self): - adb_set_band_lock_mode_auto = r'am instrument -w -e request at+GOOGSETNV=\"NASU.SIPC.NetworkMode.ManualMode\",0,\"03\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' - adb_disable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",0,\"00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' - - # band lock mode auto - self.send(adb_set_band_lock_mode_auto) + adb_enable_band_lock_nr = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_BASED_BAND_LIST_SUPPORT\",00,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + self.send(adb_enable_band_lock_nr) + time.sleep(2) + adb_add_band_list_n71 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",00,\"47,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + self.send(adb_add_band_list_n71) time.sleep(2) + adb_add_band_list_n78 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",01,\"4E,00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + self.send(adb_add_band_list_n78) + time.sleep(2) + adb_add_band_list_n260 = r'am instrument -w -e request at+GOOGSETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\",02,\"04,01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + self.send(adb_add_band_list_n260) + time.sleep(2) + + def disable_lock_band_lte(self): + adb_disable_band_lock_lte = r'am instrument -w -e request at+GOOGSETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\",0,\"01\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' # disable band lock lte self.send(adb_disable_band_lock_lte) time.sleep(2) def disable_txas(self): + res = self.send(self.ADB_CMD_GET_TXAS) + if '+GOOGGETTXAS:2' in res: + self.log.info('TXAS is in default.') + return res cmd = self.ADB_CMD_DISABLE_TXAS response = self.send(cmd) return 'OK' in response + def get_band_lock_info(self): + cmd = r'am instrument -w -e request at+GOOGGETNV=\"!SAEL3.Manual.Enabled.RFBands.BitMap\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + res = self.send(cmd) + cmd = r'am instrument -w -e request at+GOOGGETNV=\"!SAEL3.Manual.Band.Select\ Enb\/\ Dis\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + res += self.send(cmd) + cmd = r'am instrument -w -e request at+GOOGGETNV=\"!NRRRC.SIM_BASED_BAND_LIST_SUPPORT\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + res += self.send(cmd) + cmd = r'am instrument -w -e request at+GOOGGETNV=\"!NRRRC.SIM_OPERATOR_BAND_LIST\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + res += self.send(cmd) + return res + + def set_nv(self, nv_name, index, value): + cmd = self.ADB_CMD_SET_NV.format( + nv_name=nv_name, + nv_index=index, + nv_value=value + ) + res = self.send(cmd) + return res + + def get_sim_slot_mapping(self): + cmd = r'am instrument -w -e request at+slotmap? -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + return self.send(cmd) + + def set_single_psim(self): + cmd = r'am instrument -w -e request at+slotmap=1 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + return self.send(cmd) + + def disable_dsp(self): + cmd = r'am instrument -w -e request at+googsetnv=\"NASU\.LCPU\.LOG\.SWITCH\",0,\"00\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + return self.send(cmd) + + def get_dsp_status(self): + cmd = r'am instrument -w -e request at+googgetnv=\"NASU\.LCPU\.LOG\.SWITCH\" -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' + return self.send(cmd) + + def enable_ims_nr(self): + # set !NRCAPA.Gen.VoiceOverNr + self.set_nv( + nv_name = '!NRCAPA.Gen.VoiceOverNr', + index = '0', + value = '01' + ) + # set PSS.AIMS.Enable.NRSACONTROL + self.set_nv( + nv_name = 'PSS.AIMS.Enable.NRSACONTROL', + index = '0', + value = '00' + ) + # set DS.PSS.AIMS.Enable.NRSACONTROL + self.set_nv( + nv_name = 'DS.PSS.AIMS.Enable.NRSACONTROL', + index = '0', + value = '00' + ) + if self.dut.model == 'oriole': + # For P21, NR.CONFIG.MODE/DS.NR.CONFIG.MODE + self.set_nv( + nv_name = 'NR.CONFIG.MODE', + index = '0', + value = '11' + ) + # set DS.NR.CONFIG.MODE + self.set_nv( + nv_name = 'DS.NR.CONFIG.MODE', + index = '0', + value = '11' + ) + else: + # For P22, NASU.NR.CONFIG.MODE to 11 + self.set_nv( + nv_name = 'NASU.NR.CONFIG.MODE', + index = '0', + value = '11' + ) + class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): # Key for ODPM report ODPM_ENERGY_TABLE_NAME = 'PowerStats HAL 2.0 energy meter' ODPM_MODEM_CHANNEL_NAME = '[VSYS_PWR_MODEM]:Modem' + # Pass fail threshold lower bound + THRESHOLD_TOLERANCE_LOWER_BOUND_DEFAULT = 0.3 + # Key for custom_property in Sponge CUSTOM_PROP_KEY_BUILD_ID = 'build_id' CUSTOM_PROP_KEY_INCR_BUILD_ID = 'incremental_build_id' @@ -94,6 +203,7 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): CUSTOM_PROP_KEY_MODEM_KIBBLE_PCIE_POWER = 'modem_kibble_pcie_power' CUSTOM_PROP_KEY_RFFE_POWER = 'rffe_power' CUSTOM_PROP_KEY_MMWAVE_POWER = 'mmwave_power' + CUSTOM_PROP_KEY_CURRENT_REFERENCE_TARGET = 'reference_target' # kibble report KIBBLE_SYSTEM_RECORD_NAME = '- name: default_device.C10_EVT_1_1.Monsoon:mA' MODEM_PCIE_RAIL_NAME_LIST = [ @@ -102,27 +212,41 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): 'PP0850_L8C_PCIE' ] - MODEM_RFFE_RAIL_NAME_LIST = [ - 'PP1200_L31C_RFFE', - 'VSYS_PWR_RFFE', - 'PP2800_L33C_RFFE' - ] + MODEM_RFFE_RAIL_NAME = 'VSYS_PWR_RFFE' MODEM_POWER_RAIL_NAME = 'VSYS_PWR_MODEM' + MODEM_POWER_RAIL_WO_PCIE_NAME = 'VSYS_PWR_MODEM_W_O_PCIE' + + WEARABLE_POWER_RAIL = 'LTE_DC' + + WEARABLE_SOC_MODEM_RAIL = 'SOC_MODEM_USBHS' + MODEM_MMWAVE_RAIL_NAME = 'VSYS_PWR_MMWAVE' - MONSOON_RAIL_NAME = 'Monsoon' + MONSOON_RAIL_NAME = 'Monsoon:mW' # params key MONSOON_VOLTAGE_KEY = 'mon_voltage' MDSTEST_APP_APK_NAME = 'mdstest.apk' - ADB_CMD_INSTALL = 'install {apk_path}' - ADB_CMD_DISABLE_TXAS = 'am instrument -w -e request at+googtxas=2 -e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"' - ADB_CMD_SET_NV = ('am instrument -w ' - '-e request at+googsetnv=\"{nv_name}\",{nv_index},\"{nv_value}\" ' - '-e response wait "com.google.mdstest/com.google.mdstest.instrument.ModemATCommandInstrumentation"') + + ADB_CMD_ENABLE_ALWAYS_ON_LOGGING = ( + 'am broadcast -n com.android.pixellogger/.receiver.AlwaysOnLoggingReceiver ' + '-a com.android.pixellogger.service.logging.LoggingService.ACTION_CONFIGURE_ALWAYS_ON_LOGGING ' + '-e intent_key_enable "true" ' + '-e intent_key_config "Lassen\ default" ' + '--ei intent_key_max_log_size_mb 100 ' + '--ei intent_key_max_number_of_files 20' + ) + ADB_CMD_DISABLE_ALWAYS_ON_LOGGING = ( + 'am start-foreground-service -a ' + 'com.android.pixellogger.service.logging.LoggingService.ACTION_STOP_LOGGING') + + ADB_CMD_TOGGLE_MODEM_LOG = 'setprop persist.vendor.sys.modem.logging.enable {state}' + + _ADB_GET_ACTIVE_NETWORK = ('dumpsys connectivity | ' + 'grep \'Active default network\'') def __init__(self, controllers): super().__init__(controllers) @@ -133,6 +257,9 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): self.mmwave_power = 0 self.modem_power = 0 self.monsoon_power = 0 + self.kibble_error_range = 2 + self.system_power = 0 + self.odpm_power = 0 def setup_class(self): super().setup_class() @@ -144,21 +271,85 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): self.at_util = AtUtil(self.cellular_dut.ad, self.log) # preset UE. + self.log.info(f'Bug report mode: {self.bug_report}') + self.toggle_modem_log(False) self.log.info('Installing mdstest app.') self.install_apk() - self.log.info('Disable antenna switch.') - is_txas_disabled = self.at_util.disable_txas() - self.log.info('Disable txas: ' + str(is_txas_disabled)) + self.unpack_userparams(is_mdstest_supported=True) + self.log.info(f'Supports mdstest: {self.is_mdstest_supported}') + if self.is_mdstest_supported: + # UE preset + self.log.info('Disable antenna switch.') + self.at_util.disable_txas() + time.sleep(10) + + # set device to be data centric + nv_result = self.at_util.set_nv( + nv_name = '!SAEL3.SAE_UE_OPERATION_MODE', + index = '0', + value = '03' + ) + self.log.info(nv_result) + + self.at_util.lock_band() + self.log.info('Band lock info: \n%s',self.at_util.get_band_lock_info()) - # get sim type + self.at_util.set_single_psim() + + self.unpack_userparams(is_wifi_only_device=False) + + # extract log only flag + self.unpack_userparams(collect_log_only=False) + # get sdim type self.unpack_userparams(has_3gpp_sim=True) + # extract time to take log after test + self.unpack_userparams(post_test_log_duration=30) + + # toggle on/off APM for all devices + self.log.info('Toggle APM on/off for all devices.') + for ad in self.android_devices: + telutils.toggle_airplane_mode_by_adb(self.log, ad, False) + time.sleep(2) + telutils.toggle_airplane_mode_by_adb(self.log, ad, True) + time.sleep(2) + + # clear modem logs + modem_logs.clear_modem_logging(self.cellular_dut.ad) + + def collect_power_data_and_validate(self): + cells_status_before = sorted(self.cellular_simulator.get_all_cell_status()) + self.log.info('UXM cell status before collect power: %s', cells_status_before) + + super().collect_power_data() + cells_status_after = sorted(self.cellular_simulator.get_all_cell_status()) + self.log.info('UXM cell status after collect power: %s', cells_status_after) + + # power measurement results + odpm_power_results = self.get_odpm_values() + self.odpm_power = odpm_power_results.get( + self.ODPM_MODEM_CHANNEL_NAME.lower(), 0) + if hasattr(self, 'bitses'): + self.parse_power_rails_csv() + + asserts.assert_true(cells_status_before == cells_status_after, + 'Cell status before {} and after {} the test run are not the same.'.format( + cells_status_before, cells_status_after + )) + self.threshold_check() def setup_test(self): - self.cellular_simulator.set_sim_type(self.has_3gpp_sim) try: - if 'LTE' in self.test_name: - self.at_util.lock_LTE() + if self.collect_log_only: + self.log.info('Collect log only mode on.') + # set log mask + modem_logs.set_modem_log_profle(self.cellular_dut.ad, modem_logs.ModemLogProfile.LASSEN_TCP_DSP) + # start log + modem_logs.start_modem_logging(self.cellular_dut.ad) + modem_log_dir = os.path.join(self.root_output_path, 'modem_log') + os.makedirs(modem_log_dir, exist_ok=True) + self.modem_log_path = os.path.join(modem_log_dir, self.test_name) + os.makedirs(self.modem_log_path, exist_ok=True) super().setup_test() except BrokenPipeError: self.log.info('TA crashed test need retry.') @@ -166,16 +357,37 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): self.cellular_simulator.recovery_ta() self.cellular_simulator.socket_connect() raise signals.TestFailure('TA crashed mid test, retry needed.') - # except: - # # self.log.info('Waiting for device to on.') - # # self.dut.adb.wait_for_device() - # # self.cellular_dut = AndroidCellularDut.AndroidCellularDut( - # # self.android_devices[0], self.log) - # # self.dut.root_adb() - # # # Restart SL4A - # # self.dut.start_services() - # # self.need_retry = True - # raise signals.TestError('Device reboot mid test, retry needed.') + + def toggle_modem_log(self, new_state: bool, timeout: int=30): + new_state = str(new_state).lower() + current_state = self.cellular_dut.ad.adb.shell('getprop persist.vendor.sys.modem.logging.enable') + cmd = self.ADB_CMD_TOGGLE_MODEM_LOG.format(state=new_state) + if new_state != current_state: + self.cellular_dut.ad.adb.shell(cmd) + for _ in range(timeout): + self.log.debug(f'Wait for modem logging status to be {new_state}.') + time.sleep(1) + current_state = self.cellular_dut.ad.adb.shell('getprop persist.vendor.sys.modem.logging.enable') + if new_state == current_state: + self.log.info(f'Always-on modem logging status is {new_state}.') + return + raise RuntimeError(f'Fail to set modem logging to {new_state}.') + + def collect_modem_log(self, out_path, duration: int=30): + # set log mask + modem_logs.set_modem_log_profle(self.cellular_dut.ad, modem_logs.ModemLogProfile.LASSEN_TCP_DSP) + + # start log + modem_logs.start_modem_logging(self.cellular_dut.ad) + time.sleep(duration) + # stop log + modem_logs.stop_modem_logging(self.cellular_dut.ad) + try: + # pull log + modem_logs.pull_logs(self.cellular_dut.ad, out_path) + finally: + # clear log + modem_logs.clear_modem_logging(self.cellular_dut.ad) def install_apk(self): sleep_time = 3 @@ -189,55 +401,6 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): else: self.log.warning('fail to install mdstest.') - def set_nv(self, nv_name, index, value): - cmd = self.ADB_CMD_SET_NV.format( - nv_name=nv_name, - nv_index=index, - nv_value=value - ) - response = str(self.cellular_dut.ad.adb.shell(cmd)) - self.log.info(response) - - def enable_ims_nr(self): - # set !NRCAPA.Gen.VoiceOverNr - self.set_nv( - nv_name = '!NRCAPA.Gen.VoiceOverNr', - index = '0', - value = '01' - ) - # set PSS.AIMS.Enable.NRSACONTROL - self.set_nv( - nv_name = 'PSS.AIMS.Enable.NRSACONTROL', - index = '0', - value = '00' - ) - # set DS.PSS.AIMS.Enable.NRSACONTROL - self.set_nv( - nv_name = 'DS.PSS.AIMS.Enable.NRSACONTROL', - index = '0', - value = '00' - ) - if self.cellular_dut.ad.model == 'oriole': - # For P21, NR.CONFIG.MODE/DS.NR.CONFIG.MODE - self.set_nv( - nv_name = 'NR.CONFIG.MODE', - index = '0', - value = '11' - ) - # set DS.NR.CONFIG.MODE - self.set_nv( - nv_name = 'DS.NR.CONFIG.MODE', - index = '0', - value = '11' - ) - else: - # For P22, NASU.NR.CONFIG.MODE to 11 - self.set_nv( - nv_name = 'NASU.NR.CONFIG.MODE', - index = '0', - value = '11' - ) - def get_odpm_values(self): """Get power measure from ODPM. @@ -289,6 +452,7 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): # example result of line.strip().split() # ['[VSYS_PWR_DISPLAY]:Display', '1039108.42', 'mWs', '(', '344.69)'] channel, _, _, _, delta_str = line.strip().split() + channel = channel.lower() delta = float(delta_str[:-2].strip()) # calculate OPDM power @@ -308,36 +472,44 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): def parse_power_rails_csv(self): kibble_dir = os.path.join(self.root_output_path, 'Kibble') - kibble_csv_path = None + kibble_json_path = None if os.path.exists(kibble_dir): for f in os.listdir(kibble_dir): - if self.test_name in f and '.csv' in f: - kibble_csv_path = os.path.join(kibble_dir, f) - self.log.info('Kibble csv file path: ' + kibble_csv_path) + if self.test_name in f and '.json' in f: + kibble_json_path = os.path.join(kibble_dir, f) + self.log.info('Kibble json file path: ' + kibble_json_path) break self.log.info('Parsing power rails from csv.') - if kibble_csv_path: - with open(kibble_csv_path, 'r') as f: - for line in f: - # railname,val,mA,val,mV,val,mW - railname, _, _, _, _, power, _ = line.split(',') + if kibble_json_path: + with open(kibble_json_path, 'r') as f: + rails_data_json = json.load(f) + if rails_data_json: + for record in rails_data_json: + unit = record['unit'] + if unit != 'mW': + continue + railname = record['name'] + power = record['avg'] # parse pcie power if self._is_any_substring(railname, self.MODEM_PCIE_RAIL_NAME_LIST): - self.log.info(railname + ': ' + power) - self.pcie_power += float(power) + self.log.info('%s: %f',railname, power) + self.pcie_power += power elif self.MODEM_POWER_RAIL_NAME in railname: - self.log.info(railname + ': ' + power) - self.modem_power = float(power) - elif self._is_any_substring(railname, self.MODEM_RFFE_RAIL_NAME_LIST): - self.log.info(railname + ': ' + power) - self.rffe_power = float(power) + self.log.info('%s: %f',railname, power) + self.modem_power = power + elif self.MODEM_RFFE_RAIL_NAME in railname: + self.log.info('%s: %f',railname, power) + self.rffe_power = power elif self.MODEM_MMWAVE_RAIL_NAME in railname: - self.log.info(railname + ': ' + power) - self.mmwave_power = float(power) - elif self.MONSOON_RAIL_NAME == railname: - self.log.info(railname + ': ' + power) - self.monsoon_power = float(power) + self.log.info('%s: %f',railname, power) + self.mmwave_power = power + elif self.MONSOON_RAIL_NAME in railname: + self.log.info('%s: %f',railname, power) + self.monsoon_power = power + elif self.WEARABLE_POWER_RAIL in railname or self.WEARABLE_SOC_MODEM_RAIL in railname: + self.log.info('%s: %f',railname, power) + self.modem_power += power if self.modem_power: self.power_results[self.test_name] = self.modem_power @@ -366,29 +538,28 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): 'ro.boot.hardware.revision' ) - # power measurement results - odpm_power_results = self.get_odpm_values() - odpm_power = odpm_power_results.get(self.ODPM_MODEM_CHANNEL_NAME, 0) - system_power = 0 - # if kibbles are using, get power from kibble modem_kibble_power_wo_pcie = 0 if hasattr(self, 'bitses'): - self.parse_power_rails_csv() modem_kibble_power_wo_pcie = self.modem_power - self.pcie_power - system_power = self.monsoon_power + self.system_power = self.monsoon_power else: - system_power = self.power_results.get(self.test_name, 0) + self.system_power = self.power_results.get(self.test_name, 0) + + # record reference target, if it exists + self.reference_target = '' + if self.threshold and self.test_name in self.threshold: + self.reference_target = self.threshold[self.test_name] self.record_data({ 'Test Name': self.test_name, 'sponge_properties': { - self.CUSTOM_PROP_KEY_SYSTEM_POWER: system_power, + self.CUSTOM_PROP_KEY_SYSTEM_POWER: self.system_power, self.CUSTOM_PROP_KEY_BUILD_ID: build_id, self.CUSTOM_PROP_KEY_INCR_BUILD_ID: incr_build_id, self.CUSTOM_PROP_KEY_MODEM_BASEBAND: modem_base_band, self.CUSTOM_PROP_KEY_BUILD_TYPE: build_type, - self.CUSTOM_PROP_KEY_MODEM_ODPM_POWER: odpm_power, + self.CUSTOM_PROP_KEY_MODEM_ODPM_POWER: self.odpm_power, self.CUSTOM_PROP_KEY_DEVICE_NAME: device_name, self.CUSTOM_PROP_KEY_DEVICE_BUILD_PHASE: device_build_phase, self.CUSTOM_PROP_KEY_MODEM_KIBBLE_POWER: self.modem_power, @@ -396,19 +567,112 @@ class PowerCellularPresetLabBaseTest(PWCEL.PowerCellularLabBaseTest): self.CUSTOM_PROP_KEY_MODEM_KIBBLE_WO_PCIE_POWER: modem_kibble_power_wo_pcie, self.CUSTOM_PROP_KEY_MODEM_KIBBLE_PCIE_POWER: self.pcie_power, self.CUSTOM_PROP_KEY_RFFE_POWER: self.rffe_power, - self.CUSTOM_PROP_KEY_MMWAVE_POWER: self.mmwave_power + self.CUSTOM_PROP_KEY_MMWAVE_POWER: self.mmwave_power, + self.CUSTOM_PROP_KEY_CURRENT_REFERENCE_TARGET: self.reference_target }, }) + def threshold_check(self): + """Check the test result and decide if it passed or failed. + + The threshold is provided in the config file. In this class, result is + current in mA. + """ + + if not self.threshold or self.test_name not in self.threshold: + self.log.error("No threshold is provided for the test '{}' in " + "the configuration file.".format(self.test_name)) + return + + if not hasattr(self, 'bitses'): + self.log.error("No bitses attribute found, threshold cannot be" + "checked against system power.") + return + + average_current = self.modem_power + if ('modem_rail' in self.threshold.keys() and self.threshold['modem_rail'] == self.MODEM_POWER_RAIL_WO_PCIE_NAME): + average_current = average_current - self.pcie_power + current_threshold = self.threshold[self.test_name] + + acceptable_upper_difference = max( + self.threshold[self.test_name] * self.pass_fail_tolerance, + self.kibble_error_range + ) + self.log.info('acceptable upper difference' + str(acceptable_upper_difference)) + + self.unpack_userparams(pass_fail_tolerance_lower_bound=self.THRESHOLD_TOLERANCE_LOWER_BOUND_DEFAULT) + acceptable_lower_difference = max( + self.threshold[self.test_name] * self.pass_fail_tolerance_lower_bound, + self.kibble_error_range) + self.log.info('acceptable lower diff ' + str(acceptable_lower_difference)) + + if average_current: + asserts.assert_true( + average_current < current_threshold + acceptable_upper_difference, + 'Measured average current in [{}]: {:.2f}mW, which is ' + 'out of the acceptable upper range {:.2f}+{:.2f}mW'.format( + self.test_name, average_current, current_threshold, + acceptable_upper_difference)) + + asserts.assert_true( + average_current > current_threshold - acceptable_lower_difference, + 'Measured average current in [{}]: {:.2f}mW, which is ' + 'out of the acceptable lower range {:.2f}-{:.2f}mW'.format( + self.test_name, average_current, current_threshold, + acceptable_lower_difference)) + + asserts.explicit_pass( + 'Measured average current in [{}]: {:.2f}mW, which is ' + 'within the acceptable range of {:.2f}-{:.2f} and {:.2f}+{:.2f}'.format( + self.test_name, average_current, current_threshold, + acceptable_lower_difference, current_threshold, acceptable_upper_difference)) + else: + asserts.fail( + 'Something happened, measurement is not complete, test failed') + + def _get_device_network(self) -> str: + """Get active network on device. + + Returns: + Information of active network in string. + """ + return self.dut.adb.shell( + self._ADB_GET_ACTIVE_NETWORK) + def teardown_test(self): + if self.collect_log_only: + try: + # stop log + modem_logs.stop_modem_logging(self.cellular_dut.ad) + # pull log + modem_logs.pull_logs(self.cellular_dut.ad, self.modem_log_path) + finally: + # clear log + modem_logs.clear_modem_logging(self.cellular_dut.ad) + else: + if self.is_mdstest_supported: + try: + self.collect_modem_log(self.modem_log_path, self.post_test_log_duration) + except RuntimeError: + self.log.warning('Fail to collect log before test end.') + self.log.info('===>Before test end info.<====') + cells_status = self.cellular_simulator.get_all_cell_status() + self.log.info('UXM cell status: %s', cells_status) + active_network = self._get_device_network() + self.log.info('Device network: %s', active_network) super().teardown_test() # restore device to ready state for next test - self.log.info('Enable mobile data.') - self.dut.adb.shell('svc data enable') + if not self.is_wifi_only_device: + self.log.info('Enable mobile data.') + self.cellular_dut.ad.adb.shell('svc data enable') self.cellular_simulator.detach() self.cellular_dut.toggle_airplane_mode(True) + if self.is_mdstest_supported: + self.at_util.disable_dsp() + self.log.info('Band lock info: \n%s', self.at_util.get_band_lock_info()) + self.log.info('Sim slot map: \n%s', self.at_util.get_sim_slot_mapping()) + self.log.info('DSP status: \n%s', self.at_util.get_dsp_status) + # processing result self.sponge_upload() - if 'LTE' in self.test_name: - self.at_util.clear_lock_band()
\ No newline at end of file diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py b/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py index cfdc67a1f..ecdb61869 100644 --- a/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py +++ b/acts_tests/acts_contrib/test_utils/power/cellular/ims_api_connector_utils.py @@ -1,228 +1,370 @@ -# TODO(hmtuan): add type annotation. -import requests +"""Abstraction for interacting with Keysight IMS emulator. + +Keysight provided an API Connector application which is a HTTP server +running on the same host as Keysight IMS server app and client app. +It allows IMS simulator/app to be controlled via HTTP request. +""" +import enum +import json +import logging import time +from typing import List, Optional, Any +import uuid +import requests + +from acts_contrib.test_utils.power.cellular.ssh_library import SshLibrary + +_LOG = logging.getLogger(__name__) + + +class ImsAppName(enum.Enum): + """IMS app name predefined by Keysight.""" + + CLIENT = 'client' + SERVER = 'server' + + +class ImsApiConnector: + """A wrapper class for Keysight Ims API Connector. + + This class provides high-level interface to control Keysigt IMS app. + Each instance of this class conresponding to one IMS app process. + + Attributes: + log: An logger object. + api_connector_ip: An IP of host where API Connector reside. + api_connector_port: A port number of API Connector server. + ims_app: An ImsAppName enum to specify which emulator/app to control. + _api_token: An arbitrary and unique string to identify the link between API + connector and ims app. + _ims_app_ip: An IP of IMS emulator/app, usually the value of localhost. + _ims_app_port: Listening port of IMS emulator/app. + ssh: An ssh connection object that can be used to check app status, close + and reopen apps. + """ + + _BASE_URL_FORMAT = 'http://{addr}:{port}/ims/api/{app}s/{api_token}' + + _SSH_USERNAME = 'User' + _APP_BOOT_TIME = 30 + + _IMS_CLIENT_IDLE_STATUS = 'Idle' + _IMS_CLIENT_APP = 'Keysight.ImsSip.Client.exe' + _IMS_CLIENT_APP_LOC = ( + r'C:\Program Files (x86)\Keysight\C8700201A\IMS-SIP Client\\' + ) + _IMS_SERVER_APP = 'Keysight.ImsSip.Server.exe' + _IMS_SERVER_APP_LOC = ( + r'C:\Program Files (x86)\Keysight\C8700201A\IMS-SIP Server\\' + ) + _IMS_API_APP = "Keysight.ImsSip.ApiConnector.exe" + _IMS_API_APP_LOC = ( + r"C:\Program Files (x86)\Keysight\C8700201A\IMS-SIP API Connector\\" + ) + _IMS_APP_DEFAULT_PORT_MAPPING = { + ImsAppName.CLIENT: 8250, + ImsAppName.SERVER: 8240, + } + + _IMS_APP_DEFAULT_IP = '127.0.0.1' + + def __init__( + self, + api_connector_ip: str, + api_connector_port: int, + ims_app: ImsAppName, + ): + self.log = _LOG + + # create ssh connection to host pc + self.ssh = SshLibrary(api_connector_ip, self._SSH_USERNAME) + + # api connector info + self.api_connector_ip = api_connector_ip + self.api_connector_port = api_connector_port + + # ims app info + self._api_token = str(uuid.uuid4())[0:7] # api token can't contain '-' + self.log.debug('API token: %s', self._api_token) + self.ims_app = ims_app.value + self._ims_app_ip = self._IMS_APP_DEFAULT_IP + self._ims_app_port = self._IMS_APP_DEFAULT_PORT_MAPPING[ims_app] + + # construct base url + self._base_url = self._BASE_URL_FORMAT.format( + addr=self.api_connector_ip, + port=self.api_connector_port, + app=self.ims_app, + api_token=self._api_token, + ) + + # start server and client if they are not started + self._start_apps_if_down() + # create IMS-Client API link + is_app_linked = self.create_ims_app_link() + + if not is_app_linked: + raise RuntimeError('Fail to create link to IMS app.') + + def log_response_info(self, r: requests.Response): + self.log.debug('HTTP request sent:') + self.log.debug('-> method: %s', str(r.request.method)) + self.log.debug('-> url: %s', str(r.url)) + self.log.debug('-> status_code: %s', str(r.status_code)) + + def create_ims_app_link(self) -> bool: + """Creates link between Keysight API Connector to ims app. + + Returns: + True if API connector server linked/connected with ims app, + False otherwise. + """ + self.log.info('Creating ims_%s link: ', self.ims_app) + self.log.info( + '%s:%s:%s', self._api_token, self._ims_app_ip, self._ims_app_port + ) + + request_data = { + 'targetIpAddress': self._ims_app_ip, + 'targetWcfPort': self._ims_app_port, + } + + r = requests.post(url=self._base_url, json=request_data) + self.log_response_info(r) + + return r.status_code == requests.codes.created + + def _remove_ims_app_link(self) -> bool: + """Removes link between Keysight API Connector to ims app. + + Returns: + True if successfully disconnected/unlinked, + False otherwise. + """ + self.log.info('Remove ims_%s link: %s', self.ims_app, self._api_token) + + r = requests.delete(url=self._base_url) + self.log_response_info(r) + + return r.status_code == requests.codes.ok + + def get_ims_app_property(self, property_name: str) -> Optional[str]: + """Gets property value of IMS app. + + Args: + property_name: Name of property to get value. + + Returns: + Value of property which is inquired. + """ + self.log.info('Getting ims app property: %s', property_name) + + request_url = self._base_url + '/get_property' + request_params = {'propertyName': property_name} + r = requests.get(url=request_url, params=request_params) + self.log_response_info(r) + + try: + res_json = r.json() + except json.JSONDecodeError: + res_json = {'propertyValue': None} + prop_value = res_json['propertyValue'] + + return prop_value + + def set_ims_app_property( + self, property_name: str, property_value: Optional[Any] + ) -> bool: + """Sets property value of IMS app. + + Args: + property_name: Name of property to set value. + property_value: Value to be set. -class ImsApiConnector(): - """A wrapper class for Keysight Ims API Connector. - - Keysight provided an API connector application - which is a HTTP server running on the same host - as Keysight IMS server simulator and client simulator. - It allows IMS simulator/app to be controlled via HTTP request. - - Attributes: - api_connector_ip: ip of http server. - api_connector_port: port of http server. - ims_app: type of ims app (client/server). - api_token: an arbitrary and unique token-string - to identify the link between API connector - and ims app. - log: logger object. + Returns: + True if success, False otherwise. """ + self.log.info( + 'Setting ims property: %s = %s', property_name, str(property_value) + ) + + request_url = self._base_url + '/set_property' + data = {'propertyName': property_name, 'propertyValue': property_value} + r = requests.post(url=request_url, json=data) + self.log_response_info(r) + + return r.status_code == requests.codes.ok + + def ims_api_call_method( + self, method_name: str, method_args: List = [] + ) -> Optional[str]: + """Call Keysight API to control simulator. + + API Connector allows us to call Keysight Simulators' API without using C#. + To invoke an API, we are sending post request to API Connector (http + server). + + Args: + method_name: A name of method from Keysight API in string. + method_args: A python-array contains arguments for the called API. + + Returns: + A string value parse from response. + + Raises: + HTTPError: Response status code is different than requests.codes.ok. + """ + self.log.info('Calling Keysight simulator API: %s', method_name) + + if not isinstance(method_args, list): + method_args = [method_args] + request_url = self._base_url + '/call_method' + request_data = {'methodName': method_name, 'arguments': method_args} + r = requests.post(url=request_url, json=request_data) + + ret_val = None + + if r.status_code == requests.codes.ok: + return_value_key = 'returnValue' + if ('Content-Type' in r.headers) and r.headers[ + 'Content-Type' + ] == 'application/json': + response_body = r.json() + if response_body: + ret_val = response_body.get(return_value_key, None) + else: + raise requests.HTTPError(r.status_code, r.text) + + self.log_response_info(r) + + return ret_val + + def _is_line_idle(self, call_line_number) -> bool: + is_line_idle_prop = self.get_ims_app_property( + f'IVoip.CallLineParams({call_line_number}).SessionState' + ) + return is_line_idle_prop == self._IMS_CLIENT_IDLE_STATUS + + def _is_ims_client_app_registered(self) -> bool: + is_registered_prop = self.get_ims_app_property( + 'IComponentControl.IsRegistered' + ) + self.log.info('Registered: %s', str(is_registered_prop)) + return is_registered_prop == 'True' + + def restart_server(self) -> bool: + """Restarts the ims server application. + + Returns: + A boolean representing if the application was successfully started + """ + self.create_ims_app_link() + self.log.info('Stopping and starting server') + self.ims_api_call_method('IServer.StopListeners()') + result = self.ims_api_call_method('IServer.Start()') + self.log.info(result) + return result == 'True' + + def reregister_client(self): + """Re-registers the ims client with the server. + + Attempts to unregister, then register the client. If this fails, Attempts + to restart both the client and server apps. + """ + self.ims_api_call_method('ISipConnection.Unregister()') + self.ims_api_call_method('ISipConnection.Register()') + + # failed to re-register client, so try restarting server and client + if not self._is_ims_client_app_registered(): + self._restart_client_server_app() + self.ims_api_call_method('ISipConnection.Register()') + + def _restart_client_server_app(self): + """Restarts the client and server app.""" + self.ssh.close_app(self._IMS_CLIENT_APP) + self.ssh.close_app(self._IMS_SERVER_APP) + self.ssh.start_app(self._IMS_CLIENT_APP, self._IMS_CLIENT_APP_LOC) + self.ssh.start_app(self._IMS_SERVER_APP, self._IMS_SERVER_APP_LOC) + time.sleep(self._APP_BOOT_TIME) + self.create_ims_app_link() + + def _start_apps_if_down(self): + """Starts the client, server, api connector app if they are down.""" + started = False + + if not self.ssh.check_app_running(self._IMS_API_APP): + self.log.info('api connector was not running, starting now') + self.ssh.start_app(self._IMS_API_APP, self._IMS_API_APP_LOC) + started = True + + if not self.ssh.check_app_running(self._IMS_CLIENT_APP): + self.log.info('client was not running, starting now') + self.ssh.start_app(self._IMS_CLIENT_APP, self._IMS_CLIENT_APP_LOC) + started = True + + if not self.ssh.check_app_running(self._IMS_SERVER_APP): + self.log.info('server was not running, starting now') + self.ssh.start_app(self._IMS_SERVER_APP, self._IMS_SERVER_APP_LOC) + started = True + + if started: + time.sleep(self._APP_BOOT_TIME) + + def initiate_call(self, callee_number: str, call_line_idx: int = 0): + """Dials to callee_number. + + Args: + callee_number: A string value of number to be dialed to. + call_line_idx: An inteter index for call line. + + Raises: + RuntimeError: If ims client is cannot be registered. + RuntimeError: If ims client is not idle when it attempts to dial. + RuntimeError: If ims client is still in idle after starting dial. + """ + sleep_time = 5 + + self.log.info('checking if server/client/api connector are registered and running') + self._start_apps_if_down() + + # Reregister client to server + self.reregister_client() + + # clear logs + self.ims_api_call_method('ILogs.ClearResults()') + + # switch to call-line #1 (idx = 0) + self.log.info('Switching to call-line #1.') + self.set_ims_app_property('IVoip.SelectedCallLine', call_line_idx) + + # check whether the call-line #1 is ready for dialling + is_line1_idle = self._is_line_idle(call_line_idx) + if not is_line1_idle: + raise RuntimeError('Call-line not is not in indle state.') + + # entering callee number for call-line #1 + self.log.info('Enter callee number: %s.', callee_number) + self.set_ims_app_property( + 'IVoip.CallLineParams(0).CallLocation', callee_number + ) + + # dial entered callee number + self.log.info('Dialing call.') + self.ims_api_call_method('IVoip.Dial()') + + time.sleep(sleep_time) + + # check if dial success (not idle) + if self._is_line_idle(call_line_idx): + raise RuntimeError('Fail to dial.') + + def hangup_call(self): + self.ims_api_call_method('IVoip.HangUp()') + # get logs + self.log.info("Call Logs: ") + call_logs = self.get_ims_app_property('ILogs.Results') + self.log.info(call_logs) - def __init__(self, api_connector_ip, - api_connector_port, ims_app, - api_token, ims_app_ip, - ims_app_port, log): - # api connector info - self.api_connector_ip = api_connector_ip - self.api_connector_port = api_connector_port - - # ims app info - self.ims_app = ims_app - self.api_token = api_token - self.ims_app_ip = ims_app_ip - self.ims_app_port = ims_app_port - - self.log = log - # construct base url - self.base_url = 'http://{addr}:{port}/ims/api/{app}s/{api_token}'.format( - addr = self.api_connector_ip, - port = self.api_connector_port, - app = self.ims_app, - api_token = self.api_token - ) - - def get_base_url(self): - return self.base_url - - def create_ims_app_link(self): - """Create link between Keysight API Connector to ims app.""" - self.log.info('Create ims app link: token:ip:port') - self.log.info('Creating ims_{app} link: {token}:{target_ip}:{target_port}'.format( - app = self.ims_app, - token = self.api_token, - target_ip = self.ims_app_ip, - target_port= self.ims_app_port) - ) - - request_data = { - "targetIpAddress": self.ims_app_ip, - "targetWcfPort": self.ims_app_port - } - self.log.debug(f'Payload to create ims app link: {request_data}') - r = requests.post(url = self.get_base_url(), json = request_data) - - self.log.info('HTTP request sent:') - self.log.info('-> method: ' + str(r.request.method)) - self.log.info('-> url: ' + str(r.url)) - self.log.info('-> status_code: ' + str(r.status_code)) - - return (r.status_code == 201) - - def remove_ims_app_link(self): - """Remove link between Keysight API Connector to ims app.""" - self.log.info('Remove ims_{app} link: {token}'.format( - app = self.ims_app, - token = self.api_token) - ) - - r = requests.delete(url = self.get_base_url()) - - self.log.info('-> method: ' + str(r.request.method)) - self.log.info('-> url: ' + str(r.url)) - self.log.info('-> status_code: ' + str(r.status_code)) - - return (r.status_code == 200) - - def get_ims_app_property(self, property_name): - """Get property value of IMS app. - - Attributes: - property_name: name of property to get value. - """ - self.log.info('Getting ims app property: ' + property_name) - - request_url = self.get_base_url() + '/get_property' - request_params = {"propertyName": property_name} - r = requests.get(url = request_url, params = request_params) - - self.log.info('-> method: ' + str(r.request.method)) - self.log.info('-> url: ' + str(r.url)) - self.log.info('-> status_code: ' + str(r.status_code)) - - try: - res_json = r.json() - except: - res_json = {'propertyValue': None } - prop_value = res_json['propertyValue'] - - return prop_value - - def set_ims_app_property(self, property_name, property_value): - """Set property value of IMS app. - - Attributes: - property_name: name of property to set value. - property_value: value to be set. - """ - self.log.info('Setting ims property: ' + property_name + ' = ' + str(property_value)) - - request_url = self.get_base_url() + '/set_property' - data = { - 'propertyName': property_name, - 'propertyValue': property_value - } - r = requests.post(url = request_url, json = data) - - self.log.info('-> method: ' + str(r.request.method)) - self.log.info('-> url: ' + str(r.url)) - self.log.info('-> status_code: ' + str(r.status_code)) - - return (r.status_code == 200) - - def ims_api_call_method(self, method_name, method_args=None): - """ - Attributes: - method_name: a name of method from Keysight API in string. - method_args: a python-array contains - arguments for the called API method. - Returns: - a tuple of (STATUS_BOOL, FUNC_RET_VAL), - if STATUS_BOOL is false, FUNC_RET_VAL is questionable/undefined, - if STATUS_BOOL is true, FUNC_RET_VAL will be the API function return value - or FUNC_RET_VAL is None if the called method return nothing. - """ - self.log.info('Calling ims method: ' + method_name) - - if (method_args == None): - method_args = [] - elif (type(method_args) != list): - method_args = [method_args] - data = { - 'methodName': method_name, - 'arguments': method_args - } - request_url = self.get_base_url() + '/call_method' - r = requests.post(url = request_url, json = data) - - ret_val = None - - if ( ('Content-Type' in r.headers.keys()) and r.headers['Content-Type'] == 'application/json'): - # TODO(hmtuan): try json.loads() instead - response_body = r.json() - if ((response_body != None) and ('returnValue' in response_body.keys())) : - ret_val = response_body['returnValue'] - - self.log.info('-> method: ' + str(r.request.method)) - self.log.info('-> url: ' + str(r.url)) - self.log.info('-> status_code: ' + str(r.status_code)) - self.log.info('-> ret_val: ' + str(ret_val)) - - return (r.status_code == 200), ret_val - - def _is_line_idle(self, call_line_number): - is_line_idle_prop = self.get_ims_app_property( - f'IVoip.CallLineParams({call_line_number}).SessionState') - return is_line_idle_prop == 'Idle' - - def _is_ims_client_app_registered(self): - is_registered_prop = self.get_ims_app_property('IComponentControl.IsRegistered') - return is_registered_prop == 'True' - - def initiate_call(self, callee_number, call_line_idx=0): - """Dial to callee_number. - - Attributes: - callee_number: number to be dialed to. - """ - # create IMS-Client API link - ret_val = self.create_ims_app_link() - - if not ret_val: - raise RuntimeError('Fail to create link to IMS app.') - - # check if IMS-Client is registered, and if not, request client to perform Registration - self.log.info('Ensuring client registered.') - is_registered = self._is_ims_client_app_registered() - if not is_registered: - self.log.info('Client not currently registered - registering.') - self.ims_api_call_method('ISipConnection.Register()') - - is_registered = self._is_ims_client_app_registered() - if not is_registered: - raise RuntimeError('Failed to register IMS-client to IMS-server.') - - # switch to call-line #1 (idx = 0) - self.log.info('Switching to call-line #1.') - self.set_ims_app_property('IVoip.SelectedCallLine', call_line_idx) - - # check whether the call-line #1 is ready for dialling - is_line1_idle = self._is_line_idle(call_line_idx) - if not is_line1_idle: - raise RuntimeError('Call-line not is not in indle state.') - - # entering callee number for call-line #1 - self.log.info(f'Enter callee number: {callee_number}.') - self.set_ims_app_property('IVoip.CallLineParams(0).CallLocation', callee_number) - - # dial entered callee number - self.log.info('Dialling call.') - self.ims_api_call_method('IVoip.Dial()') - - time.sleep(5) - - # check if dial success (not idle) - is_line1_idle = self._is_line_idle(call_line_idx) - if is_line1_idle: - raise RuntimeError('Fail to dial.') + def tear_down(self): + self._remove_ims_app_link() + self.ssh.close_ssh_connection() diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py b/acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py new file mode 100644 index 000000000..eb21bd09c --- /dev/null +++ b/acts_tests/acts_contrib/test_utils/power/cellular/modem_logs.py @@ -0,0 +1,178 @@ +"""Functions to interact with modem log. + +Different modem logging profile can be found here: +cs/vendor/google/apps/PixelLogger/log_profile/GFT_Call_Performance.xml +""" +import enum +import logging +import time + +from mobly.controllers import android_device # type: ignore + +_LOG = logging.getLogger(__name__) + + +class ModemLogAction(enum.Enum): + """All possible valid PILOT logging actions.""" + + START = 'ACTION_START_LOGGING' + STOP = 'ACTION_STOP_LOGGING' + CLEAR = 'ACTION_CLEAR_LOG' + + +class ModemLogProfile(enum.Enum): + """All possible modem logging profiles.""" + + LASSEN_AUDIO_TCP_DSP = 'Call_Performance.xml' + LASSEN_TCP_DSP = 'Data_Performance.xml' + + +_MODEM_PILOT_ENABLE_PROP_NAME = 'vendor.pixellogger.pilot.logging_enable' + +_MODEM_LOG_PATH = '/sdcard/Android/data/com.android.pixellogger/files/logs' + +_ADB_SET_LOG_PROFILE_TEMPLATE = ( + 'am broadcast ' + '-a com.android.pixellogger.experiment.ACTION_LOAD_PROFILE ' + '-n com.android.pixellogger/.receiver.ExperimentLoggingReceiver ' + '--es name "{log_profile_name}"' +) + +_ADB_LOG_ACTION = ( + 'am broadcast ' + '-a com.android.pixellogger.experiment.{log_action} ' + '-n com.android.pixellogger/.receiver.ExperimentLoggingReceiver' +) + +_MODEM_LOGGING_PROFILE_PROP_NAME = ( + 'persist.vendor.pixellogger.pilot.profile_name' +) + + +def start_modem_logging( + dut: android_device.AndroidDevice, + timeout: int = 20, + polling_interval: int = 1, +) -> bool: + """Starts modem PILOT logging. + + Args: + dut: A mobly AndroidDevice controller object. + timeout: Seconds to try to confirm logging before giving up. + polling_interval: Seconds to wait between confirmation attempts. + + Raises: + RuntimeError: If unable to enable PILOT modem logging within timeout. + """ + dut.adb.root() + cmd = _ADB_LOG_ACTION.format(log_action=ModemLogAction.START.value) + dut.adb.shell(cmd) + end_time = time.time() + timeout + while time.time() < end_time: + time.sleep(polling_interval) + res = dut.adb.getprop(_MODEM_PILOT_ENABLE_PROP_NAME).strip() + _LOG.debug('PILOT modem logging enable: %s', res) + if res == 'true': + return + raise RuntimeError('Fail to start modem logging in PILOT mode.') + + +def stop_modem_logging( + dut: android_device.AndroidDevice, + timeout: int = 20, + polling_interval: int = 1, +) -> bool: + """Stops modem PILOT logging. + + Args: + dut: A mobly AndroidDevice controller object. + timeout: An integer of time in second to wait for modem log to stop. + polling_interval: Interval in second to check if modem logging stopped. + + Raises: + RuntimeError: If unable to disable PILOT modem logging within timeout. + """ + dut.adb.root() + cmd = _ADB_LOG_ACTION.format(log_action=ModemLogAction.STOP.value) + dut.adb.shell(cmd) + end_time = time.time() + timeout + while time.time() < end_time: + time.sleep(polling_interval) + res = dut.adb.getprop(_MODEM_PILOT_ENABLE_PROP_NAME).strip() + _LOG.debug('PILOT modem logging enable: %s', res) + if res == 'false' or not res: + return + raise RuntimeError('Fail to stop modem logging in PILOT mode.') + + +def clear_modem_logging(dut: android_device.AndroidDevice) -> None: + """Stops modem PILOT logging. + + Args: + dut: A mobly AndroidDevice controller object. + """ + dut.adb.root() + cmd = _ADB_LOG_ACTION.format(log_action=ModemLogAction.CLEAR.value) + dut.adb.shell(cmd) + _LOG.debug('Cleared modem logs.') + + +def set_modem_log_profle( + dut: android_device.AndroidDevice, + profile: ModemLogProfile, + timeout: int = 10, + polling_interval: int = 1, +) -> bool: + """Set modem log profile. + + Args: + dut: An mobly AndroidDevice controller object. + profile: An ModemLogProfile enum represent modem logging profile. + timeout: Time waiting for modem log profile to be set. + polling_interval: Interval in second to check if log profile change. + + Returns: + True if successfully set modem log profile within timeout. Fail otherwise. + """ + dut.adb.root() + cmd = _ADB_SET_LOG_PROFILE_TEMPLATE.format(log_profile_name=profile.value) + dut.adb.shell(cmd) + end_time = time.time() + timeout + while time.time() < end_time: + time.sleep(polling_interval) + if profile.value in get_modem_log_profile(dut): + return True + return False + + +def get_modem_log_profile(dut: android_device.AndroidDevice) -> str: + """Get modem log profile. + + Args: + dut: An mobly AndroidDevice controller object. + + Returns: + String value of modem logging profile name. + + Raises: + RuntimeError: If get empty response from adb shell. + """ + dut.adb.root() + res = dut.adb.getprop(_MODEM_LOGGING_PROFILE_PROP_NAME) + if not res: + raise RuntimeError('Fail to get modem logging profile from device.') + return res + + +def pull_logs(dut: android_device.AndroidDevice, out_path: str, pull_timeout = 300) -> None: + """Pulls logs on device. + + Args: + dut: An mobly AndroidDevice controller object. + out_path: A path to extract logs to. + pull_timeout: Seconds to wait for pulling complete. + """ + dut.adb.root() + dut.adb.pull( + "%s %s" % (_MODEM_LOG_PATH, out_path), timeout=pull_timeout) + _LOG.debug('Modem logs exported to %s', out_path)
\ No newline at end of file diff --git a/acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py b/acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py new file mode 100644 index 000000000..8bc932cfb --- /dev/null +++ b/acts_tests/acts_contrib/test_utils/power/cellular/ssh_library.py @@ -0,0 +1,131 @@ +"""Ssh module for checking status, starting and closing apps.""" +import logging +import re +import paramiko # type: ignore + +_LOG = logging.getLogger(__name__) + + +class SshLibrary: + """Library for creating a ssh connection, closing and opening apps.""" + + _PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}' + + _SSH_START_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"' + _SSH_CHECK_APP_RUNNING_CMD_FORMAT = ( + 'tasklist /fi "ImageName eq {regex_app_name}"' + ) + _SSH_KILL_PROCESS_BY_NAME = 'taskkill /IM {process_name} /F' + + def __init__(self, hostname: str, username: str): + self.log = _LOG + self.ssh = self.create_ssh_socket(hostname, username) + + def create_ssh_socket( + self, hostname: str, username: str + ) -> paramiko.SSHClient: + """Creates ssh session to host. + + Args: + hostname: IP address of the host machine. + username: Username of the host ims account. + + Returns: + An SSHClient object connected the hostname. + """ + + self.log.info('Creating ssh session to hostname:%s ', hostname) + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.load_system_host_keys() + ssh.connect(hostname=hostname, username=username) + self.log.info('SSH client to hostname:%s is connected', hostname) + return ssh + + def run_command_paramiko(self, command: str) -> tuple[str, str, str]: + """Runs a command using Paramiko and return stdout code. + + Args: + command: Command to run on the connected host. + + Returns: + A tuple containing the command result output, error information, and exit + status. + """ + + self.log.info('Running command: command:%s', command) + stdin, stdout, stderr = self.ssh.exec_command(command, timeout=10) + stdin.close() + err = ''.join(stderr.readlines()) + out = ''.join(stdout.readlines()) + + # psexec return process ID as part of the exit code + exit_status = stderr.channel.recv_exit_status() + if err: + self.log.error(str(err)) + else: + self.log.info(str(out)) + return out, err, str(exit_status) + + def close_ssh_connection(self): + """Closes ssh connection.""" + + self.log.info('Closing ssh connection') + self.ssh.close() + + def close_app(self, app: str) -> str: + """Closes any app whose name passed as an argument. + + Args: + app: Application name. + + Returns: + Resulting output of closing the application. + """ + + command = self._SSH_KILL_PROCESS_BY_NAME.format(process_name=app) + result, _, _ = self.run_command_paramiko(command) + return result + + def start_app(self, app: str, location: str) -> str: + """Starts any app whose name passed as an argument. + + Args: + app: Application name. + location: Directory location of the application. + + Returns: + Resulting output of starting the application. + + Raises: + RuntimeError: + Application failed to start. + """ + + command = self._SSH_START_APP_CMD_FORMAT.format(exe_path=location + app) + results, err, exit_status = self.run_command_paramiko(command) + + id_in_err = re.search( + self._PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status), + err[-1], + ) + if id_in_err: + raise RuntimeError('Fail to start app: ' + results + err) + + return results + + def check_app_running(self, app: str) -> bool: + """Checks if the given app is running. + + Args: + app: Application name. + + Returns: + A boolean representing if the application is running or not. + """ + is_running_cmd1 = self._SSH_CHECK_APP_RUNNING_CMD_FORMAT.format( + regex_app_name=app + ) + + result, _, _ = self.run_command_paramiko(is_running_cmd1) + return 'PID' in result diff --git a/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py b/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py index b9bbf093d..d2133ad2d 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py +++ b/acts_tests/acts_contrib/test_utils/wifi/WifiBaseTest.py @@ -100,6 +100,9 @@ class WifiBaseTest(BaseTestClass): self.country_code = WifiEnums.CountryCode.US wutils.set_wifi_country_code(ad, self.country_code) + if hasattr(self, "flagged_features"): + self._configure_flagged_features(ad, self.flagged_features) + def setup_test(self): if (hasattr(self, "android_devices")): wutils.start_all_wlan_logs(self.android_devices) @@ -1018,3 +1021,18 @@ class WifiBaseTest(BaseTestClass): asserts.fail(self.result_detail) return _safe_wrap_test_case + + def _configure_flagged_features(self, ad, flagged_features): + for module, features in flagged_features.items(): + for feature in features: + value = flagged_features[module][feature].lower() + if value not in ("true", "false"): + raise ValueError("Invalid flag value for %s %s." % (module, feature)) + self.log.info("Setting feature flag %s %s to %s." % (module, feature, value)) + adb_put_command = "device_config put %s %s %s" % (module, feature, value) + adb_get_command = "device_config get %s %s" % (module, feature) + ad.adb.shell(adb_put_command) + value_from_get = ad.adb.shell(adb_get_command) + + if type(value_from_get) != str or value_from_get.lower() != value: + raise RuntimeError("Failed to set flag value to %s (now is %s) for %s %s." % (value, value_from_get, module, feature))
\ No newline at end of file diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py index 9da35290d..9817e4bf1 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/__init__.py @@ -276,7 +276,7 @@ def get_iperf_arg_string(duration, num_processes=1, udp_throughput='1000M', ipv6=False, - udp_length=1470): + udp_length=1448): """Function to format iperf client arguments. This function takes in iperf client parameters and returns a properly diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py index d91803b30..60b74d549 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/bokeh_figure.py @@ -72,6 +72,7 @@ class BokehFigure(): title_size='15pt', axis_label_size='12pt', legend_label_size='12pt', + legend_location = 'top_right', axis_tick_label_size='12pt', x_axis_type='auto', sizing_mode='scale_both', @@ -91,6 +92,7 @@ class BokehFigure(): 'title_size': title_size, 'axis_label_size': axis_label_size, 'legend_label_size': legend_label_size, + 'legend_location': legend_location, 'axis_tick_label_size': axis_tick_label_size, 'x_axis_type': x_axis_type, 'sizing_mode': sizing_mode @@ -305,7 +307,7 @@ class BokehFigure(): axis_label_text_font_size=self. fig_property['axis_label_size']), 'right') # plot formatting - self.plot.legend.location = 'top_right' + self.plot.legend.location = self.fig_property['legend_location'] self.plot.legend.click_policy = 'hide' self.plot.title.text_font_size = self.fig_property['title_size'] self.plot.legend.label_text_font_size = self.fig_property[ diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py index fe2d3e7e3..d944db5ac 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_performance_test_utils/brcm_utils.py @@ -114,6 +114,44 @@ RATE_TABLE = { ] }, }, + 'EHT': { + 1: { + 20: [ + 8.6, 17.2, 25.8, 34.4, 51.6, 68.8, 77.4, 86.0, 103.2, 114.7, + 129.0, 143.4, 154.9, 172.1, 0, 0 + ], + 40: [ + 17.2, 34.4, 51.6, 68.8, 103.2, 137.6, 154.9, 172.1, 206.5, 229.4, + 258.1, 286.8, 309.7, 344.1, 0, 0 + ], + 80: [ + 36.0, 72.1, 108.1, 144.1, 216.2, 288.2, 324.3, 360.3, 432.4, + 480.4, 540.4, 600.4, 648.5, 720.6, 0, 0 + ], + 160: [ + 72.1, 144.1, 216.2, 288.2, 432.4, 576.5, 648.5, 720.6, 864.7, + 960.7, 1080.9, 1201, 1297.1, 1441.2, 0, 0 + ] + }, + 2: { + 20: [ + 17.2, 34.4, 51.6, 68.8, 103.2, 137.6, 154.8, 172, 206.4, 229.4, + 258, 286.8, 309.8, 344.2, 0, 0 + ], + 40: [ + 34.4, 68.8, 103.2, 137.6, 206.4, 275.2, 309.6, 344, 412.8, + 458.8, 516, 573.6, 619.4, 688.2, 0, 0 + ], + 80: [ + 72, 144.2, 216.2, 288.2, 432.4, 576.4, 648.6, 720.6, 864.8, + 960.8, 1080.8, 1200.8, 1297, 1441.2, 0, 0 + ], + 160: [ + 144, 288.4, 432.4, 576.4, 864.8, 1152.8, 1297.2, 1441.2, + 1729.6, 1921.6, 2161.6, 2401.6, 2594.2, 2882.4, 0, 0 + ] + }, + }, } @@ -343,7 +381,14 @@ def push_firmware(dut, firmware_files): def disable_beamforming(dut): - dut.adb.shell('wl txbf 0') + dut.adb.shell('wl down') + time.sleep(VERY_SHORT_SLEEP) + try: + dut.adb.shell('wl txbf 0') + dut.adb.shell('wl txbf_bfe_cap 0') + except: + logging.warning('Could not disable beamforming.') + dut.adb.shell('wl up') def set_nss_capability(dut, nss): @@ -366,16 +411,27 @@ def set_chain_mask(dut, chain): return # Set chain mask if needed dut.adb.shell('wl down') - time.sleep(VERY_SHORT_SLEEP) + time.sleep(SHORT_SLEEP) dut.adb.shell('wl txchain 0x{}'.format(chain)) dut.adb.shell('wl rxchain 0x{}'.format(chain)) dut.adb.shell('wl up') + try: + curr_tx_chain = int(dut.adb.shell('wl txchain')) + curr_rx_chain = int(dut.adb.shell('wl rxchain')) + except: + curr_tx_chain = -1 + curr_rx_chain = -1 + if curr_tx_chain != chain or curr_rx_chain != chain: + logging.error('Set chain mask failed.') + class LinkLayerStats(): LLSTATS_CMD = 'wl dump ampdu; wl counters;' LL_STATS_CLEAR_CMD = 'wl dump_clear ampdu; wl reset_cnts;' + BRCM_PHY_LOG_CLEAR_CMD = 'wl dump phycal; wl dump_clear txbf;' + BRCM_PHY_LOG_CMD = 'wl phy_rssi_ant; wl phy_snr_ant; wl nrate; wl dump phycal; wl tvpm; wl dump txbf;' BW_REGEX = re.compile(r'Chanspec:.+ (?P<bandwidth>[0-9]+)MHz') MCS_REGEX = re.compile(r'(?P<count>[0-9]+)\((?P<percent>[0-9]+)%\)') RX_REGEX = re.compile( @@ -413,6 +469,7 @@ class LinkLayerStats(): try: llstats_output = self.dut.adb.shell(self.LLSTATS_CMD, timeout=1) + self.dut.adb.shell_nb(self.LL_STATS_CLEAR_CMD) wl_join = self.dut.adb.shell("wl status") @@ -420,10 +477,20 @@ class LinkLayerStats(): self.bandwidth = int( re.search(self.BW_REGEX, wl_join).group('bandwidth')) except: + logging.debug('Failed to get counters and ampdu dumps.') llstats_output = '' + try: + phy_log_output = self.dut.adb.shell(self.BRCM_PHY_LOG_CMD, + ignore_status=True, + timeout=1) + self.dut.adb.shell_nb(self.BRCM_PHY_LOG_CLEAR_CMD) + except: + logging.debug('Failed to get phy log.') + phy_log_output = '' else: llstats_output = '' - self._update_stats(llstats_output) + phy_log_output = '' + self._update_stats(llstats_output, phy_log_output) def reset_stats(self): self.llstats_cumulative = self._empty_llstats() @@ -574,10 +641,11 @@ class LinkLayerStats(): llstats_summary['rx_per'] = 0 return llstats_summary - def _update_stats(self, llstats_output): + def _update_stats(self, llstats_output, phy_log_output): self.llstats_cumulative = self._empty_llstats() self.llstats_incremental = self._empty_llstats() self.llstats_incremental['raw_output'] = llstats_output + self.llstats_incremental['phy_log_output'] = phy_log_output self.llstats_incremental['mcs_stats'] = self._parse_mcs_stats( llstats_output) self.llstats_incremental['mpdu_stats'] = self._parse_mpdu_stats( diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py index da47cb8d5..1f63fdd86 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/__init__.py @@ -84,6 +84,10 @@ def create(configs): 'name': 'NetgearRAXE500AP', 'package': 'netgear_raxe500' }, + ('Netgear', 'RS700'): { + 'name': 'NetgearRS700AP', + 'package': 'netgear_rs700' + }, ('Brcm', 'Reference'): { 'name': 'BrcmRefAP', 'package': 'brcm_ref' diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py index d1420df33..5c892f137 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax120.py @@ -187,7 +187,7 @@ class NetgearRAX120AP(NetgearR7500AP): setting_to_update = {network: {}} if channel: if channel not in self.capabilities['channels'][network]: - self.log.error('Ch{} is not supported on {} interface.'.format( + raise RuntimeError('Ch{} is not supported on {} interface.'.format( channel, network)) setting_to_update[network]['channel'] = channel @@ -198,7 +198,7 @@ class NetgearRAX120AP(NetgearR7500AP): bandwidth = bandwidth.replace('bw', self.capabilities['default_mode']) if bandwidth not in self.capabilities['modes'][network]: - self.log.error('{} mode is not supported on {} interface.'.format( + raise RuntimeError('{} mode is not supported on {} interface.'.format( bandwidth, network)) setting_to_update[network]['bandwidth'] = str(bandwidth) setting_to_update['enable_ax'] = int('HE' in bandwidth) diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py index 9363bbef2..9928f33d0 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rax200.py @@ -195,7 +195,7 @@ class NetgearRAX200AP(WifiRetailAP): setting_to_update = {network: {}} if channel: if channel not in self.capabilities['channels'][network]: - self.log.error('Ch{} is not supported on {} interface.'.format( + raise RuntimeError('Ch{} is not supported on {} interface.'.format( channel, network)) setting_to_update[network]['channel'] = channel @@ -206,7 +206,7 @@ class NetgearRAX200AP(WifiRetailAP): bandwidth = bandwidth.replace('bw', self.capabilities['default_mode']) if bandwidth not in self.capabilities['modes'][network]: - self.log.error('{} mode is not supported on {} interface.'.format( + raise RuntimeError('{} mode is not supported on {} interface.'.format( bandwidth, network)) setting_to_update[network]['bandwidth'] = str(bandwidth) setting_to_update['enable_ax'] = int('HE' in bandwidth) diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py index c885e05a8..5c52b5941 100644 --- a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_raxe500.py @@ -197,7 +197,7 @@ class NetgearRAXE500AP(WifiRetailAP): setting_to_update = {network: {}} if channel: if channel not in self.capabilities['channels'][network]: - self.log.error('Ch{} is not supported on {} interface.'.format( + raise RuntimeError('Ch{} is not supported on {} interface.'.format( channel, network)) if isinstance(channel, str) and '6g' in channel: channel = int(channel[2:]) @@ -210,7 +210,7 @@ class NetgearRAXE500AP(WifiRetailAP): bandwidth = bandwidth.replace('bw', self.capabilities['default_mode']) if bandwidth not in self.capabilities['modes'][network]: - self.log.error('{} mode is not supported on {} interface.'.format( + raise RuntimeError('{} mode is not supported on {} interface.'.format( bandwidth, network)) setting_to_update[network]['bandwidth'] = str(bandwidth) setting_to_update['enable_ax'] = int('HE' in bandwidth) diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py new file mode 100644 index 000000000..8976466c6 --- /dev/null +++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_retail_ap/netgear_rs700.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import numpy +import re +import time +from acts_contrib.test_utils.wifi.wifi_retail_ap import WifiRetailAP +from acts_contrib.test_utils.wifi.wifi_retail_ap import BlockingBrowser + +BROWSER_WAIT_SHORT = 1 +BROWSER_WAIT_MED = 3 +BROWSER_WAIT_LONG = 30 +BROWSER_WAIT_EXTRA_LONG = 60 + + +class NetgearRS700AP(WifiRetailAP): + """Class that implements Netgear RS700 AP. + + Since most of the class' implementation is shared with the R7000, this + class inherits from NetgearR7000AP and simply redefines config parameters + """ + + def __init__(self, ap_settings): + super().__init__(ap_settings) + self.init_gui_data() + # Read and update AP settings + self.read_ap_firmware() + self.read_ap_settings() + self.update_ap_settings(ap_settings) + + def init_gui_data(self): + self.config_page = ( + '{protocol}://{username}:{password}@' + '{ip_address}:{port}/WLG_wireless_tri_band.htm').format( + protocol=self.ap_settings['protocol'], + username=self.ap_settings['admin_username'], + password=self.ap_settings['admin_password'], + ip_address=self.ap_settings['ip_address'], + port=self.ap_settings['port']) + self.config_page_nologin = ( + '{protocol}://{ip_address}:{port}/' + 'WLG_wireless_tri_band.htm').format( + protocol=self.ap_settings['protocol'], + ip_address=self.ap_settings['ip_address'], + port=self.ap_settings['port']) + self.config_page_advanced = ( + '{protocol}://{username}:{password}@' + '{ip_address}:{port}/WLG_adv_tri_band2.htm').format( + protocol=self.ap_settings['protocol'], + username=self.ap_settings['admin_username'], + password=self.ap_settings['admin_password'], + ip_address=self.ap_settings['ip_address'], + port=self.ap_settings['port']) + self.firmware_page = ( + '{protocol}://{username}:{password}@' + '{ip_address}:{port}/ADVANCED_home2_tri_band.htm').format( + protocol=self.ap_settings['protocol'], + username=self.ap_settings['admin_username'], + password=self.ap_settings['admin_password'], + ip_address=self.ap_settings['ip_address'], + port=self.ap_settings['port']) + self.capabilities = { + 'interfaces': ['2G', '5G_1', '6G'], + 'channels': { + '2G': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + '5G_1': [ + 36, 40, 44, 48, 52, 56, 60, 64, 100, 104, 108, 112, 116, + 120, 124, 128, 132, 136, 140, 144, 149, 153, 157, 161, 165 + ], + '6G': ['6g' + str(ch) for ch in numpy.arange(37, 222, 16)] + }, + 'modes': { + '2G': ['EHT20', 'EHT40'], + '5G_1': ['EHT40', 'EHT80', 'EHT160'], + '6G': ['EHT40', 'EHT80', 'EHT160', 'EHT320'] + }, + 'default_mode': 'EHT' + } + for interface in self.capabilities['interfaces']: + self.ap_settings[interface] = {} + + self.region_map = { + '3': 'Australia', + '4': 'Canada', + '5': 'Europe', + '7': 'Japan', + '8': 'Korea', + '11': 'North America', + '16': 'China', + '17': 'India', + '21': 'Middle East(Saudi Arabia/United Arab Emirates)', + '23': 'Singapore', + '25': 'Hong Kong', + '26': 'Vietnam' + } + + self.bw_mode_text = { + '2G': { + 'EHT20': 'Up to 0.7 Gbps', + 'EHT40': 'Up to 1.4 Gbps', + }, + '5G_1': { + 'EHT40': 'Up to 1.4 Gbps', + 'EHT80': 'Up to 2.9 Gbps', + 'EHT160': 'Up to 5.8 Gbps' + }, + '6G': { + 'EHT40': 'Up to 1.4 Gbps', + 'EHT80': 'Up to 2.9 Gbps', + 'EHT160': 'Up to 5.8 Gbps', + 'EHT320': 'Up to 11.5 Gbps', + } + } + self.bw_mode_values = { + 'HT20': 'EHT20', + 'HT40': 'EHT40', + 'HT80': 'EHT80', + 'HT160': 'EHT160', + 'HT320': 'EHT320' + } + + # Config ordering intentional to avoid GUI bugs + self.config_page_fields = collections.OrderedDict([ + ('region', 'WRegion'), (('2G', 'ssid'), 'ssid'), + (('5G_1', 'ssid'), 'ssid_an'), (('6G', 'ssid'), 'ssid_an_2'), + (('2G', 'channel'), 'w_channel'), + (('5G_1', 'channel'), 'w_channel_an'), + (('6G', 'channel'), 'w_channel_an_2'), + (('2G', 'bandwidth'), 'opmode'), + (('5G_1', 'bandwidth'), 'opmode_an'), + (('6G', 'bandwidth'), 'opmode_an_2'), + (('6G', 'security_type'), 'security_type_an_2'), + (('5G_1', 'security_type'), 'security_type_an'), + (('2G', 'security_type'), 'security_type'), + (('2G', 'password'), 'passphrase'), + (('5G_1', 'password'), 'passphrase_an'), + (('6G', 'password'), 'passphrase_an_2') + ]) + + def _set_channel_and_bandwidth(self, + network, + channel=None, + bandwidth=None): + """Helper function that sets network bandwidth and channel. + + Args: + network: string containing network identifier (2G, 5G_1, 5G_2) + channel: desired channel + bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. + """ + + setting_to_update = {network: {}} + if channel: + if channel not in self.capabilities['channels'][network]: + raise RuntimeError('Ch{} is not supported on {} interface.'.format( + channel, network)) + if isinstance(channel, str) and '6g' in channel: + channel = int(channel[2:]) + setting_to_update[network]['channel'] = channel + + if bandwidth is None: + return setting_to_update + + if 'bw' in bandwidth: + bandwidth = bandwidth.replace('bw', + self.capabilities['default_mode']) + if bandwidth not in self.capabilities['modes'][network]: + raise RuntimeError('{} mode is not supported on {} interface.'.format( + bandwidth, network)) + setting_to_update[network]['bandwidth'] = str(bandwidth) + return setting_to_update + + def set_bandwidth(self, network, bandwidth): + """Function that sets network bandwidth/mode. + + Args: + network: string containing network identifier (2G, 5G_1, 5G_2) + bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. + """ + + setting_to_update = self._set_channel_and_bandwidth( + network, bandwidth=bandwidth) + self.update_ap_settings(setting_to_update) + + def set_channel(self, network, channel): + """Function that sets network channel. + + Args: + network: string containing network identifier (2G, 5G_1, 5G_2) + channel: string or int containing channel + """ + setting_to_update = self._set_channel_and_bandwidth(network, + channel=channel) + self.update_ap_settings(setting_to_update) + + def set_channel_and_bandwidth(self, network, channel, bandwidth): + """Function that sets network bandwidth/mode. + + Args: + network: string containing network identifier (2G, 5G_1, 5G_2) + channel: desired channel + bandwidth: string containing mode, e.g. 11g, VHT20, VHT40, VHT80. + """ + setting_to_update = self._set_channel_and_bandwidth( + network, channel=channel, bandwidth=bandwidth) + self.update_ap_settings(setting_to_update) + + def read_ap_firmware(self): + """Function to read ap settings.""" + with BlockingBrowser(self.ap_settings['headless_browser'], + 900) as browser: + + # Visit URL + browser.visit_persistent(self.firmware_page, BROWSER_WAIT_MED, 10) + firmware_regex = re.compile( + r'Firmware Version[\s\S]+V(?P<version>[0-9._]+)') + #firmware_version = re.search(firmware_regex, browser.html) + firmware_version = re.search(firmware_regex, + browser.driver.page_source) + if firmware_version: + self.ap_settings['firmware_version'] = firmware_version.group( + 'version') + else: + self.ap_settings['firmware_version'] = -1 + + def read_ap_settings(self): + """Function to read ap settings.""" + with BlockingBrowser(self.ap_settings['headless_browser'], + 900) as browser: + # Visit URL + browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) + + for field_key, field_name in self.config_page_fields.items(): + field_value = browser.get_element_value(field_name) + if 'bandwidth' in field_key: + self.ap_settings[field_key[0]][ + field_key[1]] = self.bw_mode_values[field_value] + elif 'region' in field_key: + self.ap_settings['region'] = self.region_map[field_value] + elif 'security_type' in field_key: + self.ap_settings[field_key[0]][field_key[1]] = field_value + elif 'channel' in field_key: + self.ap_settings[field_key[0]][field_key[1]] = int( + field_value) + else: + self.ap_settings[field_key[0]][field_key[1]] = field_value + return self.ap_settings.copy() + + def configure_ap(self, **config_flags): + """Function to configure ap wireless settings.""" + # Configure radios + with BlockingBrowser(self.ap_settings['headless_browser'], + 900) as browser: + # Visit URL + browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) + browser.visit_persistent(self.config_page_nologin, + BROWSER_WAIT_MED, 10, self.config_page) + + # Update region, and power/bandwidth for each network + if browser.is_element_enabled(self.config_page_fields['region']): + browser.set_element_value(self.config_page_fields['region'], + self.ap_settings['region'], + select_method='text') + else: + self.log.warning('Cannot change region.') + for field_key, field_name in self.config_page_fields.items(): + if 'bandwidth' in field_key: + try: + browser.set_element_value( + field_name, + self.bw_mode_text[field_key[0]][self.ap_settings[ + field_key[0]][field_key[1]]], + select_method='text') + except AttributeError: + self.log.warning( + 'Cannot select bandwidth. Keeping AP default.') + + # Update security settings (passwords updated only if applicable) + for field_key, field_name in self.config_page_fields.items(): + if 'security_type' in field_key: + browser.set_element_value( + field_name, + self.ap_settings[field_key[0]][field_key[1]]) + if 'WPA' in self.ap_settings[field_key[0]][field_key[1]]: + browser.set_element_value( + self.config_page_fields[(field_key[0], + 'password')], + self.ap_settings[field_key[0]]['password']) + + for field_key, field_name in self.config_page_fields.items(): + if 'ssid' in field_key: + browser.set_element_value( + field_name, + self.ap_settings[field_key[0]][field_key[1]]) + elif 'channel' in field_key: + try: + browser.set_element_value( + field_name, + self.ap_settings[field_key[0]][field_key[1]]) + except AttributeError: + self.log.warning( + 'Cannot select channel. Keeping AP default.') + browser.accept_alert_if_present(BROWSER_WAIT_SHORT) + + time.sleep(BROWSER_WAIT_SHORT) + browser.click_button('Apply') + browser.accept_alert_if_present(BROWSER_WAIT_SHORT) + time.sleep(BROWSER_WAIT_SHORT) + browser.visit_persistent(self.config_page, BROWSER_WAIT_EXTRA_LONG, + 10) diff --git a/acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py b/acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py new file mode 100644 index 000000000..96b2aa0e0 --- /dev/null +++ b/acts_tests/tests/google/cellular/performance/CellularFr1RvRTest.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2022 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import csv +import itertools +import numpy +import json +import os +from acts import context +from acts import base_test +from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger +from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils +from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure +from CellularLtePlusFr1PeakThroughputTest import CellularFr1SingleCellPeakThroughputTest + +from functools import partial + + +class CellularFr1RvrTest(CellularFr1SingleCellPeakThroughputTest): + """Class to test single cell FR1 NSA sensitivity""" + + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.testcase_metric_logger = ( + BlackboxMappedMetricLogger.for_test_case()) + self.testclass_metric_logger = ( + BlackboxMappedMetricLogger.for_test_class()) + self.publish_testcase_metrics = True + self.testclass_params = self.user_params['nr_rvr_test_params'] + self.tests = self.generate_test_cases( + channel_list=['LOW', 'MID', 'HIGH'], + nr_ul_mcs=4, + lte_dl_mcs_table='QAM256', + lte_dl_mcs=4, + lte_ul_mcs_table='QAM256', + lte_ul_mcs=4, + transform_precoding=0) + + def process_testclass_results(self): + pass + + def process_testcase_results(self): + if self.current_test_name not in self.testclass_results: + return + testcase_data = self.testclass_results[self.current_test_name] + results_file_path = os.path.join( + context.get_current_context().get_full_output_path(), + '{}.json'.format(self.current_test_name)) + with open(results_file_path, 'w') as results_file: + json.dump(wputils.serialize_dict(testcase_data), + results_file, + indent=4) + + average_throughput_list = [] + theoretical_throughput_list = [] + nr_cell_index = testcase_data['testcase_params']['endc_combo_config']['lte_cell_count'] + cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][nr_cell_index] + for result in testcase_data['results']: + average_throughput_list.append( + result['throughput_measurements']['nr_tput_result']['total']['DL']['average_tput']) + theoretical_throughput_list.append( + result['throughput_measurements']['nr_tput_result']['total']['DL']['theoretical_tput']) + padding_len = len(cell_power_list) - len(average_throughput_list) + average_throughput_list.extend([0] * padding_len) + theoretical_throughput_list.extend([0] * padding_len) + + testcase_data['average_throughput_list'] = average_throughput_list + testcase_data[ + 'theoretical_throughput_list'] = theoretical_throughput_list + testcase_data['cell_power_list'] = cell_power_list + + plot = BokehFigure( + title='Band {} - RvR'.format(testcase_data['testcase_params']['endc_combo_config']['cell_list'][nr_cell_index]['band']), + x_label='Cell Power (dBm)', + primary_y_label='PHY Rate (Mbps)') + + plot.add_line( + testcase_data['cell_power_list'], + testcase_data['average_throughput_list'], + 'Average Throughput', + width=1) + plot.add_line( + testcase_data['cell_power_list'], + testcase_data['theoretical_throughput_list'], + 'Average Throughput', + width=1, + style='dashed') + plot.generate_figure() + output_file_path = os.path.join(self.log_path, '{}.html'.format(self.current_test_name)) + BokehFigure.save_figure(plot, output_file_path) + + + def get_per_cell_power_sweeps(self, testcase_params): + nr_cell_index = testcase_params['endc_combo_config']['lte_cell_count'] + start_atten = self.testclass_params['nr_cell_power_start'] + # get current cell power start + nr_cell_sweep = list( + numpy.arange(start_atten, + self.testclass_params['nr_cell_power_stop'], + self.testclass_params['nr_cell_power_step'])) + lte_sweep = [self.testclass_params['lte_cell_power'] + ] * len(nr_cell_sweep) + if nr_cell_index == 0: + cell_power_sweeps = [nr_cell_sweep] + else: + cell_power_sweeps = [lte_sweep, nr_cell_sweep] + return cell_power_sweeps + + def generate_test_cases(self, channel_list, **kwargs): + test_cases = [] + with open(self.testclass_params['nr_single_cell_configs'], + 'r') as csvfile: + test_configs = csv.DictReader(csvfile) + for test_config, channel in itertools.product( + test_configs, channel_list): + if int(test_config['skip_test']): + continue + endc_combo_config = self.generate_endc_combo_config( + test_config) + test_name = 'test_fr1_{}_{}'.format( + test_config['nr_band'], channel.lower()) + test_params = collections.OrderedDict( + endc_combo_config=endc_combo_config, + nr_dl_mcs=self.testclass_params['link_adaptation_config'], + **kwargs) + setattr(self, test_name, + partial(self._test_throughput_bler, test_params)) + test_cases.append(test_name) + return test_cases diff --git a/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py b/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py index e483e59a2..2e515cb67 100644 --- a/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py +++ b/acts_tests/tests/google/cellular/performance/CellularFr1SensitivityTest.py @@ -56,8 +56,10 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest): plots = collections.OrderedDict() compiled_data = collections.OrderedDict() for testcase_name, testcase_data in self.testclass_results.items(): + nr_cell_index = testcase_data['testcase_params'][ + 'endc_combo_config']['lte_cell_count'] cell_config = testcase_data['testcase_params'][ - 'endc_combo_config']['cell_list'][1] + 'endc_combo_config']['cell_list'][nr_cell_index] test_id = tuple(('band', cell_config['band'])) if test_id not in plots: # Initialize test id data when not present @@ -134,15 +136,19 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest): bler_list = [] average_throughput_list = [] theoretical_throughput_list = [] + nr_cell_index = testcase_data['testcase_params']['endc_combo_config'][ + 'lte_cell_count'] cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][ - 1] + nr_cell_index] for result in testcase_data['results']: - bler_list.append( - result['nr_bler_result']['total']['DL']['nack_ratio']) + bler_list.append(result['throughput_measurements'] + ['nr_bler_result']['total']['DL']['nack_ratio']) average_throughput_list.append( - result['nr_tput_result']['total']['DL']['average_tput']) + result['throughput_measurements']['nr_tput_result']['total'] + ['DL']['average_tput']) theoretical_throughput_list.append( - result['nr_tput_result']['total']['DL']['theoretical_tput']) + result['throughput_measurements']['nr_tput_result']['total'] + ['DL']['theoretical_tput']) padding_len = len(cell_power_list) - len(average_throughput_list) average_throughput_list.extend([0] * padding_len) theoretical_throughput_list.extend([0] * padding_len) @@ -160,8 +166,8 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest): sensitivity = cell_power_list[sensitivity_idx] self.log.info('NR Band {} MCS {} Sensitivity = {}dBm'.format( testcase_data['testcase_params']['endc_combo_config']['cell_list'] - [1]['band'], testcase_data['testcase_params']['nr_dl_mcs'], - sensitivity)) + [nr_cell_index]['band'], + testcase_data['testcase_params']['nr_dl_mcs'], sensitivity)) testcase_data['bler_list'] = bler_list testcase_data['average_throughput_list'] = average_throughput_list @@ -172,13 +178,14 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest): def get_per_cell_power_sweeps(self, testcase_params): # get reference test - current_band = testcase_params['endc_combo_config']['cell_list'][1][ - 'band'] + nr_cell_index = testcase_params['endc_combo_config']['lte_cell_count'] + current_band = testcase_params['endc_combo_config']['cell_list'][ + nr_cell_index]['band'] reference_test = None reference_sensitivity = None for testcase_name, testcase_data in self.testclass_results.items(): if testcase_data['testcase_params']['endc_combo_config'][ - 'cell_list'][1]['band'] == current_band: + 'cell_list'][nr_cell_index]['band'] == current_band: reference_test = testcase_name reference_sensitivity = testcase_data['sensitivity'] if reference_test and reference_sensitivity and not self.retry_flag: @@ -199,7 +206,10 @@ class CellularFr1SensitivityTest(CellularFr1SingleCellPeakThroughputTest): self.testclass_params['nr_cell_power_step'])) lte_sweep = [self.testclass_params['lte_cell_power'] ] * len(nr_cell_sweep) - cell_power_sweeps = [lte_sweep, nr_cell_sweep] + if nr_cell_index == 0: + cell_power_sweeps = [nr_cell_sweep] + else: + cell_power_sweeps = [lte_sweep, nr_cell_sweep] return cell_power_sweeps def generate_test_cases(self, channel_list, dl_mcs_list, **kwargs): diff --git a/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py b/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py index b422a3045..65fef9300 100644 --- a/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py +++ b/acts_tests/tests/google/cellular/performance/CellularLtePlusFr1PeakThroughputTest.py @@ -66,60 +66,60 @@ class CellularLtePlusFr1PeakThroughputTest(CellularThroughputBaseTest): 'nr_cell_count']: metric_map.update({ 'nr_min_dl_tput': - testcase_result['nr_tput_result']['total']['DL']['min_tput'], + testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']['min_tput'], 'nr_max_dl_tput': - testcase_result['nr_tput_result']['total']['DL']['max_tput'], + testcase_result['throughput_measurements']['nr_tput_result']['total']['DL']['max_tput'], 'nr_avg_dl_tput': - testcase_result['nr_tput_result']['total']['DL'] + testcase_result['throughput_measurements']['nr_tput_result']['total']['DL'] ['average_tput'], 'nr_theoretical_dl_tput': - testcase_result['nr_tput_result']['total']['DL'] + testcase_result['throughput_measurements']['nr_tput_result']['total']['DL'] ['theoretical_tput'], 'nr_dl_bler': - testcase_result['nr_bler_result']['total']['DL']['nack_ratio'] + testcase_result['throughput_measurements']['nr_bler_result']['total']['DL']['nack_ratio'] * 100, 'nr_min_dl_tput': - testcase_result['nr_tput_result']['total']['UL']['min_tput'], + testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']['min_tput'], 'nr_max_dl_tput': - testcase_result['nr_tput_result']['total']['UL']['max_tput'], + testcase_result['throughput_measurements']['nr_tput_result']['total']['UL']['max_tput'], 'nr_avg_dl_tput': - testcase_result['nr_tput_result']['total']['UL'] + testcase_result['throughput_measurements']['nr_tput_result']['total']['UL'] ['average_tput'], 'nr_theoretical_dl_tput': - testcase_result['nr_tput_result']['total']['UL'] + testcase_result['throughput_measurements']['nr_tput_result']['total']['UL'] ['theoretical_tput'], 'nr_ul_bler': - testcase_result['nr_bler_result']['total']['UL']['nack_ratio'] + testcase_result['throughput_measurements']['nr_bler_result']['total']['UL']['nack_ratio'] * 100 }) if testcase_data['testcase_params']['endc_combo_config'][ 'lte_cell_count']: metric_map.update({ 'lte_min_dl_tput': - testcase_result['lte_tput_result']['total']['DL']['min_tput'], + testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']['min_tput'], 'lte_max_dl_tput': - testcase_result['lte_tput_result']['total']['DL']['max_tput'], + testcase_result['throughput_measurements']['lte_tput_result']['total']['DL']['max_tput'], 'lte_avg_dl_tput': - testcase_result['lte_tput_result']['total']['DL'] + testcase_result['throughput_measurements']['lte_tput_result']['total']['DL'] ['average_tput'], 'lte_theoretical_dl_tput': - testcase_result['lte_tput_result']['total']['DL'] + testcase_result['throughput_measurements']['lte_tput_result']['total']['DL'] ['theoretical_tput'], 'lte_dl_bler': - testcase_result['lte_bler_result']['total']['DL']['nack_ratio'] + testcase_result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio'] * 100, 'lte_min_dl_tput': - testcase_result['lte_tput_result']['total']['UL']['min_tput'], + testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']['min_tput'], 'lte_max_dl_tput': - testcase_result['lte_tput_result']['total']['UL']['max_tput'], + testcase_result['throughput_measurements']['lte_tput_result']['total']['UL']['max_tput'], 'lte_avg_dl_tput': - testcase_result['lte_tput_result']['total']['UL'] + testcase_result['throughput_measurements']['lte_tput_result']['total']['UL'] ['average_tput'], 'lte_theoretical_dl_tput': - testcase_result['lte_tput_result']['total']['UL'] + testcase_result['throughput_measurements']['lte_tput_result']['total']['UL'] ['theoretical_tput'], 'lte_ul_bler': - testcase_result['lte_bler_result']['total']['UL']['nack_ratio'] + testcase_result['throughput_measurements']['lte_bler_result']['total']['UL']['nack_ratio'] * 100 }) if self.publish_testcase_metrics: @@ -160,68 +160,68 @@ class CellularLtePlusFr1PeakThroughputTest(CellularThroughputBaseTest): 'endc_combo_config']['nr_cell_count']: row_dict.update({ 'NR DL Min. Throughput': - result['nr_tput_result']['total']['DL'] + result['throughput_measurements']['nr_tput_result']['total']['DL'] ['min_tput'], 'NR DL Max. Throughput': - result['nr_tput_result']['total']['DL'] + result['throughput_measurements']['nr_tput_result']['total']['DL'] ['max_tput'], 'NR DL Avg. Throughput': - result['nr_tput_result']['total']['DL'] + result['throughput_measurements']['nr_tput_result']['total']['DL'] ['average_tput'], 'NR DL Theoretical Throughput': - result['nr_tput_result']['total']['DL'] + result['throughput_measurements']['nr_tput_result']['total']['DL'] ['theoretical_tput'], 'NR UL Min. Throughput': - result['nr_tput_result']['total']['UL'] + result['throughput_measurements']['nr_tput_result']['total']['UL'] ['min_tput'], 'NR UL Max. Throughput': - result['nr_tput_result']['total']['UL'] + result['throughput_measurements']['nr_tput_result']['total']['UL'] ['max_tput'], 'NR UL Avg. Throughput': - result['nr_tput_result']['total']['UL'] + result['throughput_measurements']['nr_tput_result']['total']['UL'] ['average_tput'], 'NR UL Theoretical Throughput': - result['nr_tput_result']['total']['UL'] + result['throughput_measurements']['nr_tput_result']['total']['UL'] ['theoretical_tput'], 'NR DL BLER (%)': - result['nr_bler_result']['total']['DL'] + result['throughput_measurements']['nr_bler_result']['total']['DL'] ['nack_ratio'] * 100, 'NR UL BLER (%)': - result['nr_bler_result']['total']['UL'] + result['throughput_measurements']['nr_bler_result']['total']['UL'] ['nack_ratio'] * 100 }) if testcase_results['testcase_params'][ 'endc_combo_config']['lte_cell_count']: row_dict.update({ 'LTE DL Min. Throughput': - result['lte_tput_result']['total']['DL'] + result['throughput_measurements']['lte_tput_result']['total']['DL'] ['min_tput'], 'LTE DL Max. Throughput': - result['lte_tput_result']['total']['DL'] + result['throughput_measurements']['lte_tput_result']['total']['DL'] ['max_tput'], 'LTE DL Avg. Throughput': - result['lte_tput_result']['total']['DL'] + result['throughput_measurements']['lte_tput_result']['total']['DL'] ['average_tput'], 'LTE DL Theoretical Throughput': - result['lte_tput_result']['total']['DL'] + result['throughput_measurements']['lte_tput_result']['total']['DL'] ['theoretical_tput'], 'LTE UL Min. Throughput': - result['lte_tput_result']['total']['UL'] + result['throughput_measurements']['lte_tput_result']['total']['UL'] ['min_tput'], 'LTE UL Max. Throughput': - result['lte_tput_result']['total']['UL'] + result['throughput_measurements']['lte_tput_result']['total']['UL'] ['max_tput'], 'LTE UL Avg. Throughput': - result['lte_tput_result']['total']['UL'] + result['throughput_measurements']['lte_tput_result']['total']['UL'] ['average_tput'], 'LTE UL Theoretical Throughput': - result['lte_tput_result']['total']['UL'] + result['throughput_measurements']['lte_tput_result']['total']['UL'] ['theoretical_tput'], 'LTE DL BLER (%)': - result['lte_bler_result']['total']['DL'] + result['throughput_measurements']['lte_bler_result']['total']['DL'] ['nack_ratio'] * 100, 'LTE UL BLER (%)': - result['lte_bler_result']['total']['UL'] + result['throughput_measurements']['lte_bler_result']['total']['UL'] ['nack_ratio'] * 100 }) writer.writerow(row_dict) @@ -316,7 +316,8 @@ class CellularLteFr1EndcPeakThroughputTest(CellularLtePlusFr1PeakThroughputTest cell_config['cell_type'] = 'NR5G' nr_cell_count = nr_cell_count + 1 cell_config['cell_number'] = nr_cell_count - nr_dl_carriers.append(cell_config['cell_number']) + nr_dl_carriers.append(cell_config['cell_number']), + cell_config['nr_cell_type'] = 'NSA', cell_config['band'] = 'N' + dl_config_match.group('band') cell_config['duplex_mode'] = 'FDD' if cell_config[ 'band'] in cputils.DUPLEX_MODE_TO_BAND_MAPPING['NR5G'][ @@ -442,6 +443,7 @@ class CellularSingleCellThroughputTest(CellularLtePlusFr1PeakThroughputTest): 1, 'band': test_config['nr_band'], + 'nr_cell_type': test_config['nr_cell_type'], 'duplex_mode': test_config['nr_duplex_mode'], 'dl_mimo_config': @@ -489,6 +491,8 @@ class CellularFr1SingleCellPeakThroughputTest(CellularSingleCellThroughputTest self.tests = self.generate_test_cases( nr_mcs_pair_list=[(27, 4), (4, 27)], nr_channel_list=['LOW', 'MID', 'HIGH'], + schedule_scenario='FULL_TPUT', + schedule_slot_ratio=80, transform_precoding=0, lte_dl_mcs=4, lte_dl_mcs_table='QAM256', @@ -507,7 +511,7 @@ class CellularFr1SingleCellPeakThroughputTest(CellularSingleCellThroughputTest continue endc_combo_config = self.generate_endc_combo_config( test_config) - endc_combo_config['cell_list'][1]['channel'] = nr_channel + endc_combo_config['cell_list'][endc_combo_config['lte_cell_count']]['channel'] = nr_channel test_name = 'test_fr1_{}_{}_dl_mcs{}_ul_mcs{}'.format( test_config['nr_band'], nr_channel.lower(), nr_mcs_pair[0], nr_mcs_pair[1]) diff --git a/acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py b/acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py new file mode 100644 index 000000000..d444bca0a --- /dev/null +++ b/acts_tests/tests/google/cellular/performance/CellularLteRvrTest.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2022 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import csv +import itertools +import numpy +import json +import re +import os +from acts import context +from acts import base_test +from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger +from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils +from acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure import BokehFigure +from CellularLtePlusFr1PeakThroughputTest import CellularLteSingleCellPeakThroughputTest + +from functools import partial + + +class CellularLteRvrTest(CellularLteSingleCellPeakThroughputTest): + """Class to test single cell LTE sensitivity""" + + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.testcase_metric_logger = ( + BlackboxMappedMetricLogger.for_test_case()) + self.testclass_metric_logger = ( + BlackboxMappedMetricLogger.for_test_class()) + self.publish_testcase_metrics = True + self.testclass_params = self.user_params['lte_sensitivity_test_params'] + self.tests = self.generate_test_cases(lte_dl_mcs_table='QAM256', + lte_ul_mcs_table='QAM256', + lte_ul_mcs=4, + transform_precoding=0) + + def process_testclass_results(self): + # Plot individual test id results raw data and compile metrics + plots = collections.OrderedDict() + compiled_data = collections.OrderedDict() + for testcase_name, testcase_data in self.testclass_results.items(): + cell_config = testcase_data['testcase_params'][ + 'endc_combo_config']['cell_list'][0] + test_id = tuple(('band', cell_config['band'])) + if test_id not in plots: + # Initialize test id data when not present + compiled_data[test_id] = { + 'mcs': [], + 'average_throughput': [], + 'theoretical_throughput': [], + 'cell_power': [], + } + plots[test_id] = BokehFigure( + title='Band {} ({}) - BLER Curves'.format( + cell_config['band'], + testcase_data['testcase_params']['lte_dl_mcs_table']), + x_label='Cell Power (dBm)', + primary_y_label='BLER (Mbps)') + test_id_rvr = test_id + tuple('RvR') + plots[test_id_rvr] = BokehFigure( + title='Band {} ({}) - RvR'.format( + cell_config['band'], + testcase_data['testcase_params']['lte_dl_mcs_table']), + x_label='Cell Power (dBm)', + primary_y_label='PHY Rate (Mbps)') + # Compile test id data and metrics + compiled_data[test_id]['average_throughput'].append( + testcase_data['average_throughput_list']) + compiled_data[test_id]['cell_power'].append( + testcase_data['cell_power_list']) + compiled_data[test_id]['mcs'].append( + testcase_data['testcase_params']['lte_dl_mcs']) + # Add test id to plots + plots[test_id].add_line( + testcase_data['cell_power_list'], + testcase_data['bler_list'], + 'MCS {}'.format( + testcase_data['testcase_params']['lte_dl_mcs']), + width=1) + plots[test_id_rvr].add_line( + testcase_data['cell_power_list'], + testcase_data['average_throughput_list'], + 'MCS {}'.format( + testcase_data['testcase_params']['lte_dl_mcs']), + width=1, + style='dashed') + + # Compute average RvRs and compute metrics over orientations + for test_id, test_data in compiled_data.items(): + test_id_rvr = test_id + tuple('RvR') + cell_power_interp = sorted(set(sum(test_data['cell_power'], []))) + average_throughput_interp = [] + for mcs, cell_power, throughput in zip( + test_data['mcs'], test_data['cell_power'], + test_data['average_throughput']): + throughput_interp = numpy.interp(cell_power_interp, + cell_power[::-1], + throughput[::-1]) + average_throughput_interp.append(throughput_interp) + rvr = numpy.max(average_throughput_interp, 0) + plots[test_id_rvr].add_line(cell_power_interp, rvr, + 'Rate vs. Range') + + figure_list = [] + for plot_id, plot in plots.items(): + plot.generate_figure() + figure_list.append(plot) + output_file_path = os.path.join(self.log_path, 'results.html') + BokehFigure.save_figures(figure_list, output_file_path) + + def process_testcase_results(self): + if self.current_test_name not in self.testclass_results: + return + testcase_data = self.testclass_results[self.current_test_name] + results_file_path = os.path.join( + context.get_current_context().get_full_output_path(), + '{}.json'.format(self.current_test_name)) + with open(results_file_path, 'w') as results_file: + json.dump(wputils.serialize_dict(testcase_data), + results_file, + indent=4) + + bler_list = [] + average_throughput_list = [] + theoretical_throughput_list = [] + cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][ + 0] + for result in testcase_data['results']: + bler_list.append( + result['throughput_measurements']['lte_bler_result']['total']['DL']['nack_ratio']) + average_throughput_list.append( + result['throughput_measurements']['lte_tput_result']['total']['DL']['average_tput']) + theoretical_throughput_list.append( + result['throughput_measurements']['lte_tput_result']['total']['DL']['theoretical_tput']) + padding_len = len(cell_power_list) - len(average_throughput_list) + average_throughput_list.extend([0] * padding_len) + theoretical_throughput_list.extend([0] * padding_len) + + testcase_data['bler_list'] = bler_list + testcase_data['average_throughput_list'] = average_throughput_list + testcase_data[ + 'theoretical_throughput_list'] = theoretical_throughput_list + testcase_data['cell_power_list'] = cell_power_list + + plot = BokehFigure( + title='Band {} - RvR'.format(testcase_data['testcase_params']['endc_combo_config']['cell_list'][0]['band']), + x_label='Cell Power (dBm)', + primary_y_label='PHY Rate (Mbps)') + + plot.add_line( + testcase_data['cell_power_list'], + testcase_data['average_throughput_list'], + 'Average Throughput', + width=1) + plot.add_line( + testcase_data['cell_power_list'], + testcase_data['theoretical_throughput_list'], + 'Average Throughput', + width=1, + style='dashed') + plot.generate_figure() + output_file_path = os.path.join(self.log_path, '{}.html'.format(self.current_test_name)) + BokehFigure.save_figure(plot, output_file_path) + + def get_per_cell_power_sweeps(self, testcase_params): + # get current cell power start + cell_power_sweeps = [ + list( + numpy.arange(self.testclass_params['lte_cell_power_start'], + self.testclass_params['lte_cell_power_stop'], + self.testclass_params['lte_cell_power_step'])) + ] + return cell_power_sweeps + + def generate_test_cases(self, lte_dl_mcs_table, + lte_ul_mcs_table, lte_ul_mcs, **kwargs): + test_cases = [] + with open(self.testclass_params['lte_single_cell_configs'], + 'r') as csvfile: + test_configs = csv.DictReader(csvfile) + for test_config in test_configs: + if int(test_config['skip_test']): + continue + endc_combo_config = self.generate_endc_combo_config( + test_config) + test_name = 'test_lte_B{}_dl_{}'.format( + test_config['lte_band'], lte_dl_mcs_table) + test_params = collections.OrderedDict( + endc_combo_config=endc_combo_config, + lte_dl_mcs_table=lte_dl_mcs_table, + lte_dl_mcs='WCQI', + lte_ul_mcs_table=lte_ul_mcs_table, + lte_ul_mcs=lte_ul_mcs, + **kwargs) + setattr(self, test_name, + partial(self._test_throughput_bler, test_params)) + test_cases.append(test_name) + return test_cases diff --git a/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py b/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py index 5e27bca5a..22e436bc1 100644 --- a/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py +++ b/acts_tests/tests/google/cellular/performance/CellularLteSensitivityTest.py @@ -141,12 +141,14 @@ class CellularLteSensitivityTest(CellularLteSingleCellPeakThroughputTest): cell_power_list = testcase_data['testcase_params']['cell_power_sweep'][ 0] for result in testcase_data['results']: - bler_list.append( - result['lte_bler_result']['total']['DL']['nack_ratio']) + bler_list.append(result['throughput_measurements'] + ['lte_bler_result']['total']['DL']['nack_ratio']) average_throughput_list.append( - result['lte_tput_result']['total']['DL']['average_tput']) + result['throughput_measurements']['lte_tput_result']['total'] + ['DL']['average_tput']) theoretical_throughput_list.append( - result['lte_tput_result']['total']['DL']['theoretical_tput']) + result['throughput_measurements']['lte_tput_result']['total'] + ['DL']['theoretical_tput']) padding_len = len(cell_power_list) - len(average_throughput_list) average_throughput_list.extend([0] * padding_len) theoretical_throughput_list.extend([0] * padding_len) diff --git a/acts_tests/tests/google/gnss/GnssConcurrencyTest.py b/acts_tests/tests/google/gnss/GnssConcurrencyTest.py index 3cbb2a7ff..f6f78f274 100644 --- a/acts_tests/tests/google/gnss/GnssConcurrencyTest.py +++ b/acts_tests/tests/google/gnss/GnssConcurrencyTest.py @@ -35,22 +35,22 @@ CONCURRENCY_TYPE = { } GPS_XML_CONFIG = { - "CS": [ - ' IgnorePosition=\"true\"\n', ' IgnoreEph=\"true\"\n', - ' IgnoreTime=\"true\"\n', ' AsstIgnoreLto=\"true\"\n', - ' IgnoreJniTime=\"true\"\n' - ], - "WS": [ - ' IgnorePosition=\"true\"\n', ' AsstIgnoreLto=\"true\"\n', - ' IgnoreJniTime=\"true\"\n' - ], - "HS": [] + "CS": { + 'IgnorePosition': 'true', 'IgnoreEph': 'true', + 'IgnoreTime': 'true', 'AsstIgnoreLto': 'true', + 'IgnoreJniTime': 'true', + }, + "WS": { + 'IgnorePosition': 'true', 'AsstIgnoreLto': 'true', + 'IgnoreJniTime': 'true', + }, + "HS": {} } -ONCHIP_CONFIG = [ - ' EnableOnChipStopNotification=\"1\"\n', - ' EnableOnChipStopNotification=\"2\"\n' -] +ONCHIP_CONFIG = { + "enable": {"EnableOnChipStopNotification": "1"}, + "disable": {"EnableOnChipStopNotification": "2"}, +} class GnssConcurrencyTest(BaseTestClass): @@ -251,6 +251,7 @@ class GnssConcurrencyTest(BaseTestClass): request_type, len(outliers[request_type])) if failure_log: + failure_log += f"The test begins at {begin_time}\n" raise signals.TestFailure(failure_log) def run_engine_switching_test(self, freq): @@ -325,12 +326,10 @@ class GnssConcurrencyTest(BaseTestClass): Args: conf_type: a string identify the config type """ - search_line_tag = "<gll\n" - append_line_str = GPS_XML_CONFIG[conf_type] - gutils.bcm_gps_xml_update_option(self.ad, "add", search_line_tag, - append_line_str) + gutils.bcm_gps_xml_update_option( + self.ad, child_tag="gll", items_to_update=GPS_XML_CONFIG[conf_type]) - def update_gps_conf(self, search_line, update_line): + def update_gps_conf(self, update_attrib): """ Update gps.xml content Args: @@ -338,7 +337,7 @@ class GnssConcurrencyTest(BaseTestClass): update_line: update content """ gutils.bcm_gps_xml_update_option( - self.ad, "update", search_line, update_txt=update_line) + self.ad, child_tag="gll", items_to_update=update_attrib) def delete_gps_conf(self, conf_type): """ Delete gps.xml content @@ -346,9 +345,8 @@ class GnssConcurrencyTest(BaseTestClass): Args: conf_type: a string identify the config type """ - search_line_tag = GPS_XML_CONFIG[conf_type] gutils.bcm_gps_xml_update_option( - self.ad, "delete", delete_txt=search_line_tag) + self.ad, child_tag="gll", items_to_delete=GPS_XML_CONFIG[conf_type].keys()) def preset_mcu_test(self, mode): """ Preseting mcu test with config and device state @@ -359,7 +357,7 @@ class GnssConcurrencyTest(BaseTestClass): self.add_ttff_conf(mode) gutils.push_lhd_overlay(self.ad) toggle_airplane_mode(self.ad.log, self.ad, new_state=True) - self.update_gps_conf(ONCHIP_CONFIG[1], ONCHIP_CONFIG[0]) + self.update_gps_conf(ONCHIP_CONFIG["enable"]) gutils.clear_aiding_data_by_gtw_gpstool(self.ad) self.ad.reboot(self.ad) self.load_chre_nanoapp() @@ -371,7 +369,7 @@ class GnssConcurrencyTest(BaseTestClass): mode: a string identify the test type """ self.delete_gps_conf(mode) - self.update_gps_conf(ONCHIP_CONFIG[0], ONCHIP_CONFIG[1]) + self.update_gps_conf(ONCHIP_CONFIG["disable"]) def get_mcu_ttff(self): """ Get mcu ttff seconds diff --git a/acts_tests/tests/google/gnss/GnssFunctionTest.py b/acts_tests/tests/google/gnss/GnssFunctionTest.py index b22cd4fd5..6f5f74a1c 100644 --- a/acts_tests/tests/google/gnss/GnssFunctionTest.py +++ b/acts_tests/tests/google/gnss/GnssFunctionTest.py @@ -14,9 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict +import datetime import os import re +import time +import functools import fnmatch +from statistics import mean from acts import asserts from acts import signals @@ -60,6 +65,8 @@ from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log +_PPS_KICKIN_WAITING_TIME_IN_SECOND = 30 + class GnssFunctionTest(BaseTestClass): """ GNSS Function Tests""" def setup_class(self): @@ -71,6 +78,7 @@ class GnssFunctionTest(BaseTestClass): "supl_hs_criteria", "standalone_cs_criteria", "wearable_reboot_hs_criteria", + "first_satellite_criteria", "default_gnss_signal_attenuation", "weak_gnss_signal_attenuation", "gnss_init_error_list", @@ -99,6 +107,7 @@ class GnssFunctionTest(BaseTestClass): gutils.enable_supl_mode(self.ad) gutils.enable_vendor_orbit_assistance_data(self.ad) gutils.disable_ramdump(self.ad) + gutils.enable_compact_and_particle_fusion_log(self.ad) def setup_test(self): log_current_epoch_time(self.ad, "test_start_time") @@ -141,6 +150,13 @@ class GnssFunctionTest(BaseTestClass): set_wifi_and_bt_scanning(self.ad, True) log_current_epoch_time(self.ad, "test_end_time") + def keep_logs(self): + # for debug cs is faster than ws issue + test_name = self.test_name + self.ad.take_bug_report(test_name, self.begin_time) + get_gnss_qxdm_log(self.ad, self.qdsp6m_path) + get_tcpdump_log(self.ad, test_name, self.begin_time) + def on_fail(self, test_name, begin_time): if self.collect_logs: self.ad.take_bug_report(test_name, begin_time) @@ -164,7 +180,7 @@ class GnssFunctionTest(BaseTestClass): path = self.user_params.get("radio_image") if isinstance(path, list): path = path[0] - if "dev/null" in path: + if not path or "dev/null" in path: self.ad.log.info("Radio image path is not defined in Test flag.") return False for path_key in os.listdir(path): @@ -245,8 +261,19 @@ class GnssFunctionTest(BaseTestClass): gutils.start_qxdm_and_tcpdump_log(self.ad, self.collect_logs) self.ad.log.info("Turn airplane mode on") toggle_airplane_mode(self.ad.log, self.ad, new_state=True) + """ + The change for arguments here is for b/277667310 + Randomized interval is used for breaking CS GLNS only fix sequence. + """ gutils.run_ttff_via_gtw_gpstool( - self.ad, mode, criteria, self.ttff_test_cycle, self.pixel_lab_location) + self.ad, + mode, + criteria, + self.ttff_test_cycle, + self.pixel_lab_location, + raninterval=True, + mininterval=15, + maxinterval=20,) """ Test Cases """ @@ -276,44 +303,88 @@ class GnssFunctionTest(BaseTestClass): asserts.assert_true(all(overall_test_result), "SUPL fail after system server restart.") - def test_cs_ttff_after_gps_service_restart(self): - """Verify cs ttff after modem silent reboot / GPS daemons restart. + def test_recovery_and_location_time_after_gnss_services_restart(self): + """Verify gpsd recover time after gpsd being killed. Steps: - 1. Trigger modem crash by adb/Restart GPS daemons by killing PID. - 2. Wait 1 minute for modem to recover. - 3. TTFF Cold Start for 3 iteration. - 4. Repeat Step 1. to Step 3. for 5 times. + 1. Start GPS tracking + 2. Restart GPS daemons by killing PID. + 3. Waiting for GPS service to restart + 4. Waiting for the first fixed + 4. Re-run steps 1~4 for 5 times. Expected Results: - All SUPL TTFF Cold Start results should be within supl_cs_criteria. + 1. The time GPSd services take to restart must be within 3 seconds. + 2. Location fix time must be within supl_hs_criteria """ - supl_ssr_test_result_all = [] + if gutils.check_chipset_vendor_by_qualcomm(self.ad): + raise signals.TestSkip("Skip the test due to Qualcomm chipset") + test_times = 5 gutils.start_qxdm_and_tcpdump_log(self.ad, self.collect_logs) - for times in range(1, 6): - begin_time = get_current_epoch_time() - if gutils.check_chipset_vendor_by_qualcomm(self.ad): - test_info = "Modem SSR" - gutils.gnss_trigger_modem_ssr_by_mds(self.ad) - else: - test_info = "restarting GPS daemons" - gutils.restart_gps_daemons(self.ad) - if not verify_internet_connection(self.ad.log, self.ad, retries=3, - expected_state=True): - raise signals.TestFailure("Fail to connect to LTE network.") - gutils.process_gnss_by_gtw_gpstool(self.ad, self.standalone_cs_criteria) - gutils.start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=3) - ttff_data = gutils.process_ttff_by_gtw_gpstool(self.ad, begin_time, - self.pixel_lab_location) - supl_ssr_test_result = gutils.check_ttff_data( - self.ad, ttff_data, ttff_mode="Cold Start", - criteria=self.supl_cs_criteria) - self.ad.log.info("SUPL after %s test %d times -> %s" % ( - test_info, times, supl_ssr_test_result)) - supl_ssr_test_result_all.append(supl_ssr_test_result) - - asserts.assert_true(all(supl_ssr_test_result_all), - "TTFF fails to reach designated criteria") + satellite_times = defaultdict(list) + location_fix_times = defaultdict(list) + + kill_functions = ( + gutils.get_gps_process_and_kill_function_by_vendor(self.ad)) + for time in range(1, test_times+1): + self.ad.log.info("Performing test times %d", time) + first_fixed_time = process_gnss_by_gtw_gpstool( + self.ad, + criteria=self.supl_hs_criteria, + clear_data=False) + + begin_time = int(first_fixed_time.timestamp() * 1000) + self.ad.log.info("Start tracking") + gutils.wait_n_mins_for_gnss_tracking(self.ad, + begin_time, + testtime=0.5, + ignore_hal_crash=False) + + + for num, (process, kill_function) in enumerate(kill_functions.items()): + kill_start_time = kill_function() + first_gpsd_update_time = (gutils.get_gpsd_update_time( + self.ad, + kill_start_time)) + self.ad.log.info("Resume tracking ... ") + gutils.wait_n_mins_for_gnss_tracking(self.ad, + begin_time, + testtime=num+1, + ignore_hal_crash=True) + + location_fix_time = (gutils. + get_location_fix_time_via_gpstool_log( + self.ad, first_gpsd_update_time)) + + satellite_times[process].append(first_gpsd_update_time - kill_start_time) + location_fix_times[process].append(location_fix_time - first_gpsd_update_time) + # gpsd recovery time : Time between gpsd killed to first satellite update. + self.ad.log.info("%s recovery time : %d ms", + process, (first_gpsd_update_time - kill_start_time)) + # TTFF Hot Start : Time between first satellite update to first location fix. + self.ad.log.info("TTFF Hot Start %d ms", + (location_fix_time - first_gpsd_update_time)) + start_gnss_by_gtw_gpstool(self.ad, state=False) + + for num, process in enumerate(kill_functions): + prop_basename = gutils.UPLOAD_TO_SPONGE_PREFIX + f"{process}_recovery_time_" + self.ad.log.info(prop_basename + "AVG %d", + mean(satellite_times[process])) + self.ad.log.info(prop_basename + "MAX %d", + max(satellite_times[process])) + prop_basename = gutils.UPLOAD_TO_SPONGE_PREFIX + f"{process}_ttff_hs_" + self.ad.log.info(prop_basename + "AVG %d", + mean(location_fix_times[process])) + self.ad.log.info(prop_basename + "MAX %d", + max(location_fix_times[process])) + asserts.assert_true(mean(satellite_times[process])/1000 <= + self.first_satellite_criteria, + f"{process} takes more than {self.first_satellite_criteria}" + "seconds in average to recover") + asserts.assert_true(mean(location_fix_times[process])/1000 <= + self.supl_hs_criteria, + f"Location fix time is more than {self.supl_hs_criteria}" + "seconds in average") def test_gnss_one_hour_tracking(self): """Verify GNSS tracking performance of signal strength and position @@ -666,9 +737,9 @@ class GnssFunctionTest(BaseTestClass): validate_gnssstatus=True) def test_location_update_after_resuming_from_deep_suspend(self): - """Verify the GPS location reported after resume from suspend mode + """Verify the GPS location reported after resume from deep doze mode 1. Enable GPS location report for 1 min to make sure the GPS is working - 2. Force DUT into deep suspend mode for a while(3 times with 15s interval) + 2. Force DUT into deep doze mode for 60s 3. Enable GPS location report for 5 mins 4. Check the report frequency 5. Check the location fix rate @@ -678,18 +749,18 @@ class GnssFunctionTest(BaseTestClass): gnss_tracking_via_gtw_gpstool(self.ad, criteria=self.supl_cs_criteria, api_type="gnss", testtime=gps_enable_minutes) result = parse_gtw_gpstool_log(self.ad, self.pixel_lab_location, api_type="gnss") - self.ad.log.debug("Location report details before suspend") + self.ad.log.debug("Location report details before deep doze") self.ad.log.debug(result) gutils.validate_location_fix_rate(self.ad, result, run_time=gps_enable_minutes, fix_rate_criteria=0.95) - gutils.deep_suspend_device(self.ad) + gutils.enter_deep_doze_mode(self.ad, lasting_time_in_seconds=60) gps_enable_minutes = 5 gnss_tracking_via_gtw_gpstool(self.ad, criteria=self.supl_cs_criteria, api_type="gnss", testtime=gps_enable_minutes) result = parse_gtw_gpstool_log(self.ad, self.pixel_lab_location, api_type="gnss") - self.ad.log.debug("Location report details after suspend") + self.ad.log.debug("Location report details after deep doze") self.ad.log.debug(result) location_report_time = list(result.keys()) @@ -812,3 +883,14 @@ class GnssFunctionTest(BaseTestClass): gutils.stop_pixel_logger(self.ad) + def test_the_diff_of_gps_clock_and_elapsed_realtime_should_be_stable(self): + gutils.start_pixel_logger(self.ad) + with gutils.full_gnss_measurement(self.ad): + first_fixed_time = gnss_tracking_via_gtw_gpstool( + self.ad, criteria=self.supl_cs_criteria, api_type="gnss", + testtime=5, meas_flag=True) + gutils.stop_pixel_logger(self.ad) + start_time = first_fixed_time + datetime.timedelta( + seconds=time.timezone + _PPS_KICKIN_WAITING_TIME_IN_SECOND) + self.ad.log.debug("Start time is %s" % start_time) + gutils.validate_diff_of_gps_clock_elapsed_realtime(self.ad, start_time) diff --git a/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py b/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py index e94626fd6..744676441 100644 --- a/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py +++ b/acts_tests/tests/google/gnss/GnssVendorFeaturesTest.py @@ -35,6 +35,7 @@ class GnssVendorFeaturesTest(BaseTestClass): gutils._init_device(self.ad) gutils.disable_supl_mode(self.ad) gutils.enable_vendor_orbit_assistance_data(self.ad) + gutils.reboot(self.ad) def setup_test(self): gutils.log_current_epoch_time(self.ad, "test_start_time") diff --git a/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py b/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py index 44fab02a3..d61930a44 100644 --- a/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py +++ b/acts_tests/tests/google/power/tel/PowerTelAirplaneMode_Test.py @@ -26,10 +26,8 @@ class PowerTelAirplaneModeTest(PB.PowerCellularPresetLabBaseTest): # Allow airplane mode to propagate time.sleep(3) - # Measure power - self.collect_power_data() - # Check if power measurement is within the required values - self.pass_fail_check(self.avg_current) + # Measure power and check against threshold + self.collect_power_data_and_validate() class PowerTelAirplaneMode_Test(PowerTelAirplaneModeTest): def test_airplane_mode(self): diff --git a/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py index 8efbc76e6..05c647281 100644 --- a/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py +++ b/acts_tests/tests/google/power/tel/PowerTelIdle_Preset_Test.py @@ -24,11 +24,8 @@ class PowerTelIdle_Preset_Test(PB.PowerCellularPresetLabBaseTest): # Wait for RRC status change to trigger self.cellular_simulator.wait_until_idle_state(idle_wait_time) - # Measure power - self.collect_power_data() - - # Check if power measurement is below the required value - self.pass_fail_check(self.avg_current) + # Measure power and check against threshold + self.collect_power_data_and_validate() def test_preset_LTE_idle(self): self.power_tel_idle_test() diff --git a/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py index a18653d92..c674a6ed8 100644 --- a/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py +++ b/acts_tests/tests/google/power/tel/PowerTelIms_Preset_Test.py @@ -16,9 +16,8 @@ import time from acts_contrib.test_utils.power.cellular.ims_api_connector_utils import ImsApiConnector -import acts_contrib.test_utils.power.cellular.cellular_power_base_test as PWCEL +from acts_contrib.test_utils.power.cellular.ims_api_connector_utils import ImsAppName from acts_contrib.test_utils.tel.tel_test_utils import set_phone_silent_mode -from acts_contrib.test_utils.tel.tel_voice_utils import hangup_call import acts_contrib.test_utils.power.cellular.cellular_power_preset_base_test as PB @@ -57,6 +56,8 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest): IMS_CLIENT = 'client' IMS_SERVER = 'server' + UE_DEFAULT_NUMBER = '001010123456789' + def setup_class(self): """ Executed only once when initializing the class. """ super().setup_class() @@ -80,23 +81,27 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest): self.unpack_userparams(api_connector_port=self.IMS_API_CONNECTOR_DEFAULT_PORT, api_token=self.IMS_CLIENT_DEFAULT_API_TOKEN, ims_client_ip=self.IMS_CLIENT_DEFAULT_IP, - ims_client_port=self.IMS_CLIENT_DEFAULT_PORT) + ims_client_port=self.IMS_CLIENT_DEFAULT_PORT, + ue_number=self.UE_DEFAULT_NUMBER) self.ims_client = ImsApiConnector( self.uxm_ip, self.api_connector_port, - self.IMS_CLIENT, - self.api_token, - self.ims_client_ip, - self.ims_client_port, - self.log + ImsAppName.CLIENT + ) + + self.ims_server = ImsApiConnector( + self.uxm_ip, + self.api_connector_port, + ImsAppName.SERVER ) def setup_test(self): # Enable NR if it is VoNR test case self.log.info(f'test name: {self.test_name}') + self.ims_server.restart_server() if 'NR' in self.test_name: self.log.info('Enable VoNR for UE.') - self.enable_ims_nr() + self.at_util.enable_ims_nr() super().setup_test() def power_ims_call_test(self): @@ -114,7 +119,7 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest): # Initiate the voice call self.log.info('Callbox initiates call to UE.') - self.ims_client.initiate_call('001010123456789') + self.ims_client.initiate_call(self.ue_number) time.sleep(5) @@ -122,28 +127,29 @@ class PowerTelImsPresetTest(PB.PowerCellularPresetLabBaseTest): self.log.info('UE pick up call.') self.dut.adb.shell('input keyevent KEYCODE_CALL') + # Set mac padding + if 'NR' in self.test_name: + self.cellular_simulator.modify_dl_ul_mac_padding() + # Mute the call self.dut.droid.telecomCallMute() # Turn of screen self.dut.droid.goToSleepNow() - # Measure power - self.collect_power_data() - - # End the call - hangup_call(self.log, self.dut) - - # Check if power measurement is within the required values - self.pass_fail_check() + # Measure power and check against threshold + self.collect_power_data_and_validate() def teardown_test(self): super().teardown_test() - #self.cellular_simulator.deregister_ue_ims() - self.ims_client.remove_ims_app_link() + # End the call + self.log.info('Hangup.') + self.ims_client.hangup_call() def teardown_class(self): super().teardown_class() + self.ims_client.tear_down() + self.ims_server.tear_down() self.log.info('Disable IMS.') self.dut.adb.shell(self.ADB_CMD_DISABLE_IMS) diff --git a/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py index 3f023f3d0..ceb2223ff 100644 --- a/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py +++ b/acts_tests/tests/google/power/tel/PowerTelPdcchFr2_Preset_Test.py @@ -23,11 +23,8 @@ class PowerTelPdcchFr2_Preset_Test(PB.PowerCellularPresetLabBaseTest): Requirements for this test are that mac padding is off and that the inactivity timer is not enabled. """ - # Measure power - self.collect_power_data() - - # Check if power measurement is within the required values - self.pass_fail_check(self.avg_current) + # Measure power and check against threshold + self.collect_power_data_and_validate() def test_preset_nsa_cdrx_fr2(self): self.power_pdcch_test() diff --git a/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py index 270463853..937b953c7 100644 --- a/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py +++ b/acts_tests/tests/google/power/tel/PowerTelPdcch_Preset_Test.py @@ -22,11 +22,8 @@ class PowerTelPdcch_Preset_Test(PB.PowerCellularPresetLabBaseTest): Requirements for this test are that mac padding is off and that the inactivity timer is not enabled. """ - # Measure power - self.collect_power_data() - - # Check if power measurement is within the required values - self.pass_fail_check(self.avg_current) + # Measure power and check against threshold + self.collect_power_data_and_validate() def test_preset_sa_pdcch_fr1(self): self.power_pdcch_test() diff --git a/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py b/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py index d05c80154..4242307df 100644 --- a/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py +++ b/acts_tests/tests/google/power/tel/PowerTelTraffic_Preset_Test.py @@ -14,9 +14,9 @@ import os import time +from acts import asserts import acts_contrib.test_utils.power.cellular.cellular_power_preset_base_test as PB - class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): # command to enable mobile data ADB_CMD_ENABLE_MOBILE_DATA = 'svc data enable' @@ -26,7 +26,7 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): # command to start iperf server on UE # (require: 1.path to iperf exe 2.hostname/hostIP) - START_IPERF_CLIENT_UE_CMD = 'nohup > /dev/null 2>&1 sh -c "iperf3 -c {iperf_host_ip} -i1 -p5202 -w8m -t2000 > /dev/null &"' + START_IPERF_CLIENT_UE_CMD = 'nohup > /dev/null 2>&1 sh -c "iperf3 -c {iperf_host_ip} -i1 -p5202 -w8m -t2000 -O{second} > /dev/null &"' # command to start iperf server on host() START_IPERF_SV_HOST_CMD = '{exe_path}\\iperf3 -s -p5202' @@ -34,10 +34,10 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): # command to start iperf client on host # (require: 1.path to iperf exe 2.UE IP) START_IPERF_CLIENT_HOST_CMD = ( - '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201') + '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201 -O{second}') START_IPERF_CLIENT_HOST_CMD_FR2 = ( - '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201 -P32') + '{exe_path}\\iperf3 -c {ue_ip} -w16M -t1000 -p5201 -O{second}') def __init__(self, controllers): super().__init__(controllers) @@ -63,6 +63,13 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): # Call parent method first to setup simulation super().setup_test() + # get tput configs + self.unpack_userparams(abnormal_bandwidth_tolerance=0.1, + bandwidth_tolerance=0.1, + n_second_to_omitted=45) + self.expected_downlink_bandwidth = float(self.test_configs[self.test_name]['tput']['downlink'].split()[0]) + self.expected_uplink_bandwidth = float(self.test_configs[self.test_name]['tput']['uplink'].split()[0]) + # setup ssh client self.ssh_iperf_client = self.cellular_simulator.create_ssh_client() self.ssh_iperf_server = self.cellular_simulator.create_ssh_client() @@ -73,19 +80,81 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): """Measure power while data is transferring.""" # Start data traffic self.start_uplink_process() - time.sleep(5) self.start_downlink_process() - # Measure power - self.collect_power_data() + # Measure power and check against threshold + self.collect_power_data_and_validate() + def _end_iperfs(self): + err_message = [] # Write iperf log self.ssh_iperf_server.close() uplink_log_name = self.test_name + '_uplink.txt' - self._write_iperf_log(uplink_log_name, self.ssh_iperf_server) + out, err = self.iperf_out_err[self.ssh_iperf_server] + output_content = ''.join(out.readlines()) + err_content = ''.join(err.readlines()) + self._write_iperf_log(uplink_log_name, output_content + err_content) + if err_content.strip(): + err_message.append(f'Uplink process fail due to error: {err_content}\n') + else: + if not self._iperf_log_check(output_content, self.expected_uplink_bandwidth): + err_message.append('Bandwidth of uplink process is unstable.') + self.ssh_iperf_client.close() downlink_log_name = self.test_name + '_downlink.txt' - self._write_iperf_log(downlink_log_name, self.ssh_iperf_client) + out, err = self.iperf_out_err[self.ssh_iperf_client] + output_content = ''.join(out.readlines()) + err_content = ''.join(err.readlines()) + self._write_iperf_log(downlink_log_name, output_content + err_content) + if err_content.strip(): + err_message.append(f'Downlink process fail due to error: {err_content}\n') + else: + if not self._iperf_log_check(output_content, self.expected_downlink_bandwidth): + err_message.append('Bandwidth of downlink process is unstable.') + + if err_message: + raise RuntimeError('\n'.join(err_message)) + + def teardown_test(self): + try: + self._end_iperfs() + except RuntimeError as re: + raise re + finally: + super().teardown_test() + + def _iperf_log_check(self, file, expected_bandwidth): + """Check iperf log and abnormal bandwidth instances. + + Args: + file: file object of iperf log to be checked. + expected_bandwidth: integer value for expected bandwidth. + Returns: + True if log is normal, False otherwise. + """ + # example of record line + #[ 4] 0.00-1.00 sec 20.2 MBytes 169 Mbits/sec + total_abnormal_entries = 0 + total_record_entries = 0 + bandwidth_val_idx = 6 + record_entry_total_cols = 8 + lines = file.split('\n') + acceptable_difference = self.bandwidth_tolerance * expected_bandwidth + self.log.debug('Expected bandwidth: %f', expected_bandwidth) + self.log.debug('Acceptance difference: %f', acceptable_difference) + for line in lines: + cols = line.split() + self.log.debug(cols) + if len(cols) == record_entry_total_cols: + total_record_entries += 1 + bandwidth = float(cols[bandwidth_val_idx]) + self.log.debug('bandwidth: %f', bandwidth) + if abs(bandwidth - expected_bandwidth) > acceptable_difference: + total_abnormal_entries += 1 + if not total_record_entries: + raise RuntimeError('No tput data record found.') + self.log.debug('Total abnormal entries: %d - Total record: %d', total_abnormal_entries, total_record_entries) + return (total_abnormal_entries/total_record_entries) <= self.abnormal_bandwidth_tolerance def _exec_ssh_cmd(self, ssh_client, cmd): """Execute command on given ssh client. @@ -112,11 +181,13 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): if 'fr2' in self.test_name: cmd = self.START_IPERF_CLIENT_HOST_CMD_FR2.format( exe_path=self.iperf_exe_path, - ue_ip=self.ue_ip) + ue_ip=self.ue_ip, + second=self.n_second_to_omitted) else: cmd = self.START_IPERF_CLIENT_HOST_CMD.format( exe_path=self.iperf_exe_path, - ue_ip=self.ue_ip) + ue_ip=self.ue_ip, + second=self.n_second_to_omitted) if not cmd: raise RuntimeError('Cannot format command to start iperf client.') @@ -134,29 +205,25 @@ class PowerTelTrafficPresetTest(PB.PowerCellularPresetLabBaseTest): time.sleep(5) # start UE iperf adb_cmd = self.START_IPERF_CLIENT_UE_CMD.format( - iperf_host_ip=self.iperf_host_ip) + iperf_host_ip=self.iperf_host_ip, + second=self.n_second_to_omitted) self.cellular_dut.ad.adb.shell(adb_cmd) self.log.info('cmd sent to UE: ' + adb_cmd) self.log.info('UE iperf client started') time.sleep(5) - def _write_iperf_log(self, file_name, ssh): + def _write_iperf_log(self, file_name, content): """ Writing ssh stdout and stdin to log file. Args: - file_name: log file name to write log to. - ssh: paramiko client object. + file_name: Log file name to write log to. + content: Content to write to file. """ iperf_log_dir = os.path.join(self.root_output_path, 'iperf') os.makedirs(iperf_log_dir, exist_ok=True) iperf_log_file_path = os.path.join(iperf_log_dir, file_name) with open(iperf_log_file_path, 'w') as f: - out, err = self.iperf_out_err[ssh] - out_content = ''.join(out.readlines()) - err_content = ''.join(err.readlines()) - f.write(out_content) - f.write('\nErrors:\n') - f.write(err_content) + f.write(content) def turn_on_mobile_data(self): self.dut.adb.shell(self.ADB_CMD_ENABLE_MOBILE_DATA) diff --git a/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py index daf9b1a9e..b3cc779ab 100644 --- a/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py +++ b/acts_tests/tests/google/wifi/WifiEnterpriseRoamingTest.py @@ -50,7 +50,9 @@ class WifiEnterpriseRoamingTest(WifiBaseTest): "wifi6_models", "radius_conf_2g", "radius_conf_5g") - self.unpack_userparams(req_params) + opt_param = ["domain_suffix_match"] + self.unpack_userparams(req_params, + opt_param) if "AccessPoint" in self.user_params: self.legacy_configure_ap_and_start( mirror_ap=True, @@ -81,15 +83,17 @@ class WifiEnterpriseRoamingTest(WifiBaseTest): Ent.IDENTITY: self.eap_identity, Ent.PASSWORD: self.eap_password, Ent.PHASE2: int(EapPhase2.MSCHAPV2), + Ent.DOM_SUFFIX_MATCH: self.domain_suffix_match, WifiEnums.SSID_KEY: self.ent_roaming_ssid } self.config_tls = { Ent.EAP: int(EAP.TLS), Ent.CA_CERT: self.ca_cert, - WifiEnums.SSID_KEY: self.ent_roaming_ssid, Ent.CLIENT_CERT: self.client_cert, Ent.PRIVATE_KEY_ID: self.client_key, Ent.IDENTITY: self.eap_identity, + Ent.DOM_SUFFIX_MATCH: self.domain_suffix_match, + WifiEnums.SSID_KEY: self.ent_roaming_ssid } self.config_ttls = { Ent.EAP: int(EAP.TTLS), @@ -97,6 +101,7 @@ class WifiEnterpriseRoamingTest(WifiBaseTest): Ent.IDENTITY: self.eap_identity, Ent.PASSWORD: self.eap_password, Ent.PHASE2: int(EapPhase2.MSCHAPV2), + Ent.DOM_SUFFIX_MATCH: self.domain_suffix_match, WifiEnums.SSID_KEY: self.ent_roaming_ssid } self.config_sim = { diff --git a/acts_tests/tests/google/wifi/WifiPingTest.py b/acts_tests/tests/google/wifi/WifiPingTest.py index ca1ccd7db..a1ea5d907 100644 --- a/acts_tests/tests/google/wifi/WifiPingTest.py +++ b/acts_tests/tests/google/wifi/WifiPingTest.py @@ -868,30 +868,13 @@ class WifiOtaPing_TenDegree_Test(WifiOtaPingTest): self.tests = self.generate_test_cases( ap_power='standard', channels=[6, 36, 149, '6g37', '6g117', '6g213'], - modes=['bw20'], + modes=['bw20', 'bw80', 'bw160'], chain_masks=['2x2'], chamber_mode='orientation', positions=list(range(0, 360, 10)), reference_params=['channel', 'mode', 'chain_mask']) -class WifiOtaPing_45Degree_Test(WifiOtaPingTest): - - def __init__(self, controllers): - WifiOtaPingTest.__init__(self, controllers) - self.tests = self.generate_test_cases( - ap_power='standard', - channels=[ - 1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161, '6g37', '6g117', - '6g213' - ], - modes=['bw20'], - chain_masks=['2x2'], - chamber_mode='orientation', - positions=list(range(0, 360, 45)), - reference_params=['channel', 'mode', 'chain_mask']) - - class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest): def __init__(self, controllers): @@ -920,20 +903,6 @@ class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest): reference_params=['channel', 'mode', 'chain_mask']) -class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest): - - def __init__(self, controllers): - WifiOtaPingTest.__init__(self, controllers) - self.tests = self.generate_test_cases( - ap_power='low_power', - channels=[1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], - modes=['bw20'], - chain_masks=['2x2'], - chamber_mode='orientation', - positions=list(range(0, 360, 45)), - reference_params=['channel', 'mode', 'chain_mask']) - - class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest): def __init__(self, controllers): diff --git a/acts_tests/tests/google/wifi/WifiRssiTest.py b/acts_tests/tests/google/wifi/WifiRssiTest.py index 452591f85..eb00327b6 100644 --- a/acts_tests/tests/google/wifi/WifiRssiTest.py +++ b/acts_tests/tests/google/wifi/WifiRssiTest.py @@ -850,25 +850,6 @@ class WifiRssiTest(base_test.BaseTestClass): return test_cases -class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest): - - def __init__(self, controllers): - super().__init__(controllers) - self.tests = self.generate_test_cases( - ['test_rssi_stability', 'test_rssi_vs_atten'], [1, 2, 6, 10, 11], - ['bw20'], ['ActiveTraffic']) - - -class WifiRssi_5GHz_ActiveTraffic_Test(WifiRssiTest): - - def __init__(self, controllers): - super().__init__(controllers) - self.tests = self.generate_test_cases( - ['test_rssi_stability', 'test_rssi_vs_atten'], - [36, 40, 44, 48, 149, 153, 157, 161], ['bw20', 'bw40', 'bw80'], - ['ActiveTraffic']) - - class WifiRssi_AllChannels_ActiveTraffic_Test(WifiRssiTest): def __init__(self, controllers): @@ -885,8 +866,8 @@ class WifiRssi_SampleChannels_NoTraffic_Test(WifiRssiTest): def __init__(self, controllers): super().__init__(controllers) self.tests = self.generate_test_cases( - ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149], - ['bw20', 'bw40', 'bw80'], ['NoTraffic']) + ['test_rssi_stability', 'test_rssi_vs_atten'], [6, 36, 149, '6g37'], + ['bw20', 'bw40', 'bw80', 'bw160'], ['NoTraffic']) class WifiRssiTrackingTest(WifiRssiTest): @@ -894,8 +875,8 @@ class WifiRssiTrackingTest(WifiRssiTest): def __init__(self, controllers): super().__init__(controllers) self.tests = self.generate_test_cases(['test_rssi_tracking'], - [6, 36, 149], - ['bw20', 'bw40', 'bw80'], + [6, 36, 149, '6g37'], + ['bw20', 'bw40', 'bw80', 'bw160'], ['ActiveTraffic', 'NoTraffic']) @@ -1109,24 +1090,13 @@ class WifiOtaRssiTest(WifiRssiTest): test_cases.append(test_name) return test_cases - -class WifiOtaRssi_Accuracy_Test(WifiOtaRssiTest): - - def __init__(self, controllers): - super().__init__(controllers) - self.tests = self.generate_test_cases(['test_rssi_vs_atten'], - [6, 36, 149, '6g37'], ['bw20'], - ['ActiveTraffic'], - ['orientation'], - list(range(0, 360, 45))) - - class WifiOtaRssi_StirrerVariation_Test(WifiOtaRssiTest): def __init__(self, controllers): WifiRssiTest.__init__(self, controllers) self.tests = self.generate_test_cases(['test_rssi_variation'], - [6, 36, 149, '6g37'], ['bw20'], + [6, 36, 149, '6g37'], + ['bw20', 'bw80', 'bw160'], ['ActiveTraffic'], ['StirrersOn'], [0]) @@ -1136,7 +1106,8 @@ class WifiOtaRssi_TenDegree_Test(WifiOtaRssiTest): def __init__(self, controllers): WifiRssiTest.__init__(self, controllers) self.tests = self.generate_test_cases(['test_rssi_over_orientation'], - [6, 36, 149, '6g37'], ['bw20'], + [6, 36, 149, '6g37'], + ['bw20', 'bw80', 'bw160'], ['ActiveTraffic'], ['orientation'], list(range(0, 360, 10))) diff --git a/acts_tests/tests/google/wifi/WifiRvrTest.py b/acts_tests/tests/google/wifi/WifiRvrTest.py index 4a6455218..cb188fd50 100644 --- a/acts_tests/tests/google/wifi/WifiRvrTest.py +++ b/acts_tests/tests/google/wifi/WifiRvrTest.py @@ -1097,6 +1097,19 @@ class WifiOtaRvr_SampleChannel_Test(WifiOtaRvrTest): self.generate_test_cases(['6g37'], ['bw160'], list(range(0, 360, 45)), ['TCP'], ['DL'])) +class WifiOtaRvr_SampleChannel_UDP_Test(WifiOtaRvrTest): + + def __init__(self, controllers): + WifiOtaRvrTest.__init__(self, controllers) + self.tests = self.generate_test_cases([6], ['bw20'], + list(range(0, 360, 45)), ['UDP'], + ['DL']) + self.tests.extend( + self.generate_test_cases([36, 149], ['bw80', 'bw160'], + list(range(0, 360, 45)), ['UDP'], ['DL'])) + self.tests.extend( + self.generate_test_cases(['6g37'], ['bw160'], + list(range(0, 360, 45)), ['UDP'], ['DL'])) class WifiOtaRvr_SingleOrientation_Test(WifiOtaRvrTest): diff --git a/acts_tests/tests/google/wifi/WifiSensitivityTest.py b/acts_tests/tests/google/wifi/WifiSensitivityTest.py index 535572d6c..58f938d85 100644 --- a/acts_tests/tests/google/wifi/WifiSensitivityTest.py +++ b/acts_tests/tests/google/wifi/WifiSensitivityTest.py @@ -248,7 +248,7 @@ class WifiSensitivityTest(WifiRvrTest, WifiPingTest): for plot_id, plot in plots.items(): plot.generate_figure() figure_list.append(plot) - output_file_path = os.path.join(self.log_path, 'results.html') + output_file_path = os.path.join(self.log_path, 'PER_curves.html') BokehFigure.save_figures(figure_list, output_file_path) def process_testclass_results(self): diff --git a/acts_tests/tests/google/wifi/WifiSoftApTest.py b/acts_tests/tests/google/wifi/WifiSoftApTest.py index 3aa815c45..12cf328da 100644 --- a/acts_tests/tests/google/wifi/WifiSoftApTest.py +++ b/acts_tests/tests/google/wifi/WifiSoftApTest.py @@ -190,8 +190,6 @@ class WifiSoftApTest(WifiBaseTest): initial_cell_state = tel_utils.is_sim_ready(self.log, self.dut) self.dut.log.info("current state: %s", initial_wifi_state) self.dut.log.info("is sim ready? %s", initial_cell_state) - if initial_cell_state: - self.check_cell_data_and_enable() config = self.create_softap_config() wutils.start_wifi_tethering(self.dut, config[wutils.WifiEnums.SSID_KEY], diff --git a/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py b/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py index 3b55b6c03..671e78c6f 100644 --- a/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py +++ b/acts_tests/tests/google/wifi/WifiWpa3EnterpriseTest.py @@ -109,11 +109,12 @@ class WifiWpa3EnterpriseTest(WifiBaseTest): logcat_msg = "E WifiKeyStore: Invalid certificate type for Suite-B" try: wutils.connect_to_wifi_network(self.dut, config) - asserts.fail("WPA3 Ent worked with insecure RSA key. Expected to fail.") except: logcat_search = self.dut.search_logcat(logcat_msg) self.log.info("Logcat search results: %s" % logcat_search) asserts.assert_true(logcat_search, "No valid error msg in logcat") + else: + asserts.fail("WPA3 Ent worked with insecure RSA key. Expected to fail.") @test_tracker_info(uuid="897957f3-de25-4f9e-b6fc-9d7798ea1e6f") def test_connect_to_wpa3_enterprise_expired_rsa_cert(self): @@ -130,11 +131,12 @@ class WifiWpa3EnterpriseTest(WifiBaseTest): logcat_msg = "E WifiKeyStore: Invalid certificate type for Suite-B" try: wutils.connect_to_wifi_network(self.dut, config) - asserts.fail("WPA3 Ent worked with expired cert. Expected to fail.") except: logcat_search = self.dut.search_logcat(logcat_msg) self.log.info("Logcat search results: %s" % logcat_search) asserts.assert_true(logcat_search, "No valid error msg in logcat") + else: + asserts.fail("WPA3 Ent worked with expired cert. Expected to fail.") @test_tracker_info(uuid="f7ab30e2-f2b5-488a-8667-e45920fc24d1") def test_connect_to_wpa3_enterprise_corrupted_rsa_cert(self): @@ -150,9 +152,10 @@ class WifiWpa3EnterpriseTest(WifiBaseTest): } try: wutils.connect_to_wifi_network(self.dut, config) - asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.") except: asserts.explicit_pass("Connection failed as expected.") + else: + asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.") @test_tracker_info(uuid="f934f388-dc0b-4c78-a493-026b798c15ca") def test_connect_to_wpa3_enterprise_unsigned_rsa_cert(self): @@ -168,9 +171,10 @@ class WifiWpa3EnterpriseTest(WifiBaseTest): } try: wutils.connect_to_wifi_network(self.dut, config) - asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.") except: asserts.explicit_pass("Connection failed as expected.") + else: + asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.") @test_tracker_info(uuid="7082dc90-5eb8-4055-8b48-b555a98a837a") def test_connect_to_wpa3_enterprise_wrong_domain_rsa_cert(self): @@ -186,9 +190,10 @@ class WifiWpa3EnterpriseTest(WifiBaseTest): } try: wutils.connect_to_wifi_network(self.dut, config) - asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.") except: asserts.explicit_pass("Connection failed as expected.") + else: + asserts.fail("WPA3 Ent worked with unsigned cert. Expected to fail.") @test_tracker_info(uuid="9ad5fd82-f115-42c3-b8e8-520144485ea1") def test_network_selection_status_for_wpa3_ent_wrong_domain_rsa_cert(self): @@ -204,10 +209,11 @@ class WifiWpa3EnterpriseTest(WifiBaseTest): } try: wutils.connect_to_wifi_network(self.dut, config) - asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.") except: asserts.assert_true( self.dut.droid.wifiIsNetworkTemporaryDisabledForNetwork(config), "WiFi network is not temporary disabled.") asserts.explicit_pass( "Connection failed with correct network selection status.") + else: + asserts.fail("WPA3 Ent worked with corrupted cert. Expected to fail.") diff --git a/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py b/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py index 69dc42c84..7c061f24c 100644 --- a/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py +++ b/acts_tests/tests/google/wifi/aware/functional/DiscoveryTest.py @@ -293,11 +293,6 @@ class DiscoveryTest(AwareBaseTest): autils.wait_for_event(s_dut, aconsts.SESSION_CB_ON_SESSION_TERMINATED) - - # verify that there were no other events - autils.verify_no_more_events(p_dut, timeout=0) - autils.verify_no_more_events(s_dut, timeout=0) - # verify that forbidden callbacks aren't called autils.validate_forbidden_callbacks(p_dut, {aconsts.CB_EV_MATCH: 0}) |