summaryrefslogtreecommitdiff
path: root/acts_tests/acts_contrib/test_utils/wifi/wifi_test_utils.py
blob: 5af197101deec243461da56ea709858ce7aa195e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
#!/usr/bin/env python3
#
#   Copyright 2016 Google, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import ipaddress
import logging
import os
import re
import shutil
import random
import subprocess
import time

from retry import retry
from typing import Optional, Union

from collections import namedtuple
from enum import IntEnum
from queue import Empty

from acts import asserts
from acts import context
from acts import signals
from acts import utils
from acts.controllers import attenuator
from acts.controllers.ap_lib import hostapd_security
from acts.controllers.ap_lib import hostapd_ap_preset
from acts.controllers.ap_lib.hostapd_constants import BAND_2G
from acts.controllers.ap_lib.hostapd_constants import BAND_5G
from acts_contrib.test_utils.net import connectivity_const as cconsts
from acts_contrib.test_utils.tel import tel_defines
from acts_contrib.test_utils.wifi import wifi_constants
from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils

# Default timeout used for reboot, toggle WiFi and Airplane mode,
# for the system to settle down after the operation.
DEFAULT_TIMEOUT = 10
# Number of seconds to wait for events that are supposed to happen quickly.
# Like onSuccess for start background scan and confirmation on wifi state
# change.
SHORT_TIMEOUT = 30
ROAMING_TIMEOUT = 30
WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
DEFAULT_SCAN_TRIES = 3
DEFAULT_CONNECT_TRIES = 3
# Speed of light in m/s.
SPEED_OF_LIGHT = 299792458
# WiFi scan retry interval
WIFI_SCAN_RETRY_INTERVAL_SEC = 5

DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"

ROAMING_ATTN = {
    "AP1_on_AP2_off": [0, 0, 95, 95],
    "AP1_off_AP2_on": [95, 95, 0, 0],
    "default": [0, 0, 0, 0]
}


class WifiEnums():

    SSID_KEY = "SSID"  # Used for Wifi & SoftAp
    SSID_PATTERN_KEY = "ssidPattern"
    NETID_KEY = "network_id"
    BSSID_KEY = "BSSID"  # Used for Wifi & SoftAp
    BSSID_PATTERN_KEY = "bssidPattern"
    PWD_KEY = "password"  # Used for Wifi & SoftAp
    frequency_key = "frequency"
    HIDDEN_KEY = "hiddenSSID"  # Used for Wifi & SoftAp
    IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
    IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
    IS_SUGGESTION_METERED = "isMetered"
    PRIORITY = "priority"
    SECURITY = "security"  # Used for Wifi & SoftAp

    # Used for SoftAp
    AP_BAND_KEY = "apBand"
    AP_CHANNEL_KEY = "apChannel"
    AP_BANDS_KEY = "apBands"
    AP_CHANNEL_FREQUENCYS_KEY = "apChannelFrequencies"
    AP_MAC_RANDOMIZATION_SETTING_KEY = "MacRandomizationSetting"
    AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY = "BridgedModeOpportunisticShutdownEnabled"
    AP_IEEE80211AX_ENABLED_KEY = "Ieee80211axEnabled"
    AP_MAXCLIENTS_KEY = "MaxNumberOfClients"
    AP_SHUTDOWNTIMEOUT_KEY = "ShutdownTimeoutMillis"
    AP_SHUTDOWNTIMEOUTENABLE_KEY = "AutoShutdownEnabled"
    AP_CLIENTCONTROL_KEY = "ClientControlByUserEnabled"
    AP_ALLOWEDLIST_KEY = "AllowedClientList"
    AP_BLOCKEDLIST_KEY = "BlockedClientList"

    WIFI_CONFIG_SOFTAP_BAND_2G = 1
    WIFI_CONFIG_SOFTAP_BAND_5G = 2
    WIFI_CONFIG_SOFTAP_BAND_2G_5G = 3
    WIFI_CONFIG_SOFTAP_BAND_6G = 4
    WIFI_CONFIG_SOFTAP_BAND_2G_6G = 5
    WIFI_CONFIG_SOFTAP_BAND_5G_6G = 6
    WIFI_CONFIG_SOFTAP_BAND_ANY = 7

    # DO NOT USE IT for new test case! Replaced by WIFI_CONFIG_SOFTAP_BAND_
    WIFI_CONFIG_APBAND_2G = WIFI_CONFIG_SOFTAP_BAND_2G
    WIFI_CONFIG_APBAND_5G = WIFI_CONFIG_SOFTAP_BAND_5G
    WIFI_CONFIG_APBAND_AUTO = WIFI_CONFIG_SOFTAP_BAND_2G_5G

    WIFI_CONFIG_APBAND_2G_OLD = 0
    WIFI_CONFIG_APBAND_5G_OLD = 1
    WIFI_CONFIG_APBAND_AUTO_OLD = -1

    WIFI_WPS_INFO_PBC = 0
    WIFI_WPS_INFO_DISPLAY = 1
    WIFI_WPS_INFO_KEYPAD = 2
    WIFI_WPS_INFO_LABEL = 3
    WIFI_WPS_INFO_INVALID = 4

    class SoftApSecurityType():
        OPEN = "NONE"
        WPA2 = "WPA2_PSK"
        WPA3_SAE_TRANSITION = "WPA3_SAE_TRANSITION"
        WPA3_SAE = "WPA3_SAE"

    class CountryCode():
        AUSTRALIA = "AU"
        CHINA = "CN"
        GERMANY = "DE"
        JAPAN = "JP"
        UK = "GB"
        US = "US"
        UNKNOWN = "UNKNOWN"

    # Start of Macros for EAP
    # EAP types
    class Eap(IntEnum):
        NONE = -1
        PEAP = 0
        TLS = 1
        TTLS = 2
        PWD = 3
        SIM = 4
        AKA = 5
        AKA_PRIME = 6
        UNAUTH_TLS = 7

    # EAP Phase2 types
    class EapPhase2(IntEnum):
        NONE = 0
        PAP = 1
        MSCHAP = 2
        MSCHAPV2 = 3
        GTC = 4

    class Enterprise:
        # Enterprise Config Macros
        EMPTY_VALUE = "NULL"
        EAP = "eap"
        PHASE2 = "phase2"
        IDENTITY = "identity"
        ANON_IDENTITY = "anonymous_identity"
        PASSWORD = "password"
        SUBJECT_MATCH = "subject_match"
        ALTSUBJECT_MATCH = "altsubject_match"
        DOM_SUFFIX_MATCH = "domain_suffix_match"
        CLIENT_CERT = "client_cert"
        CA_CERT = "ca_cert"
        ENGINE = "engine"
        ENGINE_ID = "engine_id"
        PRIVATE_KEY_ID = "key_id"
        REALM = "realm"
        PLMN = "plmn"
        FQDN = "FQDN"
        FRIENDLY_NAME = "providerFriendlyName"
        ROAMING_IDS = "roamingConsortiumIds"
        OCSP = "ocsp"

    # End of Macros for EAP

    # Macros for wifi p2p.
    WIFI_P2P_SERVICE_TYPE_ALL = 0
    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
    WIFI_P2P_SERVICE_TYPE_UPNP = 2
    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255

    class ScanResult:
        CHANNEL_WIDTH_20MHZ = 0
        CHANNEL_WIDTH_40MHZ = 1
        CHANNEL_WIDTH_80MHZ = 2
        CHANNEL_WIDTH_160MHZ = 3
        CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4

    # Macros for wifi rtt.
    class RttType(IntEnum):
        TYPE_ONE_SIDED = 1
        TYPE_TWO_SIDED = 2

    class RttPeerType(IntEnum):
        PEER_TYPE_AP = 1
        PEER_TYPE_STA = 2  # Requires NAN.
        PEER_P2P_GO = 3
        PEER_P2P_CLIENT = 4
        PEER_NAN = 5

    class RttPreamble(IntEnum):
        PREAMBLE_LEGACY = 0x01
        PREAMBLE_HT = 0x02
        PREAMBLE_VHT = 0x04

    class RttBW(IntEnum):
        BW_5_SUPPORT = 0x01
        BW_10_SUPPORT = 0x02
        BW_20_SUPPORT = 0x04
        BW_40_SUPPORT = 0x08
        BW_80_SUPPORT = 0x10
        BW_160_SUPPORT = 0x20

    class Rtt(IntEnum):
        STATUS_SUCCESS = 0
        STATUS_FAILURE = 1
        STATUS_FAIL_NO_RSP = 2
        STATUS_FAIL_REJECTED = 3
        STATUS_FAIL_NOT_SCHEDULED_YET = 4
        STATUS_FAIL_TM_TIMEOUT = 5
        STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
        STATUS_FAIL_NO_CAPABILITY = 7
        STATUS_ABORTED = 8
        STATUS_FAIL_INVALID_TS = 9
        STATUS_FAIL_PROTOCOL = 10
        STATUS_FAIL_SCHEDULE = 11
        STATUS_FAIL_BUSY_TRY_LATER = 12
        STATUS_INVALID_REQ = 13
        STATUS_NO_WIFI = 14
        STATUS_FAIL_FTM_PARAM_OVERRIDE = 15

        REASON_UNSPECIFIED = -1
        REASON_NOT_AVAILABLE = -2
        REASON_INVALID_LISTENER = -3
        REASON_INVALID_REQUEST = -4

    class RttParam:
        device_type = "deviceType"
        request_type = "requestType"
        BSSID = "bssid"
        channel_width = "channelWidth"
        frequency = "frequency"
        center_freq0 = "centerFreq0"
        center_freq1 = "centerFreq1"
        number_burst = "numberBurst"
        interval = "interval"
        num_samples_per_burst = "numSamplesPerBurst"
        num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
        num_retries_per_FTMR = "numRetriesPerFTMR"
        lci_request = "LCIRequest"
        lcr_request = "LCRRequest"
        burst_timeout = "burstTimeout"
        preamble = "preamble"
        bandwidth = "bandwidth"
        margin = "margin"

    RTT_MARGIN_OF_ERROR = {
        RttBW.BW_80_SUPPORT: 2,
        RttBW.BW_40_SUPPORT: 5,
        RttBW.BW_20_SUPPORT: 5
    }

    # Macros as specified in the WifiScanner code.
    WIFI_BAND_UNSPECIFIED = 0  # not specified
    WIFI_BAND_24_GHZ = 1  # 2.4 GHz band
    WIFI_BAND_5_GHZ = 2  # 5 GHz band without DFS channels
    WIFI_BAND_5_GHZ_DFS_ONLY = 4  # 5 GHz band with DFS channels
    WIFI_BAND_5_GHZ_WITH_DFS = 6  # 5 GHz band with DFS channels
    WIFI_BAND_BOTH = 3  # both bands without DFS channels
    WIFI_BAND_BOTH_WITH_DFS = 7  # both bands with DFS channels

    REPORT_EVENT_AFTER_BUFFER_FULL = 0
    REPORT_EVENT_AFTER_EACH_SCAN = 1
    REPORT_EVENT_FULL_SCAN_RESULT = 2

    SCAN_TYPE_LOW_LATENCY = 0
    SCAN_TYPE_LOW_POWER = 1
    SCAN_TYPE_HIGH_ACCURACY = 2

    # US Wifi frequencies
    ALL_2G_FREQUENCIES = [
        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
    ]
    DFS_5G_FREQUENCIES = [
        5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, 5640,
        5660, 5680, 5700, 5720
    ]
    NONE_DFS_5G_FREQUENCIES = [
        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
    ]
    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES

    band_to_frequencies = {
        WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
        WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
        WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
        WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
        WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
        WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
    }

    # TODO: add all of the band mapping.
    softap_band_frequencies = {
        WIFI_CONFIG_SOFTAP_BAND_2G: ALL_2G_FREQUENCIES,
        WIFI_CONFIG_SOFTAP_BAND_5G: ALL_5G_FREQUENCIES
    }

    # All Wifi frequencies to channels lookup.
    freq_to_channel = {
        2412: 1,
        2417: 2,
        2422: 3,
        2427: 4,
        2432: 5,
        2437: 6,
        2442: 7,
        2447: 8,
        2452: 9,
        2457: 10,
        2462: 11,
        2467: 12,
        2472: 13,
        2484: 14,
        4915: 183,
        4920: 184,
        4925: 185,
        4935: 187,
        4940: 188,
        4945: 189,
        4960: 192,
        4980: 196,
        5035: 7,
        5040: 8,
        5045: 9,
        5055: 11,
        5060: 12,
        5080: 16,
        5170: 34,
        5180: 36,
        5190: 38,
        5200: 40,
        5210: 42,
        5220: 44,
        5230: 46,
        5240: 48,
        5260: 52,
        5280: 56,
        5300: 60,
        5320: 64,
        5500: 100,
        5520: 104,
        5540: 108,
        5560: 112,
        5580: 116,
        5600: 120,
        5620: 124,
        5640: 128,
        5660: 132,
        5680: 136,
        5700: 140,
        5745: 149,
        5765: 153,
        5785: 157,
        5795: 159,
        5805: 161,
        5825: 165,
        5845: 169,
        5865: 173,
        5885: 177
    }

    # All Wifi channels to frequencies lookup.
    channel_2G_to_freq = {
        1: 2412,
        2: 2417,
        3: 2422,
        4: 2427,
        5: 2432,
        6: 2437,
        7: 2442,
        8: 2447,
        9: 2452,
        10: 2457,
        11: 2462,
        12: 2467,
        13: 2472,
        14: 2484
    }

    channel_5G_to_freq = {
        183: 4915,
        184: 4920,
        185: 4925,
        187: 4935,
        188: 4940,
        189: 4945,
        192: 4960,
        196: 4980,
        7: 5035,
        8: 5040,
        9: 5045,
        11: 5055,
        12: 5060,
        16: 5080,
        34: 5170,
        36: 5180,
        38: 5190,
        40: 5200,
        42: 5210,
        44: 5220,
        46: 5230,
        48: 5240,
        50: 5250,
        52: 5260,
        56: 5280,
        60: 5300,
        64: 5320,
        100: 5500,
        104: 5520,
        108: 5540,
        112: 5560,
        116: 5580,
        120: 5600,
        124: 5620,
        128: 5640,
        132: 5660,
        136: 5680,
        140: 5700,
        149: 5745,
        151: 5755,
        153: 5765,
        155: 5775,
        157: 5785,
        159: 5795,
        161: 5805,
        165: 5825,
        169: 5845,
        173: 5865,
        177: 5885
    }

    channel_6G_to_freq = {4 * x + 1: 5955 + 20 * x for x in range(59)}

    channel_to_freq = {
        '2G': channel_2G_to_freq,
        '5G': channel_5G_to_freq,
        '6G': channel_6G_to_freq
    }


class WifiChannelBase:
    ALL_2G_FREQUENCIES = []
    DFS_5G_FREQUENCIES = []
    NONE_DFS_5G_FREQUENCIES = []
    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
    MIX_CHANNEL_SCAN = []

    def band_to_freq(self, band):
        _band_to_frequencies = {
            WifiEnums.WIFI_BAND_24_GHZ:
            self.ALL_2G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ:
            self.NONE_DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY:
            self.DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS:
            self.ALL_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_BOTH:
            self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
            self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
        }
        return _band_to_frequencies[band]


class WifiChannelUS(WifiChannelBase):
    # US Wifi frequencies
    ALL_2G_FREQUENCIES = [
        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
    ]
    NONE_DFS_5G_FREQUENCIES = [
        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
    ]
    MIX_CHANNEL_SCAN = [
        2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500, 5320, 5520, 5560,
        5700, 5745, 5805
    ]

    def __init__(self, model=None, support_addition_channel=[]):
        if model in support_addition_channel:
            self.ALL_2G_FREQUENCIES = [
                2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457,
                2462, 2467, 2472
                ]
        self.DFS_5G_FREQUENCIES = [
            5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620,
            5640, 5660, 5680, 5700, 5720, 5845, 5865, 5885
            ]
        self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES


class WifiReferenceNetworks:
    """ Class to parse and return networks of different band and
        auth type from reference_networks
    """
    def __init__(self, obj):
        self.reference_networks = obj
        self.WIFI_2G = "2g"
        self.WIFI_5G = "5g"

        self.secure_networks_2g = []
        self.secure_networks_5g = []
        self.open_networks_2g = []
        self.open_networks_5g = []
        self._parse_networks()

    def _parse_networks(self):
        for network in self.reference_networks:
            for key in network:
                if key == self.WIFI_2G:
                    if "password" in network[key]:
                        self.secure_networks_2g.append(network[key])
                    else:
                        self.open_networks_2g.append(network[key])
                else:
                    if "password" in network[key]:
                        self.secure_networks_5g.append(network[key])
                    else:
                        self.open_networks_5g.append(network[key])

    def return_2g_secure_networks(self):
        return self.secure_networks_2g

    def return_5g_secure_networks(self):
        return self.secure_networks_5g

    def return_2g_open_networks(self):
        return self.open_networks_2g

    def return_5g_open_networks(self):
        return self.open_networks_5g

    def return_secure_networks(self):
        return self.secure_networks_2g + self.secure_networks_5g

    def return_open_networks(self):
        return self.open_networks_2g + self.open_networks_5g


def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
    """Wrapper function that handles the bahevior of assert_on_fail.

    When assert_on_fail is True, let all test signals through, which can
    terminate test cases directly. When assert_on_fail is False, the wrapper
    raises no test signals and reports operation status by returning True or
    False.

    Args:
        func: The function to wrap. This function reports operation status by
              raising test signals.
        assert_on_fail: A boolean that specifies if the output of the wrapper
                        is test signal based or return value based.
        args: Positional args for func.
        kwargs: Name args for func.

    Returns:
        If assert_on_fail is True, returns True/False to signal operation
        status, otherwise return nothing.
    """
    try:
        func(*args, **kwargs)
        if not assert_on_fail:
            return True
    except signals.TestSignal:
        if assert_on_fail:
            raise
        return False


def assert_network_in_list(target, network_list):
    """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
    networks.

    Args:
        target: A dict representing a Wi-Fi network.
                E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
        network_list: A list of dicts, each representing a Wi-Fi network.
    """
    match_results = match_networks(target, network_list)
    asserts.assert_true(
        match_results, "Target network %s, does not exist in network list %s" %
        (target, network_list))


def match_networks(target_params, networks):
    """Finds the WiFi networks that match a given set of parameters in a list
    of WiFi networks.

    To be considered a match, the network should contain every key-value pair
    of target_params

    Args:
        target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
                       E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
        networks: A list of dict objects representing WiFi networks.

    Returns:
        The networks that match the target parameters.
    """
    results = []
    asserts.assert_true(target_params,
                        "Expected networks object 'target_params' is empty")
    for n in networks:
        add_network = 1
        for k, v in target_params.items():
            if k not in n:
                add_network = 0
                break
            if n[k] != v:
                add_network = 0
                break
        if add_network:
            results.append(n)
    return results


def wait_for_wifi_state(ad, state, assert_on_fail=True):
    """Waits for the device to transition to the specified wifi state

    Args:
        ad: An AndroidDevice object.
        state: Wifi state to wait for.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the device transitions
        to the specified state, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(_wait_for_wifi_state,
                                   assert_on_fail,
                                   ad,
                                   state=state)


def _wait_for_wifi_state(ad, state):
    """Toggles the state of wifi.

    TestFailure signals are raised when something goes wrong.

    Args:
        ad: An AndroidDevice object.
        state: Wifi state to wait for.
    """
    if state == ad.droid.wifiCheckState():
        # Check if the state is already achieved, so we don't wait for the
        # state change event by mistake.
        return
    ad.droid.wifiStartTrackingStateChange()
    fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (
        state, ad.serial)
    try:
        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
                             lambda x: x["data"]["enabled"] == state,
                             SHORT_TIMEOUT)
    except Empty:
        asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
    """Toggles the state of wifi.

    Args:
        ad: An AndroidDevice object.
        new_state: Wifi state to set to. If None, opposite of the current state.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the toggle was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(_wifi_toggle_state,
                                   assert_on_fail,
                                   ad,
                                   new_state=new_state)


def _wifi_toggle_state(ad, new_state=None):
    """Toggles the state of wifi.

    TestFailure signals are raised when something goes wrong.

    Args:
        ad: An AndroidDevice object.
        new_state: The state to set Wi-Fi to. If None, opposite of the current
                   state will be set.
    """
    if new_state is None:
        new_state = not ad.droid.wifiCheckState()
    elif new_state == ad.droid.wifiCheckState():
        # Check if the new_state is already achieved, so we don't wait for the
        # state change event by mistake.
        return
    ad.droid.wifiStartTrackingStateChange()
    ad.log.info("Setting Wi-Fi state to %s.", new_state)
    ad.ed.clear_all_events()
    # Setting wifi state.
    ad.droid.wifiToggleState(new_state)
    time.sleep(2)
    fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
                                                           ad.serial)
    try:
        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
                             lambda x: x["data"]["enabled"] == new_state,
                             SHORT_TIMEOUT)
    except Empty:
        asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def reset_wifi(ad):
    """Clears all saved Wi-Fi networks on a device.

    This will turn Wi-Fi on.

    Args:
        ad: An AndroidDevice object.

    """
    networks = ad.droid.wifiGetConfiguredNetworks()
    if not networks:
        return
    removed = []
    for n in networks:
        if n['networkId'] not in removed:
            ad.droid.wifiForgetNetwork(n['networkId'])
            removed.append(n['networkId'])
        else:
            continue
        try:
            event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
                                    SHORT_TIMEOUT)
        except Empty:
            logging.warning("Could not confirm the removal of network %s.", n)
    # Check again to see if there's any network left.
    asserts.assert_true(
        not ad.droid.wifiGetConfiguredNetworks(),
        "Failed to remove these configured Wi-Fi networks: %s" % networks)



def toggle_airplane_mode_on_and_off(ad):
    """Turn ON and OFF Airplane mode.

    ad: An AndroidDevice object.
    Returns: Assert if turning on/off Airplane mode fails.

    """
    ad.log.debug("Toggling Airplane mode ON.")
    asserts.assert_true(utils.force_airplane_mode(ad, True),
                        "Can not turn on airplane mode on: %s" % ad.serial)
    time.sleep(DEFAULT_TIMEOUT)
    ad.log.debug("Toggling Airplane mode OFF.")
    asserts.assert_true(utils.force_airplane_mode(ad, False),
                        "Can not turn on airplane mode on: %s" % ad.serial)
    time.sleep(DEFAULT_TIMEOUT)


def toggle_wifi_off_and_on(ad):
    """Turn OFF and ON WiFi.

    ad: An AndroidDevice object.
    Returns: Assert if turning off/on WiFi fails.

    """
    ad.log.debug("Toggling wifi OFF.")
    wifi_toggle_state(ad, False)
    time.sleep(DEFAULT_TIMEOUT)
    ad.log.debug("Toggling wifi ON.")
    wifi_toggle_state(ad, True)
    time.sleep(DEFAULT_TIMEOUT)


def wifi_forget_network(ad, net_ssid):
    """Remove configured Wifi network on an android device.

    Args:
        ad: android_device object for forget network.
        net_ssid: ssid of network to be forget

    """
    networks = ad.droid.wifiGetConfiguredNetworks()
    if not networks:
        return
    removed = []
    for n in networks:
        if net_ssid in n[WifiEnums.SSID_KEY] and n['networkId'] not in removed:
            ad.droid.wifiForgetNetwork(n['networkId'])
            removed.append(n['networkId'])
            try:
                event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
                                        SHORT_TIMEOUT)
            except Empty:
                asserts.fail("Failed to remove network %s." % n)
            break


def wifi_test_device_init(ad, country_code=WifiEnums.CountryCode.US):
    """Initializes an android device for wifi testing.

    0. Make sure SL4A connection is established on the android device.
    1. Disable location service's WiFi scan.
    2. Turn WiFi on.
    3. Clear all saved networks.
    4. Set country code to US.
    5. Enable WiFi verbose logging.
    6. Sync device time with computer time.
    7. Turn off cellular data.
    8. Turn off ambient display.
    """
    utils.require_sl4a((ad, ))
    ad.droid.wifiScannerToggleAlwaysAvailable(False)
    msg = "Failed to turn off location service's scan."
    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
    wifi_toggle_state(ad, True)
    reset_wifi(ad)
    ad.droid.wifiEnableVerboseLogging(1)
    msg = "Failed to enable WiFi verbose logging."
    asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
    # We don't verify the following settings since they are not critical.
    # Set wpa_supplicant log level to EXCESSIVE.
    output = ad.adb.shell(
        "wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
        "wlan0 log_level EXCESSIVE",
        ignore_status=True)
    ad.log.info("wpa_supplicant log change status: %s", output)
    utils.sync_device_time(ad)
    ad.droid.telephonyToggleDataConnection(False)
    set_wifi_country_code(ad, country_code)
    utils.set_ambient_display(ad, False)


def set_wifi_country_code(ad, country_code):
    """Sets the wifi country code on the device.

    Args:
        ad: An AndroidDevice object.
        country_code: 2 letter ISO country code

    Raises:
        An RpcException if unable to set the country code.
    """
    try:
        ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
    except Exception as e:
        ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)


def start_wifi_connection_scan(ad):
    """Starts a wifi connection scan and wait for results to become available.

    Args:
        ad: An AndroidDevice object.
    """
    ad.ed.clear_all_events()
    ad.droid.wifiStartScan()
    try:
        ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
    except Empty:
        asserts.fail("Wi-Fi results did not become available within 60s.")


def start_wifi_connection_scan_and_return_status(ad):
    """
    Starts a wifi connection scan and wait for results to become available
    or a scan failure to be reported.

    Args:
        ad: An AndroidDevice object.
    Returns:
        True: if scan succeeded & results are available
        False: if scan failed
    """
    ad.ed.clear_all_events()
    ad.droid.wifiStartScan()
    try:
        events = ad.ed.pop_events("WifiManagerScan(ResultsAvailable|Failure)",
                                  60)
    except Empty:
        asserts.fail(
            "Wi-Fi scan results/failure did not become available within 60s.")
    # If there are multiple matches, we check for atleast one success.
    for event in events:
        if event["name"] == "WifiManagerScanResultsAvailable":
            return True
        elif event["name"] == "WifiManagerScanFailure":
            ad.log.debug("Scan failure received")
    return False


def start_wifi_connection_scan_and_check_for_network(ad,
                                                     network_ssid,
                                                     max_tries=3,
                                                     found=True):
    """
    Start connectivity scans & checks if the |network_ssid| is seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
        found: True if expected a given SSID to be found; False otherwise.
    Returns:
        True: if network_ssid status is expected in scan results.
        False: if network_ssid status is expected in scan results.
    """
    start_time = time.time()
    for num_tries in range(max_tries):
        if start_wifi_connection_scan_and_return_status(ad):
            scan_results = ad.droid.wifiGetScanResults()
            match_results = match_networks({WifiEnums.SSID_KEY: network_ssid},
                                           scan_results)
            if found == (len(match_results) > 0):
                if found:
                    ad.log.debug("%s network found in %s seconds." %
                                  (network_ssid, (time.time() - start_time)))
                    return True
                # if found == False, we loop over till max_tries to make sure the ssid is
                # really no show.
                elif not found and (num_tries + 1) == max_tries:
                    ad.log.debug("%s network not found in %d tries in %s seconds." %
                                 (network_ssid, max_tries, (time.time() - start_time)))
                    return True
        else:
            if (num_tries + 1) == max_tries:
                break
            # wait for a while when a WiFi scan is failed, e.g. because of device busy.
            time.sleep(WIFI_SCAN_RETRY_INTERVAL_SEC)
    return False


def start_wifi_connection_scan_and_ensure_network_found(
        ad, network_ssid, max_tries=3):
    """
    Start connectivity scans & ensure the |network_ssid| is seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.
    This method asserts on failure!

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    """
    ad.log.info("Starting scans to ensure %s is present", network_ssid)
    assert_msg = "Failed to find " + network_ssid + " in scan results" \
        " after " + str(max_tries) + " tries"
    asserts.assert_true(
        start_wifi_connection_scan_and_check_for_network(
            ad, network_ssid, max_tries, True), assert_msg)


def start_wifi_connection_scan_and_ensure_network_not_found(
        ad, network_ssid, max_tries=3):
    """
    Start connectivity scans & ensure the |network_ssid| is not seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.
    This method asserts on failure!

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    """
    ad.log.info("Starting scans to ensure %s is not present", network_ssid)
    assert_msg = "Found " + network_ssid + " in scan results" \
        " after " + str(max_tries) + " tries"
    asserts.assert_true(
        start_wifi_connection_scan_and_check_for_network(
            ad, network_ssid, max_tries, False), assert_msg)


def start_wifi_background_scan(ad, scan_setting):
    """Starts wifi background scan.

    Args:
        ad: android_device object to initiate connection on.
        scan_setting: A dict representing the settings of the scan.

    Returns:
        If scan was started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
    event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
                            SHORT_TIMEOUT)
    return event['data']


def start_wifi_tethering(ad, ssid, password, band=None, hidden=None,
                         security=None):
    """Starts wifi tethering on an android_device.

    Args:
        ad: android_device to start wifi tethering on.
        ssid: The SSID the soft AP should broadcast.
        password: The password the soft AP should use.
        band: The band the soft AP should be set on. It should be either
            WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
        hidden: boolean to indicate if the AP needs to be hidden or not.
        security: security type of softap.

    Returns:
        No return value. Error checks in this function will raise test failure signals
    """
    config = {WifiEnums.SSID_KEY: ssid}
    if password:
        config[WifiEnums.PWD_KEY] = password
    if band:
        config[WifiEnums.AP_BAND_KEY] = band
    if hidden:
        config[WifiEnums.HIDDEN_KEY] = hidden
    if security:
        config[WifiEnums.SECURITY] = security
    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(config),
                        "Failed to update WifiAp Configuration")
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
    try:
        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
        ad.log.debug("Tethering started successfully.")
    except Empty:
        msg = "Failed to receive confirmation of wifi tethering starting"
        asserts.fail(msg)
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def save_wifi_soft_ap_config(ad,
                             wifi_config,
                             band=None,
                             hidden=None,
                             security=None,
                             password=None,
                             channel=None,
                             max_clients=None,
                             shutdown_timeout_enable=None,
                             shutdown_timeout_millis=None,
                             client_control_enable=None,
                             allowedList=None,
                             blockedList=None,
                             bands=None,
                             channel_frequencys=None,
                             mac_randomization_setting=None,
                             bridged_opportunistic_shutdown_enabled=None,
                             ieee80211ax_enabled=None):
    """ Save a soft ap configuration and verified
    Args:
        ad: android_device to set soft ap configuration.
        wifi_config: a soft ap configuration object, at least include SSID.
        band: specifies the band for the soft ap.
        hidden: specifies the soft ap need to broadcast its SSID or not.
        security: specifies the security type for the soft ap.
        password: specifies the password for the soft ap.
        channel: specifies the channel for the soft ap.
        max_clients: specifies the maximum connected client number.
        shutdown_timeout_enable: specifies the auto shut down enable or not.
        shutdown_timeout_millis: specifies the shut down timeout value.
        client_control_enable: specifies the client control enable or not.
        allowedList: specifies allowed clients list.
        blockedList: specifies blocked clients list.
        bands: specifies the band list for the soft ap.
        channel_frequencys: specifies the channel frequency list for soft ap.
        mac_randomization_setting: specifies the mac randomization setting.
        bridged_opportunistic_shutdown_enabled: specifies the opportunistic
                shutdown enable or not.
        ieee80211ax_enabled: specifies the ieee80211ax enable or not.
    """
    if security and password:
        wifi_config[WifiEnums.SECURITY] = security
        wifi_config[WifiEnums.PWD_KEY] = password
    if hidden is not None:
        wifi_config[WifiEnums.HIDDEN_KEY] = hidden
    if max_clients is not None:
        wifi_config[WifiEnums.AP_MAXCLIENTS_KEY] = max_clients
    if shutdown_timeout_enable is not None:
        wifi_config[
            WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] = shutdown_timeout_enable
    if shutdown_timeout_millis is not None:
        wifi_config[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] = shutdown_timeout_millis
    if client_control_enable is not None:
        wifi_config[WifiEnums.AP_CLIENTCONTROL_KEY] = client_control_enable
    if allowedList is not None:
        wifi_config[WifiEnums.AP_ALLOWEDLIST_KEY] = allowedList
    if blockedList is not None:
        wifi_config[WifiEnums.AP_BLOCKEDLIST_KEY] = blockedList
    if mac_randomization_setting is not None:
        wifi_config[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY
                ] = mac_randomization_setting
    if bridged_opportunistic_shutdown_enabled is not None:
        wifi_config[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY
                ] = bridged_opportunistic_shutdown_enabled
    if ieee80211ax_enabled is not None:
       wifi_config[WifiEnums.AP_IEEE80211AX_ENABLED_KEY]= ieee80211ax_enabled
    if channel_frequencys is not None:
        wifi_config[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] = channel_frequencys
    elif bands is not None:
        wifi_config[WifiEnums.AP_BANDS_KEY] = bands
    elif band is not None:
        if channel is not None:
            wifi_config[WifiEnums.AP_BAND_KEY] = band
            wifi_config[WifiEnums.AP_CHANNEL_KEY] = channel
        else:
             wifi_config[WifiEnums.AP_BAND_KEY] = band

    if WifiEnums.AP_CHANNEL_KEY in wifi_config and wifi_config[
            WifiEnums.AP_CHANNEL_KEY] == 0:
        del wifi_config[WifiEnums.AP_CHANNEL_KEY]

    if WifiEnums.SECURITY in wifi_config and wifi_config[
            WifiEnums.SECURITY] == WifiEnums.SoftApSecurityType.OPEN:
        del wifi_config[WifiEnums.SECURITY]
        del wifi_config[WifiEnums.PWD_KEY]

    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
                        "Failed to set WifiAp Configuration")

    wifi_ap = ad.droid.wifiGetApConfiguration()
    asserts.assert_true(
        wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
        "Hotspot SSID doesn't match")
    if WifiEnums.SECURITY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.SECURITY] == wifi_config[WifiEnums.SECURITY],
            "Hotspot Security doesn't match")
    if WifiEnums.PWD_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.PWD_KEY] == wifi_config[WifiEnums.PWD_KEY],
            "Hotspot Password doesn't match")

    if WifiEnums.HIDDEN_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.HIDDEN_KEY] == wifi_config[WifiEnums.HIDDEN_KEY],
            "Hotspot hidden setting doesn't match")

    if WifiEnums.AP_CHANNEL_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_CHANNEL_KEY] == wifi_config[
                WifiEnums.AP_CHANNEL_KEY], "Hotspot Channel doesn't match")
    if WifiEnums.AP_MAXCLIENTS_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_MAXCLIENTS_KEY] == wifi_config[
                WifiEnums.AP_MAXCLIENTS_KEY],
            "Hotspot Max Clients doesn't match")
    if WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] == wifi_config[
                WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY],
            "Hotspot ShutDown feature flag doesn't match")
    if WifiEnums.AP_SHUTDOWNTIMEOUT_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] == wifi_config[
                WifiEnums.AP_SHUTDOWNTIMEOUT_KEY],
            "Hotspot ShutDown timeout setting doesn't match")
    if WifiEnums.AP_CLIENTCONTROL_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_CLIENTCONTROL_KEY] == wifi_config[
                WifiEnums.AP_CLIENTCONTROL_KEY],
            "Hotspot Client control flag doesn't match")
    if WifiEnums.AP_ALLOWEDLIST_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_ALLOWEDLIST_KEY] == wifi_config[
                WifiEnums.AP_ALLOWEDLIST_KEY],
            "Hotspot Allowed List doesn't match")
    if WifiEnums.AP_BLOCKEDLIST_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_BLOCKEDLIST_KEY] == wifi_config[
                WifiEnums.AP_BLOCKEDLIST_KEY],
            "Hotspot Blocked List doesn't match")

    if WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY] == wifi_config[
                  WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY],
            "Hotspot Mac randomization setting doesn't match")

    if WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY] == wifi_config[
                  WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY],
            "Hotspot bridged shutdown enable setting doesn't match")

    if WifiEnums.AP_IEEE80211AX_ENABLED_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_IEEE80211AX_ENABLED_KEY] == wifi_config[
                  WifiEnums.AP_IEEE80211AX_ENABLED_KEY],
            "Hotspot 80211 AX enable setting doesn't match")

    if WifiEnums.AP_CHANNEL_FREQUENCYS_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] == wifi_config[
                  WifiEnums.AP_CHANNEL_FREQUENCYS_KEY],
            "Hotspot channels setting doesn't match")

def start_wifi_tethering_saved_config(ad):
    """ Turn on wifi hotspot with a config that is already saved """
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
    try:
        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
    except:
        asserts.fail("Didn't receive wifi tethering starting confirmation")
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def stop_wifi_tethering(ad):
    """Stops wifi tethering on an android_device.
    Args:
        ad: android_device to stop wifi tethering on.
    """
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
    try:
        ad.ed.pop_event("WifiManagerApDisabled", 30)
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: not x["data"]["ACTIVE_TETHER"], 30)
    except Empty:
        msg = "Failed to receive confirmation of wifi tethering stopping"
        asserts.fail(msg)
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def toggle_wifi_and_wait_for_reconnection(ad,
                                          network,
                                          num_of_tries=1,
                                          assert_on_fail=True):
    """Toggle wifi state and then wait for Android device to reconnect to
    the provided wifi network.

    This expects the device to be already connected to the provided network.

    Logic steps are
     1. Ensure that we're connected to the network.
     2. Turn wifi off.
     3. Wait for 10 seconds.
     4. Turn wifi on.
     5. Wait for the "connected" event, then confirm the connected ssid is the
        one requested.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to await connection. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the toggle was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(_toggle_wifi_and_wait_for_reconnection,
                                   assert_on_fail,
                                   ad,
                                   network,
                                   num_of_tries=num_of_tries)


def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=3):
    """Toggle wifi state and then wait for Android device to reconnect to
    the provided wifi network.

    This expects the device to be already connected to the provided network.

    Logic steps are
     1. Ensure that we're connected to the network.
     2. Turn wifi off.
     3. Wait for 10 seconds.
     4. Turn wifi on.
     5. Wait for the "connected" event, then confirm the connected ssid is the
        one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to await connection. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    expected_ssid = network[WifiEnums.SSID_KEY]
    # First ensure that we're already connected to the provided network.
    verify_con = {WifiEnums.SSID_KEY: expected_ssid}
    verify_wifi_connection_info(ad, verify_con)
    # Now toggle wifi state and wait for the connection event.
    wifi_toggle_state(ad, False)
    time.sleep(10)
    wifi_toggle_state(ad, True)
    ad.droid.wifiStartTrackingStateChange()
    try:
        connect_result = None
        for i in range(num_of_tries):
            try:
                connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                                 30)
                break
            except Empty:
                pass
        asserts.assert_true(
            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
            (network, ad.serial))
        logging.debug("Connection result on %s: %s.", ad.serial,
                      connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(
            actual_ssid, expected_ssid, "Connected to the wrong network on %s."
            "Expected %s, but got %s." %
            (ad.serial, expected_ssid, actual_ssid))
        logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
                     ad.serial)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wait_for_connect(ad,
                     expected_ssid=None,
                     expected_id=None,
                     tries=2,
                     assert_on_fail=True):
    """Wait for a connect event.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: An Android device object.
        expected_ssid: SSID of the network to connect to.
        expected_id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    return _assert_on_fail_handler(_wait_for_connect, assert_on_fail, ad,
                                   expected_ssid, expected_id, tries)


def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
    """Wait for a connect event.

    Args:
        ad: An Android device object.
        expected_ssid: SSID of the network to connect to.
        expected_id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.
    """
    ad.droid.wifiStartTrackingStateChange()
    try:
        connect_result = _wait_for_connect_event(ad,
                                                 ssid=expected_ssid,
                                                 id=expected_id,
                                                 tries=tries)
        asserts.assert_true(
            connect_result,
            "Failed to connect to Wi-Fi network %s" % expected_ssid)
        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        if expected_ssid:
            asserts.assert_equal(actual_ssid, expected_ssid,
                                 "Connected to the wrong network")
        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
        if expected_id:
            asserts.assert_equal(actual_id, expected_id,
                                 "Connected to the wrong network")
        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
    except Empty:
        asserts.fail("Failed to start connection process to %s" %
                     expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
                     error)
        raise signals.TestFailure("Failed to connect to %s network" %
                                  expected_ssid)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
    """Wait for a connect event on queue and pop when available.

    Args:
        ad: An Android device object.
        ssid: SSID of the network to connect to.
        id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.

    Returns:
        A dict with details of the connection data, which looks like this:
        {
         'time': 1485460337798,
         'name': 'WifiNetworkConnected',
         'data': {
                  'rssi': -27,
                  'is_24ghz': True,
                  'mac_address': '02:00:00:00:00:00',
                  'network_id': 1,
                  'BSSID': '30:b5:c2:33:d3:fc',
                  'ip_address': 117483712,
                  'link_speed': 54,
                  'supplicant_state': 'completed',
                  'hidden_ssid': False,
                  'SSID': 'wh_ap1_2g',
                  'is_5ghz': False}
        }

    """
    conn_result = None

    # If ssid and network id is None, just wait for any connect event.
    if id is None and ssid is None:
        for i in range(tries):
            try:
                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                              30)
                break
            except Empty:
                pass
    else:
        # If ssid or network id is specified, wait for specific connect event.
        for i in range(tries):
            try:
                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                              30)
                if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
                    break
                elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
                    break
            except Empty:
                pass

    return conn_result


def wait_for_disconnect(ad, timeout=10):
    """Wait for a disconnect event within the specified timeout.

    Args:
        ad: Android device object.
        timeout: Timeout in seconds.

    """
    try:
        ad.droid.wifiStartTrackingStateChange()
        event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
    except Empty:
        raise signals.TestFailure("Device did not disconnect from the network")
    finally:
        ad.droid.wifiStopTrackingStateChange()


def ensure_no_disconnect(ad, duration=10):
    """Ensure that there is no disconnect for the specified duration.

    Args:
        ad: Android device object.
        duration: Duration in seconds.

    """
    try:
        ad.droid.wifiStartTrackingStateChange()
        event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
        raise signals.TestFailure("Device disconnected from the network")
    except Empty:
        pass
    finally:
        ad.droid.wifiStopTrackingStateChange()


def connect_to_wifi_network(ad, network, assert_on_fail=True,
                            check_connectivity=True, hidden=False,
                            num_of_scan_tries=DEFAULT_SCAN_TRIES,
                            num_of_connect_tries=DEFAULT_CONNECT_TRIES):
    """Connection logic for open and psk wifi networks.

    Args:
        ad: AndroidDevice to use for connection
        network: network info of the network to connect to
        assert_on_fail: If true, errors from wifi_connect will raise
                        test failure signals.
        hidden: Is the Wifi network hidden.
        num_of_scan_tries: The number of times to try scan
                           interface before declaring failure.
        num_of_connect_tries: The number of times to try
                              connect wifi before declaring failure.
    """
    if hidden:
        start_wifi_connection_scan_and_ensure_network_not_found(
            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
    else:
        start_wifi_connection_scan_and_ensure_network_found(
            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
    wifi_connect(ad,
                 network,
                 num_of_tries=num_of_connect_tries,
                 assert_on_fail=assert_on_fail,
                 check_connectivity=check_connectivity)


def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
    """Connect to the given network using network id and verify SSID.

    Args:
        network_id: int Network Id of the network.
        network_ssid: string SSID of the network.

    Returns: True if connect using network id was successful;
             False otherwise.

    """
    start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
    wifi_connect_by_id(ad, network_id)
    connect_data = ad.droid.wifiGetConnectionInfo()
    connect_ssid = connect_data[WifiEnums.SSID_KEY]
    ad.log.debug("Expected SSID = %s Connected SSID = %s" %
                 (network_ssid, connect_ssid))
    if connect_ssid != network_ssid:
        return False
    return True


def wifi_connect(ad,
                 network,
                 num_of_tries=1,
                 assert_on_fail=True,
                 check_connectivity=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    return _assert_on_fail_handler(_wifi_connect,
                                   assert_on_fail,
                                   ad,
                                   network,
                                   num_of_tries=num_of_tries,
                                   check_connectivity=check_connectivity)


def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    asserts.assert_true(
        WifiEnums.SSID_KEY in network,
        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = network[WifiEnums.SSID_KEY]
    ad.droid.wifiConnectByConfig(network)
    ad.log.info("Starting connection process to %s", expected_ssid)
    try:
        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
        connect_result = _wait_for_connect_event(ad,
                                                 ssid=expected_ssid,
                                                 tries=num_of_tries)
        asserts.assert_true(
            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
            (network, ad.serial))
        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(
            actual_ssid, expected_ssid,
            "Connected to the wrong network on %s." % ad.serial)
        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)

        if check_connectivity:
            internet = validate_connection(ad, DEFAULT_PING_ADDR)
            if not internet:
                raise signals.TestFailure(
                    "Failed to connect to internet on %s" % expected_ssid)
    except Empty:
        asserts.fail("Failed to start connection process to %s on %s" %
                     (network, ad.serial))
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
                     error)
        raise signals.TestFailure("Failed to connect to %s network" % network)

    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
    """Connect an Android device to a wifi network using network Id.

    Start connection to the wifi network, with the given network Id, wait for
    the "connected" event, then verify the connected network is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network_id: Integer specifying the network id of the network.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
                            network_id, num_of_tries)


def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
    """Connect an Android device to a wifi network using it's network id.

    Start connection to the wifi network, with the given network id, wait for
    the "connected" event, then verify the connected network is the one requested.

    Args:
        ad: android_device object to initiate connection on.
        network_id: Integer specifying the network id of the network.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    ad.droid.wifiStartTrackingStateChange()
    # Clear all previous events.
    ad.ed.clear_all_events()
    ad.droid.wifiConnectByNetworkId(network_id)
    ad.log.info("Starting connection to network with id %d", network_id)
    try:
        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
        connect_result = _wait_for_connect_event(ad,
                                                 id=network_id,
                                                 tries=num_of_tries)
        asserts.assert_true(
            connect_result,
            "Failed to connect to Wi-Fi network using network id")
        ad.log.debug("Wi-Fi connection result: %s", connect_result)
        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
        asserts.assert_equal(
            actual_id, network_id, "Connected to the wrong network on %s."
            "Expected network id = %d, but got %d." %
            (ad.serial, network_id, actual_id))
        expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        ad.log.info("Connected to Wi-Fi network %s with %d network id.",
                    expected_ssid, network_id)

        internet = validate_connection(ad, DEFAULT_PING_ADDR)
        if not internet:
            raise signals.TestFailure("Failed to connect to internet on %s" %
                                      expected_ssid)
    except Empty:
        asserts.fail("Failed to connect to network with id %d on %s" %
                     (network_id, ad.serial))
    except Exception as error:
        ad.log.error("Failed to connect to network with id %d with error %s",
                     network_id, error)
        raise signals.TestFailure("Failed to connect to network with network"
                                  " id %d" % network_id)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_connect_using_network_request(ad,
                                       network,
                                       network_specifier,
                                       num_of_tries=3):
    """Connect an Android device to a wifi network using network request.

    Trigger a network request with the provided network specifier,
    wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network_specifier: A dictionary representing the network specifier to
                           use.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
    Returns:
        key: Key corresponding to network request.
    """
    key = ad.droid.connectivityRequestWifiNetwork(network_specifier, 0)
    ad.log.info("Sent network request %s with %s " % (key, network_specifier))
    # Need a delay here because UI interaction should only start once wifi
    # starts processing the request.
    time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
    _wait_for_wifi_connect_after_network_request(ad, network, key,
                                                 num_of_tries)
    return key


def wait_for_wifi_connect_after_network_request(ad,
                                                network,
                                                key,
                                                num_of_tries=3,
                                                assert_on_fail=True):
    """
    Simulate and verify the connection flow after initiating the network
    request.

    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        key: Key corresponding to network request.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
                            assert_on_fail, ad, network, key, num_of_tries)


def _wait_for_wifi_connect_after_network_request(ad,
                                                 network,
                                                 key,
                                                 num_of_tries=3):
    """
    Simulate and verify the connection flow after initiating the network
    request.

    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        key: Key corresponding to network request.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
    """
    asserts.assert_true(
        WifiEnums.SSID_KEY in network,
        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = network[WifiEnums.SSID_KEY]
    ad.droid.wifiRegisterNetworkRequestMatchCallback()
    # Wait for the platform to scan and return a list of networks
    # matching the request
    try:
        matched_network = None
        for _ in [0, num_of_tries]:
            on_match_event = ad.ed.pop_event(
                wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
            asserts.assert_true(on_match_event,
                                "Network request on match not received.")
            matched_scan_results = on_match_event["data"]
            ad.log.debug("Network request on match results %s",
                         matched_scan_results)
            matched_network = match_networks(
                {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
                matched_scan_results)
            ad.log.debug("Network request on match %s", matched_network)
            if matched_network:
                break

        asserts.assert_true(matched_network,
                            "Target network %s not found" % network)

        ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
        ad.log.info("Sent user selection for network request %s",
                    expected_ssid)

        # Wait for the platform to connect to the network.
        autils.wait_for_event_with_keys(
            ad, cconsts.EVENT_NETWORK_CALLBACK, 60,
            (cconsts.NETWORK_CB_KEY_ID, key),
            (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_AVAILABLE))
        on_capabilities_changed = autils.wait_for_event_with_keys(
            ad, cconsts.EVENT_NETWORK_CALLBACK, 10,
            (cconsts.NETWORK_CB_KEY_ID, key),
            (cconsts.NETWORK_CB_KEY_EVENT,
             cconsts.NETWORK_CB_CAPABILITIES_CHANGED))
        connected_network = None
        # WifiInfo is attached to TransportInfo only in S.
        if ad.droid.isSdkAtLeastS():
            connected_network = (
                on_capabilities_changed["data"][
                    cconsts.NETWORK_CB_KEY_TRANSPORT_INFO]
            )
        else:
            connected_network = ad.droid.wifiGetConnectionInfo()
        ad.log.info("Connected to network %s", connected_network)
        asserts.assert_equal(
            connected_network[WifiEnums.SSID_KEY], expected_ssid,
            "Connected to the wrong network."
            "Expected %s, but got %s." % (network, connected_network))
    except Empty:
        asserts.fail("Failed to connect to %s" % expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s" %
                     (expected_ssid, error))
        raise signals.TestFailure("Failed to connect to %s network" % network)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_passpoint_connect(ad,
                           passpoint_network,
                           num_of_tries=1,
                           assert_on_fail=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        passpoint_network: SSID of the Passpoint network to connect to.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns network id, if the connect was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    _assert_on_fail_handler(_wifi_passpoint_connect,
                            assert_on_fail,
                            ad,
                            passpoint_network,
                            num_of_tries=num_of_tries)


def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        passpoint_network: SSID of the Passpoint network to connect to.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = passpoint_network
    ad.log.info("Starting connection process to passpoint %s", expected_ssid)

    try:
        connect_result = _wait_for_connect_event(ad, expected_ssid,
                                                 num_of_tries)
        asserts.assert_true(
            connect_result, "Failed to connect to WiFi passpoint network %s on"
            " %s" % (expected_ssid, ad.serial))
        ad.log.info("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(
            actual_ssid, expected_ssid,
            "Connected to the wrong network on %s." % ad.serial)
        ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)

        internet = validate_connection(ad, DEFAULT_PING_ADDR)
        if not internet:
            raise signals.TestFailure("Failed to connect to internet on %s" %
                                      expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to passpoint network %s with error %s",
                     expected_ssid, error)
        raise signals.TestFailure("Failed to connect to %s passpoint network" %
                                  expected_ssid)

    finally:
        ad.droid.wifiStopTrackingStateChange()


def delete_passpoint(ad, fqdn):
    """Delete a required Passpoint configuration."""
    try:
        ad.droid.removePasspointConfig(fqdn)
        return True
    except Exception as error:
        ad.log.error(
            "Failed to remove passpoint configuration with FQDN=%s "
            "and error=%s", fqdn, error)
        return False


def start_wifi_single_scan(ad, scan_setting):
    """Starts wifi single shot scan.

    Args:
        ad: android_device object to initiate connection on.
        scan_setting: A dict representing the settings of the scan.

    Returns:
        If scan was started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartScan(scan_setting)
    event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
    ad.log.debug("Got event %s", event)
    return event['data']


def track_connection(ad, network_ssid, check_connection_count):
    """Track wifi connection to network changes for given number of counts

    Args:
        ad: android_device object for forget network.
        network_ssid: network ssid to which connection would be tracked
        check_connection_count: Integer for maximum number network connection
                                check.
    Returns:
        True if connection to given network happen, else return False.
    """
    ad.droid.wifiStartTrackingStateChange()
    while check_connection_count > 0:
        connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
        ad.log.info("Connected to network %s", connect_network)
        if (WifiEnums.SSID_KEY in connect_network['data'] and
                connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
            return True
        check_connection_count -= 1
    ad.droid.wifiStopTrackingStateChange()
    return False


def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
    """Calculate the scan time required based on the band or channels in scan
    setting

    Args:
        wifi_chs: Object of channels supported
        scan_setting: scan setting used for start scan
        stime_channel: scan time per channel

    Returns:
        scan_time: time required for completing a scan
        scan_channels: channel used for scanning
    """
    scan_time = 0
    scan_channels = []
    if "band" in scan_setting and "channels" not in scan_setting:
        scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
    elif "channels" in scan_setting and "band" not in scan_setting:
        scan_channels = scan_setting["channels"]
    scan_time = len(scan_channels) * stime_channel
    for channel in scan_channels:
        if channel in WifiEnums.DFS_5G_FREQUENCIES:
            scan_time += 132  #passive scan time on DFS
    return scan_time, scan_channels


def start_wifi_track_bssid(ad, track_setting):
    """Start tracking Bssid for the given settings.

    Args:
      ad: android_device object.
      track_setting: Setting for which the bssid tracking should be started

    Returns:
      If tracking started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartTrackingBssids(
        track_setting["bssidInfos"], track_setting["apLostThreshold"])
    event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
                            SHORT_TIMEOUT)
    return event['data']


def convert_pem_key_to_pkcs8(in_file, out_file):
    """Converts the key file generated by us to the format required by
    Android using openssl.

    The input file must have the extension "pem". The output file must
    have the extension "der".

    Args:
        in_file: The original key file.
        out_file: The full path to the converted key file, including
        filename.
    """
    asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
    asserts.assert_true(out_file.endswith(".der"),
                        "Output file has to be .der.")
    cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
           " -topk8").format(in_file, out_file)
    utils.exe_cmd(cmd)


def validate_connection(ad,
                        ping_addr=DEFAULT_PING_ADDR,
                        wait_time=15,
                        ping_gateway=True):
    """Validate internet connection by pinging the address provided.

    Args:
        ad: android_device object.
        ping_addr: address on internet for pinging.
        wait_time: wait for some time before validating connection

    Returns:
        ping output if successful, NULL otherwise.
    """
    android_version = int(ad.adb.shell("getprop ro.vendor.build.version.release"))
    # wait_time to allow for DHCP to complete.
    for i in range(wait_time):
        if ad.droid.connectivityNetworkIsConnected():
            if (android_version > 10 and ad.droid.connectivityGetIPv4DefaultGateway()) or android_version < 11:
                break
        time.sleep(1)
    ping = False
    try:
        ping = ad.droid.httpPing(ping_addr)
        ad.log.info("Http ping result: %s.", ping)
    except:
        pass
    if android_version > 10 and not ping and ping_gateway:
        ad.log.info("Http ping failed. Pinging default gateway")
        gw = ad.droid.connectivityGetIPv4DefaultGateway()
        result = ad.adb.shell("ping -c 6 {}".format(gw))
        ad.log.info("Default gateway ping result: %s" % result)
        ping = False if "100% packet loss" in result else True
    return ping


#TODO(angli): This can only verify if an actual value is exactly the same.
# Would be nice to be able to verify an actual value is one of serveral.
def verify_wifi_connection_info(ad, expected_con):
    """Verifies that the information of the currently connected wifi network is
    as expected.

    Args:
        expected_con: A dict representing expected key-value pairs for wifi
            connection. e.g. {"SSID": "test_wifi"}
    """
    current_con = ad.droid.wifiGetConnectionInfo()
    case_insensitive = ["BSSID", "supplicant_state"]
    ad.log.debug("Current connection: %s", current_con)
    for k, expected_v in expected_con.items():
        # Do not verify authentication related fields.
        if k == "password":
            continue
        msg = "Field %s does not exist in wifi connection info %s." % (
            k, current_con)
        if k not in current_con:
            raise signals.TestFailure(msg)
        actual_v = current_con[k]
        if k in case_insensitive:
            actual_v = actual_v.lower()
            expected_v = expected_v.lower()
        msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
                                                          actual_v)
        if actual_v != expected_v:
            raise signals.TestFailure(msg)


def check_autoconnect_to_open_network(
        ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
    """Connects to any open WiFI AP
     Args:
         timeout value in sec to wait for UE to connect to a WiFi AP
     Returns:
         True if UE connects to WiFi AP (supplicant_state = completed)
         False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
    """
    if ad.droid.wifiCheckState():
        return True
    ad.droid.wifiToggleState()
    wifi_connection_state = None
    timeout = time.time() + conn_timeout
    while wifi_connection_state != "completed":
        wifi_connection_state = ad.droid.wifiGetConnectionInfo(
        )['supplicant_state']
        if time.time() > timeout:
            ad.log.warning("Failed to connect to WiFi AP")
            return False
    return True


def expand_enterprise_config_by_phase2(config):
    """Take an enterprise config and generate a list of configs, each with
    a different phase2 auth type.

    Args:
        config: A dict representing enterprise config.

    Returns
        A list of enterprise configs.
    """
    results = []
    phase2_types = WifiEnums.EapPhase2
    if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
        # Skip unsupported phase2 types for PEAP.
        phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
    for phase2_type in phase2_types:
        # Skip a special case for passpoint TTLS.
        if (WifiEnums.Enterprise.FQDN in config
                and phase2_type == WifiEnums.EapPhase2.GTC):
            continue
        c = dict(config)
        c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
        results.append(c)
    return results


def generate_eap_test_name(config, ad=None):
    """ Generates a test case name based on an EAP configuration.

    Args:
        config: A dict representing an EAP credential.
        ad object: Redundant but required as the same param is passed
                   to test_func in run_generated_tests

    Returns:
        A string representing the name of a generated EAP test case.
    """
    eap = WifiEnums.Eap
    eap_phase2 = WifiEnums.EapPhase2
    Ent = WifiEnums.Enterprise
    name = "test_connect-"
    eap_name = ""
    for e in eap:
        if e.value == config[Ent.EAP]:
            eap_name = e.name
            break
    if "peap0" in config[WifiEnums.SSID_KEY].lower():
        eap_name = "PEAP0"
    if "peap1" in config[WifiEnums.SSID_KEY].lower():
        eap_name = "PEAP1"
    name += eap_name
    if Ent.PHASE2 in config:
        for e in eap_phase2:
            if e.value == config[Ent.PHASE2]:
                name += "-{}".format(e.name)
                break
    return name


def group_attenuators(attenuators):
    """Groups a list of attenuators into attenuator groups for backward
    compatibility reasons.

    Most legacy Wi-Fi setups have two attenuators each connected to a separate
    AP. The new Wi-Fi setup has four attenuators, each connected to one channel
    on an AP, so two of them are connected to one AP.

    To make the existing scripts work in the new setup, when the script needs
    to attenuate one AP, it needs to set attenuation on both attenuators
    connected to the same AP.

    This function groups attenuators properly so the scripts work in both
    legacy and new Wi-Fi setups.

    Args:
        attenuators: A list of attenuator objects, either two or four in length.

    Raises:
        signals.TestFailure is raised if the attenuator list does not have two
        or four objects.
    """
    attn0 = attenuator.AttenuatorGroup("AP0")
    attn1 = attenuator.AttenuatorGroup("AP1")
    # Legacy testbed setup has two attenuation channels.
    num_of_attns = len(attenuators)
    if num_of_attns == 2:
        attn0.add(attenuators[0])
        attn1.add(attenuators[1])
    elif num_of_attns == 4:
        attn0.add(attenuators[0])
        attn0.add(attenuators[1])
        attn1.add(attenuators[2])
        attn1.add(attenuators[3])
    else:
        asserts.fail(("Either two or four attenuators are required for this "
                      "test, but found %s") % num_of_attns)
    return [attn0, attn1]


def set_attns(attenuator, attn_val_name, roaming_attn=ROAMING_ATTN):
    """Sets attenuation values on attenuators used in this test.

    Args:
        attenuator: The attenuator object.
        attn_val_name: Name of the attenuation value pair to use.
        roaming_attn: Dictionary specifying the attenuation params.
    """
    logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
    try:
        attenuator[0].set_atten(roaming_attn[attn_val_name][0])
        attenuator[1].set_atten(roaming_attn[attn_val_name][1])
        attenuator[2].set_atten(roaming_attn[attn_val_name][2])
        attenuator[3].set_atten(roaming_attn[attn_val_name][3])
    except:
        logging.exception("Failed to set attenuation values %s.",
                          attn_val_name)
        raise


def set_attns_steps(attenuators,
                    atten_val_name,
                    roaming_attn=ROAMING_ATTN,
                    steps=10,
                    wait_time=12):
    """Set attenuation values on attenuators used in this test. It will change
    the attenuation values linearly from current value to target value step by
    step.

    Args:
        attenuators: The list of attenuator objects that you want to change
                     their attenuation value.
        atten_val_name: Name of the attenuation value pair to use.
        roaming_attn: Dictionary specifying the attenuation params.
        steps: Number of attenuator changes to reach the target value.
        wait_time: Sleep time for each change of attenuator.
    """
    logging.info("Set attenuation values to %s in %d step(s)",
                 roaming_attn[atten_val_name], steps)
    start_atten = [attenuator.get_atten() for attenuator in attenuators]
    target_atten = roaming_attn[atten_val_name]
    for current_step in range(steps):
        progress = (current_step + 1) / steps
        for i, attenuator in enumerate(attenuators):
            amount_since_start = (target_atten[i] - start_atten[i]) * progress
            attenuator.set_atten(round(start_atten[i] + amount_since_start))
        time.sleep(wait_time)


def trigger_roaming_and_validate(dut,
                                 attenuator,
                                 attn_val_name,
                                 expected_con,
                                 roaming_attn=ROAMING_ATTN):
    """Sets attenuators to trigger roaming and validate the DUT connected
    to the BSSID expected.

    Args:
        attenuator: The attenuator object.
        attn_val_name: Name of the attenuation value pair to use.
        expected_con: The network information of the expected network.
        roaming_attn: Dictionary specifying the attenaution params.
    """
    expected_con = {
        WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
        WifiEnums.BSSID_KEY: expected_con["bssid"],
    }
    set_attns_steps(attenuator, attn_val_name, roaming_attn)

    verify_wifi_connection_info(dut, expected_con)
    expected_bssid = expected_con[WifiEnums.BSSID_KEY]
    logging.info("Roamed to %s successfully", expected_bssid)
    if not validate_connection(dut):
        raise signals.TestFailure("Fail to connect to internet on %s" %
                                  expected_bssid)


def create_softap_config():
    """Create a softap config with random ssid and password."""
    ap_ssid = "softap_" + utils.rand_ascii_str(8)
    ap_password = utils.rand_ascii_str(8)
    logging.info("softap setup: %s %s", ap_ssid, ap_password)
    config = {
        WifiEnums.SSID_KEY: ap_ssid,
        WifiEnums.PWD_KEY: ap_password,
    }
    return config


def start_softap_and_verify(ad, band):
    """Bring-up softap and verify AP mode and in scan results.

    Args:
        band: The band to use for softAP.

    Returns: dict, the softAP config.

    """
    # Register before start the test.
    callbackId = ad.dut.droid.registerSoftApCallback()
    # Check softap info value is default
    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
    asserts.assert_true(frequency == 0, "Softap frequency is not reset")
    asserts.assert_true(bandwdith == 0, "Softap bandwdith is not reset")

    config = create_softap_config()
    start_wifi_tethering(ad.dut,
                         config[WifiEnums.SSID_KEY],
                         config[WifiEnums.PWD_KEY],
                         band=band)
    asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
                        "SoftAp is not reported as running")
    start_wifi_connection_scan_and_ensure_network_found(
        ad.dut_client, config[WifiEnums.SSID_KEY])

    # Check softap info can get from callback succeed and assert value should be
    # valid.
    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
    asserts.assert_true(frequency > 0, "Softap frequency is not valid")
    asserts.assert_true(bandwdith > 0, "Softap bandwdith is not valid")
    # Unregister callback
    ad.dut.droid.unregisterSoftApCallback(callbackId)

    return config


def wait_for_expected_number_of_softap_clients(ad, callbackId,
                                               expected_num_of_softap_clients):
    """Wait for the number of softap clients to be updated as expected.
    Args:
        callbackId: Id of the callback associated with registering.
        expected_num_of_softap_clients: expected number of softap clients.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
    clientData = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data']
    clientCount = clientData[wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
    clientMacAddresses = clientData[
        wifi_constants.SOFTAP_CLIENTS_MACS_CALLBACK_KEY]
    asserts.assert_equal(
        clientCount, expected_num_of_softap_clients,
        "The number of softap clients doesn't match the expected number")
    asserts.assert_equal(
        len(clientMacAddresses), expected_num_of_softap_clients,
        "The number of mac addresses doesn't match the expected number")
    for macAddress in clientMacAddresses:
        asserts.assert_true(checkMacAddress(macAddress),
                            "An invalid mac address was returned")


def checkMacAddress(input):
    """Validate whether a string is a valid mac address or not.

    Args:
        input: The string to validate.

    Returns: True/False, returns true for a valid mac address and false otherwise.
    """
    macValidationRegex = "[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
    if re.match(macValidationRegex, input.lower()):
        return True
    return False


def wait_for_expected_softap_state(ad, callbackId, expected_softap_state):
    """Wait for the expected softap state change.
    Args:
        callbackId: Id of the callback associated with registering.
        expected_softap_state: The expected softap state.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_STATE_CHANGED
    asserts.assert_equal(
        ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'][
            wifi_constants.SOFTAP_STATE_CHANGE_CALLBACK_KEY],
        expected_softap_state,
        "Softap state doesn't match with expected state")


def get_current_number_of_softap_clients(ad, callbackId):
    """pop up all of softap client updated event from queue.
    Args:
        callbackId: Id of the callback associated with registering.

    Returns:
        If exist aleast callback, returns last updated number_of_softap_clients.
        Returns None when no any match callback event in queue.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
    events = ad.ed.pop_all(eventStr)
    for event in events:
        num_of_clients = event['data'][
            wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
    if len(events) == 0:
        return None
    return num_of_clients


def get_current_softap_info(ad, callbackId, need_to_wait):
    """pop up all of softap info changed event from queue.
    Args:
        callbackId: Id of the callback associated with registering.
        need_to_wait: Wait for the info callback event before pop all.
    Returns:
        Returns last updated information of softap.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_INFO_CHANGED
    ad.log.debug("softap info dump from eventStr %s", eventStr)
    frequency = 0
    bandwidth = 0
    if (need_to_wait):
        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
        frequency = event['data'][
            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
        bandwidth = event['data'][
            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
        ad.log.info("softap info updated, frequency is %s, bandwidth is %s",
                    frequency, bandwidth)

    events = ad.ed.pop_all(eventStr)
    for event in events:
        frequency = event['data'][
            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
        bandwidth = event['data'][
            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
    ad.log.info("softap info, frequency is %s, bandwidth is %s", frequency,
                bandwidth)
    return frequency, bandwidth

def get_current_softap_infos(ad, callbackId, need_to_wait):
    """pop up all of softap info list changed event from queue.
    Args:
        callbackId: Id of the callback associated with registering.
        need_to_wait: Wait for the info callback event before pop all.
    Returns:
        Returns last updated informations of softap.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_INFOLIST_CHANGED
    ad.log.debug("softap info dump from eventStr %s", eventStr)

    if (need_to_wait):
        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
        infos = event['data']

    events = ad.ed.pop_all(eventStr)
    for event in events:
        infos = event['data']

    for info in infos:
        frequency = info[
            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
        bandwidth = info[
            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
        wifistandard = info[
            wifi_constants.SOFTAP_INFO_WIFISTANDARD_CALLBACK_KEY]
        bssid = info[
            wifi_constants.SOFTAP_INFO_BSSID_CALLBACK_KEY]
        ad.log.info(
                "softap info, freq:%s, bw:%s, wifistandard:%s, bssid:%s",
                frequency, bandwidth, wifistandard, bssid)

    return infos

def get_current_softap_capability(ad, callbackId, need_to_wait):
    """pop up all of softap info list changed event from queue.
    Args:
        callbackId: Id of the callback associated with registering.
        need_to_wait: Wait for the info callback event before pop all.
    Returns:
        Returns last updated capability of softap.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
            callbackId) + wifi_constants.SOFTAP_CAPABILITY_CHANGED
    ad.log.debug("softap capability dump from eventStr %s", eventStr)
    if (need_to_wait):
        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
        capability = event['data']

    events = ad.ed.pop_all(eventStr)
    for event in events:
        capability = event['data']

    return capability

def get_ssrdumps(ad):
    """Pulls dumps in the ssrdump dir
    Args:
        ad: android device object.
    """
    logs = ad.get_file_names("/data/vendor/ssrdump/")
    if logs:
        ad.log.info("Pulling ssrdumps %s", logs)
        log_path = os.path.join(ad.device_log_path, "SSRDUMPS_%s" % ad.serial)
        os.makedirs(log_path, exist_ok=True)
        ad.pull_files(logs, log_path)
    ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete",
                 ignore_status=True)


def start_pcap(pcap, wifi_band, test_name):
    """Start packet capture in monitor mode.

    Args:
        pcap: packet capture object
        wifi_band: '2g' or '5g' or 'dual'
        test_name: test name to be used for pcap file name

    Returns:
        Dictionary with wifi band as key and the tuple
        (pcap Process object, log directory) as the value
    """
    log_dir = os.path.join(
        context.get_current_context().get_full_output_path(), 'PacketCapture')
    os.makedirs(log_dir, exist_ok=True)
    if wifi_band == 'dual':
        bands = [BAND_2G, BAND_5G]
    else:
        bands = [wifi_band]
    procs = {}
    for band in bands:
        proc = pcap.start_packet_capture(band, log_dir, test_name)
        procs[band] = (proc, os.path.join(log_dir, test_name))
    return procs


def stop_pcap(pcap, procs, test_status=None):
    """Stop packet capture in monitor mode.

    Since, the pcap logs in monitor mode can be very large, we will
    delete them if they are not required. 'test_status' if True, will delete
    the pcap files. If False, we will keep them.

    Args:
        pcap: packet capture object
        procs: dictionary returned by start_pcap
        test_status: status of the test case
    """
    for proc, fname in procs.values():
        pcap.stop_packet_capture(proc)

    if test_status:
        shutil.rmtree(os.path.dirname(fname))


def verify_mac_not_found_in_pcap(ad, mac, packets):
    """Verify that a mac address is not found in the captured packets.

    Args:
        ad: android device object
        mac: string representation of the mac address
        packets: packets obtained by rdpcap(pcap_fname)
    """
    for pkt in packets:
        logging.debug("Packet Summary = %s", pkt.summary())
        if mac in pkt.summary():
            asserts.fail("Device %s caught Factory MAC: %s in packet sniffer."
                         "Packet = %s" % (ad.serial, mac, pkt.show()))


def verify_mac_is_found_in_pcap(ad, mac, packets):
    """Verify that a mac address is found in the captured packets.

    Args:
        ad: android device object
        mac: string representation of the mac address
        packets: packets obtained by rdpcap(pcap_fname)
    """
    for pkt in packets:
        if mac in pkt.summary():
            return
    asserts.fail("Did not find MAC = %s in packet sniffer."
                 "for device %s" % (mac, ad.serial))

def start_all_wlan_logs(ads):
    for ad in ads:
        start_wlan_logs(ad)

def start_wlan_logs(ad):
    """Start Pixel Logger to record extra wifi logs

    Args:
        ad: android device object.
    """
    if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
        ad.log.info("Device doesn't have Pixel Logger")
        return

    ad.adb.shell(
            "find /sdcard/Android/data/com.android.pixellogger/files/logs"
            "/wlan_logs/ -type f -delete",
            ignore_status=True)
    if ad.file_exists("/vendor/bin/cnss_diag"):
        ad.adb.shell("am startservice -a com.android.pixellogger.service"
                ".logging.LoggingService.ACTION_START_LOGGING "
                "-e intent_logger cnss_diag", ignore_status=True)
    else:
        ad.adb.shell("am startservice -a com.android.pixellogger.service"
                ".logging.LoggingService.ACTION_START_LOGGING "
                "-e intent_logger wlan_logs", ignore_status=True)

def stop_all_wlan_logs(ads):
    for ad in ads:
        stop_wlan_logs(ad)
    ad.log.info("Wait 30s for the createion of zip file for wlan logs")
    time.sleep(30)

def stop_wlan_logs(ad):
    """Stops Pixel Logger

    Args:
        ad: android device object.
    """
    if not ad.adb.shell("pm list package | grep com.android.pixellogger"):
        return

    ad.adb.shell("am startservice -a com.android.pixellogger.service.logging"
            ".LoggingService.ACTION_STOP_LOGGING", ignore_status=True)

def get_wlan_logs(ad):
    """Pull logs from Pixel Logger folder
    Args:
        ad: android device object.
    """
    logs = ad.get_file_names("/sdcard/Android/data/com.android.pixellogger/files/logs/wlan_logs")
    if logs:
        ad.log.info("Pulling Pixel Logger logs %s", logs)
        log_path = os.path.join(ad.device_log_path, "WLAN_LOGS_%s" % ad.serial)
        os.makedirs(log_path, exist_ok=True)
        ad.pull_files(logs, log_path)

LinkProbeResult = namedtuple(
    'LinkProbeResult',
    ('is_success', 'stdout', 'elapsed_time', 'failure_reason'))


def send_link_probe(ad):
    """Sends a link probe to the currently connected AP, and returns whether the
    probe succeeded or not.

    Args:
         ad: android device object
    Returns:
        LinkProbeResult namedtuple
    """
    stdout = ad.adb.shell('cmd wifi send-link-probe')
    asserts.assert_false('Error' in stdout or 'Exception' in stdout,
                         'Exception while sending link probe: ' + stdout)

    is_success = False
    elapsed_time = None
    failure_reason = None
    if 'succeeded' in stdout:
        is_success = True
        elapsed_time = next(
            (int(token) for token in stdout.split() if token.isdigit()), None)
    elif 'failed with reason' in stdout:
        failure_reason = next(
            (int(token) for token in stdout.split() if token.isdigit()), None)
    else:
        asserts.fail('Unexpected link probe result: ' + stdout)

    return LinkProbeResult(is_success=is_success,
                           stdout=stdout,
                           elapsed_time=elapsed_time,
                           failure_reason=failure_reason)


def send_link_probes(ad, num_probes, delay_sec):
    """Sends a sequence of link probes to the currently connected AP, and
    returns whether the probes succeeded or not.

    Args:
         ad: android device object
         num_probes: number of probes to perform
         delay_sec: delay time between probes, in seconds
    Returns:
        List[LinkProbeResult] one LinkProbeResults for each probe
    """
    logging.info('Sending link probes')
    results = []
    for _ in range(num_probes):
        # send_link_probe() will also fail the test if it sees an exception
        # in the stdout of the adb shell command
        result = send_link_probe(ad)
        logging.info('link probe results: ' + str(result))
        results.append(result)
        time.sleep(delay_sec)

    return results


def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
    """Set up the AP with provided network info.

        Args:
            test: the calling test class object.
            index: int, index of the AP.
            ap: access_point object of the AP.
            network: dict with information of the network, including ssid,
                     password and bssid.
            bandwidth: the operation bandwidth for the AP, default 80MHz.
            channel: the channel number for the AP.
        Returns:
            brconfigs: the bridge interface configs
        """
    bss_settings = []
    ssid = network[WifiEnums.SSID_KEY]
    test.access_points[index].close()
    time.sleep(5)

    # Configure AP as required.
    if "password" in network.keys():
        password = network["password"]
        security = hostapd_security.Security(security_mode="wpa",
                                             password=password)
    else:
        security = hostapd_security.Security(security_mode=None, password=None)
    config = hostapd_ap_preset.create_ap_preset(channel=channel,
                                                ssid=ssid,
                                                security=security,
                                                bss_settings=bss_settings,
                                                vht_bandwidth=bandwidth,
                                                profile_name='whirlwind',
                                                iface_wlan_2g=ap.wlan_2g,
                                                iface_wlan_5g=ap.wlan_5g)
    ap.start_ap(config)
    logging.info("AP started on channel {} with SSID {}".format(channel, ssid))


def turn_ap_off(test, AP):
    """Bring down hostapd on the Access Point.
    Args:
        test: The test class object.
        AP: int, indicating which AP to turn OFF.
    """
    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
    if hostapd_2g.is_alive():
        hostapd_2g.stop()
        logging.debug('Turned WLAN0 AP%d off' % AP)
    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
    if hostapd_5g.is_alive():
        hostapd_5g.stop()
        logging.debug('Turned WLAN1 AP%d off' % AP)


def turn_ap_on(test, AP):
    """Bring up hostapd on the Access Point.
    Args:
        test: The test class object.
        AP: int, indicating which AP to turn ON.
    """
    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
    if not hostapd_2g.is_alive():
        hostapd_2g.start(hostapd_2g.config)
        logging.debug('Turned WLAN0 AP%d on' % AP)
    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
    if not hostapd_5g.is_alive():
        hostapd_5g.start(hostapd_5g.config)
        logging.debug('Turned WLAN1 AP%d on' % AP)


def turn_location_off_and_scan_toggle_off(ad):
    """Turns off wifi location scans."""
    utils.set_location_service(ad, False)
    ad.droid.wifiScannerToggleAlwaysAvailable(False)
    msg = "Failed to turn off location service's scan."
    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)


def set_softap_channel(dut, ap_iface='wlan1', cs_count=10, channel=2462):
    """ Set SoftAP mode channel

    Args:
        dut: android device object
        ap_iface: interface of SoftAP mode.
        cs_count: how many beacon frames before switch channel, default = 10
        channel: a wifi channel.
    """
    chan_switch_cmd = 'hostapd_cli -i {} chan_switch {} {}'
    chan_switch_cmd_show = chan_switch_cmd.format(ap_iface, cs_count, channel)
    dut.log.info('adb shell {}'.format(chan_switch_cmd_show))
    chan_switch_result = dut.adb.shell(
        chan_switch_cmd.format(ap_iface, cs_count, channel))
    if chan_switch_result == 'OK':
        dut.log.info('switch hotspot channel to {}'.format(channel))
        return chan_switch_result

    asserts.fail("Failed to switch hotspot channel")

def get_wlan0_link(dut):
    """ get wlan0 interface status"""
    get_wlan0 = 'wpa_cli -iwlan0 -g@android:wpa_wlan0 IFNAME=wlan0 status'
    out = dut.adb.shell(get_wlan0)
    out = dict(re.findall(r'(\S+)=(".*?"|\S+)', out))
    asserts.assert_true("ssid" in out,
                        "Client doesn't connect to any network")
    return out

def verify_11ax_wifi_connection(ad, wifi6_supported_models, wifi6_ap):
    """Verify 11ax for wifi connection.

    Args:
      ad: adndroid device object
      wifi6_supported_models: device supporting 11ax.
      wifi6_ap: if the AP supports 11ax.
    """
    if wifi6_ap and ad.model in wifi6_supported_models:
        logging.info("Verifying 11ax. Model: %s" % ad.model)
        asserts.assert_true(
            ad.droid.wifiGetConnectionStandard() ==
            wifi_constants.WIFI_STANDARD_11AX, "DUT did not connect to 11ax.")

def verify_11ax_softap(dut, dut_client, wifi6_supported_models):
    """Verify 11ax SoftAp if devices support it.

    Check if both DUT and DUT client supports 11ax, then SoftAp turns on
    with 11ax mode and DUT client can connect to it.

    Args:
      dut: Softap device.
      dut_client: Client connecting to softap.
      wifi6_supported_models: List of device models supporting 11ax.
    """
    if dut.model in wifi6_supported_models and dut_client.model in wifi6_supported_models:
        logging.info(
            "Verifying 11ax softap. DUT model: %s, DUT Client model: %s",
            dut.model, dut_client.model)
        asserts.assert_true(
            dut_client.droid.wifiGetConnectionStandard() ==
            wifi_constants.WIFI_STANDARD_11AX,
            "DUT failed to start SoftAp in 11ax.")

def check_available_channels_in_bands_2_5(dut, country_code):
    """Check if DUT is capable of enable BridgedAp.
    #TODO: Find a way to make this function flexible by taking an argument.

    Check points:
        1. Check the DUT support by calling Android API.
        2. Check the dual SAP bands support by changing DUT to the given country_code.

    Args:
        country_code: country code, e.g., 'US', 'JP'.
    Returns:
        True: If DUT is capable of enable BridgedAp.
        False: If DUT is not capable of enable BridgedAp.
    """
    # Check point #1
    is_bridged_ap_supported = dut.droid.wifiIsBridgedApConcurrencySupported()
    if not is_bridged_ap_supported:
        logging.error("DUT %s doesn't support bridged AP.", dut.model)
        return False

    # Check point #2
    set_wifi_country_code(dut, country_code)
    country = dut.droid.wifiGetCountryCode()
    dut.log.info("DUT current country code : {}".format(country))
    # Wi-Fi ON and OFF to make sure country code take effet.
    wifi_toggle_state(dut, True)
    wifi_toggle_state(dut, False)

    # Register SoftAp Callback and get SoftAp capability.
    callbackId = dut.droid.registerSoftApCallback()
    capability = get_current_softap_capability(dut, callbackId, True)
    dut.droid.unregisterSoftApCallback(callbackId)

    if capability[wifi_constants.
                  SOFTAP_CAPABILITY_24GHZ_SUPPORTED_CHANNEL_LIST] and \
        capability[wifi_constants.
                   SOFTAP_CAPABILITY_5GHZ_SUPPORTED_CHANNEL_LIST]:
        return True

    logging.error("DUT in %s doesn't support dual SAP bands (2G and 5G).", country_code)
    return False


@retry(tries=5, delay=2)
def validate_ping_between_two_clients(dut1, dut2):
    """Make 2 DUT ping each other.

    Args:
        dut1: An AndroidDevice object.
        dut2: An AndroidDevice object.
    """
    # Get DUTs' IPv4 addresses.
    dut1_ip = ""
    dut2_ip = ""
    try:
        dut1_ip = dut1.droid.connectivityGetIPv4Addresses('wlan0')[0]
    except IndexError as e:
        dut1.log.info(
            "{} has no Wi-Fi connection, cannot get IPv4 address."
            .format(dut1.serial))
    try:
        dut2_ip = dut2.droid.connectivityGetIPv4Addresses('wlan0')[0]
    except IndexError as e:
        dut2.log.info(
            "{} has no Wi-Fi connection, cannot get IPv4 address."
            .format(dut2.serial))
    # Test fail if not able to obtain two DUT's IPv4 addresses.
    asserts.assert_true(dut1_ip and dut2_ip,
                        "Ping failed because no DUT's IPv4 address")

    dut1.log.info("{} IPv4 addresses : {}".format(dut1.serial, dut1_ip))
    dut2.log.info("{} IPv4 addresses : {}".format(dut2.serial, dut2_ip))

    # Two clients ping each other
    dut1.log.info("{} ping {}".format(dut1_ip, dut2_ip))
    asserts.assert_true(
        utils.adb_shell_ping(dut1, count=10, dest_ip=dut2_ip,
                             timeout=20),
        "%s ping %s failed" % (dut1.serial, dut2_ip))

    dut2.log.info("{} ping {}".format(dut2_ip, dut1_ip))
    asserts.assert_true(
        utils.adb_shell_ping(dut2, count=10, dest_ip=dut1_ip,
                             timeout=20),
        "%s ping %s failed" % (dut2.serial, dut1_ip))

def get_wear_wifimediator_disable_status(ad):
    """Gets WearWifiMediator disable status.

    Args:
        ad: Android Device

    Returns:
        True if WearWifiMediator is disabled, False otherwise.
    """
    status = ad.adb.shell("settings get global cw_disable_wifimediator")
    if status == "1":
        ad.log.info("WearWifiMediator is DISABLED")
        status = True
    else:
        ad.log.info("WearWifiMediator is ENABLED")
        status = False
    return status

def disable_wear_wifimediator(ad, state):
    """Disables WearWifiMediator.

    Args:
        ad: Android Device
        state: True to disable, False otherwise.
    """
    if state:
        ad.log.info("Disabling WearWifiMediator.....")
        ad.adb.shell("settings put global cw_disable_wifimediator 1")
        asserts.assert_true(get_wear_wifimediator_disable_status(ad),
                            "WearWifiMediator should be disabled")
    else:
        ad.log.info("Enabling WearWifiMediator.....")
        ad.adb.shell("settings put global cw_disable_wifimediator 0")
        asserts.assert_false(get_wear_wifimediator_disable_status(ad),
                             "WearWifiMediator should be enabled")

def list_scan_results(ad, wait_time=15):
    """
    Initiates an Android Wi-Fi scan and retrieves the available Wi-Fi networks'.

    Args:
        ad (AndroidDevice): The Android device on which the scan is performed.
        wait_time (int, optional):
          The time in seconds to wait for the scan to complete before fetching results.
          Default is 10 seconds.
    """
    ad.log.info("Start scan for available Wi-Fi networks...")
    ad.adb.shell("cmd wifi start-scan")
    ad.log.info("Wait %ss for scan to complete.", wait_time)
    time.sleep(wait_time)
    scan_results = ad.adb.shell("cmd wifi list-scan-results")
    ad.log.info("Available Wi-Fi networks: " + "\n" + scan_results + "\n")

def kill_iperf3_server_by_port(port: str):
    """
        Kill an iperf3 server process running on the specified port.

        Args:
            port: The port number on which the iperf3 server is running.
    """
    try:
        ps_output = subprocess.check_output(["ps", "aux"], universal_newlines=True)
        lines = ps_output.split('\n')
        for line in lines:
            if "iperf3" in line and str(port) in line:
                columns = line.split()
                pid = columns[1]
                subprocess.run(["kill", "-15", pid])
                logging.warning(f"iperf3 server on port {port} already in use,"
                              f"kill it with PID {pid}")
    except subprocess.CalledProcessError:
        logging.info("Error executing shell command with subprocess.")

def get_host_public_ipv4_address() -> Optional[str]:
  """Retrieves the host's public IPv4 address using the ifconfig command.

  This function tries to extract the host's public IPv4 address by parsing
  the output of the ifconfig command. It will filter out private IP addresses
  (e.g., 10.0.0.0/8, 172.16.0.0/12, and 192.168.0.0/16).

  Returns:
    str: The public IPv4 address, if found.
    None: If no public IPv4 address is found or in case of errors.

  Raises:
    May print errors related to executing ifconfig or parsing the IPs, but
    exceptions are handled and won't be raised beyond the function.
  """
  try:
    # Run ifconfig command and get its output
    output = subprocess.check_output(["ifconfig"], universal_newlines=True)
  except subprocess.CalledProcessError:
    logging.info("Error executing ifconfig command.")
    return None
  except FileNotFoundError:
    logging.info("ifconfig command not found.")
    return None
  except Exception as e:
    logging.info("An unexpected error occurred: %s", e)
    return None

  # Regular expression to capture IPv4 addresses that come after the word 'inet'
  pattern = r'inet (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'

  # Extract all matches
  matches = re.findall(pattern, output)

  # Return the first public IP address found
  for ip_str in matches:
    try:
      ip = ipaddress.ip_address(ip_str)
      if not ip.is_private:
        return ip_str
    except ValueError:
      logging.info("Invalid IP address format: %s", ip_str)
      continue

  # Return None if no public IP is found
  return None

def get_iperf_server_port():
  """Gets a unique port number within the Dynamic port range (49152-65535).

  This function first determines which ports are currently in use, and then
  selects a random port from the dynamic range that is not in use.

  Returns:
    int: An unused port number.

  Raises:
    Exception: If no available port is found in the Dynamic Ports range.
  """

  def get_used_ports():
    """Retrieve a list of ports that are currently in use on the system.

    This function uses the 'netstat' command to determine which ports are
    currently in use, and then parses the output to extract the port numbers.

    Returns:
        list[int]: A list of ports currently in use.
    """
    try:
      # Get the output from the `netstat` command.
      output = subprocess.check_output(['netstat', '-tuln']).decode('utf-8')

      # Use a regex to extract port numbers from the output.
      port_pattern = re.compile(r'(?<=:)\d+')
      ports = port_pattern.findall(output)

      # Convert the list of ports to integers and return.
      return list(map(int, ports))
    except Exception as e:
      logging.error(f"Error: {e}")
      return []

  # Define the range for Dynamic Ports.
  dynamic_ports_range = list(range(49152, 65536))
  used_ports = get_used_ports()

  # Randomly shuffle the ports and then find one that's not in use.
  random.shuffle(dynamic_ports_range)

  for port in dynamic_ports_range:
    if port not in used_ports:
      return port

  raise RuntimeError("No available port found in the Dynamic Ports range!")