summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenzo Colitti <lorenzo@google.com>2018-01-13 17:26:51 +0900
committerLorenzo Colitti <lorenzo@google.com>2018-02-21 22:24:33 +0900
commit31c8de4f1409e5c2fe86c99e1477cfa41a0f9887 (patch)
tree69161338d0aa53bd4099bd47a2c926a08c36a24c
parent3a38134704bd62482ea371a015aa71e6a3e073d2 (diff)
downloadtests-31c8de4f1409e5c2fe86c99e1477cfa41a0f9887.tar.gz
Add a new VtiInterface class and use it in XfrmVtiTest.
This class encapsulates both the knowledge needed to create a VTI interface and the state that it holds. This will be useful in future changes to store multiple VTIs in the same test. Test: xfrm tests pass on android-4.9 Change-Id: I5b047a41e384b94b7cf50546026b32081f5d00bd
-rwxr-xr-xnet/test/xfrm_tunnel_test.py177
1 files changed, 102 insertions, 75 deletions
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index e1fabb2..c66f74c 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -18,6 +18,7 @@
from errno import * # pylint: disable=wildcard-import
from socket import * # pylint: disable=wildcard-import
+import random
import struct
import unittest
@@ -37,8 +38,8 @@ _VTI_IFNAME = "test_vti"
_TEST_OUT_SPI = 0x1234
_TEST_IN_SPI = _TEST_OUT_SPI
-_TEST_OKEY = _TEST_OUT_SPI + _VTI_NETID
-_TEST_IKEY = _TEST_IN_SPI + _VTI_NETID
+_TEST_OKEY = 2000000100
+_TEST_IKEY = 2000000200
def _GetLocalInnerAddress(version):
@@ -99,6 +100,51 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
self._CheckTunnelOutput(6, 6)
+class VtiInterface(object):
+
+ def __init__(self, iface, netid, underlying_netid, local, remote):
+ self.iface = iface
+ self.netid = netid
+ self.underlying_netid = underlying_netid
+ self.local, self.remote = local, remote
+ self.rx = self.tx = 0
+ self.ikey = _TEST_IKEY + netid
+ self.okey = _TEST_OKEY + netid
+ self.out_spi = self.in_spi = random.randint(0, 0x7fffffff)
+
+ self.iproute = iproute.IPRoute()
+ self.xfrm = xfrm.Xfrm()
+
+ self.SetupInterface()
+ self.SetupXfrm()
+ self.addrs = {}
+
+ def Teardown(self):
+ self.TeardownInterface()
+
+ def SetupInterface(self):
+ self.iproute.CreateVirtualTunnelInterface(
+ self.iface, self.local, self.remote, self.ikey, self.okey)
+
+ def TeardownInterface(self):
+ self.iproute.DeleteLink(self.iface)
+
+ def SetupXfrm(self):
+ # For the VTI, the selectors are wildcard since packets will only
+ # be selected if they have the appropriate mark, hence the inner
+ # addresses are wildcard.
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
+ self.out_spi, xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ xfrm.ExactMatchMark(self.okey),
+ self.underlying_netid)
+
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
+ self.in_spi, xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ xfrm.ExactMatchMark(self.ikey), None)
+
+
@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
class XfrmVtiTest(xfrm_base.XfrmLazyTest):
@@ -171,7 +217,7 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest):
with self.assertRaises(IOError):
self.iproute.GetIfIndex(_VTI_IFNAME)
- def _SetupVtiNetwork(self, netid, ifname, is_add):
+ def _SetupVtiNetwork(self, vti, is_add):
"""Setup rules and routes for a VTI Network.
Takes an interface and depending on the boolean
@@ -180,15 +226,11 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest):
Network for purposes of testing.
Args:
- ifname: The name of a linux interface
+ vti: A VtiInterface, the VTI to set up.
is_add: Boolean that causes this method to perform setup if True or
teardown if False
"""
if is_add:
- # Bring up the interface so that we can start adding addresses
- # and routes.
- net_test.SetInterfaceUp(ifname)
-
# Disable router solicitations to avoid occasional spurious packets
# arriving on the underlying network; there are two possible behaviors
# when that occurred: either only the RA packet is read, and when it
@@ -196,30 +238,42 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest):
# the UDP_PAYLOAD; or, two packets may arrive on the underlying
# network which fails the assertion that only one ESP packet is received.
self.SetSysctl(
- "/proc/sys/net/ipv6/conf/%s/router_solicitations" % ifname, 0)
+ "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0)
+ net_test.SetInterfaceUp(vti.iface)
+
for version in [4, 6]:
- ifindex = net_test.GetInterfaceIndex(ifname)
- table = netid
+ ifindex = net_test.GetInterfaceIndex(vti.iface)
+ table = vti.netid
# Set up routing rules.
- start, end = self.UidRangeForNetid(netid)
+ start, end = self.UidRangeForNetid(vti.netid)
self.iproute.UidRangeRule(version, is_add, start, end, table,
self.PRIORITY_UID)
- self.iproute.OifRule(version, is_add, ifname, table, self.PRIORITY_OIF)
- self.iproute.FwmarkRule(version, is_add, netid, self.NETID_FWMASK, table,
- self.PRIORITY_FWMARK)
+ self.iproute.OifRule(version, is_add, vti.iface, table, self.PRIORITY_OIF)
+ self.iproute.FwmarkRule(version, is_add, vti.netid, self.NETID_FWMASK,
+ table, self.PRIORITY_FWMARK)
+
+ # Configure IP addresses.
+ if version == 4:
+ addr = self._MyIPv4Address(vti.netid)
+ else:
+ addr = self.OnlinkPrefix(6, vti.netid) + "1"
+ prefixlen = net_test.AddressLengthBits(version)
+ vti.addrs[version] = addr
if is_add:
- self.iproute.AddAddress(
- _GetLocalInnerAddress(version),
- net_test.AddressLengthBits(version), ifindex)
+ self.iproute.AddAddress(addr, prefixlen, ifindex)
self.iproute.AddRoute(version, table, "default", 0, None, ifindex)
else:
self.iproute.DelRoute(version, table, "default", 0, None, ifindex)
- self.iproute.DelAddress(
- _GetLocalInnerAddress(version),
- net_test.AddressLengthBits(version), ifindex)
- if not is_add:
- net_test.SetInterfaceDown(ifname)
+ self.iproute.DelAddress(addr, prefixlen, ifindex)
+
+ def assertReceivedPacket(self, vti):
+ vti.rx += 1
+ self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
+
+ def assertSentPacket(self, vti):
+ vti.tx += 1
+ self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
# TODO: Should we completely re-write this using null encryption and null
# authentication? We could then assemble and disassemble packets for each
@@ -229,34 +283,13 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest):
def _CreateVti(self, netid, vti_netid, ifname, outer_version):
local_outer = self.MyAddress(outer_version, netid)
remote_outer = _GetRemoteOuterAddress(outer_version)
- self.iproute.CreateVirtualTunnelInterface(
- dev_name=ifname,
- local_addr=local_outer,
- remote_addr=remote_outer,
- i_key=_TEST_IKEY,
- o_key=_TEST_OKEY)
+ vti = VtiInterface(ifname, vti_netid, netid, local_outer, remote_outer)
+ self._SetupVtiNetwork(vti, True)
+ return vti
- self._SetupVtiNetwork(vti_netid, ifname, True)
-
- # For the VTI, the selectors are wildcard since packets will only
- # be selected if they have the appropriate mark, hence the inner
- # addresses are wildcard.
- self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer,
- remote_outer, _TEST_OUT_SPI,
- xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(_TEST_OKEY), netid)
-
- self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer,
- local_outer, _TEST_IN_SPI,
- xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(_TEST_IKEY), None)
-
- def _CheckVtiInputOutput(self, netid, vti_netid, iface, outer_version,
- inner_version, rx, tx):
- local_outer = self.MyAddress(outer_version, netid)
- remote_outer = _GetRemoteOuterAddress(outer_version)
+ def _CheckVtiInputOutput(self, vti, inner_version):
+ local_outer = vti.local
+ remote_outer = vti.remote
# Create a socket to receive packets.
read_sock = socket(
@@ -271,40 +304,39 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest):
# of the input socket.
write_sock = socket(
net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- self.SelectInterface(write_sock, vti_netid, "mark")
+ self.SelectInterface(write_sock, vti.netid, "mark")
write_sock.sendto(net_test.UDP_PAYLOAD,
(_GetRemoteInnerAddress(inner_version), port))
# Read a tunneled IP packet on the underlying (outbound) network
# verifying that it is an ESP packet.
- pkt = self._ExpectEspPacketOn(netid, _TEST_OUT_SPI, tx + 1, None,
+ self.assertSentPacket(vti)
+ pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
local_outer, remote_outer)
- self.assertEquals((rx, tx + 1), self.iproute.GetRxTxPackets(iface))
-
# Perform an address switcheroo so that the inner address of the remote
# end of the tunnel is now the address on the local VTI interface; this
# way, the twisted inner packet finds a destination via the VTI once
# decrypted.
remote = _GetRemoteInnerAddress(inner_version)
- local = _GetLocalInnerAddress(inner_version)
- self._SwapInterfaceAddress(iface, new_addr=remote, old_addr=local)
+ local = vti.addrs[inner_version]
+ self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local)
try:
# Swap the packet's IP headers and write it back to the
# underlying network.
pkt = TunTwister.TwistPacket(pkt)
- self.ReceivePacketOn(netid, pkt)
+ self.ReceivePacketOn(vti.underlying_netid, pkt)
+ self.assertReceivedPacket(vti)
# Receive the decrypted packet on the dest port number.
read_packet = read_sock.recv(4096)
self.assertEquals(read_packet, net_test.UDP_PAYLOAD)
- self.assertEquals((rx + 1, tx + 1), self.iproute.GetRxTxPackets(iface))
finally:
# Unwind the switcheroo
- self._SwapInterfaceAddress(iface, new_addr=local, old_addr=remote)
+ self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote)
# Now attempt to provoke an ICMP error.
# TODO: deduplicate with multinetwork_test.py.
- version = outer_version
+ version = net_test.GetAddressVersion(vti.remote)
dst_prefix, intermediate = {
4: ("172.19.", "172.16.9.12"),
6: ("2001:db8::", "2001:db8::1")
@@ -312,39 +344,34 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest):
write_sock.sendto(net_test.UDP_PAYLOAD,
(_GetRemoteInnerAddress(inner_version), port))
- pkt = self._ExpectEspPacketOn(netid, _TEST_OUT_SPI, tx + 2, None,
+ self.assertSentPacket(vti)
+ pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
local_outer, remote_outer)
- myaddr = self.MyAddress(version, netid)
+ myaddr = self.MyAddress(version, vti.underlying_netid)
_, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt)
- self.ReceivePacketOn(netid, toobig)
+ self.ReceivePacketOn(vti.underlying_netid, toobig)
# Check that the packet too big reduced the MTU.
- routes = self.iproute.GetRoutes(remote_outer, 0, netid, None)
+ routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None)
self.assertEquals(1, len(routes))
rtmsg, attributes = routes[0]
self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
# Clear PMTU information so that future tests don't have to worry about it.
- self.InvalidateDstCache(version, netid)
-
- self.assertEquals((rx + 1, tx + 2),
- self.iproute.GetRxTxPackets(iface))
- return rx + 1, tx + 2
+ self.InvalidateDstCache(version, vti.underlying_netid)
def _TestVti(self, outer_version):
"""Test packet input and output over a Virtual Tunnel Interface."""
netid = self.RandomNetid()
- self._CreateVti(netid, _VTI_NETID, _VTI_IFNAME, outer_version)
+ vti = self._CreateVti(netid, _VTI_NETID, _VTI_IFNAME, outer_version)
try:
- rx, tx = self._CheckVtiInputOutput(netid, _VTI_NETID, _VTI_IFNAME,
- outer_version, 4, 0, 0)
- self._CheckVtiInputOutput(netid, _VTI_NETID, _VTI_IFNAME, outer_version,
- 4, rx, tx)
+ self._CheckVtiInputOutput(vti, 4)
+ self._CheckVtiInputOutput(vti, 6)
finally:
- self._SetupVtiNetwork(_VTI_NETID, _VTI_IFNAME, False)
+ self._SetupVtiNetwork(vti, False)
def testIpv4Vti(self):
self._TestVti(4)