summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNelson Li <nelsonli@google.com>2020-06-11 15:06:38 +0800
committerMaciej Żenczykowski <maze@google.com>2020-07-10 23:56:38 -0700
commitb7769d6474435c70b2be16aeb0ae4e268c840968 (patch)
treecde2e871628cbe51e466107719cd37e4886dca6a
parentacb2b4b0ebb7a82310208ebc93defa18a27770c3 (diff)
downloadtests-android-r-beta-3.tar.gz
net-test: be more compatible with python3android-r-beta-3android-r-beta-2
Conversions are primarily: print ... -> print(...) xrange(...) -> range(...) assertEquals(...) -> assertEqual(...) except t, e -> except t as e "\x00" -> b"\x00" iteritems() -> items() Plus: sprinkle in some list()'s around a few things and a few other minor things Test: tested on ACK5.4 with 32 and 64-bit x86 uml which proves this continues to work with the uml image's embedded python2 Bug: 158494111 Signed-off-by: Maciej Żenczykowski <maze@google.com> Change-Id: I89dffc575293c1768eacdc6f1bce2bee4095c590
-rwxr-xr-xnet/test/bpf_test.py32
-rw-r--r--net/test/cstruct.py25
-rwxr-xr-xnet/test/cstruct_test.py48
-rwxr-xr-xnet/test/forwarding_test.py4
-rwxr-xr-xnet/test/genetlink.py2
-rw-r--r--net/test/iproute.py8
-rwxr-xr-xnet/test/leak_test.py2
-rw-r--r--net/test/multinetwork_base.py8
-rwxr-xr-xnet/test/multinetwork_test.py78
-rw-r--r--net/test/namespace.py12
-rwxr-xr-xnet/test/neighbour_test.py16
-rwxr-xr-xnet/test/net_test.py4
-rw-r--r--net/test/netlink.py4
-rwxr-xr-xnet/test/pf_key.py12
-rwxr-xr-xnet/test/pf_key_test.py44
-rwxr-xr-xnet/test/ping6_test.py74
-rwxr-xr-xnet/test/resilient_rs_test.py4
-rwxr-xr-xnet/test/sock_diag.py12
-rwxr-xr-xnet/test/sock_diag_test.py66
-rwxr-xr-xnet/test/srcaddr_selection_test.py20
-rwxr-xr-xnet/test/tcp_fastopen_test.py2
-rwxr-xr-xnet/test/tcp_metrics.py2
-rwxr-xr-xnet/test/tcp_nuke_addr_test.py4
-rwxr-xr-xnet/test/tcp_repair_test.py26
-rw-r--r--net/test/tun_twister.py8
-rwxr-xr-xnet/test/xfrm.py8
-rwxr-xr-xnet/test/xfrm_algorithm_test.py18
-rw-r--r--net/test/xfrm_base.py12
-rwxr-xr-xnet/test/xfrm_test.py88
-rwxr-xr-xnet/test/xfrm_tunnel_test.py50
30 files changed, 348 insertions, 345 deletions
diff --git a/net/test/bpf_test.py b/net/test/bpf_test.py
index ea3e56b..693e1d4 100755
--- a/net/test/bpf_test.py
+++ b/net/test/bpf_test.py
@@ -49,10 +49,10 @@ def PrintMapInfo(map_fd):
try:
nextKey = GetNextKey(map_fd, key).value
value = LookupMap(map_fd, nextKey)
- print repr(nextKey) + " : " + repr(value.value)
+ print(repr(nextKey) + " : " + repr(value.value))
key = nextKey
except:
- print "no value"
+ print("no value")
break
@@ -67,7 +67,7 @@ def SocketUDPLoopBack(packet_count, version, prog_fd):
sock.bind((addr, 0))
addr = sock.getsockname()
sockaddr = csocket.Sockaddr(addr)
- for i in xrange(packet_count):
+ for i in range(packet_count):
sock.sendto("foo", addr)
data, retaddr = csocket.Recvfrom(sock, 4096, 0)
assert "foo" == data
@@ -170,7 +170,7 @@ class BpfTest(net_test.NetworkTest):
self.map_fd = CreateMap(BPF_MAP_TYPE_HASH, KEY_SIZE, VALUE_SIZE,
TOTAL_ENTRIES)
UpdateMap(self.map_fd, key, value)
- self.assertEquals(value, LookupMap(self.map_fd, key).value)
+ self.assertEqual(value, LookupMap(self.map_fd, key).value)
DeleteMap(self.map_fd, key)
self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, key)
@@ -185,17 +185,17 @@ class BpfTest(net_test.NetworkTest):
result = GetNextKey(self.map_fd, key)
key = result.value
self.assertGreaterEqual(key, 0)
- self.assertEquals(value, LookupMap(self.map_fd, key).value)
+ self.assertEqual(value, LookupMap(self.map_fd, key).value)
count += 1
def testIterateMap(self):
self.map_fd = CreateMap(BPF_MAP_TYPE_HASH, KEY_SIZE, VALUE_SIZE,
TOTAL_ENTRIES)
value = 1024
- for key in xrange(0, TOTAL_ENTRIES):
+ for key in range(0, TOTAL_ENTRIES):
UpdateMap(self.map_fd, key, value)
- for key in xrange(0, TOTAL_ENTRIES):
- self.assertEquals(value, LookupMap(self.map_fd, key).value)
+ for key in range(0, TOTAL_ENTRIES):
+ self.assertEqual(value, LookupMap(self.map_fd, key).value)
self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, 101)
nonexistent_key = -1
self.CheckAllMapEntry(nonexistent_key, TOTAL_ENTRIES, value)
@@ -204,7 +204,7 @@ class BpfTest(net_test.NetworkTest):
self.map_fd = CreateMap(BPF_MAP_TYPE_HASH, KEY_SIZE, VALUE_SIZE,
TOTAL_ENTRIES)
value = 1024
- for key in xrange(0, TOTAL_ENTRIES):
+ for key in range(0, TOTAL_ENTRIES):
UpdateMap(self.map_fd, key, value)
firstKey = GetFirstKey(self.map_fd)
key = firstKey.value
@@ -261,7 +261,7 @@ class BpfTest(net_test.NetworkTest):
packet_count = 10
SocketUDPLoopBack(packet_count, 4, self.prog_fd)
SocketUDPLoopBack(packet_count, 6, self.prog_fd)
- self.assertEquals(packet_count * 2, LookupMap(self.map_fd, key).value)
+ self.assertEqual(packet_count * 2, LookupMap(self.map_fd, key).value)
@unittest.skipUnless(HAVE_EBPF_ACCOUNTING,
"BPF helper function is not fully supported")
@@ -282,7 +282,7 @@ class BpfTest(net_test.NetworkTest):
def PacketCountByCookie(version):
self.sock = SocketUDPLoopBack(packet_count, version, self.prog_fd)
cookie = sock_diag.SockDiag.GetSocketCookie(self.sock)
- self.assertEquals(packet_count, LookupMap(self.map_fd, cookie).value)
+ self.assertEqual(packet_count, LookupMap(self.map_fd, cookie).value)
self.sock.close()
PacketCountByCookie(4)
PacketCountByCookie(6)
@@ -307,10 +307,10 @@ class BpfTest(net_test.NetworkTest):
with net_test.RunAsUid(uid):
self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, uid)
SocketUDPLoopBack(packet_count, 4, self.prog_fd)
- self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value)
+ self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value)
DeleteMap(self.map_fd, uid);
SocketUDPLoopBack(packet_count, 6, self.prog_fd)
- self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value)
+ self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value)
@unittest.skipUnless(HAVE_EBPF_ACCOUNTING,
"Cgroup BPF is not fully supported")
@@ -397,17 +397,17 @@ class BpfCgroupTest(net_test.NetworkTest):
with net_test.RunAsUid(uid):
self.assertRaisesErrno(errno.ENOENT, LookupMap, self.map_fd, uid)
SocketUDPLoopBack(packet_count, 4, None)
- self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value)
+ self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value)
DeleteMap(self.map_fd, uid)
SocketUDPLoopBack(packet_count, 6, None)
- self.assertEquals(packet_count, LookupMap(self.map_fd, uid).value)
+ self.assertEqual(packet_count, LookupMap(self.map_fd, uid).value)
BpfProgDetach(self._cg_fd, BPF_CGROUP_INET_INGRESS)
def checkSocketCreate(self, family, socktype, success):
try:
sock = socket.socket(family, socktype, 0)
sock.close()
- except socket.error, e:
+ except socket.error as e:
if success:
self.fail("Failed to create socket family=%d type=%d err=%s" %
(family, socktype, os.strerror(e.errno)))
diff --git a/net/test/cstruct.py b/net/test/cstruct.py
index 5e05263..c675c9e 100644
--- a/net/test/cstruct.py
+++ b/net/test/cstruct.py
@@ -28,24 +28,24 @@ Example usage:
>>> # Create instances from a tuple of values, raw bytes, zero-initialized, or
>>> # using keywords.
... n1 = NLMsgHdr((44, 32, 0x2, 0, 491))
->>> print n1
+>>> print(n1)
NLMsgHdr(length=44, type=32, flags=2, seq=0, pid=491)
>>>
>>> n2 = NLMsgHdr("\x2c\x00\x00\x00\x21\x00\x02\x00"
... "\x00\x00\x00\x00\xfe\x01\x00\x00" + "junk at end")
->>> print n2
+>>> print(n2)
NLMsgHdr(length=44, type=33, flags=2, seq=0, pid=510)
>>>
>>> n3 = netlink.NLMsgHdr() # Zero-initialized
->>> print n3
+>>> print(n3)
NLMsgHdr(length=0, type=0, flags=0, seq=0, pid=0)
>>>
>>> n4 = netlink.NLMsgHdr(length=44, type=33) # Other fields zero-initialized
->>> print n4
+>>> print(n4)
NLMsgHdr(length=44, type=33, flags=0, seq=0, pid=0)
>>>
>>> # Serialize to raw bytes.
-... print n1.Pack().encode("hex")
+... print(n1.Pack().encode("hex"))
2c0000002000020000000000eb010000
>>>
>>> # Parse the beginning of a byte stream as a struct, and return the struct
@@ -70,11 +70,14 @@ NLMsgHdr(length=44, type=33, flags=0, seq=0, pid=0)
import ctypes
import string
import struct
+import re
def CalcSize(fmt):
if "A" in fmt:
fmt = fmt.replace("A", "s")
+ # Remove the last digital since it will cause error in python3.
+ fmt = (re.split('\d+$', fmt)[0])
return struct.calcsize(fmt)
def CalcNumElements(fmt):
@@ -82,7 +85,7 @@ def CalcNumElements(fmt):
fmt = fmt.replace("S", "")
numstructs = prevlen - len(fmt)
size = CalcSize(fmt)
- elements = struct.unpack(fmt, "\x00" * size)
+ elements = struct.unpack(fmt, b"\x00" * size)
return len(elements) + numstructs
@@ -118,7 +121,7 @@ def Struct(name, fmt, fieldnames, substructs={}):
# where XX is the length of the struct type's packed representation.
_format = ""
laststructindex = 0
- for i in xrange(len(fmt)):
+ for i in range(len(fmt)):
if fmt[i] == "S":
# Nested struct. Record the index in our struct it should go into.
index = CalcNumElements(fmt[:i])
@@ -138,17 +141,17 @@ def Struct(name, fmt, fieldnames, substructs={}):
offset_list = [0]
last_offset = 0
- for i in xrange(len(_format)):
+ for i in range(len(_format)):
offset = CalcSize(_format[:i])
if offset > last_offset:
last_offset = offset
offset_list.append(offset)
# A dictionary that maps field names to their offsets in the struct.
- _offsets = dict(zip(_fieldnames, offset_list))
+ _offsets = dict(list(zip(_fieldnames, offset_list)))
# Check that the number of field names matches the number of fields.
- numfields = len(struct.unpack(_format, "\x00" * _length))
+ numfields = len(struct.unpack(_format, b"\x00" * _length))
if len(_fieldnames) != numfields:
raise ValueError("Invalid cstruct: \"%s\" has %d elements, \"%s\" has %d."
% (fmt, numfields, fieldnames, len(_fieldnames)))
@@ -182,7 +185,7 @@ def Struct(name, fmt, fieldnames, substructs={}):
# Default construct from null bytes.
self._Parse("\x00" * len(self))
# If any keywords were supplied, set those fields.
- for k, v in kwargs.iteritems():
+ for k, v in kwargs.items():
setattr(self, k, v)
elif isinstance(tuple_or_bytes, str):
# Initializing from a string.
diff --git a/net/test/cstruct_test.py b/net/test/cstruct_test.py
index 6b27973..af1fc42 100755
--- a/net/test/cstruct_test.py
+++ b/net/test/cstruct_test.py
@@ -27,16 +27,16 @@ TestStructB = cstruct.Struct("TestStructB", "=BI", "byte1 int2")
class CstructTest(unittest.TestCase):
def CheckEquals(self, a, b):
- self.assertEquals(a, b)
- self.assertEquals(b, a)
+ self.assertEqual(a, b)
+ self.assertEqual(b, a)
assert a == b
assert b == a
assert not (a != b) # pylint: disable=g-comparison-negation,superfluous-parens
assert not (b != a) # pylint: disable=g-comparison-negation,superfluous-parens
def CheckNotEquals(self, a, b):
- self.assertNotEquals(a, b)
- self.assertNotEquals(b, a)
+ self.assertNotEqual(a, b)
+ self.assertNotEqual(b, a)
assert a != b
assert b != a
assert not (a == b) # pylint: disable=g-comparison-negation,superfluous-parens
@@ -69,9 +69,9 @@ class CstructTest(unittest.TestCase):
expectedlen = (len(TestStructA) +
2 + len(TestStructA) + len(TestStructB) + 4 +
1)
- self.assertEquals(expectedlen, len(DoubleNested))
+ self.assertEqual(expectedlen, len(DoubleNested))
- self.assertEquals(7, d.nest2.nest3.byte1)
+ self.assertEqual(7, d.nest2.nest3.byte1)
d.byte3 = 252
d.nest2.word1 = 33214
@@ -80,17 +80,17 @@ class CstructTest(unittest.TestCase):
t = n.nest3
t.int2 = 33627591
- self.assertEquals(33627591, d.nest2.nest3.int2)
+ self.assertEqual(33627591, d.nest2.nest3.int2)
expected = (
"DoubleNested(nest1=TestStructA(byte1=1, int2=2),"
" nest2=Nested(word1=33214, nest2=TestStructA(byte1=3, int2=4),"
" nest3=TestStructB(byte1=7, int2=33627591), int4=-55), byte3=252)")
- self.assertEquals(expected, str(d))
+ self.assertEqual(expected, str(d))
expected = ("01" "02000000"
"81be" "03" "04000000"
"07" "c71d0102" "ffffffc9" "fc").decode("hex")
- self.assertEquals(expected, d.Pack())
+ self.assertEqual(expected, d.Pack())
unpacked = DoubleNested(expected)
self.CheckEquals(unpacked, d)
@@ -102,24 +102,24 @@ class CstructTest(unittest.TestCase):
t = TestStruct((2, nullstr, 12345, nullstr, 33210))
expected = ("TestStruct(byte1=2, string2=68656c6c6f0000000000000000000000,"
" int3=12345, ascii4=hello, word5=33210)")
- self.assertEquals(expected, str(t))
+ self.assertEqual(expected, str(t))
embeddednull = "hello\x00visible123"
t = TestStruct((2, embeddednull, 12345, embeddednull, 33210))
expected = ("TestStruct(byte1=2, string2=68656c6c6f0076697369626c65313233,"
" int3=12345, ascii4=hello\x00visible123, word5=33210)")
- self.assertEquals(expected, str(t))
+ self.assertEqual(expected, str(t))
def testZeroInitialization(self):
TestStruct = cstruct.Struct("TestStruct", "B16si16AH",
"byte1 string2 int3 ascii4 word5")
t = TestStruct()
- self.assertEquals(0, t.byte1)
- self.assertEquals("\x00" * 16, t.string2)
- self.assertEquals(0, t.int3)
- self.assertEquals("\x00" * 16, t.ascii4)
- self.assertEquals(0, t.word5)
- self.assertEquals("\x00" * len(TestStruct), t.Pack())
+ self.assertEqual(0, t.byte1)
+ self.assertEqual("\x00" * 16, t.string2)
+ self.assertEqual(0, t.int3)
+ self.assertEqual("\x00" * 16, t.ascii4)
+ self.assertEqual(0, t.word5)
+ self.assertEqual("\x00" * len(TestStruct), t.Pack())
def testKeywordInitialization(self):
TestStruct = cstruct.Struct("TestStruct", "=B16sIH",
@@ -130,26 +130,26 @@ class CstructTest(unittest.TestCase):
# Populate all fields
t1 = TestStruct(byte1=1, string2=text, int3=0xFEDCBA98, word4=0x1234)
expected = ("01" + text_bytes + "98BADCFE" "3412").decode("hex")
- self.assertEquals(expected, t1.Pack())
+ self.assertEqual(expected, t1.Pack())
# Partially populated
t1 = TestStruct(string2=text, word4=0x1234)
expected = ("00" + text_bytes + "00000000" "3412").decode("hex")
- self.assertEquals(expected, t1.Pack())
+ self.assertEqual(expected, t1.Pack())
def testCstructOffset(self):
TestStruct = cstruct.Struct("TestStruct", "B16si16AH",
"byte1 string2 int3 ascii4 word5")
nullstr = "hello" + (16 - len("hello")) * "\x00"
t = TestStruct((2, nullstr, 12345, nullstr, 33210))
- self.assertEquals(0, t.offset("byte1"))
- self.assertEquals(1, t.offset("string2")) # sizeof(byte)
- self.assertEquals(17, t.offset("int3")) # sizeof(byte) + 16*sizeof(char)
+ self.assertEqual(0, t.offset("byte1"))
+ self.assertEqual(1, t.offset("string2")) # sizeof(byte)
+ self.assertEqual(17, t.offset("int3")) # sizeof(byte) + 16*sizeof(char)
# The integer is automatically padded by the struct module
# to match native alignment.
# offset = sizeof(byte) + 16*sizeof(char) + padding + sizeof(int)
- self.assertEquals(24, t.offset("ascii4"))
- self.assertEquals(40, t.offset("word5"))
+ self.assertEqual(24, t.offset("ascii4"))
+ self.assertEqual(40, t.offset("word5"))
self.assertRaises(KeyError, t.offset, "random")
# TODO: Add support for nested struct offset
diff --git a/net/test/forwarding_test.py b/net/test/forwarding_test.py
index 34394cd..b35e19f 100755
--- a/net/test/forwarding_test.py
+++ b/net/test/forwarding_test.py
@@ -126,8 +126,8 @@ class ForwardingTest(multinetwork_base.MultiNetworkBaseTest):
mydst = "%s:%04X" % (net_test.FormatSockStatAddress(remoteaddr), remoteport)
state = None
sockets = [s for s in sockets if s[0] == mysrc and s[1] == mydst]
- self.assertEquals(1, len(sockets))
- self.assertEquals("%02X" % self.TCP_TIME_WAIT, sockets[0][2])
+ self.assertEqual(1, len(sockets))
+ self.assertEqual("%02X" % self.TCP_TIME_WAIT, sockets[0][2])
# Remove our IP address.
try:
diff --git a/net/test/genetlink.py b/net/test/genetlink.py
index dda3964..6928f07 100755
--- a/net/test/genetlink.py
+++ b/net/test/genetlink.py
@@ -120,4 +120,4 @@ class GenericNetlinkControl(GenericNetlink):
if __name__ == "__main__":
g = GenericNetlinkControl()
- print g.GetFamily("tcp_metrics")
+ print(g.GetFamily("tcp_metrics"))
diff --git a/net/test/iproute.py b/net/test/iproute.py
index 470cbf1..9036246 100644
--- a/net/test/iproute.py
+++ b/net/test/iproute.py
@@ -408,7 +408,7 @@ class IPRoute(netlink.NetlinkSocket):
while True:
try:
self._SendNlRequest(RTM_DELRULE, rtmsg)
- except IOError, e:
+ except IOError as e:
if e.errno == errno.ENOENT:
break
else:
@@ -459,7 +459,7 @@ class IPRoute(netlink.NetlinkSocket):
subject = CommandSubject(command)
if "ALL" not in self.NL_DEBUG and subject not in self.NL_DEBUG:
return
- print self.CommandToString(command, data)
+ print(self.CommandToString(command, data))
def MaybeDebugMessage(self, message):
hdr = netlink.NLMsgHdr(message)
@@ -467,7 +467,7 @@ class IPRoute(netlink.NetlinkSocket):
def PrintMessage(self, message):
hdr = netlink.NLMsgHdr(message)
- print self.CommandToString(hdr.type, message)
+ print(self.CommandToString(hdr.type, message))
def DumpRules(self, version):
"""Returns the IP rules for the specified IP version."""
@@ -774,4 +774,4 @@ if __name__ == "__main__":
iproute.DEBUG = True
iproute.DumpRules(6)
iproute.DumpLinks()
- print iproute.GetRoutes("2001:4860:4860::8888", 0, 0, None)
+ print(iproute.GetRoutes("2001:4860:4860::8888", 0, 0, None))
diff --git a/net/test/leak_test.py b/net/test/leak_test.py
index 8a42611..a245817 100755
--- a/net/test/leak_test.py
+++ b/net/test/leak_test.py
@@ -63,7 +63,7 @@ class ForceSocketBufferOptionTest(net_test.NetworkTest):
val = 4097
self.assertGreater(2 * val, minbuf)
s.setsockopt(SOL_SOCKET, force_option, val)
- self.assertEquals(2 * val, s.getsockopt(SOL_SOCKET, option))
+ self.assertEqual(2 * val, s.getsockopt(SOL_SOCKET, option))
# Check that the force option sets at least the minimum value instead
# of a negative value on integer overflow. Because the kernel multiplies
diff --git a/net/test/multinetwork_base.py b/net/test/multinetwork_base.py
index 8dbd360..6b79d4f 100644
--- a/net/test/multinetwork_base.py
+++ b/net/test/multinetwork_base.py
@@ -366,7 +366,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest):
@classmethod
def _RestoreSysctls(cls):
- for sysctl, value in cls.saved_sysctls.iteritems():
+ for sysctl, value in cls.saved_sysctls.items():
try:
open(sysctl, "w").write(value)
except IOError:
@@ -558,7 +558,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest):
# MAC address has 1 in the least-significant bit.
if include_multicast or not int(ether.dst.split(":")[0], 16) & 0x1:
packets.append(ether.payload)
- except OSError, e:
+ except OSError as e:
# EAGAIN means there are no more packets waiting.
if re.match(e.message, os.strerror(errno.EAGAIN)):
# If we didn't see any packets, try again for good luck.
@@ -669,7 +669,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest):
# repr() can be expensive. Call it only if the test is going to fail and we
# want to see the error.
if expected_real != actual_real:
- self.assertEquals(repr(expected_real), repr(actual_real))
+ self.assertEqual(repr(expected_real), repr(actual_real))
def PacketMatches(self, expected, actual):
try:
@@ -710,7 +710,7 @@ class MultiNetworkBaseTest(net_test.NetworkTest):
# expected, but this is good enough for now.
try:
self.assertPacketMatches(expected, packets[-1])
- except Exception, e:
+ except Exception as e:
raise UnexpectedPacketError(
"%s: diff with last packet:\n%s" % (msg, e.message))
diff --git a/net/test/multinetwork_test.py b/net/test/multinetwork_test.py
index a0b464a..092736b 100755
--- a/net/test/multinetwork_test.py
+++ b/net/test/multinetwork_test.py
@@ -134,7 +134,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest):
self.ExpectPacketOn(netid, msg, expected)
def CheckOutgoingPackets(self, routing_mode):
- for _ in xrange(self.ITERATIONS):
+ for _ in range(self.ITERATIONS):
for netid in self.tuns:
self.CheckPingPacket(4, netid, routing_mode, self.IPV4_PING)
@@ -194,7 +194,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest):
# If we're testing connected sockets, connect the socket on the first
# netid now.
if use_connect:
- netid = self.tuns.keys()[0]
+ netid = list(self.tuns.keys())[0]
self.SelectInterface(s, netid, mode)
s.connect((dstaddr, 53))
expected.src = self.MyAddress(version, netid)
@@ -270,7 +270,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest):
self.CheckRemarking(6, True)
def testIPv6StickyPktinfo(self):
- for _ in xrange(self.ITERATIONS):
+ for _ in range(self.ITERATIONS):
for netid in self.tuns:
s = net_test.UDPSocket(AF_INET6)
@@ -312,7 +312,7 @@ class OutgoingTest(multinetwork_base.MultiNetworkBaseTest):
self.ExpectPacketOn(netid, msg, expected)
def CheckPktinfoRouting(self, version):
- for _ in xrange(self.ITERATIONS):
+ for _ in range(self.ITERATIONS):
for netid in self.tuns:
family = self.GetProtocolFamily(version)
s = net_test.UDPSocket(family)
@@ -478,11 +478,11 @@ class TCPAcceptTest(multinetwork_base.InboundMarkingTest):
self.InvalidateDstCache(version, netid)
if mode == self.MODE_INCOMING_MARK:
- self.assertEquals(netid, mark & self.NETID_FWMASK,
+ self.assertEqual(netid, mark & self.NETID_FWMASK,
msg + ": Accepted socket: Expected mark %d, got %d" % (
netid, mark))
elif mode != self.MODE_EXPLICIT_MARK:
- self.assertEquals(0, self.GetSocketMark(listensocket))
+ self.assertEqual(0, self.GetSocketMark(listensocket))
# Check the FIN was sent on the right interface, and ack it. We don't expect
# this to fail because by the time the connection is established things are
@@ -670,14 +670,14 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest):
table, prefix, plen, router, ifindex, None, None)
def testSetAcceptRaRtInfoMinPlen(self):
- for plen in xrange(-1, 130):
+ for plen in range(-1, 130):
self.SetAcceptRaRtInfoMinPlen(plen)
- self.assertEquals(plen, self.GetAcceptRaRtInfoMinPlen())
+ self.assertEqual(plen, self.GetAcceptRaRtInfoMinPlen())
def testSetAcceptRaRtInfoMaxPlen(self):
- for plen in xrange(-1, 130):
+ for plen in range(-1, 130):
self.SetAcceptRaRtInfoMaxPlen(plen)
- self.assertEquals(plen, self.GetAcceptRaRtInfoMaxPlen())
+ self.assertEqual(plen, self.GetAcceptRaRtInfoMaxPlen())
def testZeroRtLifetime(self):
PREFIX = "2001:db8:8901:2300::"
@@ -700,7 +700,7 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest):
RTLIFETIME = 70372
PRF = 0
# sweep from high to low to avoid spurious failures from late arrivals.
- for plen in xrange(130, 1, -1):
+ for plen in range(130, 1, -1):
self.SetAcceptRaRtInfoMinPlen(plen)
# RIO with plen < min_plen should be ignored
self.SendRIO(RTLIFETIME, plen - 1, PREFIX, PRF)
@@ -715,7 +715,7 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest):
RTLIFETIME = 73078
PRF = 0
# sweep from low to high to avoid spurious failures from late arrivals.
- for plen in xrange(-1, 128, 1):
+ for plen in range(-1, 128, 1):
self.SetAcceptRaRtInfoMaxPlen(plen)
# RIO with plen > max_plen should be ignored
self.SendRIO(RTLIFETIME, plen + 1, PREFIX, PRF)
@@ -782,16 +782,16 @@ class RIOTest(multinetwork_base.MultiNetworkBaseTest):
baseline = self.CountRoutes()
self.SetAcceptRaRtInfoMaxPlen(56)
# Send many RIOs compared to the expected number on a healthy system.
- for i in xrange(0, COUNT):
+ for i in range(0, COUNT):
prefix = "2001:db8:%x:1100::" % i
self.SendRIO(RTLIFETIME, PLEN, prefix, PRF)
time.sleep(0.1)
- self.assertEquals(COUNT + baseline, self.CountRoutes())
- for i in xrange(0, COUNT):
+ self.assertEqual(COUNT + baseline, self.CountRoutes())
+ for i in range(0, COUNT):
prefix = "2001:db8:%x:1100::" % i
self.DelRA6(prefix, PLEN)
# Expect that we can return to baseline config without lingering routes.
- self.assertEquals(baseline, self.CountRoutes())
+ self.assertEqual(baseline, self.CountRoutes())
class RATest(multinetwork_base.MultiNetworkBaseTest):
@@ -874,7 +874,7 @@ class RATest(multinetwork_base.MultiNetworkBaseTest):
return len(open("/proc/net/ipv6_route").readlines())
num_routes = GetNumRoutes()
- for i in xrange(10, 20):
+ for i in range(10, 20):
try:
self.tuns[i] = self.CreateTunInterface(i)
self.SendRA(i)
@@ -908,23 +908,23 @@ class RATest(multinetwork_base.MultiNetworkBaseTest):
csocket.SetSocketTimeout(s.sock, 100)
try:
data = s._Recv()
- except IOError, e:
+ except IOError as e:
self.fail("Should have received an RTM_NEWNDUSEROPT message. "
"Please ensure the kernel supports receiving the "
"PREF64 RA option. Error: %s" % e)
# Check that the message is received correctly.
nlmsghdr, data = cstruct.Read(data, netlink.NLMsgHdr)
- self.assertEquals(iproute.RTM_NEWNDUSEROPT, nlmsghdr.type)
+ self.assertEqual(iproute.RTM_NEWNDUSEROPT, nlmsghdr.type)
# Check the option contents.
ndopthdr, data = cstruct.Read(data, iproute.NdUseroptMsg)
- self.assertEquals(AF_INET6, ndopthdr.family)
- self.assertEquals(self.ND_ROUTER_ADVERT, ndopthdr.icmp_type)
- self.assertEquals(len(opt), ndopthdr.opts_len)
+ self.assertEqual(AF_INET6, ndopthdr.family)
+ self.assertEqual(self.ND_ROUTER_ADVERT, ndopthdr.icmp_type)
+ self.assertEqual(len(opt), ndopthdr.opts_len)
actual_opt = self.Pref64Option(data)
- self.assertEquals(opt, actual_opt)
+ self.assertEqual(opt, actual_opt)
@@ -986,7 +986,7 @@ class PMTUTest(multinetwork_base.InboundMarkingTest):
# Send a packet and receive a packet too big.
SendBigPacket(version, s, dstaddr, netid, payload)
received = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(received),
+ self.assertEqual(1, len(received),
"unexpected packets: %s" % received[1:])
_, toobig = packets.ICMPPacketTooBig(version, intermediate, srcaddr,
received[0])
@@ -1000,7 +1000,7 @@ class PMTUTest(multinetwork_base.InboundMarkingTest):
# If this is a connected socket, make sure the socket MTU was set.
# Note that in IPv4 this only started working in Linux 3.6!
if use_connect and (version == 6 or net_test.LINUX_VERSION >= (3, 6)):
- self.assertEquals(packets.PTB_MTU, self.GetSocketMTU(version, s))
+ self.assertEqual(packets.PTB_MTU, self.GetSocketMTU(version, s))
s.close()
@@ -1010,16 +1010,16 @@ class PMTUTest(multinetwork_base.InboundMarkingTest):
# here we use a mark for simplicity.
s2 = self.BuildSocket(version, net_test.UDPSocket, netid, "mark")
s2.connect((dstaddr, 1234))
- self.assertEquals(packets.PTB_MTU, self.GetSocketMTU(version, s2))
+ self.assertEqual(packets.PTB_MTU, self.GetSocketMTU(version, s2))
# Also check the MTU reported by ip route get, this time using the oif.
routes = self.iproute.GetRoutes(dstaddr, self.ifindices[netid], 0, None)
self.assertTrue(routes)
route = routes[0]
rtmsg, attributes = route
- self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
+ self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
metrics = attributes["RTA_METRICS"]
- self.assertEquals(packets.PTB_MTU, metrics["RTAX_MTU"])
+ self.assertEqual(packets.PTB_MTU, metrics["RTAX_MTU"])
def testIPv4BasicPMTU(self):
"""Tests IPv4 path MTU discovery.
@@ -1152,11 +1152,11 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest):
rules = self.GetRulesAtPriority(version, priority)
self.assertTrue(rules)
_, attributes = rules[-1]
- self.assertEquals(priority, attributes["FRA_PRIORITY"])
+ self.assertEqual(priority, attributes["FRA_PRIORITY"])
uidrange = attributes["FRA_UID_RANGE"]
- self.assertEquals(start, uidrange.start)
- self.assertEquals(end, uidrange.end)
- self.assertEquals(table, attributes["FRA_TABLE"])
+ self.assertEqual(start, uidrange.start)
+ self.assertEqual(end, uidrange.end)
+ self.assertEqual(table, attributes["FRA_TABLE"])
finally:
self.iproute.UidRangeRule(version, False, start, end, table, priority)
self.assertRaisesErrno(
@@ -1223,15 +1223,15 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest):
try:
routes = self.iproute.GetRoutes(addr, oif, mark, uid)
rtmsg, _ = routes[0]
- self.assertEquals(iproute.RTN_UNREACHABLE, rtmsg.type)
- except IOError, e:
+ self.assertEqual(iproute.RTN_UNREACHABLE, rtmsg.type)
+ except IOError as e:
if int(e.errno) != int(errno.ENETUNREACH):
raise e
def ExpectRoute(self, addr, oif, mark, uid):
routes = self.iproute.GetRoutes(addr, oif, mark, uid)
rtmsg, _ = routes[0]
- self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
+ self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
def CheckGetRoute(self, version, addr):
self.ExpectNoRoute(addr, 0, 0, 0)
@@ -1257,7 +1257,7 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest):
self.assertRaisesErrno(errno.ENETUNREACH,
s.sendto, "foo", (remoteaddr, 53))
def CheckSendSucceeds():
- self.assertEquals(len("foo"), s.sendto("foo", (remoteaddr, 53)))
+ self.assertEqual(len("foo"), s.sendto("foo", (remoteaddr, 53)))
CheckSendFails()
self.iproute.UidRangeRule(6, True, uid, uid, table, self.PRIORITY_UID)
@@ -1269,7 +1269,7 @@ class UidRoutingTest(multinetwork_base.MultiNetworkBaseTest):
CheckSendSucceeds()
os.fchown(s.fileno(), -1, 12345)
CheckSendSucceeds()
- os.fchmod(s.fileno(), 0777)
+ os.fchmod(s.fileno(), 0o777)
CheckSendSucceeds()
os.fchown(s.fileno(), 0, -1)
CheckSendFails()
@@ -1306,8 +1306,8 @@ class RulesTest(net_test.NetworkTest):
# Check that the rule pointing at table 301 is still around.
attributes = [a for _, a in self.iproute.DumpRules(version)
if a.get("FRA_PRIORITY", 0) == self.RULE_PRIORITY]
- self.assertEquals(1, len(attributes))
- self.assertEquals(301, attributes[0]["FRA_TABLE"])
+ self.assertEqual(1, len(attributes))
+ self.assertEqual(301, attributes[0]["FRA_TABLE"])
if __name__ == "__main__":
diff --git a/net/test/namespace.py b/net/test/namespace.py
index 85db654..c7da82f 100644
--- a/net/test/namespace.py
+++ b/net/test/namespace.py
@@ -112,10 +112,10 @@ def UnShare(flags):
def DumpMounts(hdr):
- print
- print hdr
+ print()
+ print(hdr)
print open('/proc/mounts', 'r').read(),
- print '---'
+ print('---')
# Requires at least kernel configuration options:
@@ -130,7 +130,7 @@ def IfPossibleEnterNewNetworkNamespace():
try:
UnShare(CLONE_NEWNS | CLONE_NEWUTS | CLONE_NEWNET)
except OSError as err:
- print 'failed: %s (likely: no privs or lack of kernel support).' % err
+ print('failed: %s (likely: no privs or lack of kernel support).' % err)
return False
try:
@@ -143,11 +143,11 @@ def IfPossibleEnterNewNetworkNamespace():
SetFileContents('/proc/sys/net/ipv4/ping_group_range', '0 2147483647')
net_test.SetInterfaceUp('lo')
except:
- print 'failed.'
+ print('failed.')
# We've already transitioned into the new netns -- it's too late to recover.
raise
- print 'succeeded.'
+ print('succeeded.')
return True
diff --git a/net/test/neighbour_test.py b/net/test/neighbour_test.py
index 2cb5c23..8cea6da 100755
--- a/net/test/neighbour_test.py
+++ b/net/test/neighbour_test.py
@@ -93,7 +93,7 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
self.sock.bind((0, RTMGRP_NEIGH))
net_test.SetNonBlocking(self.sock)
- self.netid = random.choice(self.tuns.keys())
+ self.netid = random.choice(list(self.tuns.keys()))
self.ifindex = self.ifindices[self.netid]
# MultinetworkBaseTest always uses NUD_PERMANENT for router ARP entries.
@@ -144,19 +144,19 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)
def assertNeighbourState(self, state, addr):
- self.assertEquals(state, self.GetNdEntry(addr)[0].state)
+ self.assertEqual(state, self.GetNdEntry(addr)[0].state)
def assertNeighbourAttr(self, addr, name, value):
- self.assertEquals(value, self.GetNdEntry(addr)[1][name])
+ self.assertEqual(value, self.GetNdEntry(addr)[1][name])
def ExpectNeighbourNotification(self, addr, state, attrs=None):
msg = self.sock.recv(4096)
msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
- self.assertEquals(addr, actual_attrs["NDA_DST"])
- self.assertEquals(state, msg.state)
+ self.assertEqual(addr, actual_attrs["NDA_DST"])
+ self.assertEqual(state, msg.state)
if attrs:
for name in attrs:
- self.assertEquals(attrs[name], actual_attrs[name])
+ self.assertEqual(attrs[name], actual_attrs[name])
def ExpectProbe(self, is_unicast, addr):
version = csocket.AddressVersion(addr)
@@ -225,7 +225,7 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
sleep_ms = min(100, interval - slept)
time.sleep(sleep_ms / 1000.0)
slept += sleep_ms
- print self.GetNdEntry(addr)
+ print(self.GetNdEntry(addr))
def MonitorSleep(self, intervalseconds, addr):
self.MonitorSleepMs(intervalseconds * 1000, addr)
@@ -319,7 +319,7 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
self.assertNeighbourState(NUD_REACHABLE, addr)
self.ExpectNeighbourNotification(addr, NUD_REACHABLE)
- for _ in xrange(5):
+ for _ in range(5):
ForceProbe(router6, routermac)
def testIsRouterFlag(self):
diff --git a/net/test/net_test.py b/net/test/net_test.py
index 458cedf..1c9c1c4 100755
--- a/net/test/net_test.py
+++ b/net/test/net_test.py
@@ -250,7 +250,7 @@ def CanonicalizeIPv6Address(addr):
def FormatProcAddress(unformatted):
groups = []
- for i in xrange(0, len(unformatted), 4):
+ for i in range(0, len(unformatted), 4):
groups.append(unformatted[i:i+4])
formatted = ":".join(groups)
# Compress the address.
@@ -265,7 +265,7 @@ def FormatSockStatAddress(address):
family = AF_INET
binary = inet_pton(family, address)
out = ""
- for i in xrange(0, len(binary), 4):
+ for i in range(0, len(binary), 4):
out += "%08X" % struct.unpack("=L", binary[i:i+4])
return out
diff --git a/net/test/netlink.py b/net/test/netlink.py
index 4e230d4..2c9c757 100644
--- a/net/test/netlink.py
+++ b/net/test/netlink.py
@@ -67,7 +67,7 @@ class NetlinkSocket(object):
def _Debug(self, s):
if self.DEBUG:
- print s
+ print(s)
def _NlAttr(self, nla_type, data):
datalen = len(data)
@@ -212,7 +212,7 @@ class NetlinkSocket(object):
self._Debug(" %s" % nlmsghdr)
if nlmsghdr.type == NLMSG_ERROR or nlmsghdr.type == NLMSG_DONE:
- print "done"
+ print("done")
return (None, None), data
nlmsg, data = cstruct.Read(data, msgtype)
diff --git a/net/test/pf_key.py b/net/test/pf_key.py
index 875e01c..47d434b 100755
--- a/net/test/pf_key.py
+++ b/net/test/pf_key.py
@@ -202,7 +202,7 @@ class PfKey(object):
def Recv(self):
reply = self.sock.recv(4096)
msg = SadbMsg(reply)
- # print "RECV:", self.DecodeSadbMsg(msg)
+ # print("RECV:", self.DecodeSadbMsg(msg))
if msg.errno != 0:
raise OSError(msg.errno, os.strerror(msg.errno))
return reply
@@ -213,7 +213,7 @@ class PfKey(object):
msg.pid = os.getpid()
msg.len = (len(SadbMsg) + len(extensions)) / 8
self.sock.send(msg.Pack() + extensions)
- # print "SEND:", self.DecodeSadbMsg(msg)
+ # print("SEND:", self.DecodeSadbMsg(msg))
return self.Recv()
def PackPfKeyExtensions(self, extlist):
@@ -314,13 +314,13 @@ class PfKey(object):
def PrintSaInfos(self, dump):
for msg, extensions in dump:
- print self.DecodeSadbMsg(msg)
+ print(self.DecodeSadbMsg(msg))
for exttype, ext, attrs in extensions:
exttype = _GetMultiConstantName(exttype, ["SADB_EXT", "SADB_X_EXT"])
if exttype == SADB_EXT_SA:
- print " ", exttype, self.DecodeSadbSa(ext), attrs.encode("hex")
- print " ", exttype, ext, attrs.encode("hex")
- print
+ print(" ", exttype, self.DecodeSadbSa(ext), attrs.encode("hex"))
+ print(" ", exttype, ext, attrs.encode("hex"))
+ print()
if __name__ == "__main__":
diff --git a/net/test/pf_key_test.py b/net/test/pf_key_test.py
index e58947c..317ec7e 100755
--- a/net/test/pf_key_test.py
+++ b/net/test/pf_key_test.py
@@ -49,26 +49,26 @@ class PfKeyTest(unittest.TestCase):
pf_key.SADB_X_AALG_SHA2_256HMAC, ENCRYPTION_KEY)
sainfos = self.xfrm.DumpSaInfo()
- self.assertEquals(2, len(sainfos))
+ self.assertEqual(2, len(sainfos))
state4, attrs4 = [(s, a) for s, a in sainfos if s.family == AF_INET][0]
state6, attrs6 = [(s, a) for s, a in sainfos if s.family == AF_INET6][0]
pfkey_sainfos = self.pf_key.DumpSaInfo()
- self.assertEquals(2, len(pfkey_sainfos))
+ self.assertEqual(2, len(pfkey_sainfos))
self.assertTrue(all(msg.satype == pf_key.SDB_TYPE_ESP)
for msg, _ in pfkey_sainfos)
- self.assertEquals(xfrm.IPPROTO_ESP, state4.id.proto)
- self.assertEquals(xfrm.IPPROTO_ESP, state6.id.proto)
- self.assertEquals(54321, state4.reqid)
- self.assertEquals(12345, state6.reqid)
- self.assertEquals(0xdeadbeef, state4.id.spi)
- self.assertEquals(0xbeefdead, state6.id.spi)
+ self.assertEqual(xfrm.IPPROTO_ESP, state4.id.proto)
+ self.assertEqual(xfrm.IPPROTO_ESP, state6.id.proto)
+ self.assertEqual(54321, state4.reqid)
+ self.assertEqual(12345, state6.reqid)
+ self.assertEqual(0xdeadbeef, state4.id.spi)
+ self.assertEqual(0xbeefdead, state6.id.spi)
- self.assertEquals(xfrm.PaddedAddress("192.0.2.1"), state4.saddr)
- self.assertEquals(xfrm.PaddedAddress("192.0.2.2"), state4.id.daddr)
- self.assertEquals(xfrm.PaddedAddress("2001:db8::1"), state6.saddr)
- self.assertEquals(xfrm.PaddedAddress("2001:db8::2"), state6.id.daddr)
+ self.assertEqual(xfrm.PaddedAddress("192.0.2.1"), state4.saddr)
+ self.assertEqual(xfrm.PaddedAddress("192.0.2.2"), state4.id.daddr)
+ self.assertEqual(xfrm.PaddedAddress("2001:db8::1"), state6.saddr)
+ self.assertEqual(xfrm.PaddedAddress("2001:db8::2"), state6.id.daddr)
# The algorithm names are null-terminated, but after that contain garbage.
# Kernel bug?
@@ -79,20 +79,20 @@ class PfKeyTest(unittest.TestCase):
self.assertTrue(attrs4["XFRMA_ALG_AUTH"].name.startswith(sha256_name))
self.assertTrue(attrs6["XFRMA_ALG_AUTH"].name.startswith(sha256_name))
- self.assertEquals(256, attrs4["XFRMA_ALG_CRYPT"].key_len)
- self.assertEquals(256, attrs4["XFRMA_ALG_CRYPT"].key_len)
- self.assertEquals(256, attrs6["XFRMA_ALG_AUTH"].key_len)
- self.assertEquals(256, attrs6["XFRMA_ALG_AUTH"].key_len)
- self.assertEquals(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len)
- self.assertEquals(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len)
+ self.assertEqual(256, attrs4["XFRMA_ALG_CRYPT"].key_len)
+ self.assertEqual(256, attrs4["XFRMA_ALG_CRYPT"].key_len)
+ self.assertEqual(256, attrs6["XFRMA_ALG_AUTH"].key_len)
+ self.assertEqual(256, attrs6["XFRMA_ALG_AUTH"].key_len)
+ self.assertEqual(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len)
+ self.assertEqual(256, attrs6["XFRMA_ALG_AUTH_TRUNC"].key_len)
- self.assertEquals(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len)
- self.assertEquals(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len)
+ self.assertEqual(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len)
+ self.assertEqual(128, attrs4["XFRMA_ALG_AUTH_TRUNC"].trunc_len)
self.pf_key.DelSa(src4, dst4, 0xdeadbeef, pf_key.SADB_TYPE_ESP)
- self.assertEquals(1, len(self.xfrm.DumpSaInfo()))
+ self.assertEqual(1, len(self.xfrm.DumpSaInfo()))
self.pf_key.DelSa(src6, dst6, 0xbeefdead, pf_key.SADB_TYPE_ESP)
- self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
+ self.assertEqual(0, len(self.xfrm.DumpSaInfo()))
if __name__ == "__main__":
diff --git a/net/test/ping6_test.py b/net/test/ping6_test.py
index dd73e88..d551b5f 100755
--- a/net/test/ping6_test.py
+++ b/net/test/ping6_test.py
@@ -185,7 +185,7 @@ class PingReplyThread(threading.Thread):
packet = scapy.Ether(src=self._routermac, dst=self._mymac) / packet
try:
posix.write(self._tun.fileno(), str(packet))
- except Exception, e:
+ except Exception as e:
if not self._stopped:
raise e
@@ -194,12 +194,12 @@ class PingReplyThread(threading.Thread):
while not self._stopped:
try:
packet = posix.read(self._tun.fileno(), 4096)
- except OSError, e:
+ except OSError as e:
if e.errno == errno.EAGAIN:
continue
else:
break
- except ValueError, e:
+ except ValueError as e:
if not self._stopped:
raise e
@@ -220,9 +220,9 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# tests fail.
_INTERVAL = 0.1
_ATTEMPTS = 20
- for i in xrange(0, _ATTEMPTS):
+ for i in range(0, _ATTEMPTS):
for netid in cls.NETIDS:
- if all(thread.IsStarted() for thread in cls.reply_threads.values()):
+ if all(thread.IsStarted() for thread in list(cls.reply_threads.values())):
return
time.sleep(_INTERVAL)
msg = "WARNING: reply threads not all started after %.1f seconds\n" % (
@@ -231,7 +231,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
@classmethod
def StopReplyThreads(cls):
- for thread in cls.reply_threads.values():
+ for thread in list(cls.reply_threads.values()):
thread.Stop()
@classmethod
@@ -295,9 +295,9 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# Check that the flow label is zero and that the scope ID is sane.
self.assertEqual(flowlabel, 0)
if addr.startswith("fe80::"):
- self.assertTrue(scope_id in self.ifindices.values())
+ self.assertTrue(scope_id in list(self.ifindices.values()))
else:
- self.assertEquals(0, scope_id)
+ self.assertEqual(0, scope_id)
# TODO: check the checksum. We can't do this easily now for ICMPv6 because
# we don't have the IP addresses so we can't construct the pseudoheader.
@@ -356,32 +356,32 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
def testIPv4PingUsingSendto(self):
s = net_test.IPv4PingSocket()
written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
- self.assertEquals(len(net_test.IPV4_PING), written)
+ self.assertEqual(len(net_test.IPV4_PING), written)
self.assertValidPingResponse(s, net_test.IPV4_PING)
def testIPv6PingUsingSendto(self):
s = net_test.IPv6PingSocket()
written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
- self.assertEquals(len(net_test.IPV6_PING), written)
+ self.assertEqual(len(net_test.IPV6_PING), written)
self.assertValidPingResponse(s, net_test.IPV6_PING)
def testIPv4NoCrash(self):
# Python 2.x does not provide either read() or recvmsg.
s = net_test.IPv4PingSocket()
written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
- self.assertEquals(len(net_test.IPV4_PING), written)
+ self.assertEqual(len(net_test.IPV4_PING), written)
fd = s.fileno()
reply = posix.read(fd, 4096)
- self.assertEquals(written, len(reply))
+ self.assertEqual(written, len(reply))
def testIPv6NoCrash(self):
# Python 2.x does not provide either read() or recvmsg.
s = net_test.IPv6PingSocket()
written = s.sendto(net_test.IPV6_PING, ("::1", 55))
- self.assertEquals(len(net_test.IPV6_PING), written)
+ self.assertEqual(len(net_test.IPV6_PING), written)
fd = s.fileno()
reply = posix.read(fd, 4096)
- self.assertEquals(written, len(reply))
+ self.assertEqual(written, len(reply))
def testCrossProtocolCrash(self):
# Checks that an ICMP error containing a ping packet that matches the ID
@@ -458,12 +458,12 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# Bind to unspecified address.
s = net_test.IPv4PingSocket()
s.bind(("0.0.0.0", 544))
- self.assertEquals(("0.0.0.0", 544), s.getsockname())
+ self.assertEqual(("0.0.0.0", 544), s.getsockname())
# Bind to loopback.
s = net_test.IPv4PingSocket()
s.bind(("127.0.0.1", 99))
- self.assertEquals(("127.0.0.1", 99), s.getsockname())
+ self.assertEqual(("127.0.0.1", 99), s.getsockname())
# Binding twice is not allowed.
self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
@@ -471,10 +471,10 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# But binding two different sockets to the same ID is allowed.
s2 = net_test.IPv4PingSocket()
s2.bind(("127.0.0.1", 99))
- self.assertEquals(("127.0.0.1", 99), s2.getsockname())
+ self.assertEqual(("127.0.0.1", 99), s2.getsockname())
s3 = net_test.IPv4PingSocket()
s3.bind(("127.0.0.1", 99))
- self.assertEquals(("127.0.0.1", 99), s3.getsockname())
+ self.assertEqual(("127.0.0.1", 99), s3.getsockname())
# If two sockets bind to the same port, the first one to call read() gets
# the response.
@@ -501,12 +501,12 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# Bind to unspecified address.
s = net_test.IPv6PingSocket()
s.bind(("::", 769))
- self.assertEquals(("::", 769, 0, 0), s.getsockname())
+ self.assertEqual(("::", 769, 0, 0), s.getsockname())
# Bind to loopback.
s = net_test.IPv6PingSocket()
s.bind(("::1", 99))
- self.assertEquals(("::1", 99, 0, 0), s.getsockname())
+ self.assertEqual(("::1", 99, 0, 0), s.getsockname())
# Binding twice is not allowed.
self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
@@ -514,10 +514,10 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# But binding two different sockets to the same ID is allowed.
s2 = net_test.IPv6PingSocket()
s2.bind(("::1", 99))
- self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
+ self.assertEqual(("::1", 99, 0, 0), s2.getsockname())
s3 = net_test.IPv6PingSocket()
s3.bind(("::1", 99))
- self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
+ self.assertEqual(("::1", 99, 0, 0), s3.getsockname())
# Binding both IPv4 and IPv6 to the same socket works.
s4 = net_test.IPv4PingSocket()
@@ -542,7 +542,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
try:
s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
s.bind((net_test.IPV4_ADDR, 651))
- except IOError, e:
+ except IOError as e:
if e.errno == errno.EACCES:
pass # We're not root. let it go for now.
@@ -557,7 +557,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
try:
s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
s.bind((net_test.IPV6_ADDR, 651))
- except IOError, e:
+ except IOError as e:
if e.errno == errno.EACCES:
pass # We're not root. let it go for now.
@@ -567,7 +567,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
sockaddr.family = AF_UNSPEC
csocket.Bind(s4, sockaddr)
- self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
+ self.assertEqual(("0.0.0.0", 12996), s4.getsockname())
# But not if the address is anything else.
sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
@@ -591,7 +591,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# returns "fe80:1%foo", even though it does not understand it.
expected = self.lladdr + "%" + self.ifname
s.bind((self.lladdr, 4646, 0, self.ifindex))
- self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
+ self.assertEqual((expected, 4646, 0, self.ifindex), s.getsockname())
# Of course, for the above to work the address actually has to be configured
# on the machine.
@@ -601,18 +601,18 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# Scope IDs on non-link-local addresses are silently ignored.
s = net_test.IPv6PingSocket()
s.bind(("::1", 1234, 0, 1))
- self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
+ self.assertEqual(("::1", 1234, 0, 0), s.getsockname())
def testBindAffectsIdentifier(self):
s = net_test.IPv6PingSocket()
s.bind((self.globaladdr, 0xf976))
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
- self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
+ self.assertEqual("\xf9\x76", s.recv(32768)[4:6])
s = net_test.IPv6PingSocket()
s.bind((self.globaladdr, 0xace))
s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
- self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
+ self.assertEqual("\x0a\xce", s.recv(32768)[4:6])
def testLinkLocalAddress(self):
s = net_test.IPv6PingSocket()
@@ -751,8 +751,8 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
s.bind(("127.0.0.1", 0xace))
s.connect(("127.0.0.1", 0xbeef))
- self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
- self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
+ self.assertEqual(numrows + 1, len(self.ReadProcNetSocket("icmp")))
+ self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
@unittest.skipUnless(HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
def testIcmp6SocketsNotInIcmp(self):
@@ -761,8 +761,8 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
s = net_test.IPv6PingSocket()
s.bind(("::1", 0xace))
s.connect(("::1", 0xbeef))
- self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
- self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
+ self.assertEqual(numrows, len(self.ReadProcNetSocket("icmp")))
+ self.assertEqual(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
def testProcNetIcmp(self):
s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
@@ -780,7 +780,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# Check the row goes away when the socket is closed.
s.close()
- self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
+ self.assertEqual(numrows6, len(self.ReadProcNetSocket("icmp6")))
# Try send, bind and connect to check the addresses and the state.
s = net_test.IPv6PingSocket()
@@ -841,7 +841,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
ident = struct.pack("!H", s.getsockname()[1])
pkt = pkt[:4] + ident + pkt[6:]
data = data[:2] + "\x00\x00" + pkt[4:]
- self.assertEquals(pkt, data)
+ self.assertEqual(pkt, data)
# Check the address that the packet was sent to.
# ... except in 4.1, where it just returns an AF_UNSPEC, like this:
@@ -851,7 +851,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
# msg_controllen=64, {cmsg_len=60, cmsg_level=SOL_IPV6, cmsg_type=, ...},
# msg_flags=MSG_ERRQUEUE}, MSG_ERRQUEUE) = 1232
if net_test.LINUX_VERSION != (4, 1, 0):
- self.assertEquals(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
+ self.assertEqual(csocket.Sockaddr(("2001:4860:4860::8888", 0)), addr)
# Check the cmsg data, including the link MTU.
mtu = PingReplyThread.LINK_MTU
@@ -869,7 +869,7 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
if net_test.LINUX_VERSION <= (3, 14, 0):
msglist[0][2][1].port = cmsg[0][2][1].port
- self.assertEquals(msglist, cmsg)
+ self.assertEqual(msglist, cmsg)
if __name__ == "__main__":
diff --git a/net/test/resilient_rs_test.py b/net/test/resilient_rs_test.py
index 12843c4..27b9f49 100755
--- a/net/test/resilient_rs_test.py
+++ b/net/test/resilient_rs_test.py
@@ -159,8 +159,8 @@ class ResilientRouterSolicitationTest(multinetwork_base.MultiNetworkBaseTest):
# Compute minimum and maximum bounds for RFC3315 S14 exponential backoff.
# First retransmit is linear backoff, subsequent retransmits are exponential
- min_exp_bound = accumulate(map(lambda i: MIN_LIN * pow(MIN_EXP, i), range(0, len(rsSendTimes))))
- max_exp_bound = accumulate(map(lambda i: MAX_LIN * pow(MAX_EXP, i), range(0, len(rsSendTimes))))
+ min_exp_bound = accumulate([MIN_LIN * pow(MIN_EXP, i) for i in range(0, len(rsSendTimes))])
+ max_exp_bound = accumulate([MAX_LIN * pow(MAX_EXP, i) for i in range(0, len(rsSendTimes))])
# Assert that each sample falls within the worst case interval. If all samples fit we accept
# the exponential backoff hypothesis
diff --git a/net/test/sock_diag.py b/net/test/sock_diag.py
index 46cc92d..03d5587 100755
--- a/net/test/sock_diag.py
+++ b/net/test/sock_diag.py
@@ -164,7 +164,7 @@ class SockDiag(netlink.NetlinkSocket):
if "ALL" not in self.NL_DEBUG and "SOCK" not in self.NL_DEBUG:
return
parsed = self._ParseNLMsg(data, InetDiagReqV2)
- print "%s %s" % (name, str(parsed))
+ print("%s %s" % (name, str(parsed)))
@staticmethod
def _EmptyInetDiagSockId():
@@ -246,15 +246,15 @@ class SockDiag(netlink.NetlinkSocket):
positions.append(positions[-1] + 4) # Why 4? Because the kernel uses 4.
assert len(args) == len(instructions) == len(positions) - 2
- # print positions
+ # print(positions)
packed = ""
for i, (op, yes, no, arg) in enumerate(instructions):
yes = positions[i + yes] - positions[i]
no = positions[i + no] - positions[i]
instruction = InetDiagBcOp((op, yes, no)).Pack() + args[i]
- #print "%3d: %d %3d %3d %s %s" % (positions[i], op, yes, no,
- # arg, instruction.encode("hex"))
+ #print("%3d: %d %3d %3d %s %s" % (positions[i], op, yes, no,
+ # arg, instruction.encode("hex")))
packed += instruction
#print
@@ -363,7 +363,7 @@ class SockDiag(netlink.NetlinkSocket):
src, sport = s.getsockname()[:2]
try:
dst, dport = s.getpeername()[:2]
- except error, e:
+ except error as e:
if e.errno == errno.ENOTCONN:
dport = 0
dst = "::" if family == AF_INET6 else "0.0.0.0"
@@ -430,4 +430,4 @@ if __name__ == "__main__":
states = 0xffffffff
diag_msgs = n.DumpAllInetSockets(IPPROTO_TCP, "",
sock_id=sock_id, ext=ext, states=states)
- print diag_msgs
+ print(diag_msgs)
diff --git a/net/test/sock_diag_test.py b/net/test/sock_diag_test.py
index daa2fa4..39ace4c 100755
--- a/net/test/sock_diag_test.py
+++ b/net/test/sock_diag_test.py
@@ -105,7 +105,7 @@ class SockDiagBaseTest(multinetwork_base.MultiNetworkBaseTest):
def _CreateLotsOfSockets(socktype):
# Dict mapping (addr, sport, dport) tuples to socketpairs.
socketpairs = {}
- for _ in xrange(NUM_SOCKETS):
+ for _ in range(NUM_SOCKETS):
family, addr = random.choice([
(AF_INET, "127.0.0.1"),
(AF_INET6, "::1"),
@@ -151,7 +151,7 @@ class SockDiagBaseTest(multinetwork_base.MultiNetworkBaseTest):
def PackAndCheckBytecode(self, instructions):
bytecode = self.sock_diag.PackBytecode(instructions)
decoded = self.sock_diag.DecodeBytecode(bytecode)
- self.assertEquals(len(instructions), len(decoded))
+ self.assertEqual(len(instructions), len(decoded))
self.assertFalse("???" in decoded)
return bytecode
@@ -192,7 +192,7 @@ class SockDiagBaseTest(multinetwork_base.MultiNetworkBaseTest):
self.socketpairs = {}
def tearDown(self):
- for socketpair in self.socketpairs.values():
+ for socketpair in list(self.socketpairs.values()):
for s in socketpair:
s.close()
super(SockDiagBaseTest, self).tearDown()
@@ -228,9 +228,9 @@ class SockDiagTest(SockDiagBaseTest):
cookies[(addr, sport, dport)] = diag_msg.id.cookie
# Did we find all the cookies?
- self.assertEquals(2 * NUM_SOCKETS, len(cookies))
+ self.assertEqual(2 * NUM_SOCKETS, len(cookies))
- socketpairs = self.socketpairs.values()
+ socketpairs = list(self.socketpairs.values())
random.shuffle(socketpairs)
for socketpair in socketpairs:
for sock in socketpair:
@@ -284,7 +284,7 @@ class SockDiagTest(SockDiagBaseTest):
)
states = 1 << tcp_test.TCP_ESTABLISHED
self.assertMultiLineEqual(expected, bytecode.encode("hex"))
- self.assertEquals(76, len(bytecode))
+ self.assertEqual(76, len(bytecode))
self.socketpairs = self._CreateLotsOfSockets(SOCK_STREAM)
filteredsockets = self.sock_diag.DumpAllInetSockets(IPPROTO_TCP, bytecode,
states=states)
@@ -294,7 +294,7 @@ class SockDiagTest(SockDiagBaseTest):
# Pick a few sockets in hash table order, and check that the bytecode we
# compiled selects them properly.
- for socketpair in self.socketpairs.values()[:20]:
+ for socketpair in list(self.socketpairs.values())[:20]:
for s in socketpair:
diag_msg = self.sock_diag.FindSockDiagFromFd(s)
instructions = [
@@ -304,12 +304,12 @@ class SockDiagTest(SockDiagBaseTest):
(sock_diag.INET_DIAG_BC_D_LE, 1, 2, diag_msg.id.dport),
]
bytecode = self.PackAndCheckBytecode(instructions)
- self.assertEquals(32, len(bytecode))
+ self.assertEqual(32, len(bytecode))
sockets = self.sock_diag.DumpAllInetSockets(IPPROTO_TCP, bytecode)
- self.assertEquals(1, len(sockets))
+ self.assertEqual(1, len(sockets))
# TODO: why doesn't comparing the cstructs work?
- self.assertEquals(diag_msg.Pack(), sockets[0][0].Pack())
+ self.assertEqual(diag_msg.Pack(), sockets[0][0].Pack())
def testCrossFamilyBytecode(self):
"""Checks for a cross-family bug in inet_diag_hostcond matching.
@@ -365,7 +365,7 @@ class SockDiagTest(SockDiagBaseTest):
5e1f542 inet_diag: validate port comparison byte code to prevent unsafe reads
"""
bytecode = sock_diag.InetDiagBcOp((sock_diag.INET_DIAG_BC_D_GE, 4, 8))
- self.assertEquals("???",
+ self.assertEqual("???",
self.sock_diag.DecodeBytecode(bytecode))
self.assertRaisesErrno(
EINVAL,
@@ -481,7 +481,7 @@ class SockDestroyTest(SockDiagBaseTest):
def testClosesSockets(self):
self.socketpairs = self._CreateLotsOfSockets(SOCK_STREAM)
- for _, socketpair in self.socketpairs.iteritems():
+ for _, socketpair in self.socketpairs.items():
# Close one of the sockets.
# This will send a RST that will close the other side as well.
s = random.choice(socketpair)
@@ -521,7 +521,7 @@ class SocketExceptionThread(threading.Thread):
def run(self):
try:
self.operation(self.sock)
- except (IOError, AssertionError), e:
+ except (IOError, AssertionError) as e:
self.exception = e
@@ -534,7 +534,7 @@ class SockDiagTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
android-3.4:
457a04b inet_diag: fix oops for IPv4 AF_INET6 TCP SYN-RECV state
"""
- netid = random.choice(self.tuns.keys())
+ netid = random.choice(list(self.tuns.keys()))
self.IncomingConnection(5, tcp_test.TCP_SYN_RECV, netid)
sock_id = self.sock_diag._EmptyInetDiagSockId()
sock_id.sport = self.port
@@ -599,7 +599,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
def setUp(self):
super(SockDestroyTcpTest, self).setUp()
- self.netid = random.choice(self.tuns.keys())
+ self.netid = random.choice(list(self.tuns.keys()))
def CheckRstOnClose(self, sock, req, expect_reset, msg, do_close=True):
"""Closes the socket and checks whether a RST is sent or not."""
@@ -650,7 +650,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
self.accepted.close()
diag_req.states = 1 << tcp_test.TCP_FIN_WAIT1
diag_msg, attrs = self.sock_diag.GetSockInfo(diag_req)
- self.assertEquals(tcp_test.TCP_FIN_WAIT1, diag_msg.state)
+ self.assertEqual(tcp_test.TCP_FIN_WAIT1, diag_msg.state)
desc, fin = self.FinPacket()
self.ExpectPacketOn(self.netid, "Closing FIN_WAIT1 socket", fin)
@@ -660,7 +660,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
# The socket is still there in FIN_WAIT1: SOCK_DESTROY did nothing
# because userspace had already closed it.
- self.assertEquals(tcp_test.TCP_FIN_WAIT1, diag_msg.state)
+ self.assertEqual(tcp_test.TCP_FIN_WAIT1, diag_msg.state)
# ACK the FIN so we don't trip over retransmits in future tests.
finversion = 4 if version == 5 else version
@@ -702,7 +702,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
d = self.sock_diag.FindSockDiagFromFd(self.s)
parent = self.sock_diag.DiagReqFromDiagMsg(d, IPPROTO_TCP)
children = self.FindChildSockets(self.s)
- self.assertEquals(1, len(children))
+ self.assertEqual(1, len(children))
is_established = (state == tcp_test.TCP_NOT_YET_ACCEPTED)
expected_state = tcp_test.TCP_ESTABLISHED if is_established else state
@@ -716,7 +716,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
for child in children:
if can_close_children:
diag_msg, attrs = self.sock_diag.GetSockInfo(child)
- self.assertEquals(diag_msg.state, expected_state)
+ self.assertEqual(diag_msg.state, expected_state)
self.assertMarkIs(self.netid, attrs)
else:
self.assertRaisesErrno(ENOENT, self.sock_diag.GetSockInfo, child)
@@ -772,7 +772,7 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
self.assertRaisesErrno(ECONNABORTED, self.s.send, "foo")
self.assertRaisesErrno(EINVAL, self.s.accept)
# TODO: this should really return an error such as ENOTCONN...
- self.assertEquals("", self.s.recv(4096))
+ self.assertEqual("", self.s.recv(4096))
def testReadInterrupted(self):
"""Tests that read() is interrupted by SOCK_DESTROY."""
@@ -782,8 +782,8 @@ class SockDestroyTcpTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
ECONNABORTED)
# Writing returns EPIPE, and reading returns EOF.
self.assertRaisesErrno(EPIPE, self.accepted.send, "foo")
- self.assertEquals("", self.accepted.recv(4096))
- self.assertEquals("", self.accepted.recv(4096))
+ self.assertEqual("", self.accepted.recv(4096))
+ self.assertEqual("", self.accepted.recv(4096))
def testConnectInterrupted(self):
"""Tests that connect() is interrupted by SOCK_DESTROY."""
@@ -818,7 +818,7 @@ class PollOnCloseTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
def setUp(self):
super(PollOnCloseTest, self).setUp()
- self.netid = random.choice(self.tuns.keys())
+ self.netid = random.choice(list(self.tuns.keys()))
POLL_FLAGS = [(select.POLLIN, "IN"), (select.POLLOUT, "OUT"),
(select.POLLERR, "ERR"), (select.POLLHUP, "HUP")]
@@ -852,8 +852,8 @@ class PollOnCloseTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
# Subsequent operations behave as normal.
self.assertRaisesErrno(EPIPE, self.accepted.send, "foo")
- self.assertEquals("", self.accepted.recv(4096))
- self.assertEquals("", self.accepted.recv(4096))
+ self.assertEqual("", self.accepted.recv(4096))
+ self.assertEqual("", self.accepted.recv(4096))
def CheckPollDestroy(self, mask, expected, ignoremask):
"""Interrupts a poll() with SOCK_DESTROY."""
@@ -917,7 +917,7 @@ class SockDestroyUdpTest(SockDiagBaseTest):
def testClosesUdpSockets(self):
self.socketpairs = self._CreateLotsOfSockets(SOCK_DGRAM)
- for _, socketpair in self.socketpairs.iteritems():
+ for _, socketpair in self.socketpairs.items():
s1, s2 = socketpair
self.assertSocketConnected(s1)
@@ -930,12 +930,12 @@ class SockDestroyUdpTest(SockDiagBaseTest):
def BindToRandomPort(self, s, addr):
ATTEMPTS = 20
- for i in xrange(20):
+ for i in range(20):
port = random.randrange(1024, 65535)
try:
s.bind((addr, port))
return port
- except error, e:
+ except error as e:
if e.errno != EADDRINUSE:
raise e
raise ValueError("Could not find a free port on %s after %d attempts" %
@@ -989,13 +989,13 @@ class SockDestroyUdpTest(SockDiagBaseTest):
# Check that reads on connected sockets are interrupted.
s.connect((addr, 53))
- self.assertEquals(3, s.send("foo"))
+ self.assertEqual(3, s.send("foo"))
self.CloseDuringBlockingCall(s, lambda sock: sock.recv(4096),
ECONNABORTED)
# A destroyed socket is no longer connected, but still usable.
self.assertRaisesErrno(EDESTADDRREQ, s.send, "foo")
- self.assertEquals(3, s.sendto("foo", (addr, 53)))
+ self.assertEqual(3, s.sendto("foo", (addr, 53)))
# Check that reads on unconnected sockets are also interrupted.
self.CloseDuringBlockingCall(s, lambda sock: sock.recv(4096),
@@ -1049,7 +1049,7 @@ class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
def assertSamePorts(self, ports, diag_msgs):
expected = sorted(ports)
actual = sorted([msg[0].id.sport for msg in diag_msgs])
- self.assertEquals(expected, actual)
+ self.assertEqual(expected, actual)
def SockInfoMatchesSocket(self, s, info):
try:
@@ -1076,7 +1076,7 @@ class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
self.SocketDescription(s))
for i in infos:
- if i not in matches.values():
+ if i not in list(matches.values()):
self.fail("Too many sockets in dump, first unexpected: %s" % str(i))
def testMarkBytecode(self):
@@ -1101,7 +1101,7 @@ class SockDiagMarkTest(tcp_test.TcpBaseTest, SockDiagBaseTest):
self.assertFoundSockets(infos, [s1, s2])
infos = self.FilterEstablishedSockets(0xfff0000, 0xf0fed00)
- self.assertEquals(0, len(infos))
+ self.assertEqual(0, len(infos))
with net_test.RunAsUid(12345):
self.assertRaisesErrno(EPERM, self.FilterEstablishedSockets,
diff --git a/net/test/srcaddr_selection_test.py b/net/test/srcaddr_selection_test.py
index e57ce16..1e7a107 100755
--- a/net/test/srcaddr_selection_test.py
+++ b/net/test/srcaddr_selection_test.py
@@ -91,11 +91,11 @@ class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
def assertAddressHasExpectedAttributes(
self, address, expected_ifindex, expected_flags):
ifa_msg = self.iproute.GetAddress(address)[0]
- self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
- self.assertEquals(64, ifa_msg.prefixlen)
- self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
- self.assertEquals(expected_ifindex, ifa_msg.index)
- self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
+ self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
+ self.assertEqual(64, ifa_msg.prefixlen)
+ self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
+ self.assertEqual(expected_ifindex, ifa_msg.index)
+ self.assertEqual(expected_flags, ifa_msg.flags & expected_flags)
def AddressIsTentative(self, address):
ifa_msg = self.iproute.GetAddress(address)[0]
@@ -122,13 +122,13 @@ class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
self.SendWithSourceAddress, address, netid)
def assertAddressSelected(self, address, netid):
- self.assertEquals(address, self.GetSourceIP(netid))
+ self.assertEqual(address, self.GetSourceIP(netid))
def assertAddressNotSelected(self, address, netid):
- self.assertNotEquals(address, self.GetSourceIP(netid))
+ self.assertNotEqual(address, self.GetSourceIP(netid))
def WaitForDad(self, address):
- for _ in xrange(20):
+ for _ in range(20):
if not self.AddressIsTentative(address):
return
time.sleep(0.1)
@@ -149,7 +149,7 @@ class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)
# [1] Pick an interface on which to test.
- self.test_netid = random.choice(self.tuns.keys())
+ self.test_netid = random.choice(list(self.tuns.keys()))
self.test_ip = self.MyAddress(6, self.test_netid)
self.test_ifindex = self.ifindices[self.test_netid]
self.test_ifname = self.GetInterfaceName(self.test_netid)
@@ -254,7 +254,7 @@ class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
self.assertAddressHasExpectedAttributes(
preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
- self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
+ self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid))
# [4] Get another IPv6 address, in optimistic DAD start-up.
self.SetDAD(self.test_ifname, 1) # Enable DAD
diff --git a/net/test/tcp_fastopen_test.py b/net/test/tcp_fastopen_test.py
index 9257a19..eadae79 100755
--- a/net/test/tcp_fastopen_test.py
+++ b/net/test/tcp_fastopen_test.py
@@ -55,7 +55,7 @@ class TcpFastOpenTest(multinetwork_base.MultiNetworkBaseTest):
daddr = self.GetRemoteAddress(version)
self.tcp_metrics.DelMetrics(saddr, daddr)
with self.assertRaisesErrno(ESRCH):
- print self.tcp_metrics.GetMetrics(saddr, daddr)
+ print(self.tcp_metrics.GetMetrics(saddr, daddr))
def assertNoTcpMetrics(self, version, netid):
saddr = self.MyAddress(version, netid)
diff --git a/net/test/tcp_metrics.py b/net/test/tcp_metrics.py
index 574a755..03f604f 100755
--- a/net/test/tcp_metrics.py
+++ b/net/test/tcp_metrics.py
@@ -134,4 +134,4 @@ class TcpMetrics(genetlink.GenericNetlink):
if __name__ == "__main__":
t = TcpMetrics()
- print t.DumpMetrics()
+ print(t.DumpMetrics())
diff --git a/net/test/tcp_nuke_addr_test.py b/net/test/tcp_nuke_addr_test.py
index 1f0de76..e5d17b2 100755
--- a/net/test/tcp_nuke_addr_test.py
+++ b/net/test/tcp_nuke_addr_test.py
@@ -88,8 +88,8 @@ class TcpNukeAddrTest(net_test.NetworkTest):
self.assertRaisesErrno(errno.ENOTTY, KillAddrIoctl, addr)
data = "foo"
try:
- self.assertEquals(len(data), s1.send(data))
- self.assertEquals(data, s2.recv(4096))
+ self.assertEqual(len(data), s1.send(data))
+ self.assertEqual(data, s2.recv(4096))
self.assertSocketsNotClosed(socketpair)
finally:
s1.close()
diff --git a/net/test/tcp_repair_test.py b/net/test/tcp_repair_test.py
index ce54aba..e0b156e 100755
--- a/net/test/tcp_repair_test.py
+++ b/net/test/tcp_repair_test.py
@@ -149,7 +149,7 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest):
sock.setsockopt(SOL_TCP, TCP_REPAIR, TCP_REPAIR_OFF)
readData = sock.recv(4096)
- self.assertEquals(readData, TEST_RECEIVED)
+ self.assertEqual(readData, TEST_RECEIVED)
sock.close()
# Test whether tcp read/write sequence number can be fetched correctly
@@ -164,20 +164,20 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest):
# test write queue sequence number
sequence_before = self.GetWriteSequenceNumber(version, sock)
expect_sequence = self.last_sent.getlayer("TCP").seq
- self.assertEquals(sequence_before & 0xffffffff, expect_sequence)
+ self.assertEqual(sequence_before & 0xffffffff, expect_sequence)
TEST_SEND = net_test.UDP_PAYLOAD
self.sendData(netid, version, sock, TEST_SEND)
sequence_after = self.GetWriteSequenceNumber(version, sock)
- self.assertEquals(sequence_before + len(TEST_SEND), sequence_after)
+ self.assertEqual(sequence_before + len(TEST_SEND), sequence_after)
# test read queue sequence number
sequence_before = self.GetReadSequenceNumber(version, sock)
expect_sequence = self.last_received.getlayer("TCP").seq + 1
- self.assertEquals(sequence_before & 0xffffffff, expect_sequence)
+ self.assertEqual(sequence_before & 0xffffffff, expect_sequence)
TEST_READ = net_test.UDP_PAYLOAD
self.receiveData(netid, version, TEST_READ)
sequence_after = self.GetReadSequenceNumber(version, sock)
- self.assertEquals(sequence_before + len(TEST_READ), sequence_after)
+ self.assertEqual(sequence_before + len(TEST_READ), sequence_after)
sock.close()
def GetWriteSequenceNumber(self, version, sock):
@@ -267,12 +267,12 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest):
buf = ctypes.c_int()
fcntl.ioctl(sock, SIOCINQ, buf)
- self.assertEquals(buf.value, 0)
+ self.assertEqual(buf.value, 0)
TEST_RECV_PAYLOAD = net_test.UDP_PAYLOAD
self.receiveData(netid, version, TEST_RECV_PAYLOAD)
fcntl.ioctl(sock, SIOCINQ, buf)
- self.assertEquals(buf.value, len(TEST_RECV_PAYLOAD))
+ self.assertEqual(buf.value, len(TEST_RECV_PAYLOAD))
sock.close()
def writeQueueIdleTest(self, version):
@@ -281,14 +281,14 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest):
sock = self.createConnectedSocket(version, netid)
buf = ctypes.c_int()
fcntl.ioctl(sock, SIOCOUTQ, buf)
- self.assertEquals(buf.value, 0)
+ self.assertEqual(buf.value, 0)
# Change to repair mode with SEND_QUEUE, writing some data to the queue.
sock.setsockopt(SOL_TCP, TCP_REPAIR, TCP_REPAIR_ON)
TEST_SEND_PAYLOAD = net_test.UDP_PAYLOAD
sock.setsockopt(SOL_TCP, TCP_REPAIR_QUEUE, TCP_SEND_QUEUE)
self.sendData(netid, version, sock, TEST_SEND_PAYLOAD)
fcntl.ioctl(sock, SIOCOUTQ, buf)
- self.assertEquals(buf.value, len(TEST_SEND_PAYLOAD))
+ self.assertEqual(buf.value, len(TEST_SEND_PAYLOAD))
sock.close()
# Setup a connected socket again.
@@ -297,14 +297,14 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest):
# Send out some data and don't receive ACK yet.
self.sendData(netid, version, sock, TEST_SEND_PAYLOAD)
fcntl.ioctl(sock, SIOCOUTQ, buf)
- self.assertEquals(buf.value, len(TEST_SEND_PAYLOAD))
+ self.assertEqual(buf.value, len(TEST_SEND_PAYLOAD))
# Receive response ACK.
remoteaddr = self.GetRemoteAddress(version)
myaddr = self.MyAddress(version, netid)
desc_ack, ack = packets.ACK(version, remoteaddr, myaddr, self.last_sent)
self.ReceivePacketOn(netid, ack)
fcntl.ioctl(sock, SIOCOUTQ, buf)
- self.assertEquals(buf.value, 0)
+ self.assertEqual(buf.value, 0)
sock.close()
@@ -315,7 +315,7 @@ class TcpRepairTest(multinetwork_base.MultiNetworkBaseTest):
events = p.poll(500)
for fd,event in events:
if fd == sock.fileno():
- self.assertEquals(event, expected)
+ self.assertEqual(event, expected)
else:
raise AssertionError("unexpected poll fd")
@@ -334,7 +334,7 @@ class SocketExceptionThread(threading.Thread):
def run(self):
try:
self.operation(self.sock)
- except (IOError, AssertionError), e:
+ except (IOError, AssertionError) as e:
self.exception = e
if __name__ == '__main__':
diff --git a/net/test/tun_twister.py b/net/test/tun_twister.py
index 2ed25c9..f42d789 100644
--- a/net/test/tun_twister.py
+++ b/net/test/tun_twister.py
@@ -52,8 +52,8 @@ class TunTwister(object):
# Set up routing so packets go to my_tun.
def ValidatePortNumber(packet):
- self.assertEquals(8080, packet.getlayer(scapy.UDP).sport)
- self.assertEquals(8080, packet.getlayer(scapy.UDP).dport)
+ self.assertEqual(8080, packet.getlayer(scapy.UDP).sport)
+ self.assertEqual(8080, packet.getlayer(scapy.UDP).dport)
with TunTwister(tun_fd=my_tun, validator=ValidatePortNumber):
sock = socket(AF_INET, SOCK_DGRAM, 0)
@@ -61,8 +61,8 @@ class TunTwister(object):
sock.settimeout(1.0)
sock.sendto("hello", ("1.2.3.4", 8080))
data, addr = sock.recvfrom(1024)
- self.assertEquals("hello", data)
- self.assertEquals(("1.2.3.4", 8080), addr)
+ self.assertEqual("hello", data)
+ self.assertEqual(("1.2.3.4", 8080), addr)
"""
# Hopefully larger than any packet.
diff --git a/net/test/xfrm.py b/net/test/xfrm.py
index acdfd4f..d8f5ffe 100755
--- a/net/test/xfrm.py
+++ b/net/test/xfrm.py
@@ -356,9 +356,9 @@ class Xfrm(netlink.NetlinkSocket):
cmdname = self._GetConstantName(command, "XFRM_MSG_")
if struct_type:
- print "%s %s" % (cmdname, str(self._ParseNLMsg(data, struct_type)))
+ print("%s %s" % (cmdname, str(self._ParseNLMsg(data, struct_type))))
else:
- print "%s" % cmdname
+ print("%s" % cmdname)
def _Decode(self, command, unused_msg, nla_type, nla_data):
"""Decodes netlink attributes to Python types."""
@@ -710,5 +710,5 @@ class Xfrm(netlink.NetlinkSocket):
if __name__ == "__main__":
x = Xfrm()
- print x.DumpSaInfo()
- print x.DumpPolicyInfo()
+ print(x.DumpSaInfo())
+ print(x.DumpPolicyInfo())
diff --git a/net/test/xfrm_algorithm_test.py b/net/test/xfrm_algorithm_test.py
index 0176265..c2f836c 100755
--- a/net/test/xfrm_algorithm_test.py
+++ b/net/test/xfrm_algorithm_test.py
@@ -117,10 +117,10 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
def AssertEncrypted(packet):
# This gives a free pass to ICMP and ICMPv6 packets, which show up
# nondeterministically in tests.
- self.assertEquals(None,
+ self.assertEqual(None,
packet.getlayer(scapy.UDP),
"UDP packet sent in the clear")
- self.assertEquals(None,
+ self.assertEqual(None,
packet.getlayer(scapy.TCP),
"TCP packet sent in the clear")
@@ -237,10 +237,10 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
sock.listen(1)
server_ready.set()
accepted, peer = sock.accept()
- self.assertEquals(remote_addr, peer[0])
- self.assertEquals(client_port, peer[1])
+ self.assertEqual(remote_addr, peer[0])
+ self.assertEqual(client_port, peer[1])
data = accepted.recv(2048)
- self.assertEquals("hello request", data)
+ self.assertEqual("hello request", data)
accepted.send("hello response")
except Exception as e:
server_error = e
@@ -251,9 +251,9 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
try:
server_ready.set()
data, peer = sock.recvfrom(2048)
- self.assertEquals(remote_addr, peer[0])
- self.assertEquals(client_port, peer[1])
- self.assertEquals("hello request", data)
+ self.assertEqual(remote_addr, peer[0])
+ self.assertEqual(client_port, peer[1])
+ self.assertEqual("hello request", data)
sock.sendto("hello response", peer)
except Exception as e:
server_error = e
@@ -283,7 +283,7 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
sock_left.connect((remote_addr, right_port))
sock_left.send("hello request")
data = sock_left.recv(2048)
- self.assertEquals("hello response", data)
+ self.assertEqual("hello response", data)
sock_left.close()
server.join()
if server_error:
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index 1eaa302..03e15c4 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -196,7 +196,7 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header.
padlen = util.GetPadLength(4, esplen)
# The pad bytes are consecutive integers starting from 0x01.
- padding = "".join((chr(i) for i in xrange(1, padlen + 1)))
+ padding = "".join((chr(i) for i in range(1, padlen + 1)))
trailer = padding + struct.pack("BB", padlen, esp_nexthdr)
# Assemble the packet.
@@ -285,17 +285,17 @@ class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest):
scapy.IP/IPv6: the read packet
"""
packets = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(packets))
+ self.assertEqual(1, len(packets))
packet = packets[0]
if length is not None:
- self.assertEquals(length, len(packet.payload))
+ self.assertEqual(length, len(packet.payload))
if dst_addr is not None:
- self.assertEquals(dst_addr, packet.dst)
+ self.assertEqual(dst_addr, packet.dst)
if src_addr is not None:
- self.assertEquals(src_addr, packet.src)
+ self.assertEqual(src_addr, packet.src)
# extract the ESP header
esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr)
- self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr)
+ self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr)
return packet
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 64be084..439a2d2 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -53,14 +53,14 @@ TEST_SPI2 = 0x1235
class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
def assertIsUdpEncapEsp(self, packet, spi, seq, length):
- self.assertEquals(IPPROTO_UDP, packet.proto)
+ self.assertEqual(IPPROTO_UDP, packet.proto)
udp_hdr = packet[scapy.UDP]
- self.assertEquals(4500, udp_hdr.dport)
- self.assertEquals(length, len(udp_hdr))
+ self.assertEqual(4500, udp_hdr.dport)
+ self.assertEqual(length, len(udp_hdr))
esp_hdr, _ = cstruct.Read(str(udp_hdr.payload), xfrm.EspHdr)
# FIXME: this file currently swaps SPI byte order manually, so SPI needs to
# be double-swapped here.
- self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr)
+ self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr)
def CreateNewSa(self, localAddr, remoteAddr, spi, reqId, encap_tmpl,
null_auth=False):
@@ -93,12 +93,12 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
self.xfrm.DeleteSaInfo(TEST_ADDR1, TEST_SPI, IPPROTO_ESP)
def testFlush(self):
- self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
+ self.assertEqual(0, len(self.xfrm.DumpSaInfo()))
self.CreateNewSa("::", "2000::", TEST_SPI, 1234, None)
self.CreateNewSa("0.0.0.0", "192.0.2.1", TEST_SPI, 4321, None)
- self.assertEquals(2, len(self.xfrm.DumpSaInfo()))
+ self.assertEqual(2, len(self.xfrm.DumpSaInfo()))
self.xfrm.FlushSaInfo()
- self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
+ self.assertEqual(0, len(self.xfrm.DumpSaInfo()))
def _TestSocketPolicy(self, version):
# Open a UDP socket and connect it.
@@ -141,7 +141,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
try:
self.xfrm.DeleteSaInfo(self.GetRemoteAddress(xfrm_version), TEST_SPI, IPPROTO_ESP)
except IOError as e:
- self.assertEquals(ESRCH, e.errno, "Unexpected error when deleting ACQ SA")
+ self.assertEqual(ESRCH, e.errno, "Unexpected error when deleting ACQ SA")
# Adding a matching SA causes the packet to go out encrypted. The SA's
# SPI must match the one in our template, and the destination address must
@@ -171,11 +171,11 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
self.SelectInterface(s2, netid, "mark")
s2.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
pkts = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(pkts))
+ self.assertEqual(1, len(pkts))
packet = pkts[0]
protocol = packet.nh if version == 6 else packet.proto
- self.assertEquals(IPPROTO_UDP, protocol)
+ self.assertEqual(IPPROTO_UDP, protocol)
# Deleting the SA causes the first socket to return errors again.
self.xfrm.DeleteSaInfo(self.GetRemoteAddress(xfrm_version), TEST_SPI,
@@ -268,7 +268,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
# Expect to see an UDP encapsulated packet.
pkts = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(pkts))
+ self.assertEqual(1, len(pkts))
packet = pkts[0]
auth_algo = (
@@ -314,13 +314,13 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
# integrity-verified portion of the packet.
if not null_auth and in_spi != out_spi:
self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
- self.assertEquals(start_integrity_failures + 1,
+ self.assertEqual(start_integrity_failures + 1,
sainfo.stats.integrity_failed)
else:
data, src = twisted_socket.recvfrom(4096)
- self.assertEquals(net_test.UDP_PAYLOAD, data)
- self.assertEquals((remoteaddr, srcport), src)
- self.assertEquals(start_integrity_failures, sainfo.stats.integrity_failed)
+ self.assertEqual(net_test.UDP_PAYLOAD, data)
+ self.assertEqual((remoteaddr, srcport), src)
+ self.assertEqual(start_integrity_failures, sainfo.stats.integrity_failed)
# Check that unencrypted packets on twisted_socket are not received.
unencrypted = (
@@ -400,13 +400,13 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
def testAllocSpecificSpi(self):
spi = 0xABCD
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
- self.assertEquals(spi, new_sa.id.spi)
+ self.assertEqual(spi, new_sa.id.spi)
def testAllocSpecificSpiUnavailable(self):
"""Attempt to allocate the same SPI twice."""
spi = 0xABCD
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
- self.assertEquals(spi, new_sa.id.spi)
+ self.assertEqual(spi, new_sa.id.spi)
with self.assertRaisesErrno(ENOENT):
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
@@ -427,7 +427,7 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
# Allocating range_size + 1 SPIs is guaranteed to fail. Due to the way
# kernel picks random SPIs, this has a high probability of failing before
# reaching that limit.
- for i in xrange(range_size + 1):
+ for i in range(range_size + 1):
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end)
spi = new_sa.id.spi
self.assertNotIn(spi, spis)
@@ -514,21 +514,21 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
self.ReceivePacketOn(netid, input_pkt)
msg, addr = sock.recvfrom(1024)
- self.assertEquals("input hello", msg)
- self.assertEquals((remote_addr, remote_port), addr[:2])
+ self.assertEqual("input hello", msg)
+ self.assertEqual((remote_addr, remote_port), addr[:2])
# Send and capture a packet.
sock.sendto("output hello", (remote_addr, remote_port))
packets = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(packets))
+ self.assertEqual(1, len(packets))
output_pkt = packets[0]
output_pkt, esp_hdr = xfrm_base.DecryptPacketWithNull(output_pkt)
- self.assertEquals(output_pkt[scapy.UDP].len, len("output_hello") + 8)
- self.assertEquals(remote_addr, output_pkt.dst)
- self.assertEquals(remote_port, output_pkt[scapy.UDP].dport)
+ self.assertEqual(output_pkt[scapy.UDP].len, len("output_hello") + 8)
+ self.assertEqual(remote_addr, output_pkt.dst)
+ self.assertEqual(remote_port, output_pkt[scapy.UDP].dport)
# length of the payload plus the UDP header
- self.assertEquals("output hello", str(output_pkt[scapy.UDP].payload))
- self.assertEquals(0xABCD, esp_hdr.spi)
+ self.assertEqual("output hello", str(output_pkt[scapy.UDP].payload))
+ self.assertEqual(0xABCD, esp_hdr.spi)
def testNullEncryptionTunnelMode(self):
"""Verify null encryption in tunnel mode.
@@ -577,21 +577,21 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
self.ReceivePacketOn(netid, input_pkt)
msg, addr = sock.recvfrom(1024)
- self.assertEquals("input hello", msg)
- self.assertEquals((remote_addr, remote_port), addr[:2])
+ self.assertEqual("input hello", msg)
+ self.assertEqual((remote_addr, remote_port), addr[:2])
# Send and capture a packet.
sock.sendto("output hello", (remote_addr, remote_port))
packets = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(packets))
+ self.assertEqual(1, len(packets))
output_pkt = packets[0]
output_pkt, esp_hdr = xfrm_base.DecryptPacketWithNull(output_pkt)
# length of the payload plus the UDP header
- self.assertEquals(output_pkt[scapy.UDP].len, len("output_hello") + 8)
- self.assertEquals(remote_addr, output_pkt.dst)
- self.assertEquals(remote_port, output_pkt[scapy.UDP].dport)
- self.assertEquals("output hello", str(output_pkt[scapy.UDP].payload))
- self.assertEquals(0xABCD, esp_hdr.spi)
+ self.assertEqual(output_pkt[scapy.UDP].len, len("output_hello") + 8)
+ self.assertEqual(remote_addr, output_pkt.dst)
+ self.assertEqual(remote_port, output_pkt[scapy.UDP].dport)
+ self.assertEqual("output hello", str(output_pkt[scapy.UDP].payload))
+ self.assertEqual(0xABCD, esp_hdr.spi)
def testNullEncryptionTransportMode(self):
"""Verify null encryption in transport mode.
@@ -638,9 +638,9 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
def _CheckTemplateMatch(tmpl):
"""Dump the SPD and match a single template on a single policy."""
dump = self.xfrm.DumpPolicyInfo()
- self.assertEquals(1, len(dump))
+ self.assertEqual(1, len(dump))
_, attributes = dump[0]
- self.assertEquals(attributes['XFRMA_TMPL'], tmpl)
+ self.assertEqual(attributes['XFRMA_TMPL'], tmpl)
# Create a new policy using update.
self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark, None)
@@ -763,9 +763,9 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest):
xfrm.XFRM_MODE_TUNNEL, 100, xfrm_base._ALGO_CBC_AES_256,
xfrm_base._ALGO_HMAC_SHA1, None, None, None, mark)
dump = self.xfrm.DumpSaInfo()
- self.assertEquals(1, len(dump))
+ self.assertEqual(1, len(dump))
sainfo, attributes = dump[0]
- self.assertEquals(mark, attributes["XFRMA_OUTPUT_MARK"])
+ self.assertEqual(mark, attributes["XFRMA_OUTPUT_MARK"])
def testInvalidAlgorithms(self):
key = "af442892cdcd0ef650e9c299f9a8436a".decode("hex")
@@ -795,9 +795,9 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest):
xfrm_base._ALGO_HMAC_SHA1,
None, None, mark, 0, is_update=True)
dump = self.xfrm.DumpSaInfo()
- self.assertEquals(1, len(dump)) # check that update updated
+ self.assertEqual(1, len(dump)) # check that update updated
sainfo, attributes = dump[0]
- self.assertEquals(mark, attributes["XFRMA_MARK"])
+ self.assertEqual(mark, attributes["XFRMA_MARK"])
self.xfrm.DeleteSaInfo(net_test.GetWildcardAddress(version),
spi, IPPROTO_ESP, mark)
@@ -836,7 +836,7 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest):
s.sendto(net_test.UDP_PAYLOAD, (remote, 53))
# Check to make sure XfrmOutNoStates is incremented by exactly 1
- self.assertEquals(outNoStateCount + 1,
+ self.assertEqual(outNoStateCount + 1,
self.getXfrmStat(XFRM_STATS_OUT_NO_STATES))
length = xfrm_base.GetEspPacketLength(xfrm.XFRM_MODE_TUNNEL,
@@ -854,7 +854,7 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest):
xfrm_base._ALGO_HMAC_SHA1,
None, None, mark, 0, is_update=False)
except IOError as e:
- self.assertEquals(EEXIST, e.errno, "SA exists")
+ self.assertEqual(EEXIST, e.errno, "SA exists")
self.xfrm.AddSaInfo(local,
remote,
TEST_SPI, xfrm.XFRM_MODE_TUNNEL, 0,
@@ -893,9 +893,9 @@ class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest):
dump = self.xfrm.DumpSaInfo()
- self.assertEquals(1, len(dump)) # check that update updated
+ self.assertEqual(1, len(dump)) # check that update updated
sainfo, attributes = dump[0]
- self.assertEquals(reroute_netid, attributes["XFRMA_OUTPUT_MARK"])
+ self.assertEqual(reroute_netid, attributes["XFRMA_OUTPUT_MARK"])
self.xfrm.DeleteSaInfo(remote, TEST_SPI, IPPROTO_ESP, mark)
self.xfrm.DeletePolicyInfo(sel, xfrm.XFRM_POLICY_OUT, mark)
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index eb1a46e..f175c09 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -169,8 +169,8 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
# Verify that the packet data and src are correct
data, src = read_sock.recvfrom(4096)
- self.assertEquals(net_test.UDP_PAYLOAD, data)
- self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2])
+ self.assertEqual(net_test.UDP_PAYLOAD, data)
+ self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
def _TestTunnel(self, inner_version, outer_version, func, direction,
test_output_mark_unset):
@@ -233,13 +233,13 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr,
ikey, okey):
- self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey)
- self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey)
+ self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey)
+ self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey)
family = AF_INET if version == 4 else AF_INET6
- self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
+ self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
local_addr)
- self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
+ self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
remote_addr)
def testAddVti(self):
@@ -275,7 +275,7 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
# Validate that the netlink interface matches the ioctl interface.
- self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
+ self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
with self.assertRaises(IOError):
self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
@@ -439,7 +439,7 @@ class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest):
net_test.SetInterfaceUp(_TEST_XFRM_IFNAME)
# Validate that the netlink interface matches the ioctl interface.
- self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
+ self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
with self.assertRaises(IOError):
self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
@@ -545,7 +545,7 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
def tearDownClass(cls):
# The sysctls are restored by MultinetworkBaseTest.tearDownClass.
cls.SetInboundMarks(False)
- for tunnel in cls.tunnelsV4.values() + cls.tunnelsV6.values():
+ for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()):
cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
cls._SetupTunnelNetwork(tunnel, False)
tunnel.Teardown()
@@ -553,7 +553,7 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
def randomTunnel(self, outer_version):
version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
- return random.choice(version_dict.values())
+ return random.choice(list(version_dict.values()))
def setUp(self):
multinetwork_base.MultiNetworkBaseTest.setUp(self)
@@ -636,13 +636,13 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
def assertReceivedPacket(self, tunnel, sa_info):
tunnel.rx += 1
- self.assertEquals((tunnel.rx, tunnel.tx),
+ self.assertEqual((tunnel.rx, tunnel.tx),
self.iproute.GetRxTxPackets(tunnel.iface))
sa_info.seq_num += 1
def assertSentPacket(self, tunnel, sa_info):
tunnel.tx += 1
- self.assertEquals((tunnel.rx, tunnel.tx),
+ self.assertEqual((tunnel.rx, tunnel.tx),
self.iproute.GetRxTxPackets(tunnel.iface))
sa_info.seq_num += 1
@@ -664,8 +664,8 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
# Verify that the packet data and src are correct
data, src = read_sock.recvfrom(4096)
self.assertReceivedPacket(tunnel, sa_info)
- self.assertEquals(net_test.UDP_PAYLOAD, data)
- self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2])
+ self.assertEqual(net_test.UDP_PAYLOAD, data)
+ self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2])
def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
remote_inner, sa_info=None):
@@ -701,12 +701,12 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
# Check outer header manually (Avoids having to overwrite outer header's
# id, flags or flow label)
self.assertSentPacket(tunnel, sa_info)
- self.assertEquals(expected.src, pkt.src)
- self.assertEquals(expected.dst, pkt.dst)
- self.assertEquals(len(expected), len(pkt))
+ self.assertEqual(expected.src, pkt.src)
+ self.assertEqual(expected.dst, pkt.dst)
+ self.assertEqual(len(expected), len(pkt))
# Check everything else
- self.assertEquals(str(expected.payload), str(pkt.payload))
+ self.assertEqual(str(expected.payload), str(pkt.payload))
def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
remote_inner):
@@ -728,8 +728,8 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt))
# Check src/dst
- self.assertEquals(tunnel.local, pkt.src)
- self.assertEquals(tunnel.remote, pkt.dst)
+ self.assertEqual(tunnel.local, pkt.src)
+ self.assertEqual(tunnel.remote, pkt.dst)
# Check that the interface statistics recorded the outbound packet
self.assertSentPacket(tunnel, tunnel.out_sa)
@@ -748,8 +748,8 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
# Verify that the packet data and src are correct
data, src = read_sock.recvfrom(4096)
- self.assertEquals(net_test.UDP_PAYLOAD, data)
- self.assertEquals((local_inner, src_port), src[:2])
+ self.assertEqual(net_test.UDP_PAYLOAD, data)
+ self.assertEqual((local_inner, src_port), src[:2])
# Check that the interface statistics recorded the inbound packet
self.assertReceivedPacket(tunnel, tunnel.in_sa)
@@ -784,10 +784,10 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
# Check that the packet too big reduced the MTU.
routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None)
- self.assertEquals(1, len(routes))
+ self.assertEqual(1, len(routes))
rtmsg, attributes = routes[0]
- self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
- self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
+ self.assertEqual(iproute.RTN_UNICAST, rtmsg.type)
+ self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
# Clear PMTU information so that future tests don't have to worry about it.
self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)