diff options
author | Lorenzo Colitti <lorenzo@google.com> | 2018-01-13 17:26:51 +0900 |
---|---|---|
committer | Lorenzo Colitti <lorenzo@google.com> | 2018-02-21 22:24:33 +0900 |
commit | 31c8de4f1409e5c2fe86c99e1477cfa41a0f9887 (patch) | |
tree | 69161338d0aa53bd4099bd47a2c926a08c36a24c | |
parent | 3a38134704bd62482ea371a015aa71e6a3e073d2 (diff) | |
download | tests-31c8de4f1409e5c2fe86c99e1477cfa41a0f9887.tar.gz |
Add a new VtiInterface class and use it in XfrmVtiTest.
This class encapsulates both the knowledge needed to create a
VTI interface and the state that it holds. This will be useful
in future changes to store multiple VTIs in the same test.
Test: xfrm tests pass on android-4.9
Change-Id: I5b047a41e384b94b7cf50546026b32081f5d00bd
-rwxr-xr-x | net/test/xfrm_tunnel_test.py | 177 |
1 files changed, 102 insertions, 75 deletions
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index e1fabb2..c66f74c 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -18,6 +18,7 @@ from errno import * # pylint: disable=wildcard-import from socket import * # pylint: disable=wildcard-import +import random import struct import unittest @@ -37,8 +38,8 @@ _VTI_IFNAME = "test_vti" _TEST_OUT_SPI = 0x1234 _TEST_IN_SPI = _TEST_OUT_SPI -_TEST_OKEY = _TEST_OUT_SPI + _VTI_NETID -_TEST_IKEY = _TEST_IN_SPI + _VTI_NETID +_TEST_OKEY = 2000000100 +_TEST_IKEY = 2000000200 def _GetLocalInnerAddress(version): @@ -99,6 +100,51 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): self._CheckTunnelOutput(6, 6) +class VtiInterface(object): + + def __init__(self, iface, netid, underlying_netid, local, remote): + self.iface = iface + self.netid = netid + self.underlying_netid = underlying_netid + self.local, self.remote = local, remote + self.rx = self.tx = 0 + self.ikey = _TEST_IKEY + netid + self.okey = _TEST_OKEY + netid + self.out_spi = self.in_spi = random.randint(0, 0x7fffffff) + + self.iproute = iproute.IPRoute() + self.xfrm = xfrm.Xfrm() + + self.SetupInterface() + self.SetupXfrm() + self.addrs = {} + + def Teardown(self): + self.TeardownInterface() + + def SetupInterface(self): + self.iproute.CreateVirtualTunnelInterface( + self.iface, self.local, self.remote, self.ikey, self.okey) + + def TeardownInterface(self): + self.iproute.DeleteLink(self.iface) + + def SetupXfrm(self): + # For the VTI, the selectors are wildcard since packets will only + # be selected if they have the appropriate mark, hence the inner + # addresses are wildcard. + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, + self.out_spi, xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, + xfrm.ExactMatchMark(self.okey), + self.underlying_netid) + + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, + self.in_spi, xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, + xfrm.ExactMatchMark(self.ikey), None) + + @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") class XfrmVtiTest(xfrm_base.XfrmLazyTest): @@ -171,7 +217,7 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest): with self.assertRaises(IOError): self.iproute.GetIfIndex(_VTI_IFNAME) - def _SetupVtiNetwork(self, netid, ifname, is_add): + def _SetupVtiNetwork(self, vti, is_add): """Setup rules and routes for a VTI Network. Takes an interface and depending on the boolean @@ -180,15 +226,11 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest): Network for purposes of testing. Args: - ifname: The name of a linux interface + vti: A VtiInterface, the VTI to set up. is_add: Boolean that causes this method to perform setup if True or teardown if False """ if is_add: - # Bring up the interface so that we can start adding addresses - # and routes. - net_test.SetInterfaceUp(ifname) - # Disable router solicitations to avoid occasional spurious packets # arriving on the underlying network; there are two possible behaviors # when that occurred: either only the RA packet is read, and when it @@ -196,30 +238,42 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest): # the UDP_PAYLOAD; or, two packets may arrive on the underlying # network which fails the assertion that only one ESP packet is received. self.SetSysctl( - "/proc/sys/net/ipv6/conf/%s/router_solicitations" % ifname, 0) + "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0) + net_test.SetInterfaceUp(vti.iface) + for version in [4, 6]: - ifindex = net_test.GetInterfaceIndex(ifname) - table = netid + ifindex = net_test.GetInterfaceIndex(vti.iface) + table = vti.netid # Set up routing rules. - start, end = self.UidRangeForNetid(netid) + start, end = self.UidRangeForNetid(vti.netid) self.iproute.UidRangeRule(version, is_add, start, end, table, self.PRIORITY_UID) - self.iproute.OifRule(version, is_add, ifname, table, self.PRIORITY_OIF) - self.iproute.FwmarkRule(version, is_add, netid, self.NETID_FWMASK, table, - self.PRIORITY_FWMARK) + self.iproute.OifRule(version, is_add, vti.iface, table, self.PRIORITY_OIF) + self.iproute.FwmarkRule(version, is_add, vti.netid, self.NETID_FWMASK, + table, self.PRIORITY_FWMARK) + + # Configure IP addresses. + if version == 4: + addr = self._MyIPv4Address(vti.netid) + else: + addr = self.OnlinkPrefix(6, vti.netid) + "1" + prefixlen = net_test.AddressLengthBits(version) + vti.addrs[version] = addr if is_add: - self.iproute.AddAddress( - _GetLocalInnerAddress(version), - net_test.AddressLengthBits(version), ifindex) + self.iproute.AddAddress(addr, prefixlen, ifindex) self.iproute.AddRoute(version, table, "default", 0, None, ifindex) else: self.iproute.DelRoute(version, table, "default", 0, None, ifindex) - self.iproute.DelAddress( - _GetLocalInnerAddress(version), - net_test.AddressLengthBits(version), ifindex) - if not is_add: - net_test.SetInterfaceDown(ifname) + self.iproute.DelAddress(addr, prefixlen, ifindex) + + def assertReceivedPacket(self, vti): + vti.rx += 1 + self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) + + def assertSentPacket(self, vti): + vti.tx += 1 + self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) # TODO: Should we completely re-write this using null encryption and null # authentication? We could then assemble and disassemble packets for each @@ -229,34 +283,13 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest): def _CreateVti(self, netid, vti_netid, ifname, outer_version): local_outer = self.MyAddress(outer_version, netid) remote_outer = _GetRemoteOuterAddress(outer_version) - self.iproute.CreateVirtualTunnelInterface( - dev_name=ifname, - local_addr=local_outer, - remote_addr=remote_outer, - i_key=_TEST_IKEY, - o_key=_TEST_OKEY) + vti = VtiInterface(ifname, vti_netid, netid, local_outer, remote_outer) + self._SetupVtiNetwork(vti, True) + return vti - self._SetupVtiNetwork(vti_netid, ifname, True) - - # For the VTI, the selectors are wildcard since packets will only - # be selected if they have the appropriate mark, hence the inner - # addresses are wildcard. - self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer, - remote_outer, _TEST_OUT_SPI, - xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - xfrm.ExactMatchMark(_TEST_OKEY), netid) - - self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer, - local_outer, _TEST_IN_SPI, - xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - xfrm.ExactMatchMark(_TEST_IKEY), None) - - def _CheckVtiInputOutput(self, netid, vti_netid, iface, outer_version, - inner_version, rx, tx): - local_outer = self.MyAddress(outer_version, netid) - remote_outer = _GetRemoteOuterAddress(outer_version) + def _CheckVtiInputOutput(self, vti, inner_version): + local_outer = vti.local + remote_outer = vti.remote # Create a socket to receive packets. read_sock = socket( @@ -271,40 +304,39 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest): # of the input socket. write_sock = socket( net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - self.SelectInterface(write_sock, vti_netid, "mark") + self.SelectInterface(write_sock, vti.netid, "mark") write_sock.sendto(net_test.UDP_PAYLOAD, (_GetRemoteInnerAddress(inner_version), port)) # Read a tunneled IP packet on the underlying (outbound) network # verifying that it is an ESP packet. - pkt = self._ExpectEspPacketOn(netid, _TEST_OUT_SPI, tx + 1, None, + self.assertSentPacket(vti) + pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, local_outer, remote_outer) - self.assertEquals((rx, tx + 1), self.iproute.GetRxTxPackets(iface)) - # Perform an address switcheroo so that the inner address of the remote # end of the tunnel is now the address on the local VTI interface; this # way, the twisted inner packet finds a destination via the VTI once # decrypted. remote = _GetRemoteInnerAddress(inner_version) - local = _GetLocalInnerAddress(inner_version) - self._SwapInterfaceAddress(iface, new_addr=remote, old_addr=local) + local = vti.addrs[inner_version] + self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local) try: # Swap the packet's IP headers and write it back to the # underlying network. pkt = TunTwister.TwistPacket(pkt) - self.ReceivePacketOn(netid, pkt) + self.ReceivePacketOn(vti.underlying_netid, pkt) + self.assertReceivedPacket(vti) # Receive the decrypted packet on the dest port number. read_packet = read_sock.recv(4096) self.assertEquals(read_packet, net_test.UDP_PAYLOAD) - self.assertEquals((rx + 1, tx + 1), self.iproute.GetRxTxPackets(iface)) finally: # Unwind the switcheroo - self._SwapInterfaceAddress(iface, new_addr=local, old_addr=remote) + self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote) # Now attempt to provoke an ICMP error. # TODO: deduplicate with multinetwork_test.py. - version = outer_version + version = net_test.GetAddressVersion(vti.remote) dst_prefix, intermediate = { 4: ("172.19.", "172.16.9.12"), 6: ("2001:db8::", "2001:db8::1") @@ -312,39 +344,34 @@ class XfrmVtiTest(xfrm_base.XfrmLazyTest): write_sock.sendto(net_test.UDP_PAYLOAD, (_GetRemoteInnerAddress(inner_version), port)) - pkt = self._ExpectEspPacketOn(netid, _TEST_OUT_SPI, tx + 2, None, + self.assertSentPacket(vti) + pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, local_outer, remote_outer) - myaddr = self.MyAddress(version, netid) + myaddr = self.MyAddress(version, vti.underlying_netid) _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt) - self.ReceivePacketOn(netid, toobig) + self.ReceivePacketOn(vti.underlying_netid, toobig) # Check that the packet too big reduced the MTU. - routes = self.iproute.GetRoutes(remote_outer, 0, netid, None) + routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None) self.assertEquals(1, len(routes)) rtmsg, attributes = routes[0] self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) # Clear PMTU information so that future tests don't have to worry about it. - self.InvalidateDstCache(version, netid) - - self.assertEquals((rx + 1, tx + 2), - self.iproute.GetRxTxPackets(iface)) - return rx + 1, tx + 2 + self.InvalidateDstCache(version, vti.underlying_netid) def _TestVti(self, outer_version): """Test packet input and output over a Virtual Tunnel Interface.""" netid = self.RandomNetid() - self._CreateVti(netid, _VTI_NETID, _VTI_IFNAME, outer_version) + vti = self._CreateVti(netid, _VTI_NETID, _VTI_IFNAME, outer_version) try: - rx, tx = self._CheckVtiInputOutput(netid, _VTI_NETID, _VTI_IFNAME, - outer_version, 4, 0, 0) - self._CheckVtiInputOutput(netid, _VTI_NETID, _VTI_IFNAME, outer_version, - 4, rx, tx) + self._CheckVtiInputOutput(vti, 4) + self._CheckVtiInputOutput(vti, 6) finally: - self._SetupVtiNetwork(_VTI_NETID, _VTI_IFNAME, False) + self._SetupVtiNetwork(vti, False) def testIpv4Vti(self): self._TestVti(4) |