From 1e8d6a3baf39d7e3149463c8d8b8eec6860690c9 Mon Sep 17 00:00:00 2001 From: Pierre Lee Date: Mon, 18 Feb 2019 20:53:09 +0800 Subject: Test experimental xfrm interfaces if supported. This test adds code to transparently test VTI and/or the new xfrmi interface type depending on what is supported. (This is CP from 550701 on master branch) Bug: 123202162 Test: VtsKernelNetTest can pass Change-Id: I95cfcab44e3969012205e5fcf8ef5d384c9890fa Merged-In: If52921bbb60ad2fa05313a3ce1156b37a50476af --- net/test/iproute.py | 26 ++++- net/test/run_net_test.sh | 2 +- net/test/xfrm.py | 74 +++++++++--- net/test/xfrm_test.py | 6 +- net/test/xfrm_tunnel_test.py | 262 +++++++++++++++++++++++++++++++------------ 5 files changed, 275 insertions(+), 95 deletions(-) diff --git a/net/test/iproute.py b/net/test/iproute.py index a3310a2..8376eb6 100644 --- a/net/test/iproute.py +++ b/net/test/iproute.py @@ -209,13 +209,17 @@ IFLA_PAD = 42 IFLA_XDP = 43 IFLA_EVENT = 44 -# linux/include/uapi/if_link.h +# include/uapi/linux/if_link.h IFLA_INFO_UNSPEC = 0 IFLA_INFO_KIND = 1 IFLA_INFO_DATA = 2 IFLA_INFO_XSTATS = 3 -# linux/if_tunnel.h +IFLA_XFRM_UNSPEC = 0 +IFLA_XFRM_LINK = 1 +IFLA_XFRM_IF_ID = 2 + +# include/uapi/linux/if_tunnel.h IFLA_VTI_UNSPEC = 0 IFLA_VTI_LINK = 1 IFLA_VTI_IKEY = 2 @@ -682,7 +686,7 @@ class IPRoute(netlink.NetlinkSocket): attrs = self._ParseAttributes(RTM_NEWLINK, IfinfoMsg, attrs) return attrs["IFLA_STATS64"] - def GetVtiInfoData(self, dev_name): + def GetIfinfoData(self, dev_name): """Returns an IFLA_INFO_DATA dict object for the specified interface.""" _, attrs = self.GetIfinfo(dev_name) attrs = self._ParseAttributes(RTM_NEWLINK, IfinfoMsg, attrs) @@ -745,6 +749,22 @@ class IPRoute(netlink.NetlinkSocket): flags |= netlink.NLM_F_EXCL return self._SendNlRequest(RTM_NEWLINK, ifinfo, flags) + def CreateXfrmInterface(self, dev_name, xfrm_if_id, underlying_ifindex): + """Creates an XFRM interface with the specified parameters.""" + # The netlink attribute structure is essentially identical to the one + # for VTI above (q.v). + ifdata = self._NlAttrU32(IFLA_XFRM_LINK, underlying_ifindex) + ifdata += self._NlAttrU32(IFLA_XFRM_IF_ID, xfrm_if_id) + + linkinfo = self._NlAttrStr(IFLA_INFO_KIND, "xfrm") + linkinfo += self._NlAttr(IFLA_INFO_DATA, ifdata) + + msg = IfinfoMsg().Pack() + msg += self._NlAttrStr(IFLA_IFNAME, dev_name) + msg += self._NlAttr(IFLA_LINKINFO, linkinfo) + + return self._SendNlRequest(RTM_NEWLINK, msg) + if __name__ == "__main__": iproute = IPRoute() diff --git a/net/test/run_net_test.sh b/net/test/run_net_test.sh index 8c256b4..c7c18d3 100755 --- a/net/test/run_net_test.sh +++ b/net/test/run_net_test.sh @@ -30,7 +30,7 @@ OPTIONS="$OPTIONS TRANSPORT INET_XFRM_MODE_TUNNEL INET6_AH INET6_ESP" OPTIONS="$OPTIONS INET6_XFRM_MODE_TRANSPORT INET6_XFRM_MODE_TUNNEL" OPTIONS="$OPTIONS CRYPTO_SHA256 CRYPTO_SHA512 CRYPTO_AES_X86_64 CRYPTO_NULL" OPTIONS="$OPTIONS CRYPTO_GCM CRYPTO_ECHAINIV NET_IPVTI IPV6_VTI" -OPTIONS="$OPTIONS SOCK_CGROUP_DATA CGROUP_BPF" +OPTIONS="$OPTIONS SOCK_CGROUP_DATA CGROUP_BPF CONFIG_XFRM_INTERFACE" # For 4.14 kernels, where UBD and HOSTFS are not set OPTIONS="$OPTIONS CONFIG_BLK_DEV_UBD CONFIG_HOSTFS" diff --git a/net/test/xfrm.py b/net/test/xfrm.py index 1bd10da..6a03c13 100755 --- a/net/test/xfrm.py +++ b/net/test/xfrm.py @@ -85,6 +85,8 @@ XFRMA_ADDRESS_FILTER = 26 XFRMA_PAD = 27 XFRMA_OFFLOAD_DEV = 28 XFRMA_OUTPUT_MARK = 29 +XFRMA_INPUT_MARK = 30 +XFRMA_IF_ID = 31 # Other netlink constants. See include/uapi/linux/xfrm.h. @@ -369,21 +371,25 @@ class Xfrm(netlink.NetlinkSocket): data = struct.unpack("=I", nla_data)[0] elif name == "XFRMA_TMPL": data = cstruct.Read(nla_data, XfrmUserTmpl)[0] + elif name == "XFRMA_IF_ID": + data = struct.unpack("=I", nla_data)[0] else: data = nla_data return name, data - def _UpdatePolicyInfo(self, msg, policy, tmpl, mark): + def _UpdatePolicyInfo(self, msg, policy, tmpl, mark, xfrm_if_id): """Send a policy to the Security Policy Database""" nlattrs = [] if tmpl is not None: nlattrs.append((XFRMA_TMPL, tmpl)) if mark is not None: nlattrs.append((XFRMA_MARK, mark)) + if xfrm_if_id is not None: + nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id))) self.SendXfrmNlRequest(msg, policy, nlattrs) - def AddPolicyInfo(self, policy, tmpl, mark): + def AddPolicyInfo(self, policy, tmpl, mark, xfrm_if_id=None): """Add a new policy to the Security Policy Database If the policy exists, then return an error (EEXIST). @@ -392,10 +398,11 @@ class Xfrm(netlink.NetlinkSocket): policy: an unpacked XfrmUserpolicyInfo tmpl: an unpacked XfrmUserTmpl mark: an unpacked XfrmMark + xfrm_if_id: the XFRM interface ID as an integer, or None """ - self._UpdatePolicyInfo(XFRM_MSG_NEWPOLICY, policy, tmpl, mark) + self._UpdatePolicyInfo(XFRM_MSG_NEWPOLICY, policy, tmpl, mark, xfrm_if_id) - def UpdatePolicyInfo(self, policy, tmpl, mark): + def UpdatePolicyInfo(self, policy, tmpl, mark, xfrm_if_id): """Update an existing policy in the Security Policy Database If the policy does not exist, then create it; otherwise, update the @@ -405,10 +412,11 @@ class Xfrm(netlink.NetlinkSocket): policy: an unpacked XfrmUserpolicyInfo tmpl: an unpacked XfrmUserTmpl to update mark: an unpacked XfrmMark to match the existing policy or None + xfrm_if_id: an XFRM interface ID or None """ - self._UpdatePolicyInfo(XFRM_MSG_UPDPOLICY, policy, tmpl, mark) + self._UpdatePolicyInfo(XFRM_MSG_UPDPOLICY, policy, tmpl, mark, xfrm_if_id) - def DeletePolicyInfo(self, selector, direction, mark): + def DeletePolicyInfo(self, selector, direction, mark, xfrm_if_id=None): """Delete a policy from the Security Policy Database Args: @@ -419,6 +427,8 @@ class Xfrm(netlink.NetlinkSocket): nlattrs = [] if mark is not None: nlattrs.append((XFRMA_MARK, mark)) + if xfrm_if_id is not None: + nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id))) self.SendXfrmNlRequest(XFRM_MSG_DELPOLICY, XfrmUserpolicyId(sel=selector, dir=direction), nlattrs) @@ -440,11 +450,35 @@ class Xfrm(netlink.NetlinkSocket): if nlattrs is None: nlattrs = [] for attr_type, attr_msg in nlattrs: - msg += self._NlAttr(attr_type, attr_msg.Pack()) + # TODO: find a better way to deal with the fact that many XFRM messages + # use nlattrs that aren't cstructs. + # + # This code allows callers to pass in either something that has a Pack() + # method or a packed netlink attr, but not other types of attributes. + # Alternatives include: + # + # 1. Require callers to marshal netlink attributes themselves and call + # _SendNlRequest directly. Delete this method. + # 2. Rename this function to _SendXfrmNlRequestCstructOnly (or other name + # that makes it clear that this only takes cstructs). Switch callers + # that need non-cstruct elements to calling _SendNlRequest directly. + # 3. Make this function somehow automatically detect what to do for + # all types of XFRM attributes today and in the future. This may be + # feasible because all XFRM attributes today occupy the same number + # space, but what about nested attributes? It is unlikley feasible via + # things like "if isinstance(attr_msg, str): ...", because that would + # not be able to determine the right size or byte order for non-struct + # types such as int. + # 4. Define fictitious cstructs which have no correspondence to actual + # kernel structs such as the following to represent a raw integer. + # XfrmAttrOutputMark = cstruct.Struct("=I", mark) + if hasattr(attr_msg, "Pack"): + attr_msg = attr_msg.Pack() + msg += self._NlAttr(attr_type, attr_msg) return self._SendNlRequest(msg_type, msg, flags) def AddSaInfo(self, src, dst, spi, mode, reqid, encryption, auth_trunc, aead, - encap, mark, output_mark, is_update=False): + encap, mark, output_mark, is_update=False, xfrm_if_id=None): """Adds an IPsec security association. Args: @@ -463,6 +497,7 @@ class Xfrm(netlink.NetlinkSocket): output_mark: An integer, the output mark. 0 means unset. is_update: If true, update an existing SA otherwise create a new SA. For compatibility reasons, this value defaults to False. + xfrm_if_id: The XFRM interface ID, or None. """ proto = IPPROTO_ESP xfrm_id = XfrmId((PaddedAddress(dst), spi, proto)) @@ -488,6 +523,8 @@ class Xfrm(netlink.NetlinkSocket): nlattrs += self._NlAttr(XFRMA_ENCAP, encap.Pack()) if output_mark is not None: nlattrs += self._NlAttrU32(XFRMA_OUTPUT_MARK, output_mark) + if xfrm_if_id is not None: + nlattrs += self._NlAttrU32(XFRMA_IF_ID, xfrm_if_id) # The kernel ignores these on input, so make them empty. cur = XfrmLifetimeCur() @@ -519,7 +556,7 @@ class Xfrm(netlink.NetlinkSocket): nl_msg_type = XFRM_MSG_UPDSA if is_update else XFRM_MSG_NEWSA self._SendNlRequest(nl_msg_type, msg, flags) - def DeleteSaInfo(self, dst, spi, proto, mark=None): + def DeleteSaInfo(self, dst, spi, proto, mark=None, xfrm_if_id=None): """Delete an SA from the SAD Args: @@ -530,12 +567,13 @@ class Xfrm(netlink.NetlinkSocket): mark: A mark match specifier, such as returned by ExactMatchMark(), or None for an SA without a Mark attribute. """ - # TODO: deletes take a mark as well. family = AF_INET6 if ":" in dst else AF_INET usersa_id = XfrmUsersaId((PaddedAddress(dst), spi, family, proto)) nlattrs = [] if mark is not None: nlattrs.append((XFRMA_MARK, mark)) + if xfrm_if_id is not None: + nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id))) self.SendXfrmNlRequest(XFRM_MSG_DELSA, usersa_id, nlattrs) def AllocSpi(self, dst, proto, min_spi, max_spi): @@ -592,7 +630,7 @@ class Xfrm(netlink.NetlinkSocket): self._SendNlRequest(XFRM_MSG_FLUSHSA, usersa_flush.Pack(), flags) def CreateTunnel(self, direction, selector, src, dst, spi, encryption, - auth_trunc, mark, output_mark): + auth_trunc, mark, output_mark, xfrm_if_id): """Create an XFRM Tunnel Consisting of a Policy and an SA. Create a unidirectional XFRM tunnel, which entails one Policy and one @@ -614,11 +652,12 @@ class Xfrm(netlink.NetlinkSocket): unspecified. output_mark: The mark used to select the underlying network for packets outbound from xfrm. None means unspecified. + xfrm_if_id: The ID of the XFRM interface to use or None. """ outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst)) self.AddSaInfo(src, dst, spi, XFRM_MODE_TUNNEL, 0, encryption, auth_trunc, - None, None, mark, output_mark) + None, None, mark, output_mark, xfrm_if_id=xfrm_if_id) if selector is None: selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)] @@ -628,16 +667,19 @@ class Xfrm(netlink.NetlinkSocket): for selector in selectors: policy = UserPolicy(direction, selector) tmpl = UserTemplate(outer_family, spi, 0, (src, dst)) - self.AddPolicyInfo(policy, tmpl, mark) + self.AddPolicyInfo(policy, tmpl, mark, xfrm_if_id=xfrm_if_id) + + def DeleteTunnel(self, direction, selector, dst, spi, mark, xfrm_if_id): + if mark is not None: + mark = ExactMatchMark(mark) - def DeleteTunnel(self, direction, selector, dst, spi, mark): - self.DeleteSaInfo(dst, spi, IPPROTO_ESP, ExactMatchMark(mark)) + self.DeleteSaInfo(dst, spi, IPPROTO_ESP, mark, xfrm_if_id) if selector is None: selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)] else: selectors = [selector] for selector in selectors: - self.DeletePolicyInfo(selector, direction, ExactMatchMark(mark)) + self.DeletePolicyInfo(selector, direction, mark, xfrm_if_id) if __name__ == "__main__": diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py index 24f9edc..3a3d9b0 100755 --- a/net/test/xfrm_test.py +++ b/net/test/xfrm_test.py @@ -640,14 +640,14 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): self.assertEquals(attributes['XFRMA_TMPL'], tmpl) # Create a new policy using update. - self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark) + self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark, None) # NEWPOLICY will not update the existing policy. This checks both that # UPDPOLICY created a policy and that NEWPOLICY will not perform updates. _CheckTemplateMatch(tmpl1) with self.assertRaisesErrno(EEXIST): - self.xfrm.AddPolicyInfo(policy, tmpl2, mark) + self.xfrm.AddPolicyInfo(policy, tmpl2, mark, None) # Update the policy using UPDPOLICY. - self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark) + self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark, None) # There should only be one policy after update, and it should have the # updated template. _CheckTemplateMatch(tmpl2) diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index 77cc8b3..761c5b0 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -31,6 +31,28 @@ import packets import xfrm import xfrm_base +_LOOPBACK_IFINDEX = 1 +_TEST_XFRM_IFNAME = "ipsec42" +_TEST_XFRM_IF_ID = 42 + +# Does the kernel support xfrmi interfaces? +def HaveXfrmInterfaces(): + try: + i = iproute.IPRoute() + i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, + _LOOPBACK_IFINDEX) + i.DeleteLink(_TEST_XFRM_IFNAME) + try: + i.GetIfIndex(_TEST_XFRM_IFNAME) + assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME + except IOError: + pass + return True + except IOError: + return False + +HAVE_XFRM_INTERFACES = HaveXfrmInterfaces() + # Parameters to Set up VTI as a special network _BASE_VTI_NETID = {4: 40, 6: 60} _BASE_VTI_OKEY = 2000000100 @@ -79,7 +101,7 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1, - None, underlying_netid) + None, underlying_netid, None) write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) # Select an interface, which provides the source address of the inner @@ -106,13 +128,16 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): - def verifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, ikey, okey): + def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, + ikey, okey): self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey) self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey) family = AF_INET if version == 4 else AF_INET6 - self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), local_addr) - self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), remote_addr) + self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), + local_addr) + self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), + remote_addr) def testAddVti(self): """Test the creation of a Virtual Tunnel Interface.""" @@ -125,8 +150,9 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): remote_addr=_GetRemoteOuterAddress(version), o_key=_TEST_OKEY, i_key=_TEST_IKEY) - self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME), - version, local_addr, _GetRemoteOuterAddress(version), + self._VerifyVtiInfoData(self.iproute.GetIfinfoData(_VTI_IFNAME), + version, local_addr, + _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} @@ -140,7 +166,7 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): i_key=new_ikey, is_update=True) - self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME), + self._VerifyVtiInfoData(self.iproute.GetIfinfoData(_VTI_IFNAME), version, local_addr, new_remote_addr[version], new_ikey, new_okey) @@ -166,7 +192,7 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): class VtiInterface(object): - def __init__(self, iface, netid, underlying_netid, local, remote): + def __init__(self, iface, netid, underlying_netid, _, local, remote): self.iface = iface self.netid = netid self.underlying_netid = underlying_netid @@ -188,7 +214,7 @@ class VtiInterface(object): self.TeardownInterface() def SetupInterface(self): - self.iproute.CreateVirtualTunnelInterface( + return self.iproute.CreateVirtualTunnelInterface( self.iface, self.local, self.remote, self.ikey, self.okey) def TeardownInterface(self): @@ -202,22 +228,88 @@ class VtiInterface(object): self.out_spi, xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1, xfrm.ExactMatchMark(self.okey), - self.underlying_netid) + self.underlying_netid, None) self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, self.in_spi, xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1, - xfrm.ExactMatchMark(self.ikey), None) + xfrm.ExactMatchMark(self.ikey), None, None) def TeardownXfrm(self): self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, - self.out_spi, self.okey) + self.out_spi, self.okey, None) self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, - self.in_spi, self.ikey) + self.in_spi, self.ikey, None) -@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") -class XfrmVtiTest(xfrm_base.XfrmBaseTest): +@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") +class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): + """Test the creation of an XFRM Interface.""" + + def testAddXfrmInterface(self): + self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, + _LOOPBACK_IFINDEX) + if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) + net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) + + # Validate that the netlink interface matches the ioctl interface. + self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) + self.iproute.DeleteLink(_TEST_XFRM_IFNAME) + with self.assertRaises(IOError): + self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) + + +class XfrmInterface(object): + + def __init__(self, iface, netid, underlying_netid, ifindex, local, remote): + self.iface = iface + self.netid = netid + self.underlying_netid = underlying_netid + self.ifindex = ifindex + self.local, self.remote = local, remote + self.rx = self.tx = 0 + self.xfrm_if_id = netid + self.out_spi = self.in_spi = random.randint(0, 0x7fffffff) + self.xfrm_if_id = self.netid + + self.iproute = iproute.IPRoute() + self.xfrm = xfrm.Xfrm() + + self.SetupInterface() + self.SetupXfrm() + self.addrs = {} + + def Teardown(self): + self.TeardownXfrm() + self.TeardownInterface() + + def SetupInterface(self): + """Create an XFRM interface.""" + return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) + + def TeardownInterface(self): + self.iproute.DeleteLink(self.iface) + + def SetupXfrm(self): + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, + self.out_spi, xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, None, + self.underlying_netid, self.xfrm_if_id) + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, + self.in_spi, xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, + None, None, self.xfrm_if_id) + + + def TeardownXfrm(self): + self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, + self.out_spi, None, self.xfrm_if_id) + self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, + self.in_spi, None, self.xfrm_if_id) + + + +class XfrmTunnelBase(xfrm_base.XfrmBaseTest): @classmethod def setUpClass(cls): @@ -227,7 +319,7 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): cls.SetInboundMarks(True) cls.SetMarkReflectSysctls(1) - cls.vtis = {} + cls.tunnels = {} for i, underlying_netid in enumerate(cls.tuns): for version in 4, 6: netid = _BASE_VTI_NETID[version] + i @@ -237,19 +329,23 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR else: remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR - vti = VtiInterface(iface, netid, underlying_netid, local, remote) + ifindex = cls.ifindices[underlying_netid] + + tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, + local, remote) + cls._SetInboundMarking(netid, iface, True) - cls._SetupVtiNetwork(vti, True) - cls.vtis[netid] = vti + cls._SetupTunnelNetwork(tunnel, True) + cls.tunnels[netid] = tunnel @classmethod def tearDownClass(cls): # The sysctls are restored by MultinetworkBaseTest.tearDownClass. cls.SetInboundMarks(False) - for vti in cls.vtis.values(): - cls._SetInboundMarking(vti.netid, vti.iface, False) - cls._SetupVtiNetwork(vti, False) - vti.Teardown() + for tunnel in cls.tunnels.values(): + cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) + cls._SetupTunnelNetwork(tunnel, False) + tunnel.Teardown() xfrm_base.XfrmBaseTest.tearDownClass() def setUp(self): @@ -275,16 +371,16 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): net_test.AddressLengthBits(version), ifindex) @classmethod - def _SetupVtiNetwork(cls, vti, is_add): - """Setup rules and routes for a VTI Network. + def _SetupTunnelNetwork(cls, tunnel, is_add): + """Setup rules and routes for a tunnel Network. Takes an interface and depending on the boolean value of is_add, either adds or removes the rules - and routes for a VTI to behave like an Android - Network for purposes of testing. + and routes for a tunnel interface to behave like an + Android Network for purposes of testing. Args: - vti: A VtiInterface, the VTI to set up. + tunnel: A VtiInterface or XfrmInterface, the tunnel to set up. is_add: Boolean that causes this method to perform setup if True or teardown if False """ @@ -292,32 +388,33 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): # Disable router solicitations to avoid occasional spurious packets # arriving on the underlying network; there are two possible behaviors # when that occurred: either only the RA packet is read, and when it - # is echoed back to the VTI, it causes the test to fail by not receiving - # the UDP_PAYLOAD; or, two packets may arrive on the underlying - # network which fails the assertion that only one ESP packet is received. + # is echoed back to the tunnel, it causes the test to fail by not + # receiving # the UDP_PAYLOAD; or, two packets may arrive on the + # underlying # network which fails the assertion that only one ESP packet + # is received. cls.SetSysctl( - "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0) - net_test.SetInterfaceUp(vti.iface) + "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0) + net_test.SetInterfaceUp(tunnel.iface) for version in [4, 6]: - ifindex = net_test.GetInterfaceIndex(vti.iface) - table = vti.netid + ifindex = net_test.GetInterfaceIndex(tunnel.iface) + table = tunnel.netid # Set up routing rules. - start, end = cls.UidRangeForNetid(vti.netid) + start, end = cls.UidRangeForNetid(tunnel.netid) cls.iproute.UidRangeRule(version, is_add, start, end, table, cls.PRIORITY_UID) - cls.iproute.OifRule(version, is_add, vti.iface, table, cls.PRIORITY_OIF) - cls.iproute.FwmarkRule(version, is_add, vti.netid, cls.NETID_FWMASK, + cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF) + cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK, table, cls.PRIORITY_FWMARK) # Configure IP addresses. if version == 4: - addr = cls._MyIPv4Address(vti.netid) + addr = cls._MyIPv4Address(tunnel.netid) else: - addr = cls.OnlinkPrefix(6, vti.netid) + "1" + addr = cls.OnlinkPrefix(6, tunnel.netid) + "1" prefixlen = net_test.AddressLengthBits(version) - vti.addrs[version] = addr + tunnel.addrs[version] = addr if is_add: cls.iproute.AddAddress(addr, prefixlen, ifindex) cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) @@ -325,22 +422,24 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) cls.iproute.DelAddress(addr, prefixlen, ifindex) - def assertReceivedPacket(self, vti): - vti.rx += 1 - self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) + def assertReceivedPacket(self, tunnel): + tunnel.rx += 1 + self.assertEquals((tunnel.rx, tunnel.tx), + self.iproute.GetRxTxPackets(tunnel.iface)) - def assertSentPacket(self, vti): - vti.tx += 1 - self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) + def assertSentPacket(self, tunnel): + tunnel.tx += 1 + self.assertEquals((tunnel.rx, tunnel.tx), + self.iproute.GetRxTxPackets(tunnel.iface)) # TODO: Should we completely re-write this using null encryption and null # authentication? We could then assemble and disassemble packets for each # direction individually. This approach would improve debuggability, avoid the # complexity of the twister, and allow the test to more-closely validate # deployable configurations. - def _CheckVtiInputOutput(self, vti, inner_version): - local_outer = vti.local - remote_outer = vti.remote + def _CheckTunnelInputOutput(self, tunnel, inner_version): + local_outer = tunnel.local + remote_outer = tunnel.remote # Create a socket to receive packets. read_sock = socket( @@ -351,43 +450,44 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): # Guard against the eventuality of the receive failing. csocket.SetSocketTimeout(read_sock, 100) - # Send a packet out via the vti-backed network, bound for the port number + # Send a packet out via the tunnel-backed network, bound for the port number # of the input socket. write_sock = socket( net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - self.SelectInterface(write_sock, vti.netid, "mark") + self.SelectInterface(write_sock, tunnel.netid, "mark") write_sock.sendto(net_test.UDP_PAYLOAD, (_GetRemoteInnerAddress(inner_version), port)) # Read a tunneled IP packet on the underlying (outbound) network # verifying that it is an ESP packet. - self.assertSentPacket(vti) - pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, + self.assertSentPacket(tunnel) + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_spi, tunnel.tx, None, local_outer, remote_outer) # Perform an address switcheroo so that the inner address of the remote - # end of the tunnel is now the address on the local VTI interface; this - # way, the twisted inner packet finds a destination via the VTI once + # end of the tunnel is now the address on the local tunnel interface; this + # way, the twisted inner packet finds a destination via the tunnel once # decrypted. remote = _GetRemoteInnerAddress(inner_version) - local = vti.addrs[inner_version] - self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local) + local = tunnel.addrs[inner_version] + self._SwapInterfaceAddress(tunnel.iface, new_addr=remote, old_addr=local) try: # Swap the packet's IP headers and write it back to the # underlying network. pkt = TunTwister.TwistPacket(pkt) - self.ReceivePacketOn(vti.underlying_netid, pkt) + self.ReceivePacketOn(tunnel.underlying_netid, pkt) + self.assertReceivedPacket(tunnel) # Receive the decrypted packet on the dest port number. read_packet = read_sock.recv(4096) self.assertEquals(read_packet, net_test.UDP_PAYLOAD) self.assertReceivedPacket(vti) finally: # Unwind the switcheroo - self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote) + self._SwapInterfaceAddress(tunnel.iface, new_addr=local, old_addr=remote) # Now attempt to provoke an ICMP error. # TODO: deduplicate with multinetwork_test.py. - version = net_test.GetAddressVersion(vti.remote) + version = net_test.GetAddressVersion(tunnel.remote) dst_prefix, intermediate = { 4: ("172.19.", "172.16.9.12"), 6: ("2001:db8::", "2001:db8::1") @@ -395,29 +495,47 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): write_sock.sendto(net_test.UDP_PAYLOAD, (_GetRemoteInnerAddress(inner_version), port)) - self.assertSentPacket(vti) - pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, + self.assertSentPacket(tunnel) + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_spi, tunnel.tx, None, local_outer, remote_outer) - myaddr = self.MyAddress(version, vti.underlying_netid) + myaddr = self.MyAddress(version, tunnel.underlying_netid) _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt) - self.ReceivePacketOn(vti.underlying_netid, toobig) + self.ReceivePacketOn(tunnel.underlying_netid, toobig) # Check that the packet too big reduced the MTU. - routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None) + routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) self.assertEquals(1, len(routes)) rtmsg, attributes = routes[0] self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) # Clear PMTU information so that future tests don't have to worry about it. - self.InvalidateDstCache(version, vti.underlying_netid) + self.InvalidateDstCache(version, tunnel.underlying_netid) - def testVtiInputOutput(self): + def CheckTunnelInputOutput(self): """Test packet input and output over a Virtual Tunnel Interface.""" - for i in xrange(3 * len(self.vtis.values())): - vti = random.choice(self.vtis.values()) - self._CheckVtiInputOutput(vti, 4) - self._CheckVtiInputOutput(vti, 6) + for i in xrange(3 * len(self.tunnels.values())): + tunnel = random.choice(self.tunnels.values()) + self._CheckTunnelInputOutput(tunnel, 4) + self._CheckTunnelInputOutput(tunnel, 6) + + +@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") +class XfrmVtiTest(XfrmTunnelBase): + + INTERFACE_CLASS = VtiInterface + + def testVtiInputOutput(self): + self.CheckTunnelInputOutput() + + +@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") +class XfrmInterfaceTest(XfrmTunnelBase): + + INTERFACE_CLASS = XfrmInterface + + def testXfrmiInputOutput(self): + self.CheckTunnelInputOutput() if __name__ == "__main__": -- cgit v1.2.3 From b2d08ed88873b2085fb7edba87cadc5720e41698 Mon Sep 17 00:00:00 2001 From: Pierre Lee Date: Mon, 18 Feb 2019 20:54:29 +0800 Subject: Add tunnel input tests to net_tests This commit adds tunnel (non-VTI) input tests to xfrm_tunnel_test using null encryption. This is CP from 585815 on master branch Bug: 123202162 Test: VtsKernelNetTest can pass Change-Id: Ied437a50cdb29b189d74aaa4c3604d4aa2aec8e4 Merged-In: I829e6b12e126bf40915544cdfa94271b0ad1b1b5 --- net/test/xfrm_tunnel_test.py | 115 +++++++++++++++++++++++++++++++------------ 1 file changed, 84 insertions(+), 31 deletions(-) diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index 761c5b0..b8da743 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -19,15 +19,18 @@ from errno import * # pylint: disable=wildcard-import from socket import * # pylint: disable=wildcard-import import random +import itertools import struct import unittest +from scapy import all as scapy from tun_twister import TunTwister import csocket import iproute import multinetwork_base import net_test import packets +import util import xfrm import xfrm_base @@ -80,50 +83,99 @@ def _GetRemoteOuterAddress(version): return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] +def InjectTests(): + InjectParameterizedTests(XfrmTunnelTest) + + +def InjectParameterizedTests(cls): + VERSIONS = (4, 6) + param_list = itertools.product(VERSIONS, VERSIONS) + + def NameGenerator(*args): + return "IPv%d_in_IPv%d" % tuple(args) + + util.InjectParameterizedTest(cls, param_list, NameGenerator) + + class XfrmTunnelTest(xfrm_base.XfrmLazyTest): - def _CheckTunnelOutput(self, inner_version, outer_version): - """Test a bi-directional XFRM Tunnel with explicit selectors""" + def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid, + netid, local_inner, remote_inner, local_outer, + remote_outer, write_sock): + + write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) + self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, + local_outer, remote_outer) + + def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid, + netid, local_inner, remote_inner, local_outer, + remote_outer, read_sock): + + # The second parameter of the tuple is the port number regardless of AF. + local_port = read_sock.getsockname()[1] + + # Build and receive an ESP packet destined for the inner socket + IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] + input_pkt = ( + IpType(src=remote_inner, dst=local_inner) / scapy.UDP( + sport=1234, dport=local_port) / net_test.UDP_PAYLOAD) + input_pkt = IpType(str(input_pkt)) # Compute length, checksum. + input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, _TEST_IN_SPI, 1, + (remote_outer, local_outer)) + self.ReceivePacketOn(underlying_netid, input_pkt) + + # Verify that the packet data and src are correct + data, src = read_sock.recvfrom(4096) + self.assertEquals(net_test.UDP_PAYLOAD, data) + self.assertEquals(remote_inner, src[0]) + self.assertEquals(1234, src[1]) + + def _TestTunnel(self, inner_version, outer_version, func, direction): + """Test a unidirectional XFRM Tunnel with explicit selectors""" # Select the underlying netid, which represents the external # interface from/to which to route ESP packets. - underlying_netid = self.RandomNetid() + u_netid = self.RandomNetid() # Select a random netid that will originate traffic locally and - # which represents the logical tunnel network. - netid = self.RandomNetid(exclude=underlying_netid) + # which represents the netid on which the plaintext is sent + netid = self.RandomNetid(exclude=u_netid) local_inner = self.MyAddress(inner_version, netid) remote_inner = _GetRemoteInnerAddress(inner_version) - local_outer = self.MyAddress(outer_version, underlying_netid) + local_outer = self.MyAddress(outer_version, u_netid) remote_outer = _GetRemoteOuterAddress(outer_version) + # Create input/ouput SPs, SAs and sockets to simulate a more realistic + # environment. + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, + xfrm.SrcDstSelector(remote_inner, local_inner), + remote_outer, local_outer, _TEST_IN_SPI, + xfrm_base._ALGO_CRYPT_NULL, + xfrm_base._ALGO_AUTH_NULL, None, None, None) + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner), local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - None, underlying_netid, None) + xfrm_base._ALGO_HMAC_SHA1, None, u_netid, None) write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - # Select an interface, which provides the source address of the inner - # packet. self.SelectInterface(write_sock, netid, "mark") - write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) - self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, - local_outer, remote_outer) - - # TODO: Add support for the input path. - - def testIpv4InIpv4TunnelOutput(self): - self._CheckTunnelOutput(4, 4) + read_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) + read_sock.bind((net_test.GetWildcardAddress(inner_version), 0)) + # Guard against the eventuality of the receive failing. + net_test.SetNonBlocking(read_sock.fileno()) - def testIpv4InIpv6TunnelOutput(self): - self._CheckTunnelOutput(4, 6) + sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock + func(inner_version, outer_version, u_netid, netid, local_inner, + remote_inner, local_outer, remote_outer, sock) - def testIpv6InIpv4TunnelOutput(self): - self._CheckTunnelOutput(6, 4) + def ParamTestTunnelInput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, + xfrm.XFRM_POLICY_IN) - def testIpv6InIpv6TunnelOutput(self): - self._CheckTunnelOutput(6, 6) + def ParamTestTunnelOutput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, + xfrm.XFRM_POLICY_OUT) @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") @@ -448,7 +500,7 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): # The second parameter of the tuple is the port number regardless of AF. port = read_sock.getsockname()[1] # Guard against the eventuality of the receive failing. - csocket.SetSocketTimeout(read_sock, 100) + net_test.SetNonBlocking(read_sock.fileno()) # Send a packet out via the tunnel-backed network, bound for the port number # of the input socket. @@ -523,20 +575,21 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") class XfrmVtiTest(XfrmTunnelBase): - INTERFACE_CLASS = VtiInterface + INTERFACE_CLASS = VtiInterface - def testVtiInputOutput(self): - self.CheckTunnelInputOutput() + def testVtiInputOutput(self): + self.CheckTunnelInputOutput() @unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") class XfrmInterfaceTest(XfrmTunnelBase): - INTERFACE_CLASS = XfrmInterface + INTERFACE_CLASS = XfrmInterface - def testXfrmiInputOutput(self): - self.CheckTunnelInputOutput() + def testXfrmiInputOutput(self): + self.CheckTunnelInputOutput() if __name__ == "__main__": + InjectTests() unittest.main() -- cgit v1.2.3 From f005941af83f8c84d88193e88f810e11d2f599e9 Mon Sep 17 00:00:00 2001 From: Pierre Lee Date: Tue, 19 Feb 2019 11:03:43 +0800 Subject: Refactor VTI tests to support null encryption -Split input and output into separate tests -Refactor to inject null encrypted packets while still allowing encrypted packets to be mirrored -Add a check to ensure plaintext packets are not sent. Bug: 123202162 Test: VtsKernelNetTest can pass Change-Id: I7f7dc6ba7d16aca44b73ac0ee456f7ad59e38bca Merged-In: I6cad55f65d32fae9ea17fb44ea796bdcd4aba3a3 --- net/test/xfrm_tunnel_test.py | 516 +++++++++++++++++++++++++++++-------------- 1 file changed, 350 insertions(+), 166 deletions(-) diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index b8da743..652a0c2 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -56,20 +56,22 @@ def HaveXfrmInterfaces(): HAVE_XFRM_INTERFACES = HaveXfrmInterfaces() -# Parameters to Set up VTI as a special network -_BASE_VTI_NETID = {4: 40, 6: 60} +# Parameters to setup tunnels as special networks +_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService +_BASE_TUNNEL_NETID = {4: 40, 6: 60} _BASE_VTI_OKEY = 2000000100 _BASE_VTI_IKEY = 2000000200 -_VTI_NETID = 50 -_VTI_IFNAME = "test_vti" - _TEST_OUT_SPI = 0x1234 _TEST_IN_SPI = _TEST_OUT_SPI _TEST_OKEY = 2000000100 _TEST_IKEY = 2000000200 +_TEST_REMOTE_PORT = 1234 + +_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6} + def _GetLocalInnerAddress(version): return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] @@ -83,8 +85,50 @@ def _GetRemoteOuterAddress(version): return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] +def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer, + src_port, dst_inner, dst_outer, + dst_port, spi, seq_num, ip_hdr_options={}): + ip_hdr_options.update({'src': src_inner, 'dst': dst_inner}) + + # Build and receive an ESP packet destined for the inner socket + IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] + input_pkt = ( + IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) / + net_test.UDP_PAYLOAD) + input_pkt = IpType(str(input_pkt)) # Compute length, checksum. + input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num, + (src_outer, dst_outer)) + + return input_pkt + + +def _CreateReceiveSock(version, port=0): + # Create a socket to receive packets. + read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) + read_sock.bind((net_test.GetWildcardAddress(version), port)) + # The second parameter of the tuple is the port number regardless of AF. + local_port = read_sock.getsockname()[1] + # Guard against the eventuality of the receive failing. + net_test.SetNonBlocking(read_sock.fileno()) + + return read_sock, local_port + + +def _SendPacket(testInstance, netid, version, remote, remote_port): + # Send a packet out via the tunnel-backed network, bound for the port number + # of the input socket. + write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) + testInstance.SelectInterface(write_sock, netid, "mark") + write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port)) + local_port = write_sock.getsockname()[1] + + return local_port + + def InjectTests(): InjectParameterizedTests(XfrmTunnelTest) + InjectParameterizedTests(XfrmInterfaceTest) + InjectParameterizedTests(XfrmVtiTest) def InjectParameterizedTests(cls): @@ -115,20 +159,15 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): local_port = read_sock.getsockname()[1] # Build and receive an ESP packet destined for the inner socket - IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] - input_pkt = ( - IpType(src=remote_inner, dst=local_inner) / scapy.UDP( - sport=1234, dport=local_port) / net_test.UDP_PAYLOAD) - input_pkt = IpType(str(input_pkt)) # Compute length, checksum. - input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, _TEST_IN_SPI, 1, - (remote_outer, local_outer)) + input_pkt = _GetNullAuthCryptTunnelModePkt( + inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT, + local_inner, local_outer, local_port, _TEST_IN_SPI, 1) self.ReceivePacketOn(underlying_netid, input_pkt) # Verify that the packet data and src are correct data, src = read_sock.recvfrom(4096) self.assertEquals(net_test.UDP_PAYLOAD, data) - self.assertEquals(remote_inner, src[0]) - self.assertEquals(1234, src[1]) + self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2]) def _TestTunnel(self, inner_version, outer_version, func, direction): """Test a unidirectional XFRM Tunnel with explicit selectors""" @@ -160,10 +199,7 @@ class XfrmTunnelTest(xfrm_base.XfrmLazyTest): write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) self.SelectInterface(write_sock, netid, "mark") - read_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - read_sock.bind((net_test.GetWildcardAddress(inner_version), 0)) - # Guard against the eventuality of the receive failing. - net_test.SetNonBlocking(read_sock.fileno()) + read_sock, _ = _CreateReceiveSock(inner_version) sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock func(inner_version, outer_version, u_netid, netid, local_inner, @@ -197,38 +233,37 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): netid = self.RandomNetid() local_addr = self.MyAddress(version, netid) self.iproute.CreateVirtualTunnelInterface( - dev_name=_VTI_IFNAME, + dev_name=_TEST_XFRM_IFNAME, local_addr=local_addr, remote_addr=_GetRemoteOuterAddress(version), o_key=_TEST_OKEY, i_key=_TEST_IKEY) - self._VerifyVtiInfoData(self.iproute.GetIfinfoData(_VTI_IFNAME), - version, local_addr, - _GetRemoteOuterAddress(version), - _TEST_IKEY, _TEST_OKEY) + self._VerifyVtiInfoData( + self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, + _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} - new_okey = _TEST_OKEY + _VTI_NETID - new_ikey = _TEST_IKEY + _VTI_NETID + new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID + new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID self.iproute.CreateVirtualTunnelInterface( - dev_name=_VTI_IFNAME, + dev_name=_TEST_XFRM_IFNAME, local_addr=local_addr, remote_addr=new_remote_addr[version], o_key=new_okey, i_key=new_ikey, is_update=True) - self._VerifyVtiInfoData(self.iproute.GetIfinfoData(_VTI_IFNAME), - version, local_addr, new_remote_addr[version], - new_ikey, new_okey) + self._VerifyVtiInfoData( + self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, + new_remote_addr[version], new_ikey, new_okey) - if_index = self.iproute.GetIfIndex(_VTI_IFNAME) + if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) # Validate that the netlink interface matches the ioctl interface. - self.assertEquals(net_test.GetInterfaceIndex(_VTI_IFNAME), if_index) - self.iproute.DeleteLink(_VTI_IFNAME) + self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) + self.iproute.DeleteLink(_TEST_XFRM_IFNAME) with self.assertRaises(IOError): - self.iproute.GetIfIndex(_VTI_IFNAME) + self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) def _QuietDeleteLink(self, ifname): try: @@ -239,59 +274,94 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): def tearDown(self): super(XfrmAddDeleteVtiTest, self).tearDown() - self._QuietDeleteLink(_VTI_IFNAME) + self._QuietDeleteLink(_TEST_XFRM_IFNAME) + +class SaInfo(object): -class VtiInterface(object): + def __init__(self, spi): + self.spi = spi + self.seq_num = 1 - def __init__(self, iface, netid, underlying_netid, _, local, remote): + +class IpSecBaseInterface(object): + + def __init__(self, iface, netid, underlying_netid, local, remote, version): self.iface = iface self.netid = netid self.underlying_netid = underlying_netid self.local, self.remote = local, remote + + # XFRM interfaces technically do not have a version. This keeps track of + # the IP version of the local and remote addresses. + self.version = version self.rx = self.tx = 0 - self.ikey = _TEST_IKEY + netid - self.okey = _TEST_OKEY + netid - self.out_spi = self.in_spi = random.randint(0, 0x7fffffff) + self.addrs = {} self.iproute = iproute.IPRoute() self.xfrm = xfrm.Xfrm() - self.SetupInterface() - self.SetupXfrm() - self.addrs = {} - def Teardown(self): self.TeardownXfrm() self.TeardownInterface() + def TeardownInterface(self): + self.iproute.DeleteLink(self.iface) + + def SetupXfrm(self, use_null_crypt): + rand_spi = random.randint(0, 0x7fffffff) + self.in_sa = SaInfo(rand_spi) + self.out_sa = SaInfo(rand_spi) + + # Select algorithms: + if use_null_crypt: + auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL + else: + auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256 + + self._SetupXfrmByType(auth, crypt) + + def TeardownXfrm(self): + raise NotImplementedError("Subclasses should implement this") + + def _SetupXfrmByType(self, auth_algo, crypt_algo): + raise NotImplementedError("Subclasses should implement this") + + +class VtiInterface(IpSecBaseInterface): + + def __init__(self, iface, netid, underlying_netid, _, local, remote, version): + super(VtiInterface, self).__init__(iface, netid, underlying_netid, local, + remote, version) + + self.ikey = _TEST_IKEY + netid + self.okey = _TEST_OKEY + netid + + self.SetupInterface() + self.SetupXfrm(False) + def SetupInterface(self): return self.iproute.CreateVirtualTunnelInterface( self.iface, self.local, self.remote, self.ikey, self.okey) - def TeardownInterface(self): - self.iproute.DeleteLink(self.iface) - - def SetupXfrm(self): + def _SetupXfrmByType(self, auth_algo, crypt_algo): # For the VTI, the selectors are wildcard since packets will only # be selected if they have the appropriate mark, hence the inner # addresses are wildcard. self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, - self.out_spi, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, + self.out_sa.spi, crypt_algo, auth_algo, xfrm.ExactMatchMark(self.okey), self.underlying_netid, None) self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, - self.in_spi, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, + self.in_sa.spi, crypt_algo, auth_algo, xfrm.ExactMatchMark(self.ikey), None, None) def TeardownXfrm(self): self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, - self.out_spi, self.okey, None) + self.out_sa.spi, self.okey, None) self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, - self.in_spi, self.ikey, None) + self.in_sa.spi, self.ikey, None) @unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") @@ -311,53 +381,36 @@ class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) -class XfrmInterface(object): +class XfrmInterface(IpSecBaseInterface): + + def __init__(self, iface, netid, underlying_netid, ifindex, local, remote, + version): + super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local, + remote, version) - def __init__(self, iface, netid, underlying_netid, ifindex, local, remote): - self.iface = iface - self.netid = netid - self.underlying_netid = underlying_netid self.ifindex = ifindex - self.local, self.remote = local, remote - self.rx = self.tx = 0 self.xfrm_if_id = netid - self.out_spi = self.in_spi = random.randint(0, 0x7fffffff) - self.xfrm_if_id = self.netid - - self.iproute = iproute.IPRoute() - self.xfrm = xfrm.Xfrm() self.SetupInterface() - self.SetupXfrm() - self.addrs = {} - - def Teardown(self): - self.TeardownXfrm() - self.TeardownInterface() + self.SetupXfrm(False) def SetupInterface(self): """Create an XFRM interface.""" return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) - def TeardownInterface(self): - self.iproute.DeleteLink(self.iface) - - def SetupXfrm(self): + def _SetupXfrmByType(self, auth_algo, crypt_algo): self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, - self.out_spi, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, None, + self.out_sa.spi, crypt_algo, auth_algo, None, self.underlying_netid, self.xfrm_if_id) self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, - self.in_spi, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - None, None, self.xfrm_if_id) - + self.in_sa.spi, crypt_algo, auth_algo, None, None, + self.xfrm_if_id) def TeardownXfrm(self): self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, - self.out_spi, None, self.xfrm_if_id) + self.out_sa.spi, None, self.xfrm_if_id) self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, - self.in_spi, None, self.xfrm_if_id) + self.in_sa.spi, None, self.xfrm_if_id) @@ -366,43 +419,54 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): @classmethod def setUpClass(cls): xfrm_base.XfrmBaseTest.setUpClass() - # VTI interfaces use marks extensively, so configure realistic packet + # Tunnel interfaces use marks extensively, so configure realistic packet # marking rules to make the test representative, make PMTUD work, etc. cls.SetInboundMarks(True) cls.SetMarkReflectSysctls(1) - cls.tunnels = {} + # Group by tunnel version to ensure that we test at least one IPv4 and one + # IPv6 tunnel + cls.tunnelsV4 = {} + cls.tunnelsV6 = {} for i, underlying_netid in enumerate(cls.tuns): for version in 4, 6: - netid = _BASE_VTI_NETID[version] + i + netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i iface = "ipsec%s" % netid local = cls.MyAddress(version, underlying_netid) if version == 4: - remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR + remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2) else: - remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR - ifindex = cls.ifindices[underlying_netid] + remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2) + ifindex = cls.ifindices[underlying_netid] tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, - local, remote) - + local, remote, version) cls._SetInboundMarking(netid, iface, True) cls._SetupTunnelNetwork(tunnel, True) - cls.tunnels[netid] = tunnel + + if version == 4: + cls.tunnelsV4[netid] = tunnel + else: + cls.tunnelsV6[netid] = tunnel @classmethod def tearDownClass(cls): # The sysctls are restored by MultinetworkBaseTest.tearDownClass. cls.SetInboundMarks(False) - for tunnel in cls.tunnels.values(): + for tunnel in cls.tunnelsV4.values() + cls.tunnelsV6.values(): cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) cls._SetupTunnelNetwork(tunnel, False) tunnel.Teardown() xfrm_base.XfrmBaseTest.tearDownClass() + def randomTunnel(self, outer_version): + version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 + return random.choice(version_dict.values()) + def setUp(self): multinetwork_base.MultiNetworkBaseTest.setUp(self) self.iproute = iproute.IPRoute() + self.xfrm = xfrm.Xfrm() def tearDown(self): multinetwork_base.MultiNetworkBaseTest.tearDown(self) @@ -422,6 +486,13 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): self.iproute.DelAddress(old_addr, net_test.AddressLengthBits(version), ifindex) + @classmethod + def _GetLocalAddress(cls, version, netid): + if version == 4: + return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET) + else: + return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1" + @classmethod def _SetupTunnelNetwork(cls, tunnel, is_add): """Setup rules and routes for a tunnel Network. @@ -461,10 +532,7 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): table, cls.PRIORITY_FWMARK) # Configure IP addresses. - if version == 4: - addr = cls._MyIPv4Address(tunnel.netid) - else: - addr = cls.OnlinkPrefix(6, tunnel.netid) + "1" + addr = cls._GetLocalAddress(version, tunnel.netid) prefixlen = net_test.AddressLengthBits(version) tunnel.addrs[version] = addr if is_add: @@ -474,84 +542,149 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) cls.iproute.DelAddress(addr, prefixlen, ifindex) - def assertReceivedPacket(self, tunnel): + def assertReceivedPacket(self, tunnel, sa_info): tunnel.rx += 1 self.assertEquals((tunnel.rx, tunnel.tx), self.iproute.GetRxTxPackets(tunnel.iface)) + sa_info.seq_num += 1 - def assertSentPacket(self, tunnel): + def assertSentPacket(self, tunnel, sa_info): tunnel.tx += 1 self.assertEquals((tunnel.rx, tunnel.tx), self.iproute.GetRxTxPackets(tunnel.iface)) + sa_info.seq_num += 1 - # TODO: Should we completely re-write this using null encryption and null - # authentication? We could then assemble and disassemble packets for each - # direction individually. This approach would improve debuggability, avoid the - # complexity of the twister, and allow the test to more-closely validate - # deployable configurations. - def _CheckTunnelInputOutput(self, tunnel, inner_version): - local_outer = tunnel.local - remote_outer = tunnel.remote - - # Create a socket to receive packets. - read_sock = socket( - net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - read_sock.bind((net_test.GetWildcardAddress(inner_version), 0)) - # The second parameter of the tuple is the port number regardless of AF. - port = read_sock.getsockname()[1] - # Guard against the eventuality of the receive failing. - net_test.SetNonBlocking(read_sock.fileno()) - - # Send a packet out via the tunnel-backed network, bound for the port number - # of the input socket. - write_sock = socket( - net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - self.SelectInterface(write_sock, tunnel.netid, "mark") - write_sock.sendto(net_test.UDP_PAYLOAD, - (_GetRemoteInnerAddress(inner_version), port)) + def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner, + sa_info=None): + """Test null-crypt input path over an IPsec interface.""" + if sa_info is None: + sa_info = tunnel.in_sa + read_sock, local_port = _CreateReceiveSock(inner_version) + + input_pkt = _GetNullAuthCryptTunnelModePkt( + inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT, + local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num) + self.ReceivePacketOn(tunnel.underlying_netid, input_pkt) + + # Verify that the packet data and src are correct + self.assertReceivedPacket(tunnel, sa_info) + data, src = read_sock.recvfrom(4096) + self.assertEquals(net_test.UDP_PAYLOAD, data) + self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2]) + + def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, + remote_inner, sa_info=None): + """Test null-crypt output path over an IPsec interface.""" + if sa_info is None: + sa_info = tunnel.out_sa + local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, + _TEST_REMOTE_PORT) # Read a tunneled IP packet on the underlying (outbound) network # verifying that it is an ESP packet. - self.assertSentPacket(tunnel) - pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_spi, tunnel.tx, None, - local_outer, remote_outer) - - # Perform an address switcheroo so that the inner address of the remote - # end of the tunnel is now the address on the local tunnel interface; this - # way, the twisted inner packet finds a destination via the tunnel once - # decrypted. - remote = _GetRemoteInnerAddress(inner_version) - local = tunnel.addrs[inner_version] - self._SwapInterfaceAddress(tunnel.iface, new_addr=remote, old_addr=local) + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, + sa_info.seq_num, None, tunnel.local, + tunnel.remote) + + # Get and update the IP headers on the inner payload so that we can do a simple + # comparison of byte data. Unfortunately, due to the scapy version this runs on, + # we cannot parse past the ESP header to the inner IP header, and thus have to + # workaround in this manner + if inner_version == 4: + ip_hdr_options = { + 'id': scapy.IP(str(pkt.payload)[8:]).id, + 'flags': scapy.IP(str(pkt.payload)[8:]).flags + } + else: + ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl} + + expected = _GetNullAuthCryptTunnelModePkt( + inner_version, local_inner, tunnel.local, local_port, remote_inner, + tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num, + ip_hdr_options) + + # Check outer header manually (Avoids having to overwrite outer header's + # id, flags or flow label) + self.assertSentPacket(tunnel, sa_info) + self.assertEquals(expected.src, pkt.src) + self.assertEquals(expected.dst, pkt.dst) + self.assertEquals(len(expected), len(pkt)) + + # Check everything else + self.assertEquals(str(expected.payload), str(pkt.payload)) + + def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, + remote_inner): + """Test both input and output paths over an encrypted IPsec interface. + + This tests specifically makes sure that the both encryption and decryption + work together, as opposed to the _CheckTunnel(Input|Output) where the + input and output paths are tested separately, and using null encryption. + """ + src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, + _TEST_REMOTE_PORT) + + # Make sure it appeared on the underlying interface + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi, + tunnel.out_sa.seq_num, None, tunnel.local, + tunnel.remote) + + # Check that packet is not sent in plaintext + self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt)) + + # Check src/dst + self.assertEquals(tunnel.local, pkt.src) + self.assertEquals(tunnel.remote, pkt.dst) + + # Check that the interface statistics recorded the outbound packet + self.assertSentPacket(tunnel, tunnel.out_sa) + try: - # Swap the packet's IP headers and write it back to the - # underlying network. + # Swap the interface addresses to pretend we are the remote + self._SwapInterfaceAddress( + tunnel.iface, new_addr=remote_inner, old_addr=local_inner) + + # Swap the packet's IP headers and write it back to the underlying + # network. pkt = TunTwister.TwistPacket(pkt) + read_sock, local_port = _CreateReceiveSock(inner_version, + _TEST_REMOTE_PORT) self.ReceivePacketOn(tunnel.underlying_netid, pkt) - self.assertReceivedPacket(tunnel) - # Receive the decrypted packet on the dest port number. - read_packet = read_sock.recv(4096) - self.assertEquals(read_packet, net_test.UDP_PAYLOAD) - self.assertReceivedPacket(vti) - finally: - # Unwind the switcheroo - self._SwapInterfaceAddress(tunnel.iface, new_addr=local, old_addr=remote) + # Verify that the packet data and src are correct + data, src = read_sock.recvfrom(4096) + self.assertEquals(net_test.UDP_PAYLOAD, data) + self.assertEquals((local_inner, src_port), src[:2]) + + # Check that the interface statistics recorded the inbound packet + self.assertReceivedPacket(tunnel, tunnel.in_sa) + finally: + # Swap the interface addresses to pretend we are the remote + self._SwapInterfaceAddress( + tunnel.iface, new_addr=local_inner, old_addr=remote_inner) + + def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner, + sa_info=None): + """Test ICMP error path over an IPsec interface.""" + if sa_info is None: + sa_info = tunnel.out_sa # Now attempt to provoke an ICMP error. # TODO: deduplicate with multinetwork_test.py. - version = net_test.GetAddressVersion(tunnel.remote) dst_prefix, intermediate = { 4: ("172.19.", "172.16.9.12"), 6: ("2001:db8::", "2001:db8::1") - }[version] - - write_sock.sendto(net_test.UDP_PAYLOAD, - (_GetRemoteInnerAddress(inner_version), port)) - self.assertSentPacket(tunnel) - pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_spi, tunnel.tx, None, - local_outer, remote_outer) - myaddr = self.MyAddress(version, tunnel.underlying_netid) - _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt) + }[tunnel.version] + + local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, + _TEST_REMOTE_PORT) + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, + sa_info.seq_num, None, tunnel.local, + tunnel.remote) + self.assertSentPacket(tunnel, sa_info) + + myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid) + _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr, + pkt) self.ReceivePacketOn(tunnel.underlying_netid, toobig) # Check that the packet too big reduced the MTU. @@ -562,14 +695,35 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) # Clear PMTU information so that future tests don't have to worry about it. - self.InvalidateDstCache(version, tunnel.underlying_netid) + self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) + + def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner, + remote_inner): + """Test combined encryption path with ICMP errors over an IPsec tunnel""" + self._CheckTunnelEncryption(tunnel, inner_version, local_inner, + remote_inner) + self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner) + self._CheckTunnelEncryption(tunnel, inner_version, local_inner, + remote_inner) + + def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): + """Bootstrap method to setup and run tests for the given parameters.""" + tunnel = self.randomTunnel(outer_version) + + try: + tunnel.TeardownXfrm() + tunnel.SetupXfrm(use_null_crypt) + + local_inner = tunnel.addrs[inner_version] + remote_inner = _GetRemoteInnerAddress(inner_version) - def CheckTunnelInputOutput(self): - """Test packet input and output over a Virtual Tunnel Interface.""" - for i in xrange(3 * len(self.tunnels.values())): - tunnel = random.choice(self.tunnels.values()) - self._CheckTunnelInputOutput(tunnel, 4) - self._CheckTunnelInputOutput(tunnel, 6) + # Run twice to ensure sequence numbers are tested + for i in range(2): + func(tunnel, inner_version, local_inner, remote_inner) + finally: + if use_null_crypt: + tunnel.TeardownXfrm() + tunnel.SetupXfrm(False) @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") @@ -577,8 +731,23 @@ class XfrmVtiTest(XfrmTunnelBase): INTERFACE_CLASS = VtiInterface - def testVtiInputOutput(self): - self.CheckTunnelInputOutput() + def ParamTestVtiInput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) + + def ParamTestVtiOutput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, + True) + + def ParamTestVtiInOutEncrypted(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, + False) + + def ParamTestVtiIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) + + def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, + self._CheckTunnelEncryptionWithIcmp, False) @unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") @@ -586,8 +755,23 @@ class XfrmInterfaceTest(XfrmTunnelBase): INTERFACE_CLASS = XfrmInterface - def testXfrmiInputOutput(self): - self.CheckTunnelInputOutput() + def ParamTestXfrmIntfInput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) + + def ParamTestXfrmIntfOutput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, + True) + + def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, + False) + + def ParamTestXfrmIntfIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) + + def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, + self._CheckTunnelEncryptionWithIcmp, False) if __name__ == "__main__": -- cgit v1.2.3 From 6b0afa1c38b4be581fd5c390fb4977ff9e1ef169 Mon Sep 17 00:00:00 2001 From: Alistair Strachan Date: Thu, 10 May 2018 11:59:26 -0700 Subject: Add __NR_bpf constant for i686. This adds support for running the net tests on 32-bit x86. Bug: 79532682 Bug: 117749037 Test: run vts -m VtsKernelNetTest Test result after change: PASSED: 2, FAILED: 0, MODULES: 2 of 2 Signed-off-by: Alistair Strachan Change-Id: I5a8ade3a2292734001dd6e85f9c6b5cefc8de9c1 Merged-in: I5a8ade3a2292734001dd6e85f9c6b5cefc8de9c1 (cherry picked from commit 7993a85525c0ce459f424a2152ea24b972c6acdc) --- net/test/bpf.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/net/test/bpf.py b/net/test/bpf.py index 5e90daa..9dc3c46 100755 --- a/net/test/bpf.py +++ b/net/test/bpf.py @@ -28,7 +28,9 @@ __NR_bpf = { "aarch64": 280, "armv7l": 386, "armv8l": 386, - "x86_64": 321}[os.uname()[4]] + "i686" : 357, + "x86_64": 321, +}[os.uname()[4]] LOG_LEVEL = 1 LOG_SIZE = 65536 -- cgit v1.2.3 From 30172dd99b881717b91ee4d8e2f47d8b50fd4387 Mon Sep 17 00:00:00 2001 From: Pierre Lee Date: Mon, 18 Feb 2019 16:55:54 +0800 Subject: Extend delay probe time testVtiRekey_IPv6_in_IPv6 and testVtiRekey_IPv4_in_IPv6 failed on a poor performance platform. They failed because one redundant NS packet is sent to probe neighbor host's mac address. During the test, RA is received, neighbor table's state became NUD_STALE. After one UDP packet is sent, state is changed to NUD_DELAY. There is 5s interval before state is changed to NUD_PROBE. If it is NUD_PROBE, NS is sent certainly. Some poor platform cannot send successfully all UDP packets in 5s, so NS is necessary to probe, since it is unexpected. For such platform, 10s is enough to finish to send all, so extend delay probe time Bug: 123202162 Test: VtsKernelNetTest can pass Change-Id: I98047d08e89896958de60f66e9b286564b52046f --- net/test/xfrm_tunnel_test.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index 652a0c2..5f243cc 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -444,6 +444,11 @@ class XfrmTunnelBase(xfrm_base.XfrmBaseTest): cls._SetInboundMarking(netid, iface, True) cls._SetupTunnelNetwork(tunnel, True) + # On slower platforms, the test does not complete before the delay probe time fires. + # This causes the test to fail because of the unexpected NUD packet. b/123202162 + if version == 6: + cls.SetSysctl("/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" + % cls.GetInterfaceName(underlying_netid) , 10) if version == 4: cls.tunnelsV4[netid] = tunnel else: -- cgit v1.2.3 From f1a12c86506fd5faff559aef0aa2fcef81f2553e Mon Sep 17 00:00:00 2001 From: Benedict Wong Date: Tue, 3 Jul 2018 15:27:26 -0700 Subject: Refactor parameterization logic in net tests This change refactors the logic to create parameterized tests in the kernel unit tests. Logic for name generation is left to classes, while common code for test injection is moved to the utility class Bug: 66467511 Test: Ran net tests Change-Id: I7eba57c616145246637beefac3aca16f9e2e899e (cherry picked from commit ad7a31a77695b60bdcd223df568a2b921acc41b0) --- net/test/parameterization_test.py | 83 +++++++++++++++++++++++++++++ net/test/util.py | 57 +++++++++++++++++++- net/test/xfrm_algorithm_test.py | 109 ++++++++++++-------------------------- 3 files changed, 173 insertions(+), 76 deletions(-) create mode 100755 net/test/parameterization_test.py diff --git a/net/test/parameterization_test.py b/net/test/parameterization_test.py new file mode 100755 index 0000000..8f9e130 --- /dev/null +++ b/net/test/parameterization_test.py @@ -0,0 +1,83 @@ +#!/usr/bin/python +# +# Copyright 2018 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import unittest + +import net_test +import util + + +def InjectTests(): + ParmeterizationTest.InjectTests() + + +# This test class ensures that the Parameterized Test generator in utils.py +# works properly. It injects test methods into itself, and ensures that they +# are generated as expected, and that the TestClosures being run are properly +# defined, and running different parameterized tests each time. +class ParmeterizationTest(net_test.NetworkTest): + tests_run_list = [] + + @staticmethod + def NameGenerator(a, b, c): + return str(a) + "_" + str(b) + "_" + str(c) + + @classmethod + def InjectTests(cls): + PARAMS_A = (1, 2) + PARAMS_B = (3, 4) + PARAMS_C = (5, 6) + + param_list = itertools.product(PARAMS_A, PARAMS_B, PARAMS_C) + util.InjectParameterizedTest(cls, param_list, cls.NameGenerator) + + def ParamTestDummyFunc(self, a, b, c): + self.tests_run_list.append( + "testDummyFunc_" + ParmeterizationTest.NameGenerator(a, b, c)) + + def testParameterization(self): + expected = [ + "testDummyFunc_1_3_5", + "testDummyFunc_1_3_6", + "testDummyFunc_1_4_5", + "testDummyFunc_1_4_6", + "testDummyFunc_2_3_5", + "testDummyFunc_2_3_6", + "testDummyFunc_2_4_5", + "testDummyFunc_2_4_6", + ] + + actual = [name for name in dir(self) if name.startswith("testDummyFunc")] + + # Check that name and contents are equal + self.assertEqual(len(expected), len(actual)) + self.assertEqual(sorted(expected), sorted(actual)) + + # Start a clean list, and run all the tests. + self.tests_run_list = list() + for test_name in expected: + test_method = getattr(self, test_name) + test_method() + + # Make sure all tests have been run with the correct parameters + for test_name in expected: + self.assertTrue(test_name in self.tests_run_list) + + +if __name__ == "__main__": + ParmeterizationTest.InjectTests() + unittest.main() diff --git a/net/test/util.py b/net/test/util.py index bed3e1d..cbcd2d0 100644 --- a/net/test/util.py +++ b/net/test/util.py @@ -13,4 +13,59 @@ # limitations under the License. def GetPadLength(block_size, length): - return (block_size - (length % block_size)) % block_size \ No newline at end of file + return (block_size - (length % block_size)) % block_size + + +def InjectParameterizedTest(cls, param_list, name_generator): + """Injects parameterized tests into the provided class + + This method searches for all tests that start with the name "ParamTest", + and injects a test method for each set of parameters in param_list. Names + are generated via the use of the name_generator. + + Args: + cls: the class for which to inject all parameterized tests + param_list: a list of tuples, where each tuple is a combination of + of parameters to test (i.e. representing a single test case) + name_generator: A function that takes a combination of parameters and + returns a string that identifies the test case. + """ + param_test_names = [name for name in dir(cls) if name.startswith("ParamTest")] + + # Force param_list to an actual list; otherwise itertools.Product will hit + # the end, resulting in only the first ParamTest* method actually being + # parameterized + param_list = list(param_list) + + # Parameterize each test method starting with "ParamTest" + for test_name in param_test_names: + func = getattr(cls, test_name) + + for params in param_list: + # Give the test method a readable, debuggable name. + param_string = name_generator(*params) + new_name = "%s_%s" % (func.__name__.replace("ParamTest", "test"), + param_string) + new_name = new_name.replace("(", "-").replace(")", "") # remove parens + + # Inject the test method + setattr(cls, new_name, _GetTestClosure(func, params)) + + +def _GetTestClosure(func, params): + """ Creates a no-argument test method for the given function and parameters. + + This is required to be separate from the InjectParameterizedTest method, due + to some interesting scoping issues with internal function declarations. If + left in InjectParameterizedTest, all the tests end up using the same + instance of TestClosure + + Args: + func: the function for which this test closure should run + params: the parameters for the run of this test function + """ + + def TestClosure(self): + func(self, *params) + + return TestClosure diff --git a/net/test/xfrm_algorithm_test.py b/net/test/xfrm_algorithm_test.py index 6adc461..0176265 100755 --- a/net/test/xfrm_algorithm_test.py +++ b/net/test/xfrm_algorithm_test.py @@ -27,6 +27,7 @@ import unittest import multinetwork_base import net_test from tun_twister import TapTwister +import util import xfrm import xfrm_base @@ -72,49 +73,26 @@ AEAD_ALGOS = [ ] def InjectTests(): - XfrmAlgorithmTest.InjectTests() + XfrmAlgorithmTest.InjectTests() + class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): @classmethod def InjectTests(cls): - """Inject parameterized test cases into this class. - - Because a library for parameterized testing is not availble in - net_test.rootfs.20150203, this does a minimal parameterization. - - This finds methods named like "ParamTestFoo" and replaces them with several - "testFoo(*)" methods taking different parameter dicts. A set of test - parameters is generated from every combination of encryption, - authentication, IP version, and TCP/UDP. - - The benefit of this approach is that an individually failing tests have a - clearly separated stack trace, and one failed test doesn't prevent the rest - from running. - """ - param_test_names = [ - name for name in dir(cls) if name.startswith("ParamTest") - ] VERSIONS = (4, 6) TYPES = (SOCK_DGRAM, SOCK_STREAM) # Tests all combinations of auth & crypt. Mutually exclusive with aead. - for crypt, auth, version, proto, name in itertools.product( - CRYPT_ALGOS, AUTH_ALGOS, VERSIONS, TYPES, param_test_names): - XfrmAlgorithmTest.InjectSingleTest(name, version, proto, crypt=crypt, auth=auth) + param_list = itertools.product(VERSIONS, TYPES, AUTH_ALGOS, CRYPT_ALGOS, + [None]) + util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator) # Tests all combinations of aead. Mutually exclusive with auth/crypt. - for aead, version, proto, name in itertools.product( - AEAD_ALGOS, VERSIONS, TYPES, param_test_names): - XfrmAlgorithmTest.InjectSingleTest(name, version, proto, aead=aead) - - @classmethod - def InjectSingleTest(cls, name, version, proto, crypt=None, auth=None, aead=None): - func = getattr(cls, name) - - def TestClosure(self): - func(self, {"crypt": crypt, "auth": auth, "aead": aead, - "version": version, "proto": proto}) + param_list = itertools.product(VERSIONS, TYPES, [None], [None], AEAD_ALGOS) + util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator) + @staticmethod + def TestNameGenerator(version, proto, auth, crypt, aead): # Produce a unique and readable name for each test. e.g. # testSocketPolicySimple_cbc-aes_256_hmac-sha512_512_256_IPv6_UDP param_string = "" @@ -131,12 +109,9 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): param_string += "%s_%s" % ("IPv4" if version == 4 else "IPv6", "UDP" if proto == SOCK_DGRAM else "TCP") - new_name = "%s_%s" % (func.__name__.replace("ParamTest", "test"), - param_string) - new_name = new_name.replace("(", "-").replace(")", "") # remove parens - setattr(cls, new_name, TestClosure) + return param_string - def ParamTestSocketPolicySimple(self, params): + def ParamTestSocketPolicySimple(self, version, proto, auth, crypt, aead): """Test two-way traffic using transport mode and socket policies.""" def AssertEncrypted(packet): @@ -153,37 +128,21 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): # other using transport mode ESP. Because of TapTwister, both sockets # perceive each other as owning "remote_addr". netid = self.RandomNetid() - family = net_test.GetAddressFamily(params["version"]) - local_addr = self.MyAddress(params["version"], netid) - remote_addr = self.GetRemoteSocketAddress(params["version"]) - crypt_left = (xfrm.XfrmAlgo(( - params["crypt"].name, - params["crypt"].key_len)), - os.urandom(params["crypt"].key_len / 8)) if params["crypt"] else None - crypt_right = (xfrm.XfrmAlgo(( - params["crypt"].name, - params["crypt"].key_len)), - os.urandom(params["crypt"].key_len / 8)) if params["crypt"] else None - auth_left = (xfrm.XfrmAlgoAuth(( - params["auth"].name, - params["auth"].key_len, - params["auth"].trunc_len)), - os.urandom(params["auth"].key_len / 8)) if params["auth"] else None - auth_right = (xfrm.XfrmAlgoAuth(( - params["auth"].name, - params["auth"].key_len, - params["auth"].trunc_len)), - os.urandom(params["auth"].key_len / 8)) if params["auth"] else None - aead_left = (xfrm.XfrmAlgoAead(( - params["aead"].name, - params["aead"].key_len, - params["aead"].icv_len)), - os.urandom(params["aead"].key_len / 8)) if params["aead"] else None - aead_right = (xfrm.XfrmAlgoAead(( - params["aead"].name, - params["aead"].key_len, - params["aead"].icv_len)), - os.urandom(params["aead"].key_len / 8)) if params["aead"] else None + family = net_test.GetAddressFamily(version) + local_addr = self.MyAddress(version, netid) + remote_addr = self.GetRemoteSocketAddress(version) + auth_left = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)), + os.urandom(auth.key_len / 8)) if auth else None + auth_right = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)), + os.urandom(auth.key_len / 8)) if auth else None + crypt_left = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)), + os.urandom(crypt.key_len / 8)) if crypt else None + crypt_right = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)), + os.urandom(crypt.key_len / 8)) if crypt else None + aead_left = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)), + os.urandom(aead.key_len / 8)) if aead else None + aead_right = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)), + os.urandom(aead.key_len / 8)) if aead else None spi_left = 0xbeefface spi_right = 0xcafed00d req_ids = [100, 200, 300, 400] # Used to match templates and SAs. @@ -242,20 +201,20 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): output_mark=None) # Make two sockets. - sock_left = socket(family, params["proto"], 0) + sock_left = socket(family, proto, 0) sock_left.settimeout(2.0) sock_left.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.SelectInterface(sock_left, netid, "mark") - sock_right = socket(family, params["proto"], 0) + sock_right = socket(family, proto, 0) sock_right.settimeout(2.0) sock_right.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.SelectInterface(sock_right, netid, "mark") # For UDP, set SO_LINGER to 0, to prevent TCP sockets from hanging around # in a TIME_WAIT state. - if params["proto"] == SOCK_STREAM: - net_test.DisableFinWait(sock_left) - net_test.DisableFinWait(sock_right) + if proto == SOCK_STREAM: + net_test.DisableFinWait(sock_left) + net_test.DisableFinWait(sock_right) # Apply the left outbound socket policy. xfrm_base.ApplySocketPolicy(sock_left, family, xfrm.XFRM_POLICY_OUT, @@ -302,14 +261,14 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): sock.close() # Server and client need to know each other's port numbers in advance. - wildcard_addr = net_test.GetWildcardAddress(params["version"]) + wildcard_addr = net_test.GetWildcardAddress(version) sock_left.bind((wildcard_addr, 0)) sock_right.bind((wildcard_addr, 0)) left_port = sock_left.getsockname()[1] right_port = sock_right.getsockname()[1] # Start the appropriate server type on sock_right. - target = TcpServer if params["proto"] == SOCK_STREAM else UdpServer + target = TcpServer if proto == SOCK_STREAM else UdpServer server = threading.Thread( target=target, args=(sock_right, left_port), -- cgit v1.2.3