diff options
author | Nelson Li <nelsonli@google.com> | 2020-06-11 15:06:38 +0800 |
---|---|---|
committer | Maciej Żenczykowski <maze@google.com> | 2020-07-10 23:56:38 -0700 |
commit | b7769d6474435c70b2be16aeb0ae4e268c840968 (patch) | |
tree | cde2e871628cbe51e466107719cd37e4886dca6a | |
parent | acb2b4b0ebb7a82310208ebc93defa18a27770c3 (diff) | |
download | tests-android-r-beta-3.tar.gz |
net-test: be more compatible with python3android-r-beta-3android-r-beta-2
Conversions are primarily:
print ... -> print(...)
xrange(...) -> range(...)
assertEquals(...) -> assertEqual(...)
except t, e -> except t as e
"\x00" -> b"\x00"
iteritems() -> items()
Plus:
sprinkle in some list()'s around a few things
and a few other minor things
Test: tested on ACK5.4 with 32 and 64-bit x86 uml
which proves this continues to work with the uml image's embedded python2
Bug: 158494111
Signed-off-by: Maciej Żenczykowski <maze@google.com>
Change-Id: I89dffc575293c1768eacdc6f1bce2bee4095c590
30 files changed, 348 insertions, 345 deletions
diff --git a/net/test/bpf_test.py b/net/test/bpf_test.py index ea3e56b..693e1d4 100755 --- a/net/test/bpf_test.py +++ b/net/test/bpf_test.py @@ -49,10 +49,10 @@ def PrintMapInfo(map_fd): try: nextKey = GetNextKey(map_fd, key).value value = LookupMap(map_fd, nextKey) - print repr(nextKey) + " : " + repr(value.value) + print(repr(nextKey) + " : " + repr(value.value)) key = nextKey except: - print "no value" + print("no value") break @@ -67,7 +67,7 @@ def SocketUDPLoopBack(packet_count, version, prog_fd): sock.bind((addr, 0)) addr = sock.getsockname() sockaddr = csocket.Sockaddr(addr) - for i in xrange(packet_count): + for i in range(packet_count): sock.sendto("foo", addr) data, retaddr = csocket.Recvfrom(sock, 4096, 0) assert "foo" == data @@ -170,7 +170,7 @@ class BpfTest(net_test.NetworkTest): self.map_fd = CreateMap(BPF_MAP_TYPE_HASH, KEY_SIZE, VALUE_SIZE, TOTAL_ENTRIES) UpdateMap(self.map_fd, key, value) - self.assertEquals(value, LookupMap(self.map_fd, key).value) + self.assertEqual(value, LookupMap(self.map_fd, key).value) DeleteMap(self.map_fd, key) self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, key) @@ -185,17 +185,17 @@ class BpfTest(net_test.NetworkTest): result = GetNextKey(self.map_fd, key) key = result.value self.assertGreaterEqual(key, 0) - self.assertEquals(value, LookupMap(self.map_fd, key).value) + self.assertEqual(value, LookupMap(self.map_fd, key).value) count += 1 def testIterateMap(self): self.map_fd = CreateMap(BPF_MAP_TYPE_HASH, KEY_SIZE, VALUE_SIZE, TOTAL_ENTRIES) value = 1024 - for key in xrange(0, TOTAL_ENTRIES): + for key in range(0, TOTAL_ENTRIES): UpdateMap(self.map_fd, key, value) - for key in xrange(0, TOTAL_ENTRIES): - self.assertEquals(value, LookupMap(self.map_fd, key).value) + for key in range(0, TOTAL_ENTRIES): + self.assertEqual(value, LookupMap(self.map_fd, key).value) self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, 101) nonexistent_key = -1 self.CheckAllMapEntry(nonexistent_key, TOTAL_ENTRIES, value) @@ -204,7 +204,7 @@ class BpfTest(net_test.NetworkTest): self.map_fd = CreateMap(BPF_MAP_TYPE_HASH, KEY_SIZE, VALUE_SIZE, TOTAL_ENTRIES) value = 1024 - for key in xrange(0, TOTAL_ENTRIES): + for key in range(0, TOTAL_ENTRIES): UpdateMap(self.map_fd, key, value) firstKey = GetFirstKey(self.map_fd) key = firstKey.value @@ -261,7 +261,7 @@ class BpfTest(net_test.NetworkTest): packet_count = 10 SocketUDPLoopBack(packet_count, 4, self.prog_fd) SocketUDPLoopBack(packet_count, 6, self.prog_fd) - self.assertEquals(packet_count * 2, LookupMap(self.map_fd, key).value) + self.assertEqual(packet_count * 2, LookupMap(self.map_fd, key).value) @unittest.skipUnless(HAVE_EBPF_ACCOUNTING, "BPF helper function is not fully supported") @@ -282,7 +282,7 @@ class BpfTest(net_test.NetworkTest): def PacketCountByCookie(version): self.sock = SocketUDPLoopBack(packet_count, version, self.prog_fd) cookie = sock_diag.SockDiag.GetSocketCookie(self.sock) - self.assertEquals(packet_count, LookupMap(self.map_fd, cookie).value) + self.assertEqual(packet_count, LookupMap(self.map_fd, cookie).value) self.sock.close() PacketCountByCookie(4) PacketCountByCookie(6) @@ -307,10 +307,10 @@ class BpfTest(net_test.NetworkTest): with net_test.RunAsUid(uid): self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, uid) SocketUDPLoopBack(packet_count, 4, self.prog_fd) - self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value) + self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value) DeleteMap(self.map_fd, uid); SocketUDPLoopBack(packet_count, 6, self.prog_fd) - self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value) + self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value) @unittest.skipUnless(HAVE_EBPF_ACCOUNTING, "Cgroup BPF is not fully supported") @@ -397,17 +397,17 @@ class BpfCgroupTest(net_test.NetworkTest): with net_test.RunAsUid(uid): self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, uid) SocketUDPLoopBack(packet_count, 4, None) - self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value) + self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value) DeleteMap(self.map_fd, uid) SocketUDPLoopBack(packet_count, 6, None) - self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value) + self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value) BpfProgDetach(self._cg_fd, BPF_CGROUP_INET_INGRESS) def checkSocketCreate(self, family, socktype, success): try: sock = socket.socket(family, socktype, 0) sock.close() - except socket.error, e: + except socket.error as e: if success: self.fail("Failed to create socket family=%d type=%d err=%s" % (family, socktype, os.strerror(e.errno))) diff --git a/net/test/cstruct.py b/net/test/cstruct.py index 5e05263..c675c9e 100644 --- a/net/test/cstruct.py +++ b/net/test/cstruct.py @@ -28,24 +28,24 @@ Example usage: >>> # Create instances from a tuple of values, raw bytes, zero-initialized, or >>> # using keywords. ... n1 = NLMsgHdr((44, 32, 0x2, 0, 491)) ->>> print n1 +>>> print(n1) NLMsgHdr(length=44, type=32, flags=2, seq=0, pid=491) >>> >>> n2 = NLMsgHdr("\x2c\x00\x00\x00\x21\x00\x02\x00" ... "\x00\x00\x00\x00\xfe\x01\x00\x00" + "junk at end") ->>> print n2 +>>> print(n2) NLMsgHdr(length=44, type=33, flags=2, seq=0, pid=510) >>> >>> n3 = netlink.NLMsgHdr() # Zero-initialized ->>> print n3 +>>> print(n3) NLMsgHdr(length=0, type=0, flags=0, seq=0, pid=0) >>> >>> n4 = netlink.NLMsgHdr(length=44, type=33) # Other fields zero-initialized ->>> print n4 +>>> print(n4) NLMsgHdr(length=44, type=33, flags=0, seq=0, pid=0) >>> >>> # Serialize to raw bytes. -... print n1.Pack().encode("hex") +... print(n1.Pack().encode("hex")) 2c0000002000020000000000eb010000 >>> >>> # Parse the beginning of a byte stream as a struct, and return the struct @@ -70,11 +70,14 @@ NLMsgHdr(length=44, type=33, flags=0, seq=0, pid=0) import ctypes import string import struct +import re def CalcSize(fmt): if "A" in fmt: fmt = fmt.replace("A", "s") + # Remove the last digital since it will cause error in python3. + fmt = (re.split('\d+$', fmt)[0]) return struct.calcsize(fmt) def CalcNumElements(fmt): @@ -82,7 +85,7 @@ def CalcNumElements(fmt): fmt = fmt.replace("S", "") numstructs = prevlen - len(fmt) size = CalcSize(fmt) - elements = struct.unpack(fmt, "\x00" * size) + elements = struct.unpack(fmt, b"\x00" * size) return len(elements) + numstructs @@ -118,7 +121,7 @@ def Struct(name, fmt, fieldnames, substructs={}): # where XX is the length of the struct type's packed representation. _format = "" laststructindex = 0 - for i in xrange(len(fmt)): + for i in range(len(fmt)): if fmt[i] == "S": # Nested struct. Record the index in our struct it should go into. index = CalcNumElements(fmt[:i]) @@ -138,17 +141,17 @@ def Struct(name, fmt, fieldnames, substructs={}): offset_list = [0] last_offset = 0 - for i in xrange(len(_format)): + for i in range(len(_format)): offset = CalcSize(_format[:i]) if offset > last_offset: last_offset = offset offset_list.append(offset) # A dictionary that maps field names to their offsets in the struct. - _offsets = dict(zip(_fieldnames, offset_list)) + _offsets = dict(list(zip(_fieldnames, offset_list))) # Check that the number of field names matches the number of fields. - numfields = len(struct.unpack(_format, "\x00" * _length)) + numfields = len(struct.unpack(_format, b"\x00" * _length)) if len(_fieldnames) != numfields: raise ValueError("Invalid cstruct: \"%s\" has %d elements, \"%s\" has %d." % (fmt, numfields, fieldnames, len(_fieldnames))) @@ -182,7 +185,7 @@ def Struct(name, fmt, fieldnames, substructs={}): # Default construct from null bytes. self._Parse("\x00" * len(self)) # If any keywords were supplied, set those fields. - for k, v in kwargs.iteritems(): + for k, v in kwargs.items(): setattr(self, k, v) elif isinstance(tuple_or_bytes, str): # Initializing from a string. diff --git a/net/test/cstruct_test.py b/net/test/cstruct_test.py index 6b27973..af1fc42 100755 --- a/net/test/cstruct_test.py +++ b/net/test/cstruct_test.py @@ -27,16 +27,16 @@ TestStructB = cstruct.Struct("TestStructB", "=BI", "byte1 int2") class CstructTest(unittest.TestCase): def CheckEquals(self, a, b): - self.assertEquals(a, b) - self.assertEquals(b, a) + self.assertEqual(a, b) + self.assertEqual(b, a) assert a == b assert b == a assert not (a != b) # pylint: disable=g-comparison-negation,superfluous-parens assert not (b != a) # pylint: disable=g-comparison-negation,superfluous-parens def CheckNotEquals(self, a, b): - self.assertNotEquals(a, b) - self.assertNotEquals(b, a) + self.assertNotEqual(a, b) + self.assertNotEqual(b, a) assert a != b assert b != a assert not (a == b) # pylint: disable=g-comparison-negation,superfluous-parens @@ -69,9 +69,9 @@ class CstructTest(unittest.TestCase): expectedlen = (len(TestStructA) + 2 + len(TestStructA) + len(TestStructB) + 4 + 1) - self.assertEquals(expectedlen, len(DoubleNested)) + self.assertEqual(expectedlen, len(DoubleNested)) - self.assertEquals(7, d.nest2.nest3.byte1) + self.assertEqual(7, d.nest2.nest3.byte1) d.byte3 = 252 d.nest2.word1 = 33214 @@ -80,17 +80,17 @@ class CstructTest(unittest.TestCase): t = n.nest3 t.int2 = 33627591 - self.assertEquals(33627591, d.nest2.nest3.int2) + self.assertEqual(33627591, d.nest2.nest3.int2) expected = ( "DoubleNested(nest1=TestStructA(byte1=1, int2=2)," " nest2=Nested(word1=33214, nest2=TestStructA(byte1=3, int2=4)," " nest3=TestStructB(byte1=7, int2=33627591), int4=-55), byte3=252)") - self.assertEquals(expected, str(d)) + self.assertEqual(expected, str(d)) expected = ("01" "02000000" "81be" "03" "04000000" "07" "c71d0102" "ffffffc9" "fc").decode("hex") - self.assertEquals(expected, d.Pack()) + self.assertEqual(expected, d.Pack()) unpacked = DoubleNested(expected) self.CheckEquals(unpacked, d) @@ -102,24 +102,24 @@ class CstructTest(unittest.TestCase): t = TestStruct((2, nullstr, 12345, nullstr, 33210)) expected = ("TestStruct(byte1=2, string2=68656c6c6f0000000000000000000000," " int3=12345, ascii4=hello, word5=33210)") - self.assertEquals(expected, str(t)) + self.assertEqual(expected, str(t)) embeddednull = "hello\x00visible123" t = TestStruct((2, embeddednull, 12345, embeddednull, 33210)) expected = ("TestStruct(byte1=2, string2=68656c6c6f0076697369626c65313233," " int3=12345, ascii4=hello\x00visible123, word5=33210)") - self.assertEquals(expected, str(t)) + self.assertEqual(expected, str(t)) def testZeroInitialization(self): TestStruct = cstruct.Struct("TestStruct", "B16si16AH", "byte1 string2 int3 ascii4 word5") t = TestStruct() - self.assertEquals(0, t.byte1) - self.assertEquals("\x00" * 16, t.string2) - self.assertEquals(0, t.int3) - self.assertEquals("\x00" * 16, t.ascii4) - self.assertEquals(0, t.word5) - self.assertEquals("\x00" * len(TestStruct), t.Pack()) + self.assertEqual(0, t.byte1) + self.assertEqual("\x00" * 16, t.string2) + self.assertEqual(0, t.int3) + self.assertEqual("\x00" * 16, t.ascii4) + self.assertEqual(0, t.word5) + self.assertEqual("\x00" * len(TestStruct), t.Pack()) def testKeywordInitialization(self): TestStruct = cstruct.Struct("TestStruct", "=B16sIH", @@ -130,26 +130,26 @@ class CstructTest(unittest.TestCase): # Populate all fields t1 = TestStruct(byte1=1, string2=text, int3=0xFEDCBA98, word4=0x1234) expected = ("01" + text_bytes + "98BADCFE" "3412").decode("hex") - self.assertEquals(expected, t1.Pack()) + self.assertEqual(expected, t1.Pack()) # Partially populated t1 = TestStruct(string2=text, word4=0x1234) expected = ("00" + text_bytes + "00000000" "3412").decode("hex") - self.assertEquals(expected, t1.Pack()) + self.assertEqual(expected, t1.Pack()) def testCstructOffset(self): TestStruct = cstruct.Struct("TestStruct", "B16si16AH", "byte1 string2 int3 ascii4 word5") nullstr = "hello" + (16 - len("hello")) * "\x00" t = TestStruct((2, nullstr, 12345, nullstr, 33210)) - self.assertEquals(0, t.offset("byte1")) - self.assertEquals(1, t.offset("string2")) # sizeof(byte) - self.assertEquals(17, t.offset("int3")) # sizeof(byte) + 16*sizeof(char) + self.assertEqual(0, t.offset("byte1")) + self.assertEqual(1, t.offset("string2")) # sizeof(byte) + self.assertEqual(17, t.offset("int3")) # sizeof(byte) + 16*sizeof(char) # The integer is automatically padded by the struct module # to match native alignment. # offset = sizeof(byte) + 16*sizeof(char) + padding + sizeof(int) - self.assertEquals(24, t.offset("ascii4")) - self.assertEquals(40, t.offset("word5")) + self.assertEqual(24, t.offset("ascii4")) + self.assertEqual(40, t.offset("word5")) self.assertRaises(KeyError, t.offset, "random") # TODO: Add support for nested struct offset diff --git a/net/test/forwarding_test.py b/net/test/forwarding_test.py index 34394cd..b35e19f 100755 --- a/net/test/forwarding_test.py +++ b/net/test/forwarding_test.py @@ -126,8 +126,8 @@ class ForwardingTest(multinetwork_base.MultiNetworkBaseTest): mydst = "%s:%04X" % (net_test.FormatSockStatAddress(remoteaddr), remoteport) state = None sockets = [s for s in sockets if s[0] == mysrc and s[1] == mydst] - self.assertEquals(1, len(sockets)) - self.assertEquals("%02X" % self.TCP_TIME_WAIT, sockets[0][2]) + self.assertEqual(1, len(sockets)) + self.assertEqual("%02X" % self.TCP_TIME_WAIT, sockets[0][2]) # Remove our IP address. try: diff --git a/net/test/genetlink.py b/net/test/genetlink.py index dda3964..6928f07 100755 --- a/net/test/genetlink.py +++ b/net/test/genetlink.py @@ -120,4 +120,4 @@ class GenericNetlinkControl(GenericNetlink): if __name__ == "__main__": g = GenericNetlinkControl() - print g.GetFamily("tcp_metrics") + print(g.GetFamily("tcp_metrics")) diff --git a/net/test/iproute.py b/net/test/iproute.py index 470cbf1..9036246 100644 --- a/net/test/iproute.py +++ b/net/test/iproute.py @@ -408,7 +408,7 @@ class IPRoute(netlink.NetlinkSocket): while True: try: self._SendNlRequest(RTM_DELRULE, rtmsg) - except IOError, e: + except IOError as e: if e.errno == errno.ENOENT: break else: @@ -459,7 +459,7 @@ class IPRoute(netlink.NetlinkSocket): subject = CommandSubject(command) if "ALL" not in self.NL_DEBUG and subject not in self.NL_DEBUG: return - print self.CommandToString(command, data) + print(self.CommandToString(command, data)) def MaybeDebugMessage(self, message): hdr = netlink.NLMsgHdr(message) @@ -467,7 +467,7 @@ class IPRoute(netlink.NetlinkSocket): def PrintMessage(self, message): hdr = netlink.NLMsgHdr(message) - print self.CommandToString(hdr.type, message) + print(self.CommandToString(hdr.type, message)) def DumpRules(self, version): """Returns the IP rules for the specified IP version.""" @@ -774,4 +774,4 @@ if __name__ == "__main__": iproute.DEBUG = True iproute.DumpRules(6) iproute.DumpLinks() - print iproute.GetRoutes("2001:4860:4860::8888", 0, 0, None) + print(iproute.GetRoutes("2001:4860:4860::8888", 0, 0, None)) diff --git a/net/test/leak_test.py b/net/test/leak_test.py index 8a42611..a245817 100755 --- a/net/test/leak_test.py +++ b/net/test/leak_test.py @@ -63,7 +63,7 @@ class ForceSocketBufferOptionTest(net_test.NetworkTest): val = 4097 self.assertGreater(2 * val, minbuf) s.setsockopt(SOL_SOCKET, force_option, val) - self.assertEquals(2 * val, s.getsockopt(SOL_SOCKET, option)) + self.assertEqual(2 * val, s.getsockopt(SOL_SOCKET, option)) # Check that the force option sets at least the minimum value instead # of a negative value on integer overflow. Because the kernel multiplies diff --git a/net/test/multinetwork_base.py b/net/test/multinetwork_base.py index 8dbd360..6b79d4f 100644 --- a/net/test/multinetwork_base.py +++ b/net/test/multinetwork_base.py @@ -366,7 +366,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest): @classmethod def _RestoreSysctls(cls): - for sysctl, value in cls.saved_sysctls.iteritems(): + for sysctl, value in cls.saved_sysctls.items(): try: open(sysctl, "w").write(value) except IOError: @@ -558,7 +558,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest): # MAC address has 1 in the least-significant bit. if include_multicast or not int(ether.dst.split(":")[0], 16) & 0x1: packets.append(ether.payload) - except OSError, e: + except OSError as e: # EAGAIN means there are no more packets waiting. if re.match(e.message, os.strerror(errno.EAGAIN)): # If we didn't see any packets, try again for good luck. @@ -669,7 +669,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest): # repr() can be expensive. Call it only if the test is going to fail and we # want to see the error. if expected_real != actual_real: - self.assertEquals(repr(expected_real), repr(actual_real)) + self.assertEqual(repr(expected_real), repr(actual_real)) def PacketMatches(self, expected, actual): try: @@ -710,7 +710,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest): # expected, but this is good enough for now. try: self.assertPacketMatches(expected, packets[-1]) - except Exception, e: + except Exception as e: raise UnexpectedPacketError( "%s: diff with last packet:\n%s" % (msg, e.message)) diff --git a/net/test/multinetwork_test.py b/net/test/multinetwork_test.py index a0b464a..092736b 100755 --- a/net/test/multinetwork_test.py +++ b/net/test/multinetwork_test.py @@ -134,7 +134,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest): self.ExpectPacketOn(netid, msg, expected) def CheckOutgoingPackets(self, routing_mode): - for _ in xrange(self.ITERATIONS): + for _ in range(self.ITERATIONS): for netid in self.tuns: self.CheckPingPacket(4, netid, routing_mode, self.IPV4_PING) @@ -194,7 +194,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest): # If we're testing connected sockets, connect the socket on the first # netid now. if use_connect: - netid = self.tuns.keys()[0] + netid = list(self.tuns.keys())[0] self.SelectInterface(s, netid, mode) s.connect((dstaddr, 53)) expected.src = self.MyAddress(version, netid) @@ -270,7 +270,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest): self.CheckRemarking(6, True) def testIPv6StickyPktinfo(self): - for _ in xrange(self.ITERATIONS): + for _ in range(self.ITERATIONS): for netid in self.tuns: s = net_test.UDPSocket(AF_INET6) @@ -312,7 +312,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest): self.ExpectPacketOn(netid, msg, expected) def CheckPktinfoRouting(self, version): - for _ in xrange(self.ITERATIONS): + for _ in range(self.ITERATIONS): for netid in self.tuns: family = self.GetProtocolFamily(version) s = net_test.UDPSocket(family) @@ -478,11 +478,11 @@ class TCPAcceptTest(multinetwork_base.InboundMarkingTest): self.InvalidateDstCache(version, netid) if mode == self.MODE_INCOMING_MARK: - self.assertEquals(netid, mark & self.NETID_FWMASK, + self.assertEqual(netid, mark & self.NETID_FWMASK, msg + ": Accepted socket: Expected mark %d, got %d" % ( netid, mark)) elif mode != self.MODE_EXPLICIT_MARK: - self.assertEquals(0, self.GetSocketMark(listensocket)) + self.assertEqual(0, self.GetSocketMark(listensocket)) # Check the FIN was sent on the right interface, and ack it. We don't expect # this to fail because by the time the connection is established things are @@ -670,14 +670,14 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest): table, prefix, plen, router, ifindex, None, None) def testSetAcceptRaRtInfoMinPlen(self): - for plen in xrange(-1, 130): + for plen in range(-1, 130): self.SetAcceptRaRtInfoMinPlen(plen) - self.assertEquals(plen, self.GetAcceptRaRtInfoMinPlen()) + self.assertEqual(plen, self.GetAcceptRaRtInfoMinPlen()) def testSetAcceptRaRtInfoMaxPlen(self): - for plen in xrange(-1, 130): + for plen in range(-1, 130): self.SetAcceptRaRtInfoMaxPlen(plen) - self.assertEquals(plen, self.GetAcceptRaRtInfoMaxPlen()) + self.assertEqual(plen, self.GetAcceptRaRtInfoMaxPlen()) def testZeroRtLifetime(self): PREFIX = "2001:db8:8901:2300::" @@ -700,7 +700,7 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest): RTLIFETIME = 70372 PRF = 0 # sweep from high to low to avoid spurious failures from late arrivals. - for plen in xrange(130, 1, -1): + for plen in range(130, 1, -1): self.SetAcceptRaRtInfoMinPlen(plen) # RIO with plen < min_plen should be ignored self.SendRIO(RTLIFETIME, plen - 1, PREFIX, PRF) @@ -715,7 +715,7 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest): RTLIFETIME = 73078 PRF = 0 # sweep from low to high to avoid spurious failures from late arrivals. - for plen in xrange(-1, 128, 1): + for plen in range(-1, 128, 1): self.SetAcceptRaRtInfoMaxPlen(plen) # RIO with plen > max_plen should be ignored self.SendRIO(RTLIFETIME, plen + 1, PREFIX, PRF) @@ -782,16 +782,16 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest): baseline = self.CountRoutes() self.SetAcceptRaRtInfoMaxPlen(56) # Send many RIOs compared to the expected number on a healthy system. - for i in xrange(0, COUNT): + for i in range(0, COUNT): prefix = "2001:db8:%x:1100::" % i self.SendRIO(RTLIFETIME, PLEN, prefix, PRF) time.sleep(0.1) - self.assertEquals(COUNT + baseline, self.CountRoutes()) - for i in xrange(0, COUNT): + self.assertEqual(COUNT + baseline, self.CountRoutes()) + for i in range(0, COUNT): prefix = "2001:db8:%x:1100::" % i self.DelRA6(prefix, PLEN) # Expect that we can return to baseline config without lingering routes. - self.assertEquals(baseline, self.CountRoutes()) + self.assertEqual(baseline, self.CountRoutes()) class RATest(multinetwork_base.MultiNetworkBaseTest): @@ -874,7 +874,7 @@ class RATest(multinetwork_base.MultiNetworkBaseTest): return len(open("/proc/net/ipv6_route").readlines()) num_routes = GetNumRoutes() - for i in xrange(10, 20): + for i in range(10, 20): try: self.tuns[i] = self.CreateTunInterface(i) self.SendRA(i) @@ -908,23 +908,23 @@ class RATest(multinetwork_base.MultiNetworkBaseTest): csocket.SetSocketTimeout(s.sock, 100) try: data = s._Recv() - except IOError, e: + except IOError as e: self.fail("Should have received an RTM_NEWNDUSEROPT message. " "Please ensure the kernel supports receiving the " "PREF64 RA option. Error: %s" % e) # Check that the message is received correctly. nlmsghdr, data = cstruct.Read(data, netlink.NLMsgHdr) - self.assertEquals(iproute.RTM_NEWNDUSEROPT, nlmsghdr.type) + self.assertEqual(iproute.RTM_NEWNDUSEROPT, nlmsghdr.type) # Check the option contents. ndopthdr, data = cstruct.Read(data, iproute.NdUseroptMsg) - self.assertEquals(AF_INET6, ndopthdr.family) - self.assertEquals(self.ND_ROUTER_ADVERT, ndopthdr.icmp_type) - self.assertEquals(len(opt), ndopthdr.opts_len) + self.assertEqual(AF_INET6, ndopthdr.family) + self.assertEqual(self.ND_ROUTER_ADVERT, ndopthdr.icmp_type) + self.assertEqual(len(opt), ndopthdr.opts_len) actual_opt = self.Pref64Option(data) - self.assertEquals(opt, actual_opt) + self.assertEqual(opt, actual_opt) @@ -986,7 +986,7 @@ class PMTUTest(multinetwork_base.InboundMarkingTest): # Send a packet and receive a packet too big. SendBigPacket(version, s, dstaddr, netid, payload) received = self.ReadAllPacketsOn(netid) - self.assertEquals(1, len(received), + self.assertEqual(1, len(received), "unexpected packets: %s" % received[1:]) _, toobig = packets.ICMPPacketTooBig(version, intermediate, srcaddr, received[0]) @@ -1000,7 +1000,7 @@ class PMTUTest(multinetwork_base.InboundMarkingTest): # If this is a connected socket, make sure the socket MTU was set. # Note that in IPv4 this only started working in Linux 3.6! if use_connect and (version == 6 or net_test.LINUX_VERSION >= (3, 6)): - self.assertEquals(packets.PTB_MTU, self.GetSocketMTU(version, s)) + self.assertEqual(packets.PTB_MTU, self.GetSocketMTU(version, s)) s.close() @@ -1010,16 +1010,16 @@ class PMTUTest(multinetwork_base.InboundMarkingTest): # here we use a mark for simplicity. s2 = self.BuildSocket(version, net_test.UDPSocket, netid, "mark") s2.connect((dstaddr, 1234)) - self.assertEquals(packets.PTB_MTU, self.GetSocketMTU(version, s2)) + self.assertEqual(packets.PTB_MTU, self.GetSocketMTU(version, s2)) # Also check the MTU reported by ip route get, this time using the oif. routes = self.iproute.GetRoutes(dstaddr, self.ifindices[netid], 0, None) self.assertTrue(routes) route = routes[0] rtmsg, attributes = route - self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) + self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) metrics = attributes["RTA_METRICS"] - self.assertEquals(packets.PTB_MTU, metrics["RTAX_MTU"]) + self.assertEqual(packets.PTB_MTU, metrics["RTAX_MTU"]) def testIPv4BasicPMTU(self): """Tests IPv4 path MTU discovery. @@ -1152,11 +1152,11 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest): rules = self.GetRulesAtPriority(version, priority) self.assertTrue(rules) _, attributes = rules[-1] - self.assertEquals(priority, attributes["FRA_PRIORITY"]) + self.assertEqual(priority, attributes["FRA_PRIORITY"]) uidrange = attributes["FRA_UID_RANGE"] - self.assertEquals(start, uidrange.start) - self.assertEquals(end, uidrange.end) - self.assertEquals(table, attributes["FRA_TABLE"]) + self.assertEqual(start, uidrange.start) + self.assertEqual(end, uidrange.end) + self.assertEqual(table, attributes["FRA_TABLE"]) finally: self.iproute.UidRangeRule(version, False, start, end, table, priority) self.assertRaisesErrno( @@ -1223,15 +1223,15 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest): try: routes = self.iproute.GetRoutes(addr, oif, mark, uid) rtmsg, _ = routes[0] - self.assertEquals(iproute.RTN_UNREACHABLE, rtmsg.type) - except IOError, e: + self.assertEqual(iproute.RTN_UNREACHABLE, rtmsg.type) + except IOError as e: if int(e.errno) != int(errno.ENETUNREACH): raise e def ExpectRoute(self, addr, oif, mark, uid): routes = self.iproute.GetRoutes(addr, oif, mark, uid) rtmsg, _ = routes[0] - self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) + self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) def CheckGetRoute(self, version, addr): self.ExpectNoRoute(addr, 0, 0, 0) @@ -1257,7 +1257,7 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest): self.assertRaisesErrno(errno.ENETUNREACH, s.sendto, "foo", (remoteaddr, 53)) def CheckSendSucceeds(): - self.assertEquals(len("foo"), s.sendto("foo", (remoteaddr, 53))) + self.assertEqual(len("foo"), s.sendto("foo", (remoteaddr, 53))) CheckSendFails() self.iproute.UidRangeRule(6, True, uid, uid, table, self.PRIORITY_UID) @@ -1269,7 +1269,7 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest): CheckSendSucceeds() os.fchown(s.fileno(), -1, 12345) CheckSendSucceeds() - os.fchmod(s.fileno(), 0777) + os.fchmod(s.fileno(), 0o777) CheckSendSucceeds() os.fchown(s.fileno(), 0, -1) CheckSendFails() @@ -1306,8 +1306,8 @@ class RulesTest(net_test.NetworkTest): # Check that the rule pointing at table 301 is still around. attributes = [a for _, a in self.iproute.DumpRules(version) if a.get("FRA_PRIORITY", 0) == self.RULE_PRIORITY] - self.assertEquals(1, len(attributes)) - self.assertEquals(301, attributes[0]["FRA_TABLE"]) + self.assertEqual(1, len(attributes)) + self.assertEqual(301, attributes[0]["FRA_TABLE"]) if __name__ == "__main__": diff --git a/net/test/namespace.py b/net/test/namespace.py index 85db654..c7da82f 100644 --- a/net/test/namespace.py +++ b/net/test/namespace.py @@ -112,10 +112,10 @@ def UnShare(flags): def DumpMounts(hdr): - print - print hdr + print() + print(hdr) print open('/proc/mounts', 'r').read(), - print '---' + print('---') # Requires at least kernel configuration options: @@ -130,7 +130,7 @@ def IfPossibleEnterNewNetworkNamespace(): try: UnShare(CLONE_NEWNS | CLONE_NEWUTS | CLONE_NEWNET) except OSError as err: - print 'failed: %s (likely: no privs or lack of kernel support).' % err + print('failed: %s (likely: no privs or lack of kernel support).' % err) return False try: @@ -143,11 +143,11 @@ def IfPossibleEnterNewNetworkNamespace(): SetFileContents('/proc/sys/net/ipv4/ping_group_range', '0 2147483647') net_test.SetInterfaceUp('lo') except: - print 'failed.' + print('failed.') # We've already transitioned into the new netns -- it's too late to recover. raise - print 'succeeded.' + print('succeeded.') return True diff --git a/net/test/neighbour_test.py b/net/test/neighbour_test.py index 2cb5c23..8cea6da 100755 --- a/net/test/neighbour_test.py +++ b/net/test/neighbour_test.py @@ -93,7 +93,7 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): self.sock.bind((0, RTMGRP_NEIGH)) net_test.SetNonBlocking(self.sock) - self.netid = random.choice(self.tuns.keys()) + self.netid = random.choice(list(self.tuns.keys())) self.ifindex = self.ifindices[self.netid] # MultinetworkBaseTest always uses NUD_PERMANENT for router ARP entries. @@ -144,19 +144,19 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK) def assertNeighbourState(self, state, addr): - self.assertEquals(state, self.GetNdEntry(addr)[0].state) + self.assertEqual(state, self.GetNdEntry(addr)[0].state) def assertNeighbourAttr(self, addr, name, value): - self.assertEquals(value, self.GetNdEntry(addr)[1][name]) + self.assertEqual(value, self.GetNdEntry(addr)[1][name]) def ExpectNeighbourNotification(self, addr, state, attrs=None): msg = self.sock.recv(4096) msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg) - self.assertEquals(addr, actual_attrs["NDA_DST"]) - self.assertEquals(state, msg.state) + self.assertEqual(addr, actual_attrs["NDA_DST"]) + self.assertEqual(state, msg.state) if attrs: for name in attrs: - self.assertEquals(attrs[name], actual_attrs[name]) + self.assertEqual(attrs[name], actual_attrs[name]) def ExpectProbe(self, is_unicast, addr): version = csocket.AddressVersion(addr) @@ -225,7 +225,7 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): sleep_ms = min(100, interval - slept) time.sleep(sleep_ms / 1000.0) slept += sleep_ms - print self.GetNdEntry(addr) + print(self.GetNdEntry(addr)) def MonitorSleep(self, intervalseconds, addr): self.MonitorSleepMs(intervalseconds * 1000, addr) @@ -319,7 +319,7 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): self.assertNeighbourState(NUD_REACHABLE, addr) self.ExpectNeighbourNotification(addr, NUD_REACHABLE) - for _ in xrange(5): + for _ in range(5): ForceProbe(router6, routermac) def testIsRouterFlag(self): diff --git a/net/test/net_test.py b/net/test/net_test.py index 458cedf..1c9c1c4 100755 --- a/net/test/net_test.py +++ b/net/test/net_test.py @@ -250,7 +250,7 @@ def CanonicalizeIPv6Address(addr): def FormatProcAddress(unformatted): groups = [] - for i in xrange(0, len(unformatted), 4): + for i in range(0, len(unformatted), 4): groups.append(unformatted[i:i+4]) formatted = ":".join(groups) # Compress the address. @@ -265,7 +265,7 @@ def FormatSockStatAddress(address): family = AF_INET binary = inet_pton(family, address) out = "" - for i in xrange(0, len(binary), 4): + for i in range(0, len(binary), 4): out += "%08X" % struct.unpack("=L", binary[i:i+4]) return out diff --git a/net/test/netlink.py b/net/test/netlink.py index 4e230d4..2c9c757 100644 --- a/net/test/netlink.py +++ b/net/test/netlink.py @@ -67,7 +67,7 @@ class NetlinkSocket(object): def _Debug(self, s): if self.DEBUG: - print s + print(s) def _NlAttr(self, nla_type, data): datalen = len(data) @@ -212,7 +212,7 @@ class NetlinkSocket(object): self._Debug(" %s" % nlmsghdr) if nlmsghdr.type == NLMSG_ERROR or nlmsghdr.type == NLMSG_DONE: - print "done" + print("done") return (None, None), data nlmsg, data = cstruct.Read(data, msgtype) diff --git a/net/test/pf_key.py b/net/test/pf_key.py index 875e01c..47d434b 100755 --- a/net/test/pf_key.py +++ b/net/test/pf_key.py @@ -202,7 +202,7 @@ class PfKey(object): def Recv(self): reply = self.sock.recv(4096) msg = SadbMsg(reply) - # print "RECV:", self.DecodeSadbMsg(msg) + # print("RECV:", self.DecodeSadbMsg(msg)) if msg.errno != 0: raise OSError(msg.errno, os.strerror(msg.errno)) return reply @@ -213,7 +213,7 @@ class PfKey(object): msg.pid = os.getpid() msg.len = (len(SadbMsg) + len(extensions)) / 8 self.sock.send(msg.Pack() + extensions) - # print "SEND:", self.DecodeSadbMsg(msg) + # print("SEND:", self.DecodeSadbMsg(msg)) return self.Recv() def PackPfKeyExtensions(self, extlist): @@ -314,13 +314,13 @@ class PfKey(object): def PrintSaInfos(self, dump): for msg, extensions in dump: - print self.DecodeSadbMsg(msg) + print(self.DecodeSadbMsg(msg)) for exttype, ext, attrs in extensions: exttype = _GetMultiConstantName(exttype, ["SADB_EXT", "SADB_X_EXT"]) if exttype == SADB_EXT_SA: - print " ", exttype, self.DecodeSadbSa(ext), attrs.encode("hex") - print " ", exttype, ext, attrs.encode("hex") - print + print(" ", exttype, self.DecodeSadbSa(ext), attrs.encode("hex")) + print(" ", exttype, ext, attrs.encode("hex")) + print() if __name__ == "__main__": diff --git a/net/test/pf_key_test.py b/net/test/pf_key_test.py index e58947c..317ec7e 100755 --- a/net/test/pf_key_test.py +++ b/net/test/pf_key_test.py @@ -49,26 +49,26 @@ class PfKeyTest(unittest.TestCase): pf_key.SADB_X_AALG_SHA2_256HMAC, ENCRYPTION_KEY) sainfos = self.xfrm.DumpSaInfo() - self.assertEquals(2, len(sainfos)) + self.assertEqual(2, len(sainfos)) state4, attrs4 = [(s, a) for s, a in sainfos if s.family == AF_INET][0] state6, attrs6 = [(s, a) for s, a in sainfos if s.family == AF_INET6][0] pfkey_sainfos = self.pf_key.DumpSaInfo() - self.assertEquals(2, len(pfkey_sainfos)) + self.assertEqual(2, len(pfkey_sainfos)) self.assertTrue(all(msg.satype == pf_key.SDB_TYPE_ESP) for msg, _ in pfkey_sainfos) - self.assertEquals(xfrm.IPPROTO_ESP, state4.id.proto) - self.assertEquals(xfrm.IPPROTO_ESP, state6.id.proto) - self.assertEquals(54321, state4.reqid) - self.assertEquals(12345, state6.reqid) - self.assertEquals(0xdeadbeef, state4.id.spi) - self.assertEquals(0xbeefdead, state6.id.spi) + self.assertEqual(xfrm.IPPROTO_ESP, state4.id.proto) + self.assertEqual(xfrm.IPPROTO_ESP, state6.id.proto) + self.assertEqual(54321, state4.reqid) + self.assertEqual(12345, state6.reqid) + self.assertEqual(0xdeadbeef, state4.id.spi) + self.assertEqual(0xbeefdead, state6.id.spi) - self.assertEquals(xfrm.PaddedAddress("192.0.2.1"), state4.saddr) - self.assertEquals(xfrm.PaddedAddress("192.0.2.2"), state4.id.daddr) - self.assertEquals(xfrm.PaddedAddress("2001:db8::1"), state6.saddr) - self.assertEquals(xfrm.PaddedAddress("2001:db8::2"), state6.id.daddr) + self.assertEqual(xfrm.PaddedAddress("192.0.2.1"), state4.saddr) + self.assertEqual(xfrm.PaddedAddress("192.0.2.2"), state4.id.daddr) + self.assertEqual(xfrm.PaddedAddress("2001:db8::1"), state6.saddr) + self.assertEqual(xfrm.PaddedAddress("2001:db8::2"), state6.id.daddr) # The algorithm names are null-terminated, but after that contain garbage. # Kernel bug? @@ -79,20 +79,20 @@ class PfKeyTest(unittest.TestCase): self.assertTrue(attrs4["XFRMA_ALG_AUTH"].name.startswith(sha256_name)) self.assertTrue(attrs6["XFRMA_ALG_AUTH"].name.startswith(sha256_name)) - self.assertEquals(256, attrs4["XFRMA_ALG_CRYPT"].key_len) - self.assertEquals(256, attrs4["XFRMA_ALG_CRYPT"].key_len) - self.assertEquals(256, attrs6["XFRMA_ALG_AUTH"].key_len) - self.assertEquals(256, attrs6["XFRMA_ALG_AUTH"].key_len) - self.assertEquals(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len) - self.assertEquals(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len) + self.assertEqual(256, attrs4["XFRMA_ALG_CRYPT"].key_len) + self.assertEqual(256, attrs4["XFRMA_ALG_CRYPT"].key_len) + self.assertEqual(256, attrs6["XFRMA_ALG_AUTH"].key_len) + self.assertEqual(256, attrs6["XFRMA_ALG_AUTH"].key_len) + self.assertEqual(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len) + self.assertEqual(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len) - self.assertEquals(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len) - self.assertEquals(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len) + self.assertEqual(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len) + self.assertEqual(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len) self.pf_key.DelSa(src4, dst4, 0xdeadbeef, pf_key.SADB_TYPE_ESP) - self.assertEquals(1, len(self.xfrm.DumpSaInfo())) + self.assertEqual(1, len(self.xfrm.DumpSaInfo())) self.pf_key.DelSa(src6, dst6, 0xbeefdead, pf_key.SADB_TYPE_ESP) - self.assertEquals(0, len(self.xfrm.DumpSaInfo())) + self.assertEqual(0, len(self.xfrm.DumpSaInfo())) if __name__ == "__main__": diff --git a/net/test/ping6_test.py b/net/test/ping6_test.py index dd73e88..d551b5f 100755 --- a/net/test/ping6_test.py +++ b/net/test/ping6_test.py @@ -185,7 +185,7 @@ class PingReplyThread(threading.Thread): packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet try: posix.write(self._tun.fileno(), str(packet)) - except Exception, e: + except Exception as e: if not self._stopped: raise e @@ -194,12 +194,12 @@ class PingReplyThread(threading.Thread): while not self._stopped: try: packet = posix.read(self._tun.fileno(), 4096) - except OSError, e: + except OSError as e: if e.errno == errno.EAGAIN: continue else: break - except ValueError, e: + except ValueError as e: if not self._stopped: raise e @@ -220,9 +220,9 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # tests fail. _INTERVAL = 0.1 _ATTEMPTS = 20 - for i in xrange(0, _ATTEMPTS): + for i in range(0, _ATTEMPTS): for netid in cls.NETIDS: - if all(thread.IsStarted() for thread in cls.reply_threads.values()): + if all(thread.IsStarted() for thread in list(cls.reply_threads.values())): return time.sleep(_INTERVAL) msg = "WARNING: reply threads not all started after %.1f seconds\n" % ( @@ -231,7 +231,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): @classmethod def StopReplyThreads(cls): - for thread in cls.reply_threads.values(): + for thread in list(cls.reply_threads.values()): thread.Stop() @classmethod @@ -295,9 +295,9 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # Check that the flow label is zero and that the scope ID is sane. self.assertEqual(flowlabel, 0) if addr.startswith("fe80::"): - self.assertTrue(scope_id in self.ifindices.values()) + self.assertTrue(scope_id in list(self.ifindices.values())) else: - self.assertEquals(0, scope_id) + self.assertEqual(0, scope_id) # TODO: check the checksum. We can't do this easily now for ICMPv6 because # we don't have the IP addresses so we can't construct the pseudoheader. @@ -356,32 +356,32 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): def testIPv4PingUsingSendto(self): s = net_test.IPv4PingSocket() written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) - self.assertEquals(len(net_test.IPV4_PING), written) + self.assertEqual(len(net_test.IPV4_PING), written) self.assertValidPingResponse(s, net_test.IPV4_PING) def testIPv6PingUsingSendto(self): s = net_test.IPv6PingSocket() written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) - self.assertEquals(len(net_test.IPV6_PING), written) + self.assertEqual(len(net_test.IPV6_PING), written) self.assertValidPingResponse(s, net_test.IPV6_PING) def testIPv4NoCrash(self): # Python 2.x does not provide either read() or recvmsg. s = net_test.IPv4PingSocket() written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) - self.assertEquals(len(net_test.IPV4_PING), written) + self.assertEqual(len(net_test.IPV4_PING), written) fd = s.fileno() reply = posix.read(fd, 4096) - self.assertEquals(written, len(reply)) + self.assertEqual(written, len(reply)) def testIPv6NoCrash(self): # Python 2.x does not provide either read() or recvmsg. s = net_test.IPv6PingSocket() written = s.sendto(net_test.IPV6_PING, ("::1", 55)) - self.assertEquals(len(net_test.IPV6_PING), written) + self.assertEqual(len(net_test.IPV6_PING), written) fd = s.fileno() reply = posix.read(fd, 4096) - self.assertEquals(written, len(reply)) + self.assertEqual(written, len(reply)) def testCrossProtocolCrash(self): # Checks that an ICMP error containing a ping packet that matches the ID @@ -458,12 +458,12 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # Bind to unspecified address. s = net_test.IPv4PingSocket() s.bind(("0.0.0.0", 544)) - self.assertEquals(("0.0.0.0", 544), s.getsockname()) + self.assertEqual(("0.0.0.0", 544), s.getsockname()) # Bind to loopback. s = net_test.IPv4PingSocket() s.bind(("127.0.0.1", 99)) - self.assertEquals(("127.0.0.1", 99), s.getsockname()) + self.assertEqual(("127.0.0.1", 99), s.getsockname()) # Binding twice is not allowed. self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) @@ -471,10 +471,10 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # But binding two different sockets to the same ID is allowed. s2 = net_test.IPv4PingSocket() s2.bind(("127.0.0.1", 99)) - self.assertEquals(("127.0.0.1", 99), s2.getsockname()) + self.assertEqual(("127.0.0.1", 99), s2.getsockname()) s3 = net_test.IPv4PingSocket() s3.bind(("127.0.0.1", 99)) - self.assertEquals(("127.0.0.1", 99), s3.getsockname()) + self.assertEqual(("127.0.0.1", 99), s3.getsockname()) # If two sockets bind to the same port, the first one to call read() gets # the response. @@ -501,12 +501,12 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # Bind to unspecified address. s = net_test.IPv6PingSocket() s.bind(("::", 769)) - self.assertEquals(("::", 769, 0, 0), s.getsockname()) + self.assertEqual(("::", 769, 0, 0), s.getsockname()) # Bind to loopback. s = net_test.IPv6PingSocket() s.bind(("::1", 99)) - self.assertEquals(("::1", 99, 0, 0), s.getsockname()) + self.assertEqual(("::1", 99, 0, 0), s.getsockname()) # Binding twice is not allowed. self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) @@ -514,10 +514,10 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # But binding two different sockets to the same ID is allowed. s2 = net_test.IPv6PingSocket() s2.bind(("::1", 99)) - self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) + self.assertEqual(("::1", 99, 0, 0), s2.getsockname()) s3 = net_test.IPv6PingSocket() s3.bind(("::1", 99)) - self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) + self.assertEqual(("::1", 99, 0, 0), s3.getsockname()) # Binding both IPv4 and IPv6 to the same socket works. s4 = net_test.IPv4PingSocket() @@ -542,7 +542,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): try: s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) s.bind((net_test.IPV4_ADDR, 651)) - except IOError, e: + except IOError as e: if e.errno == errno.EACCES: pass # We're not root. let it go for now. @@ -557,7 +557,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): try: s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) s.bind((net_test.IPV6_ADDR, 651)) - except IOError, e: + except IOError as e: if e.errno == errno.EACCES: pass # We're not root. let it go for now. @@ -567,7 +567,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): sockaddr = csocket.Sockaddr(("0.0.0.0", 12996)) sockaddr.family = AF_UNSPEC csocket.Bind(s4, sockaddr) - self.assertEquals(("0.0.0.0", 12996), s4.getsockname()) + self.assertEqual(("0.0.0.0", 12996), s4.getsockname()) # But not if the address is anything else. sockaddr = csocket.Sockaddr(("127.0.0.1", 58234)) @@ -591,7 +591,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # returns "fe80:1%foo", even though it does not understand it. expected = self.lladdr + "%" + self.ifname s.bind((self.lladdr, 4646, 0, self.ifindex)) - self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) + self.assertEqual((expected, 4646, 0, self.ifindex), s.getsockname()) # Of course, for the above to work the address actually has to be configured # on the machine. @@ -601,18 +601,18 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # Scope IDs on non-link-local addresses are silently ignored. s = net_test.IPv6PingSocket() s.bind(("::1", 1234, 0, 1)) - self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) + self.assertEqual(("::1", 1234, 0, 0), s.getsockname()) def testBindAffectsIdentifier(self): s = net_test.IPv6PingSocket() s.bind((self.globaladdr, 0xf976)) s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) - self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) + self.assertEqual("\xf9\x76", s.recv(32768)[4:6]) s = net_test.IPv6PingSocket() s.bind((self.globaladdr, 0xace)) s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) - self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) + self.assertEqual("\x0a\xce", s.recv(32768)[4:6]) def testLinkLocalAddress(self): s = net_test.IPv6PingSocket() @@ -751,8 +751,8 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) s.bind(("127.0.0.1", 0xace)) s.connect(("127.0.0.1", 0xbeef)) - self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) - self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) + self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp"))) + self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) @unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") def testIcmp6SocketsNotInIcmp(self): @@ -761,8 +761,8 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): s = net_test.IPv6PingSocket() s.bind(("::1", 0xace)) s.connect(("::1", 0xbeef)) - self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) - self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) + self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp"))) + self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) def testProcNetIcmp(self): s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) @@ -780,7 +780,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # Check the row goes away when the socket is closed. s.close() - self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) + self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6"))) # Try send, bind and connect to check the addresses and the state. s = net_test.IPv6PingSocket() @@ -841,7 +841,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): ident = struct.pack("!H", s.getsockname()[1]) pkt = pkt[:4] + ident + pkt[6:] data = data[:2] + "\x00\x00" + pkt[4:] - self.assertEquals(pkt, data) + self.assertEqual(pkt, data) # Check the address that the packet was sent to. # ... except in 4.1, where it just returns an AF_UNSPEC, like this: @@ -851,7 +851,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): # msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...}, # msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232 if net_test.LINUX_VERSION != (4, 1, 0): - self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) + self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr) # Check the cmsg data, including the link MTU. mtu = PingReplyThread.LINK_MTU @@ -869,7 +869,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): if net_test.LINUX_VERSION <= (3, 14, 0): msglist[0][2][1].port = cmsg[0][2][1].port - self.assertEquals(msglist, cmsg) + self.assertEqual(msglist, cmsg) if __name__ == "__main__": diff --git a/net/test/resilient_rs_test.py b/net/test/resilient_rs_test.py index 12843c4..27b9f49 100755 --- a/net/test/resilient_rs_test.py +++ b/net/test/resilient_rs_test.py @@ -159,8 +159,8 @@ class ResilientRouterSolicitationTest(multinetwork_base.MultiNetworkBaseTest): # Compute minimum and maximum bounds for RFC3315 S14 exponential backoff. # First retransmit is linear backoff, subsequent retransmits are exponential - min_exp_bound = accumulate(map(lambda i: MIN_LIN * pow(MIN_EXP, i), range(0, len(rsSendTimes)))) - max_exp_bound = accumulate(map(lambda i: MAX_LIN * pow(MAX_EXP, i), range(0, len(rsSendTimes)))) + min_exp_bound = accumulate([MIN_LIN * pow(MIN_EXP, i) for i in range(0, len(rsSendTimes))]) + max_exp_bound = accumulate([MAX_LIN * pow(MAX_EXP, i) for i in range(0, len(rsSendTimes))]) # Assert that each sample falls within the worst case interval. If all samples fit we accept # the exponential backoff hypothesis diff --git a/net/test/sock_diag.py b/net/test/sock_diag.py index 46cc92d..03d5587 100755 --- a/net/test/sock_diag.py +++ b/net/test/sock_diag.py @@ -164,7 +164,7 @@ class SockDiag(netlink.NetlinkSocket): if "ALL" not in self.NL_DEBUG and "SOCK" not in self.NL_DEBUG: return parsed = self._ParseNLMsg(data, InetDiagReqV2) - print "%s %s" % (name, str(parsed)) + print("%s %s" % (name, str(parsed))) @staticmethod def _EmptyInetDiagSockId(): @@ -246,15 +246,15 @@ class SockDiag(netlink.NetlinkSocket): positions.append(positions[-1] + 4) # Why 4? Because the kernel uses 4. assert len(args) == len(instructions) == len(positions) - 2 - # print positions + # print(positions) packed = "" for i, (op, yes, no, arg) in enumerate(instructions): yes = positions[i + yes] - positions[i] no = positions[i + no] - positions[i] instruction = InetDiagBcOp((op, yes, no)).Pack() + args[i] - #print "%3d: %d %3d %3d %s %s" % (positions[i], op, yes, no, - # arg, instruction.encode("hex")) + #print("%3d: %d %3d %3d %s %s" % (positions[i], op, yes, no, + # arg, instruction.encode("hex"))) packed += instruction #print @@ -363,7 +363,7 @@ class SockDiag(netlink.NetlinkSocket): src, sport = s.getsockname()[:2] try: dst, dport = s.getpeername()[:2] - except error, e: + except error as e: if e.errno == errno.ENOTCONN: dport = 0 dst = "::" if family == AF_INET6 else "0.0.0.0" @@ -430,4 +430,4 @@ if __name__ == "__main__": states = 0xffffffff diag_msgs = n.DumpAllInetSockets(IPPROTO_TCP, "", sock_id=sock_id, ext=ext, states=states) - print diag_msgs + print(diag_msgs) diff --git a/net/test/sock_diag_test.py b/net/test/sock_diag_test.py index daa2fa4..39ace4c 100755 --- a/net/test/sock_diag_test.py +++ b/net/test/sock_diag_test.py @@ -105,7 +105,7 @@ class SockDiagBaseTest(multinetwork_base.MultiNetworkBaseTest): def _CreateLotsOfSockets(socktype): # Dict mapping (addr, sport, dport) tuples to socketpairs. socketpairs = {} - for _ in xrange(NUM_SOCKETS): + for _ in range(NUM_SOCKETS): family, addr = random.choice([ (AF_INET, "127.0.0.1"), (AF_INET6, "::1"), @@ -151,7 +151,7 @@ class SockDiagBaseTest(multinetwork_base.MultiNetworkBaseTest): def PackAndCheckBytecode(self, instructions): bytecode = self.sock_diag.PackBytecode(instructions) decoded = self.sock_diag.DecodeBytecode(bytecode) - self.assertEquals(len(instructions), len(decoded)) + self.assertEqual(len(instructions), len(decoded)) self.assertFalse("???" in decoded) return bytecode @@ -192,7 +192,7 @@ class SockDiagBaseTest(multinetwork_base.MultiNetworkBaseTest): self.socketpairs = {} def tearDown(self): - for socketpair in self.socketpairs.values(): + for socketpair in list(self.socketpairs.values()): for s in socketpair: s.close() super(SockDiagBaseTest, self).tearDown() @@ -228,9 +228,9 @@ class SockDiagTest(SockDiagBaseTest): cookies[(addr, sport, dport)] = diag_msg.id.cookie # Did we find all the cookies? - self.assertEquals(2 * NUM_SOCKETS, len(cookies)) + self.assertEqual(2 * NUM_SOCKETS, len(cookies)) - socketpairs = self.socketpairs.values() + socketpairs = list(self.socketpairs.values()) random.shuffle(socketpairs) for socketpair in socketpairs: for sock in socketpair: @@ -284,7 +284,7 @@ class SockDiagTest(SockDiagBaseTest): ) states = 1 << tcp_test.TCP_ESTABLISHED self.assertMultiLineEqual(expected, bytecode.encode("hex")) - self.assertEquals(76, len(bytecode)) + self.assertEqual(76, len(bytecode)) self.socketpairs = self._CreateLotsOfSockets(SOCK_STREAM) filteredsockets = self.sock_diag.DumpAllInetSockets(IPPROTO_TCP, bytecode, states=states) @@ -294,7 +294,7 @@ class SockDiagTest(SockDiagBaseTest): # Pick a few sockets in hash table order, and check that the bytecode we # compiled selects them properly. - for socketpair in self.socketpairs.values()[:20]: + for socketpair in list(self.socketpairs.values())[:20]: for s in socketpair: diag_msg = self.sock_diag.FindSockDiagFromFd(s) instructions = [ @@ -304,12 +304,12 @@ class SockDiagTest(SockDiagBaseTest): (sock_diag.INET_DIAG_BC_D_LE, 1, 2, diag_msg.id.dport), ] bytecode = self.PackAndCheckBytecode(instructions) - self.assertEquals(32, len(bytecode)) + self.assertEqual(32, len(bytecode)) sockets = self.sock_diag.DumpAllInetSockets(IPPROTO_TCP, bytecode) - self.assertEquals(1, len(sockets)) + self.assertEqual(1, len(sockets)) # TODO: why doesn't comparing the cstructs work? - self.assertEquals(diag_msg.Pack(), sockets[0][0].Pack()) + self.assertEqual(diag_msg.Pack(), sockets[0][0].Pack()) def testCrossFamilyBytecode(self): """Checks for a cross-family bug in inet_diag_hostcond matching. @@ -365,7 +365,7 @@ class SockDiagTest(SockDiagBaseTest): 5e1f542 inet_diag: validate port comparison byte code to prevent unsafe reads """ bytecode = sock_diag.InetDiagBcOp((sock_diag.INET_DIAG_BC_D_GE, 4, 8)) - self.assertEquals("???", + self.assertEqual("???", self.sock_diag.DecodeBytecode(bytecode)) self.assertRaisesErrno( EINVAL, @@ -481,7 +481,7 @@ class SockDestroyTest(SockDiagBaseTest): def testClosesSockets(self): self.socketpairs = self._CreateLotsOfSockets(SOCK_STREAM) - for _, socketpair in self.socketpairs.iteritems(): + for _, socketpair in self.socketpairs.items(): # Close one of the sockets. # This will send a RST that will close the other side as well. s = random.choice(socketpair) @@ -521,7 +521,7 @@ class SocketExceptionThread(threading.Thread): def run(self): try: self.operation(self.sock) - except (IOError, AssertionError), e: + except (IOError, AssertionError) as e: self.exception = e @@ -534,7 +534,7 @@ class SockDiagTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): android-3.4: 457a04b inet_diag: fix oops for IPv4 AF_INET6 TCP SYN-RECV state """ - netid = random.choice(self.tuns.keys()) + netid = random.choice(list(self.tuns.keys())) self.IncomingConnection(5, tcp_test.TCP_SYN_RECV, netid) sock_id = self.sock_diag._EmptyInetDiagSockId() sock_id.sport = self.port @@ -599,7 +599,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): def setUp(self): super(SockDestroyTcpTest, self).setUp() - self.netid = random.choice(self.tuns.keys()) + self.netid = random.choice(list(self.tuns.keys())) def CheckRstOnClose(self, sock, req, expect_reset, msg, do_close=True): """Closes the socket and checks whether a RST is sent or not.""" @@ -650,7 +650,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): self.accepted.close() diag_req.states = 1 << tcp_test.TCP_FIN_WAIT1 diag_msg, attrs = self.sock_diag.GetSockInfo(diag_req) - self.assertEquals(tcp_test.TCP_FIN_WAIT1, diag_msg.state) + self.assertEqual(tcp_test.TCP_FIN_WAIT1, diag_msg.state) desc, fin = self.FinPacket() self.ExpectPacketOn(self.netid, "Closing FIN_WAIT1 socket", fin) @@ -660,7 +660,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): # The socket is still there in FIN_WAIT1: SOCK_DESTROY did nothing # because userspace had already closed it. - self.assertEquals(tcp_test.TCP_FIN_WAIT1, diag_msg.state) + self.assertEqual(tcp_test.TCP_FIN_WAIT1, diag_msg.state) # ACK the FIN so we don't trip over retransmits in future tests. finversion = 4 if version == 5 else version @@ -702,7 +702,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): d = self.sock_diag.FindSockDiagFromFd(self.s) parent = self.sock_diag.DiagReqFromDiagMsg(d, IPPROTO_TCP) children = self.FindChildSockets(self.s) - self.assertEquals(1, len(children)) + self.assertEqual(1, len(children)) is_established = (state == tcp_test.TCP_NOT_YET_ACCEPTED) expected_state = tcp_test.TCP_ESTABLISHED if is_established else state @@ -716,7 +716,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): for child in children: if can_close_children: diag_msg, attrs = self.sock_diag.GetSockInfo(child) - self.assertEquals(diag_msg.state, expected_state) + self.assertEqual(diag_msg.state, expected_state) self.assertMarkIs(self.netid, attrs) else: self.assertRaisesErrno(ENOENT, self.sock_diag.GetSockInfo, child) @@ -772,7 +772,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): self.assertRaisesErrno(ECONNABORTED, self.s.send, "foo") self.assertRaisesErrno(EINVAL, self.s.accept) # TODO: this should really return an error such as ENOTCONN... - self.assertEquals("", self.s.recv(4096)) + self.assertEqual("", self.s.recv(4096)) def testReadInterrupted(self): """Tests that read() is interrupted by SOCK_DESTROY.""" @@ -782,8 +782,8 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest): ECONNABORTED) # Writing returns EPIPE, and reading returns EOF. self.assertRaisesErrno(EPIPE, self.accepted.send, "foo") - self.assertEquals("", self.accepted.recv(4096)) - self.assertEquals("", self.accepted.recv(4096)) + self.assertEqual("", self.accepted.recv(4096)) + self.assertEqual("", self.accepted.recv(4096)) def testConnectInterrupted(self): """Tests that connect() is interrupted by SOCK_DESTROY.""" @@ -818,7 +818,7 @@ class PollOnCloseTest(tcp_test.TcpBaseTest, SockDiagBaseTest): def setUp(self): super(PollOnCloseTest, self).setUp() - self.netid = random.choice(self.tuns.keys()) + self.netid = random.choice(list(self.tuns.keys())) POLL_FLAGS = [(select.POLLIN, "IN"), (select.POLLOUT, "OUT"), (select.POLLERR, "ERR"), (select.POLLHUP, "HUP")] @@ -852,8 +852,8 @@ class PollOnCloseTest(tcp_test.TcpBaseTest, SockDiagBaseTest): # Subsequent operations behave as normal. self.assertRaisesErrno(EPIPE, self.accepted.send, "foo") - self.assertEquals("", self.accepted.recv(4096)) - self.assertEquals("", self.accepted.recv(4096)) + self.assertEqual("", self.accepted.recv(4096)) + self.assertEqual("", self.accepted.recv(4096)) def CheckPollDestroy(self, mask, expected, ignoremask): """Interrupts a poll() with SOCK_DESTROY.""" @@ -917,7 +917,7 @@ class SockDestroyUdpTest(SockDiagBaseTest): def testClosesUdpSockets(self): self.socketpairs = self._CreateLotsOfSockets(SOCK_DGRAM) - for _, socketpair in self.socketpairs.iteritems(): + for _, socketpair in self.socketpairs.items(): s1, s2 = socketpair self.assertSocketConnected(s1) @@ -930,12 +930,12 @@ class SockDestroyUdpTest(SockDiagBaseTest): def BindToRandomPort(self, s, addr): ATTEMPTS = 20 - for i in xrange(20): + for i in range(20): port = random.randrange(1024, 65535) try: s.bind((addr, port)) return port - except error, e: + except error as e: if e.errno != EADDRINUSE: raise e raise ValueError("Could not find a free port on %s after %d attempts" % @@ -989,13 +989,13 @@ class SockDestroyUdpTest(SockDiagBaseTest): # Check that reads on connected sockets are interrupted. s.connect((addr, 53)) - self.assertEquals(3, s.send("foo")) + self.assertEqual(3, s.send("foo")) self.CloseDuringBlockingCall(s, lambda sock: sock.recv(4096), ECONNABORTED) # A destroyed socket is no longer connected, but still usable. self.assertRaisesErrno(EDESTADDRREQ, s.send, "foo") - self.assertEquals(3, s.sendto("foo", (addr, 53))) + self.assertEqual(3, s.sendto("foo", (addr, 53))) # Check that reads on unconnected sockets are also interrupted. self.CloseDuringBlockingCall(s, lambda sock: sock.recv(4096), @@ -1049,7 +1049,7 @@ class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest): def assertSamePorts(self, ports, diag_msgs): expected = sorted(ports) actual = sorted([msg[0].id.sport for msg in diag_msgs]) - self.assertEquals(expected, actual) + self.assertEqual(expected, actual) def SockInfoMatchesSocket(self, s, info): try: @@ -1076,7 +1076,7 @@ class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest): self.SocketDescription(s)) for i in infos: - if i not in matches.values(): + if i not in list(matches.values()): self.fail("Too many sockets in dump, first unexpected: %s" % str(i)) def testMarkBytecode(self): @@ -1101,7 +1101,7 @@ class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest): self.assertFoundSockets(infos, [s1, s2]) infos = self.FilterEstablishedSockets(0xfff0000, 0xf0fed00) - self.assertEquals(0, len(infos)) + self.assertEqual(0, len(infos)) with net_test.RunAsUid(12345): self.assertRaisesErrno(EPERM, self.FilterEstablishedSockets, diff --git a/net/test/srcaddr_selection_test.py b/net/test/srcaddr_selection_test.py index e57ce16..1e7a107 100755 --- a/net/test/srcaddr_selection_test.py +++ b/net/test/srcaddr_selection_test.py @@ -91,11 +91,11 @@ class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): def assertAddressHasExpectedAttributes( self, address, expected_ifindex, expected_flags): ifa_msg = self.iproute.GetAddress(address)[0] - self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) - self.assertEquals(64, ifa_msg.prefixlen) - self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) - self.assertEquals(expected_ifindex, ifa_msg.index) - self.assertEquals(expected_flags, ifa_msg.flags & expected_flags) + self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) + self.assertEqual(64, ifa_msg.prefixlen) + self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) + self.assertEqual(expected_ifindex, ifa_msg.index) + self.assertEqual(expected_flags, ifa_msg.flags & expected_flags) def AddressIsTentative(self, address): ifa_msg = self.iproute.GetAddress(address)[0] @@ -122,13 +122,13 @@ class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): self.SendWithSourceAddress, address, netid) def assertAddressSelected(self, address, netid): - self.assertEquals(address, self.GetSourceIP(netid)) + self.assertEqual(address, self.GetSourceIP(netid)) def assertAddressNotSelected(self, address, netid): - self.assertNotEquals(address, self.GetSourceIP(netid)) + self.assertNotEqual(address, self.GetSourceIP(netid)) def WaitForDad(self, address): - for _ in xrange(20): + for _ in range(20): if not self.AddressIsTentative(address): return time.sleep(0.1) @@ -149,7 +149,7 @@ class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0) # [1] Pick an interface on which to test. - self.test_netid = random.choice(self.tuns.keys()) + self.test_netid = random.choice(list(self.tuns.keys())) self.test_ip = self.MyAddress(6, self.test_netid) self.test_ifindex = self.ifindices[self.test_netid] self.test_ifname = self.GetInterfaceName(self.test_netid) @@ -254,7 +254,7 @@ class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) self.assertAddressHasExpectedAttributes( preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) - self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid)) + self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid)) # [4] Get another IPv6 address, in optimistic DAD start-up. self.SetDAD(self.test_ifname, 1) # Enable DAD diff --git a/net/test/tcp_fastopen_test.py b/net/test/tcp_fastopen_test.py index 9257a19..eadae79 100755 --- a/net/test/tcp_fastopen_test.py +++ b/net/test/tcp_fastopen_test.py @@ -55,7 +55,7 @@ class TcpFastOpenTest(multinetwork_base.MultiNetworkBaseTest): daddr = self.GetRemoteAddress(version) self.tcp_metrics.DelMetrics(saddr, daddr) with self.assertRaisesErrno(ESRCH): - print self.tcp_metrics.GetMetrics(saddr, daddr) + print(self.tcp_metrics.GetMetrics(saddr, daddr)) def assertNoTcpMetrics(self, version, netid): saddr = self.MyAddress(version, netid) diff --git a/net/test/tcp_metrics.py b/net/test/tcp_metrics.py index 574a755..03f604f 100755 --- a/net/test/tcp_metrics.py +++ b/net/test/tcp_metrics.py @@ -134,4 +134,4 @@ class TcpMetrics(genetlink.GenericNetlink): if __name__ == "__main__": t = TcpMetrics() - print t.DumpMetrics() + print(t.DumpMetrics()) diff --git a/net/test/tcp_nuke_addr_test.py b/net/test/tcp_nuke_addr_test.py index 1f0de76..e5d17b2 100755 --- a/net/test/tcp_nuke_addr_test.py +++ b/net/test/tcp_nuke_addr_test.py @@ -88,8 +88,8 @@ class TcpNukeAddrTest(net_test.NetworkTest): self.assertRaisesErrno(errno.ENOTTY, KillAddrIoctl, addr) data = "foo" try: - self.assertEquals(len(data), s1.send(data)) - self.assertEquals(data, s2.recv(4096)) + self.assertEqual(len(data), s1.send(data)) + self.assertEqual(data, s2.recv(4096)) self.assertSocketsNotClosed(socketpair) finally: s1.close() diff --git a/net/test/tcp_repair_test.py b/net/test/tcp_repair_test.py index ce54aba..e0b156e 100755 --- a/net/test/tcp_repair_test.py +++ b/net/test/tcp_repair_test.py @@ -149,7 +149,7 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest): sock.setsockopt(SOL_TCP, TCP_REPAIR, TCP_REPAIR_OFF) readData = sock.recv(4096) - self.assertEquals(readData, TEST_RECEIVED) + self.assertEqual(readData, TEST_RECEIVED) sock.close() # Test whether tcp read/write sequence number can be fetched correctly @@ -164,20 +164,20 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest): # test write queue sequence number sequence_before = self.GetWriteSequenceNumber(version, sock) expect_sequence = self.last_sent.getlayer("TCP").seq - self.assertEquals(sequence_before & 0xffffffff, expect_sequence) + self.assertEqual(sequence_before & 0xffffffff, expect_sequence) TEST_SEND = net_test.UDP_PAYLOAD self.sendData(netid, version, sock, TEST_SEND) sequence_after = self.GetWriteSequenceNumber(version, sock) - self.assertEquals(sequence_before + len(TEST_SEND), sequence_after) + self.assertEqual(sequence_before + len(TEST_SEND), sequence_after) # test read queue sequence number sequence_before = self.GetReadSequenceNumber(version, sock) expect_sequence = self.last_received.getlayer("TCP").seq + 1 - self.assertEquals(sequence_before & 0xffffffff, expect_sequence) + self.assertEqual(sequence_before & 0xffffffff, expect_sequence) TEST_READ = net_test.UDP_PAYLOAD self.receiveData(netid, version, TEST_READ) sequence_after = self.GetReadSequenceNumber(version, sock) - self.assertEquals(sequence_before + len(TEST_READ), sequence_after) + self.assertEqual(sequence_before + len(TEST_READ), sequence_after) sock.close() def GetWriteSequenceNumber(self, version, sock): @@ -267,12 +267,12 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest): buf = ctypes.c_int() fcntl.ioctl(sock, SIOCINQ, buf) - self.assertEquals(buf.value, 0) + self.assertEqual(buf.value, 0) TEST_RECV_PAYLOAD = net_test.UDP_PAYLOAD self.receiveData(netid, version, TEST_RECV_PAYLOAD) fcntl.ioctl(sock, SIOCINQ, buf) - self.assertEquals(buf.value, len(TEST_RECV_PAYLOAD)) + self.assertEqual(buf.value, len(TEST_RECV_PAYLOAD)) sock.close() def writeQueueIdleTest(self, version): @@ -281,14 +281,14 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest): sock = self.createConnectedSocket(version, netid) buf = ctypes.c_int() fcntl.ioctl(sock, SIOCOUTQ, buf) - self.assertEquals(buf.value, 0) + self.assertEqual(buf.value, 0) # Change to repair mode with SEND_QUEUE, writing some data to the queue. sock.setsockopt(SOL_TCP, TCP_REPAIR, TCP_REPAIR_ON) TEST_SEND_PAYLOAD = net_test.UDP_PAYLOAD sock.setsockopt(SOL_TCP, TCP_REPAIR_QUEUE, TCP_SEND_QUEUE) self.sendData(netid, version, sock, TEST_SEND_PAYLOAD) fcntl.ioctl(sock, SIOCOUTQ, buf) - self.assertEquals(buf.value, len(TEST_SEND_PAYLOAD)) + self.assertEqual(buf.value, len(TEST_SEND_PAYLOAD)) sock.close() # Setup a connected socket again. @@ -297,14 +297,14 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest): # Send out some data and don't receive ACK yet. self.sendData(netid, version, sock, TEST_SEND_PAYLOAD) fcntl.ioctl(sock, SIOCOUTQ, buf) - self.assertEquals(buf.value, len(TEST_SEND_PAYLOAD)) + self.assertEqual(buf.value, len(TEST_SEND_PAYLOAD)) # Receive response ACK. remoteaddr = self.GetRemoteAddress(version) myaddr = self.MyAddress(version, netid) desc_ack, ack = packets.ACK(version, remoteaddr, myaddr, self.last_sent) self.ReceivePacketOn(netid, ack) fcntl.ioctl(sock, SIOCOUTQ, buf) - self.assertEquals(buf.value, 0) + self.assertEqual(buf.value, 0) sock.close() @@ -315,7 +315,7 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest): events = p.poll(500) for fd,event in events: if fd == sock.fileno(): - self.assertEquals(event, expected) + self.assertEqual(event, expected) else: raise AssertionError("unexpected poll fd") @@ -334,7 +334,7 @@ class SocketExceptionThread(threading.Thread): def run(self): try: self.operation(self.sock) - except (IOError, AssertionError), e: + except (IOError, AssertionError) as e: self.exception = e if __name__ == '__main__': diff --git a/net/test/tun_twister.py b/net/test/tun_twister.py index 2ed25c9..f42d789 100644 --- a/net/test/tun_twister.py +++ b/net/test/tun_twister.py @@ -52,8 +52,8 @@ class TunTwister(object): # Set up routing so packets go to my_tun. def ValidatePortNumber(packet): - self.assertEquals(8080, packet.getlayer(scapy.UDP).sport) - self.assertEquals(8080, packet.getlayer(scapy.UDP).dport) + self.assertEqual(8080, packet.getlayer(scapy.UDP).sport) + self.assertEqual(8080, packet.getlayer(scapy.UDP).dport) with TunTwister(tun_fd=my_tun, validator=ValidatePortNumber): sock = socket(AF_INET, SOCK_DGRAM, 0) @@ -61,8 +61,8 @@ class TunTwister(object): sock.settimeout(1.0) sock.sendto("hello", ("1.2.3.4", 8080)) data, addr = sock.recvfrom(1024) - self.assertEquals("hello", data) - self.assertEquals(("1.2.3.4", 8080), addr) + self.assertEqual("hello", data) + self.assertEqual(("1.2.3.4", 8080), addr) """ # Hopefully larger than any packet. diff --git a/net/test/xfrm.py b/net/test/xfrm.py index acdfd4f..d8f5ffe 100755 --- a/net/test/xfrm.py +++ b/net/test/xfrm.py @@ -356,9 +356,9 @@ class Xfrm(netlink.NetlinkSocket): cmdname = self._GetConstantName(command, "XFRM_MSG_") if struct_type: - print "%s %s" % (cmdname, str(self._ParseNLMsg(data, struct_type))) + print("%s %s" % (cmdname, str(self._ParseNLMsg(data, struct_type)))) else: - print "%s" % cmdname + print("%s" % cmdname) def _Decode(self, command, unused_msg, nla_type, nla_data): """Decodes netlink attributes to Python types.""" @@ -710,5 +710,5 @@ class Xfrm(netlink.NetlinkSocket): if __name__ == "__main__": x = Xfrm() - print x.DumpSaInfo() - print x.DumpPolicyInfo() + print(x.DumpSaInfo()) + print(x.DumpPolicyInfo()) diff --git a/net/test/xfrm_algorithm_test.py b/net/test/xfrm_algorithm_test.py index 0176265..c2f836c 100755 --- a/net/test/xfrm_algorithm_test.py +++ b/net/test/xfrm_algorithm_test.py @@ -117,10 +117,10 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): def AssertEncrypted(packet): # This gives a free pass to ICMP and ICMPv6 packets, which show up # nondeterministically in tests. - self.assertEquals(None, + self.assertEqual(None, packet.getlayer(scapy.UDP), "UDP packet sent in the clear") - self.assertEquals(None, + self.assertEqual(None, packet.getlayer(scapy.TCP), "TCP packet sent in the clear") @@ -237,10 +237,10 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): sock.listen(1) server_ready.set() accepted, peer = sock.accept() - self.assertEquals(remote_addr, peer[0]) - self.assertEquals(client_port, peer[1]) + self.assertEqual(remote_addr, peer[0]) + self.assertEqual(client_port, peer[1]) data = accepted.recv(2048) - self.assertEquals("hello request", data) + self.assertEqual("hello request", data) accepted.send("hello response") except Exception as e: server_error = e @@ -251,9 +251,9 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): try: server_ready.set() data, peer = sock.recvfrom(2048) - self.assertEquals(remote_addr, peer[0]) - self.assertEquals(client_port, peer[1]) - self.assertEquals("hello request", data) + self.assertEqual(remote_addr, peer[0]) + self.assertEqual(client_port, peer[1]) + self.assertEqual("hello request", data) sock.sendto("hello response", peer) except Exception as e: server_error = e @@ -283,7 +283,7 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): sock_left.connect((remote_addr, right_port)) sock_left.send("hello request") data = sock_left.recv(2048) - self.assertEquals("hello response", data) + self.assertEqual("hello response", data) sock_left.close() server.join() if server_error: diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py index 1eaa302..03e15c4 100644 --- a/net/test/xfrm_base.py +++ b/net/test/xfrm_base.py @@ -196,7 +196,7 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs): esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header. padlen = util.GetPadLength(4, esplen) # The pad bytes are consecutive integers starting from 0x01. - padding = "".join((chr(i) for i in xrange(1, padlen + 1))) + padding = "".join((chr(i) for i in range(1, padlen + 1))) trailer = padding + struct.pack("BB", padlen, esp_nexthdr) # Assemble the packet. @@ -285,17 +285,17 @@ class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest): scapy.IP/IPv6: the read packet """ packets = self.ReadAllPacketsOn(netid) - self.assertEquals(1, len(packets)) + self.assertEqual(1, len(packets)) packet = packets[0] if length is not None: - self.assertEquals(length, len(packet.payload)) + self.assertEqual(length, len(packet.payload)) if dst_addr is not None: - self.assertEquals(dst_addr, packet.dst) + self.assertEqual(dst_addr, packet.dst) if src_addr is not None: - self.assertEquals(src_addr, packet.src) + self.assertEqual(src_addr, packet.src) # extract the ESP header esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr) - self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr) + self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr) return packet diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py index 64be084..439a2d2 100755 --- a/net/test/xfrm_test.py +++ b/net/test/xfrm_test.py @@ -53,14 +53,14 @@ TEST_SPI2 = 0x1235 class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): def assertIsUdpEncapEsp(self, packet, spi, seq, length): - self.assertEquals(IPPROTO_UDP, packet.proto) + self.assertEqual(IPPROTO_UDP, packet.proto) udp_hdr = packet[scapy.UDP] - self.assertEquals(4500, udp_hdr.dport) - self.assertEquals(length, len(udp_hdr)) + self.assertEqual(4500, udp_hdr.dport) + self.assertEqual(length, len(udp_hdr)) esp_hdr, _ = cstruct.Read(str(udp_hdr.payload), xfrm.EspHdr) # FIXME: this file currently swaps SPI byte order manually, so SPI needs to # be double-swapped here. - self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr) + self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr) def CreateNewSa(self, localAddr, remoteAddr, spi, reqId, encap_tmpl, null_auth=False): @@ -93,12 +93,12 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): self.xfrm.DeleteSaInfo(TEST_ADDR1, TEST_SPI, IPPROTO_ESP) def testFlush(self): - self.assertEquals(0, len(self.xfrm.DumpSaInfo())) + self.assertEqual(0, len(self.xfrm.DumpSaInfo())) self.CreateNewSa("::", "2000::", TEST_SPI, 1234, None) self.CreateNewSa("0.0.0.0", "192.0.2.1", TEST_SPI, 4321, None) - self.assertEquals(2, len(self.xfrm.DumpSaInfo())) + self.assertEqual(2, len(self.xfrm.DumpSaInfo())) self.xfrm.FlushSaInfo() - self.assertEquals(0, len(self.xfrm.DumpSaInfo())) + self.assertEqual(0, len(self.xfrm.DumpSaInfo())) def _TestSocketPolicy(self, version): # Open a UDP socket and connect it. @@ -141,7 +141,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): try: self.xfrm.DeleteSaInfo(self.GetRemoteAddress(xfrm_version), TEST_SPI, IPPROTO_ESP) except IOError as e: - self.assertEquals(ESRCH, e.errno, "Unexpected error when deleting ACQ SA") + self.assertEqual(ESRCH, e.errno, "Unexpected error when deleting ACQ SA") # Adding a matching SA causes the packet to go out encrypted. The SA's # SPI must match the one in our template, and the destination address must @@ -171,11 +171,11 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): self.SelectInterface(s2, netid, "mark") s2.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53)) pkts = self.ReadAllPacketsOn(netid) - self.assertEquals(1, len(pkts)) + self.assertEqual(1, len(pkts)) packet = pkts[0] protocol = packet.nh if version == 6 else packet.proto - self.assertEquals(IPPROTO_UDP, protocol) + self.assertEqual(IPPROTO_UDP, protocol) # Deleting the SA causes the first socket to return errors again. self.xfrm.DeleteSaInfo(self.GetRemoteAddress(xfrm_version), TEST_SPI, @@ -268,7 +268,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): # Expect to see an UDP encapsulated packet. pkts = self.ReadAllPacketsOn(netid) - self.assertEquals(1, len(pkts)) + self.assertEqual(1, len(pkts)) packet = pkts[0] auth_algo = ( @@ -314,13 +314,13 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): # integrity-verified portion of the packet. if not null_auth and in_spi != out_spi: self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096) - self.assertEquals(start_integrity_failures + 1, + self.assertEqual(start_integrity_failures + 1, sainfo.stats.integrity_failed) else: data, src = twisted_socket.recvfrom(4096) - self.assertEquals(net_test.UDP_PAYLOAD, data) - self.assertEquals((remoteaddr, srcport), src) - self.assertEquals(start_integrity_failures, sainfo.stats.integrity_failed) + self.assertEqual(net_test.UDP_PAYLOAD, data) + self.assertEqual((remoteaddr, srcport), src) + self.assertEqual(start_integrity_failures, sainfo.stats.integrity_failed) # Check that unencrypted packets on twisted_socket are not received. unencrypted = ( @@ -400,13 +400,13 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): def testAllocSpecificSpi(self): spi = 0xABCD new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) - self.assertEquals(spi, new_sa.id.spi) + self.assertEqual(spi, new_sa.id.spi) def testAllocSpecificSpiUnavailable(self): """Attempt to allocate the same SPI twice.""" spi = 0xABCD new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) - self.assertEquals(spi, new_sa.id.spi) + self.assertEqual(spi, new_sa.id.spi) with self.assertRaisesErrno(ENOENT): new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi) @@ -427,7 +427,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): # Allocating range_size + 1 SPIs is guaranteed to fail. Due to the way # kernel picks random SPIs, this has a high probability of failing before # reaching that limit. - for i in xrange(range_size + 1): + for i in range(range_size + 1): new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end) spi = new_sa.id.spi self.assertNotIn(spi, spis) @@ -514,21 +514,21 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): self.ReceivePacketOn(netid, input_pkt) msg, addr = sock.recvfrom(1024) - self.assertEquals("input hello", msg) - self.assertEquals((remote_addr, remote_port), addr[:2]) + self.assertEqual("input hello", msg) + self.assertEqual((remote_addr, remote_port), addr[:2]) # Send and capture a packet. sock.sendto("output hello", (remote_addr, remote_port)) packets = self.ReadAllPacketsOn(netid) - self.assertEquals(1, len(packets)) + self.assertEqual(1, len(packets)) output_pkt = packets[0] output_pkt, esp_hdr = xfrm_base.DecryptPacketWithNull(output_pkt) - self.assertEquals(output_pkt[scapy.UDP].len, len("output_hello") + 8) - self.assertEquals(remote_addr, output_pkt.dst) - self.assertEquals(remote_port, output_pkt[scapy.UDP].dport) + self.assertEqual(output_pkt[scapy.UDP].len, len("output_hello") + 8) + self.assertEqual(remote_addr, output_pkt.dst) + self.assertEqual(remote_port, output_pkt[scapy.UDP].dport) # length of the payload plus the UDP header - self.assertEquals("output hello", str(output_pkt[scapy.UDP].payload)) - self.assertEquals(0xABCD, esp_hdr.spi) + self.assertEqual("output hello", str(output_pkt[scapy.UDP].payload)) + self.assertEqual(0xABCD, esp_hdr.spi) def testNullEncryptionTunnelMode(self): """Verify null encryption in tunnel mode. @@ -577,21 +577,21 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): self.ReceivePacketOn(netid, input_pkt) msg, addr = sock.recvfrom(1024) - self.assertEquals("input hello", msg) - self.assertEquals((remote_addr, remote_port), addr[:2]) + self.assertEqual("input hello", msg) + self.assertEqual((remote_addr, remote_port), addr[:2]) # Send and capture a packet. sock.sendto("output hello", (remote_addr, remote_port)) packets = self.ReadAllPacketsOn(netid) - self.assertEquals(1, len(packets)) + self.assertEqual(1, len(packets)) output_pkt = packets[0] output_pkt, esp_hdr = xfrm_base.DecryptPacketWithNull(output_pkt) # length of the payload plus the UDP header - self.assertEquals(output_pkt[scapy.UDP].len, len("output_hello") + 8) - self.assertEquals(remote_addr, output_pkt.dst) - self.assertEquals(remote_port, output_pkt[scapy.UDP].dport) - self.assertEquals("output hello", str(output_pkt[scapy.UDP].payload)) - self.assertEquals(0xABCD, esp_hdr.spi) + self.assertEqual(output_pkt[scapy.UDP].len, len("output_hello") + 8) + self.assertEqual(remote_addr, output_pkt.dst) + self.assertEqual(remote_port, output_pkt[scapy.UDP].dport) + self.assertEqual("output hello", str(output_pkt[scapy.UDP].payload)) + self.assertEqual(0xABCD, esp_hdr.spi) def testNullEncryptionTransportMode(self): """Verify null encryption in transport mode. @@ -638,9 +638,9 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): def _CheckTemplateMatch(tmpl): """Dump the SPD and match a single template on a single policy.""" dump = self.xfrm.DumpPolicyInfo() - self.assertEquals(1, len(dump)) + self.assertEqual(1, len(dump)) _, attributes = dump[0] - self.assertEquals(attributes['XFRMA_TMPL'], tmpl) + self.assertEqual(attributes['XFRMA_TMPL'], tmpl) # Create a new policy using update. self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark, None) @@ -763,9 +763,9 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest): xfrm.XFRM_MODE_TUNNEL, 100, xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1, None, None, None, mark) dump = self.xfrm.DumpSaInfo() - self.assertEquals(1, len(dump)) + self.assertEqual(1, len(dump)) sainfo, attributes = dump[0] - self.assertEquals(mark, attributes["XFRMA_OUTPUT_MARK"]) + self.assertEqual(mark, attributes["XFRMA_OUTPUT_MARK"]) def testInvalidAlgorithms(self): key = "af442892cdcd0ef650e9c299f9a8436a".decode("hex") @@ -795,9 +795,9 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest): xfrm_base._ALGO_HMAC_SHA1, None, None, mark, 0, is_update=True) dump = self.xfrm.DumpSaInfo() - self.assertEquals(1, len(dump)) # check that update updated + self.assertEqual(1, len(dump)) # check that update updated sainfo, attributes = dump[0] - self.assertEquals(mark, attributes["XFRMA_MARK"]) + self.assertEqual(mark, attributes["XFRMA_MARK"]) self.xfrm.DeleteSaInfo(net_test.GetWildcardAddress(version), spi, IPPROTO_ESP, mark) @@ -836,7 +836,7 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest): s.sendto(net_test.UDP_PAYLOAD, (remote, 53)) # Check to make sure XfrmOutNoStates is incremented by exactly 1 - self.assertEquals(outNoStateCount + 1, + self.assertEqual(outNoStateCount + 1, self.getXfrmStat(XFRM_STATS_OUT_NO_STATES)) length = xfrm_base.GetEspPacketLength(xfrm.XFRM_MODE_TUNNEL, @@ -854,7 +854,7 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest): xfrm_base._ALGO_HMAC_SHA1, None, None, mark, 0, is_update=False) except IOError as e: - self.assertEquals(EEXIST, e.errno, "SA exists") + self.assertEqual(EEXIST, e.errno, "SA exists") self.xfrm.AddSaInfo(local, remote, TEST_SPI, xfrm.XFRM_MODE_TUNNEL, 0, @@ -893,9 +893,9 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest): dump = self.xfrm.DumpSaInfo() - self.assertEquals(1, len(dump)) # check that update updated + self.assertEqual(1, len(dump)) # check that update updated sainfo, attributes = dump[0] - self.assertEquals(reroute_netid, attributes["XFRMA_OUTPUT_MARK"]) + self.assertEqual(reroute_netid, attributes["XFRMA_OUTPUT_MARK"]) self.xfrm.DeleteSaInfo(remote, TEST_SPI, IPPROTO_ESP, mark) self.xfrm.DeletePolicyInfo(sel, xfrm.XFRM_POLICY_OUT, mark) diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index eb1a46e..f175c09 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -169,8 +169,8 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): # Verify that the packet data and src are correct data, src = read_sock.recvfrom(4096) - self.assertEquals(net_test.UDP_PAYLOAD, data) - self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2]) + self.assertEqual(net_test.UDP_PAYLOAD, data) + self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) def _TestTunnel(self, inner_version, outer_version, func, direction, test_output_mark_unset): @@ -233,13 +233,13 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, ikey, okey): - self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey) - self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey) + self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey) + self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey) family = AF_INET if version == 4 else AF_INET6 - self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), + self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), local_addr) - self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), + self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), remote_addr) def testAddVti(self): @@ -275,7 +275,7 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) # Validate that the netlink interface matches the ioctl interface. - self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) + self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) self.iproute.DeleteLink(_TEST_XFRM_IFNAME) with self.assertRaises(IOError): self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) @@ -439,7 +439,7 @@ class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) # Validate that the netlink interface matches the ioctl interface. - self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) + self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) self.iproute.DeleteLink(_TEST_XFRM_IFNAME) with self.assertRaises(IOError): self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) @@ -545,7 +545,7 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): def tearDownClass(cls): # The sysctls are restored by MultinetworkBaseTest.tearDownClass. cls.SetInboundMarks(False) - for tunnel in cls.tunnelsV4.values() + cls.tunnelsV6.values(): + for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()): cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) cls._SetupTunnelNetwork(tunnel, False) tunnel.Teardown() @@ -553,7 +553,7 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): def randomTunnel(self, outer_version): version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 - return random.choice(version_dict.values()) + return random.choice(list(version_dict.values())) def setUp(self): multinetwork_base.MultiNetworkBaseTest.setUp(self) @@ -636,13 +636,13 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): def assertReceivedPacket(self, tunnel, sa_info): tunnel.rx += 1 - self.assertEquals((tunnel.rx, tunnel.tx), + self.assertEqual((tunnel.rx, tunnel.tx), self.iproute.GetRxTxPackets(tunnel.iface)) sa_info.seq_num += 1 def assertSentPacket(self, tunnel, sa_info): tunnel.tx += 1 - self.assertEquals((tunnel.rx, tunnel.tx), + self.assertEqual((tunnel.rx, tunnel.tx), self.iproute.GetRxTxPackets(tunnel.iface)) sa_info.seq_num += 1 @@ -664,8 +664,8 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): # Verify that the packet data and src are correct data, src = read_sock.recvfrom(4096) self.assertReceivedPacket(tunnel, sa_info) - self.assertEquals(net_test.UDP_PAYLOAD, data) - self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2]) + self.assertEqual(net_test.UDP_PAYLOAD, data) + self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, remote_inner, sa_info=None): @@ -701,12 +701,12 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): # Check outer header manually (Avoids having to overwrite outer header's # id, flags or flow label) self.assertSentPacket(tunnel, sa_info) - self.assertEquals(expected.src, pkt.src) - self.assertEquals(expected.dst, pkt.dst) - self.assertEquals(len(expected), len(pkt)) + self.assertEqual(expected.src, pkt.src) + self.assertEqual(expected.dst, pkt.dst) + self.assertEqual(len(expected), len(pkt)) # Check everything else - self.assertEquals(str(expected.payload), str(pkt.payload)) + self.assertEqual(str(expected.payload), str(pkt.payload)) def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, remote_inner): @@ -728,8 +728,8 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt)) # Check src/dst - self.assertEquals(tunnel.local, pkt.src) - self.assertEquals(tunnel.remote, pkt.dst) + self.assertEqual(tunnel.local, pkt.src) + self.assertEqual(tunnel.remote, pkt.dst) # Check that the interface statistics recorded the outbound packet self.assertSentPacket(tunnel, tunnel.out_sa) @@ -748,8 +748,8 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): # Verify that the packet data and src are correct data, src = read_sock.recvfrom(4096) - self.assertEquals(net_test.UDP_PAYLOAD, data) - self.assertEquals((local_inner, src_port), src[:2]) + self.assertEqual(net_test.UDP_PAYLOAD, data) + self.assertEqual((local_inner, src_port), src[:2]) # Check that the interface statistics recorded the inbound packet self.assertReceivedPacket(tunnel, tunnel.in_sa) @@ -784,10 +784,10 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): # Check that the packet too big reduced the MTU. routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) - self.assertEquals(1, len(routes)) + self.assertEqual(1, len(routes)) rtmsg, attributes = routes[0] - self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) - self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) + self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) + self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) # Clear PMTU information so that future tests don't have to worry about it. self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) |