diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2019-07-08 23:24:51 +0000 |
---|---|---|
committer | android-build-team Robot <android-build-team-robot@google.com> | 2019-07-08 23:24:51 +0000 |
commit | 776c2fe2038c881d5c2e7f829a1bd0c4b4807294 (patch) | |
tree | 06d8c097803925948575a36f2bf67f9773b2ddc0 | |
parent | ef1b71e21270dcb334e577dab3a9e51321daff5f (diff) | |
parent | e90514f71460097fa3c1058cb73d78238c121433 (diff) | |
download | tests-pie-platform-release.tar.gz |
Snap for 5622519 from e90514f71460097fa3c1058cb73d78238c121433 to pi-platform-releasepie-platform-release
Change-Id: Ia179f195a05965e129c2b7e0452aed0519a4a62c
-rwxr-xr-x | net/test/bpf.py | 4 | ||||
-rw-r--r-- | net/test/iproute.py | 26 | ||||
-rwxr-xr-x | net/test/parameterization_test.py | 83 | ||||
-rwxr-xr-x | net/test/run_net_test.sh | 2 | ||||
-rw-r--r-- | net/test/util.py | 57 | ||||
-rwxr-xr-x | net/test/xfrm.py | 74 | ||||
-rwxr-xr-x | net/test/xfrm_algorithm_test.py | 109 | ||||
-rwxr-xr-x | net/test/xfrm_test.py | 6 | ||||
-rwxr-xr-x | net/test/xfrm_tunnel_test.py | 710 |
9 files changed, 796 insertions, 275 deletions
diff --git a/net/test/bpf.py b/net/test/bpf.py index 5e90daa..9dc3c46 100755 --- a/net/test/bpf.py +++ b/net/test/bpf.py @@ -28,7 +28,9 @@ __NR_bpf = { "aarch64": 280, "armv7l": 386, "armv8l": 386, - "x86_64": 321}[os.uname()[4]] + "i686" : 357, + "x86_64": 321, +}[os.uname()[4]] LOG_LEVEL = 1 LOG_SIZE = 65536 diff --git a/net/test/iproute.py b/net/test/iproute.py index a3310a2..8376eb6 100644 --- a/net/test/iproute.py +++ b/net/test/iproute.py @@ -209,13 +209,17 @@ IFLA_PAD = 42 IFLA_XDP = 43 IFLA_EVENT = 44 -# linux/include/uapi/if_link.h +# include/uapi/linux/if_link.h IFLA_INFO_UNSPEC = 0 IFLA_INFO_KIND = 1 IFLA_INFO_DATA = 2 IFLA_INFO_XSTATS = 3 -# linux/if_tunnel.h +IFLA_XFRM_UNSPEC = 0 +IFLA_XFRM_LINK = 1 +IFLA_XFRM_IF_ID = 2 + +# include/uapi/linux/if_tunnel.h IFLA_VTI_UNSPEC = 0 IFLA_VTI_LINK = 1 IFLA_VTI_IKEY = 2 @@ -682,7 +686,7 @@ class IPRoute(netlink.NetlinkSocket): attrs = self._ParseAttributes(RTM_NEWLINK, IfinfoMsg, attrs) return attrs["IFLA_STATS64"] - def GetVtiInfoData(self, dev_name): + def GetIfinfoData(self, dev_name): """Returns an IFLA_INFO_DATA dict object for the specified interface.""" _, attrs = self.GetIfinfo(dev_name) attrs = self._ParseAttributes(RTM_NEWLINK, IfinfoMsg, attrs) @@ -745,6 +749,22 @@ class IPRoute(netlink.NetlinkSocket): flags |= netlink.NLM_F_EXCL return self._SendNlRequest(RTM_NEWLINK, ifinfo, flags) + def CreateXfrmInterface(self, dev_name, xfrm_if_id, underlying_ifindex): + """Creates an XFRM interface with the specified parameters.""" + # The netlink attribute structure is essentially identical to the one + # for VTI above (q.v). + ifdata = self._NlAttrU32(IFLA_XFRM_LINK, underlying_ifindex) + ifdata += self._NlAttrU32(IFLA_XFRM_IF_ID, xfrm_if_id) + + linkinfo = self._NlAttrStr(IFLA_INFO_KIND, "xfrm") + linkinfo += self._NlAttr(IFLA_INFO_DATA, ifdata) + + msg = IfinfoMsg().Pack() + msg += self._NlAttrStr(IFLA_IFNAME, dev_name) + msg += self._NlAttr(IFLA_LINKINFO, linkinfo) + + return self._SendNlRequest(RTM_NEWLINK, msg) + if __name__ == "__main__": iproute = IPRoute() diff --git a/net/test/parameterization_test.py b/net/test/parameterization_test.py new file mode 100755 index 0000000..8f9e130 --- /dev/null +++ b/net/test/parameterization_test.py @@ -0,0 +1,83 @@ +#!/usr/bin/python +# +# Copyright 2018 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import unittest + +import net_test +import util + + +def InjectTests(): + ParmeterizationTest.InjectTests() + + +# This test class ensures that the Parameterized Test generator in utils.py +# works properly. It injects test methods into itself, and ensures that they +# are generated as expected, and that the TestClosures being run are properly +# defined, and running different parameterized tests each time. +class ParmeterizationTest(net_test.NetworkTest): + tests_run_list = [] + + @staticmethod + def NameGenerator(a, b, c): + return str(a) + "_" + str(b) + "_" + str(c) + + @classmethod + def InjectTests(cls): + PARAMS_A = (1, 2) + PARAMS_B = (3, 4) + PARAMS_C = (5, 6) + + param_list = itertools.product(PARAMS_A, PARAMS_B, PARAMS_C) + util.InjectParameterizedTest(cls, param_list, cls.NameGenerator) + + def ParamTestDummyFunc(self, a, b, c): + self.tests_run_list.append( + "testDummyFunc_" + ParmeterizationTest.NameGenerator(a, b, c)) + + def testParameterization(self): + expected = [ + "testDummyFunc_1_3_5", + "testDummyFunc_1_3_6", + "testDummyFunc_1_4_5", + "testDummyFunc_1_4_6", + "testDummyFunc_2_3_5", + "testDummyFunc_2_3_6", + "testDummyFunc_2_4_5", + "testDummyFunc_2_4_6", + ] + + actual = [name for name in dir(self) if name.startswith("testDummyFunc")] + + # Check that name and contents are equal + self.assertEqual(len(expected), len(actual)) + self.assertEqual(sorted(expected), sorted(actual)) + + # Start a clean list, and run all the tests. + self.tests_run_list = list() + for test_name in expected: + test_method = getattr(self, test_name) + test_method() + + # Make sure all tests have been run with the correct parameters + for test_name in expected: + self.assertTrue(test_name in self.tests_run_list) + + +if __name__ == "__main__": + ParmeterizationTest.InjectTests() + unittest.main() diff --git a/net/test/run_net_test.sh b/net/test/run_net_test.sh index 8c256b4..c7c18d3 100755 --- a/net/test/run_net_test.sh +++ b/net/test/run_net_test.sh @@ -30,7 +30,7 @@ OPTIONS="$OPTIONS TRANSPORT INET_XFRM_MODE_TUNNEL INET6_AH INET6_ESP" OPTIONS="$OPTIONS INET6_XFRM_MODE_TRANSPORT INET6_XFRM_MODE_TUNNEL" OPTIONS="$OPTIONS CRYPTO_SHA256 CRYPTO_SHA512 CRYPTO_AES_X86_64 CRYPTO_NULL" OPTIONS="$OPTIONS CRYPTO_GCM CRYPTO_ECHAINIV NET_IPVTI IPV6_VTI" -OPTIONS="$OPTIONS SOCK_CGROUP_DATA CGROUP_BPF" +OPTIONS="$OPTIONS SOCK_CGROUP_DATA CGROUP_BPF CONFIG_XFRM_INTERFACE" # For 4.14 kernels, where UBD and HOSTFS are not set OPTIONS="$OPTIONS CONFIG_BLK_DEV_UBD CONFIG_HOSTFS" diff --git a/net/test/util.py b/net/test/util.py index bed3e1d..cbcd2d0 100644 --- a/net/test/util.py +++ b/net/test/util.py @@ -13,4 +13,59 @@ # limitations under the License. def GetPadLength(block_size, length): - return (block_size - (length % block_size)) % block_size
\ No newline at end of file + return (block_size - (length % block_size)) % block_size + + +def InjectParameterizedTest(cls, param_list, name_generator): + """Injects parameterized tests into the provided class + + This method searches for all tests that start with the name "ParamTest", + and injects a test method for each set of parameters in param_list. Names + are generated via the use of the name_generator. + + Args: + cls: the class for which to inject all parameterized tests + param_list: a list of tuples, where each tuple is a combination of + of parameters to test (i.e. representing a single test case) + name_generator: A function that takes a combination of parameters and + returns a string that identifies the test case. + """ + param_test_names = [name for name in dir(cls) if name.startswith("ParamTest")] + + # Force param_list to an actual list; otherwise itertools.Product will hit + # the end, resulting in only the first ParamTest* method actually being + # parameterized + param_list = list(param_list) + + # Parameterize each test method starting with "ParamTest" + for test_name in param_test_names: + func = getattr(cls, test_name) + + for params in param_list: + # Give the test method a readable, debuggable name. + param_string = name_generator(*params) + new_name = "%s_%s" % (func.__name__.replace("ParamTest", "test"), + param_string) + new_name = new_name.replace("(", "-").replace(")", "") # remove parens + + # Inject the test method + setattr(cls, new_name, _GetTestClosure(func, params)) + + +def _GetTestClosure(func, params): + """ Creates a no-argument test method for the given function and parameters. + + This is required to be separate from the InjectParameterizedTest method, due + to some interesting scoping issues with internal function declarations. If + left in InjectParameterizedTest, all the tests end up using the same + instance of TestClosure + + Args: + func: the function for which this test closure should run + params: the parameters for the run of this test function + """ + + def TestClosure(self): + func(self, *params) + + return TestClosure diff --git a/net/test/xfrm.py b/net/test/xfrm.py index 1bd10da..6a03c13 100755 --- a/net/test/xfrm.py +++ b/net/test/xfrm.py @@ -85,6 +85,8 @@ XFRMA_ADDRESS_FILTER = 26 XFRMA_PAD = 27 XFRMA_OFFLOAD_DEV = 28 XFRMA_OUTPUT_MARK = 29 +XFRMA_INPUT_MARK = 30 +XFRMA_IF_ID = 31 # Other netlink constants. See include/uapi/linux/xfrm.h. @@ -369,21 +371,25 @@ class Xfrm(netlink.NetlinkSocket): data = struct.unpack("=I", nla_data)[0] elif name == "XFRMA_TMPL": data = cstruct.Read(nla_data, XfrmUserTmpl)[0] + elif name == "XFRMA_IF_ID": + data = struct.unpack("=I", nla_data)[0] else: data = nla_data return name, data - def _UpdatePolicyInfo(self, msg, policy, tmpl, mark): + def _UpdatePolicyInfo(self, msg, policy, tmpl, mark, xfrm_if_id): """Send a policy to the Security Policy Database""" nlattrs = [] if tmpl is not None: nlattrs.append((XFRMA_TMPL, tmpl)) if mark is not None: nlattrs.append((XFRMA_MARK, mark)) + if xfrm_if_id is not None: + nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id))) self.SendXfrmNlRequest(msg, policy, nlattrs) - def AddPolicyInfo(self, policy, tmpl, mark): + def AddPolicyInfo(self, policy, tmpl, mark, xfrm_if_id=None): """Add a new policy to the Security Policy Database If the policy exists, then return an error (EEXIST). @@ -392,10 +398,11 @@ class Xfrm(netlink.NetlinkSocket): policy: an unpacked XfrmUserpolicyInfo tmpl: an unpacked XfrmUserTmpl mark: an unpacked XfrmMark + xfrm_if_id: the XFRM interface ID as an integer, or None """ - self._UpdatePolicyInfo(XFRM_MSG_NEWPOLICY, policy, tmpl, mark) + self._UpdatePolicyInfo(XFRM_MSG_NEWPOLICY, policy, tmpl, mark, xfrm_if_id) - def UpdatePolicyInfo(self, policy, tmpl, mark): + def UpdatePolicyInfo(self, policy, tmpl, mark, xfrm_if_id): """Update an existing policy in the Security Policy Database If the policy does not exist, then create it; otherwise, update the @@ -405,10 +412,11 @@ class Xfrm(netlink.NetlinkSocket): policy: an unpacked XfrmUserpolicyInfo tmpl: an unpacked XfrmUserTmpl to update mark: an unpacked XfrmMark to match the existing policy or None + xfrm_if_id: an XFRM interface ID or None """ - self._UpdatePolicyInfo(XFRM_MSG_UPDPOLICY, policy, tmpl, mark) + self._UpdatePolicyInfo(XFRM_MSG_UPDPOLICY, policy, tmpl, mark, xfrm_if_id) - def DeletePolicyInfo(self, selector, direction, mark): + def DeletePolicyInfo(self, selector, direction, mark, xfrm_if_id=None): """Delete a policy from the Security Policy Database Args: @@ -419,6 +427,8 @@ class Xfrm(netlink.NetlinkSocket): nlattrs = [] if mark is not None: nlattrs.append((XFRMA_MARK, mark)) + if xfrm_if_id is not None: + nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id))) self.SendXfrmNlRequest(XFRM_MSG_DELPOLICY, XfrmUserpolicyId(sel=selector, dir=direction), nlattrs) @@ -440,11 +450,35 @@ class Xfrm(netlink.NetlinkSocket): if nlattrs is None: nlattrs = [] for attr_type, attr_msg in nlattrs: - msg += self._NlAttr(attr_type, attr_msg.Pack()) + # TODO: find a better way to deal with the fact that many XFRM messages + # use nlattrs that aren't cstructs. + # + # This code allows callers to pass in either something that has a Pack() + # method or a packed netlink attr, but not other types of attributes. + # Alternatives include: + # + # 1. Require callers to marshal netlink attributes themselves and call + # _SendNlRequest directly. Delete this method. + # 2. Rename this function to _SendXfrmNlRequestCstructOnly (or other name + # that makes it clear that this only takes cstructs). Switch callers + # that need non-cstruct elements to calling _SendNlRequest directly. + # 3. Make this function somehow automatically detect what to do for + # all types of XFRM attributes today and in the future. This may be + # feasible because all XFRM attributes today occupy the same number + # space, but what about nested attributes? It is unlikley feasible via + # things like "if isinstance(attr_msg, str): ...", because that would + # not be able to determine the right size or byte order for non-struct + # types such as int. + # 4. Define fictitious cstructs which have no correspondence to actual + # kernel structs such as the following to represent a raw integer. + # XfrmAttrOutputMark = cstruct.Struct("=I", mark) + if hasattr(attr_msg, "Pack"): + attr_msg = attr_msg.Pack() + msg += self._NlAttr(attr_type, attr_msg) return self._SendNlRequest(msg_type, msg, flags) def AddSaInfo(self, src, dst, spi, mode, reqid, encryption, auth_trunc, aead, - encap, mark, output_mark, is_update=False): + encap, mark, output_mark, is_update=False, xfrm_if_id=None): """Adds an IPsec security association. Args: @@ -463,6 +497,7 @@ class Xfrm(netlink.NetlinkSocket): output_mark: An integer, the output mark. 0 means unset. is_update: If true, update an existing SA otherwise create a new SA. For compatibility reasons, this value defaults to False. + xfrm_if_id: The XFRM interface ID, or None. """ proto = IPPROTO_ESP xfrm_id = XfrmId((PaddedAddress(dst), spi, proto)) @@ -488,6 +523,8 @@ class Xfrm(netlink.NetlinkSocket): nlattrs += self._NlAttr(XFRMA_ENCAP, encap.Pack()) if output_mark is not None: nlattrs += self._NlAttrU32(XFRMA_OUTPUT_MARK, output_mark) + if xfrm_if_id is not None: + nlattrs += self._NlAttrU32(XFRMA_IF_ID, xfrm_if_id) # The kernel ignores these on input, so make them empty. cur = XfrmLifetimeCur() @@ -519,7 +556,7 @@ class Xfrm(netlink.NetlinkSocket): nl_msg_type = XFRM_MSG_UPDSA if is_update else XFRM_MSG_NEWSA self._SendNlRequest(nl_msg_type, msg, flags) - def DeleteSaInfo(self, dst, spi, proto, mark=None): + def DeleteSaInfo(self, dst, spi, proto, mark=None, xfrm_if_id=None): """Delete an SA from the SAD Args: @@ -530,12 +567,13 @@ class Xfrm(netlink.NetlinkSocket): mark: A mark match specifier, such as returned by ExactMatchMark(), or None for an SA without a Mark attribute. """ - # TODO: deletes take a mark as well. family = AF_INET6 if ":" in dst else AF_INET usersa_id = XfrmUsersaId((PaddedAddress(dst), spi, family, proto)) nlattrs = [] if mark is not None: nlattrs.append((XFRMA_MARK, mark)) + if xfrm_if_id is not None: + nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id))) self.SendXfrmNlRequest(XFRM_MSG_DELSA, usersa_id, nlattrs) def AllocSpi(self, dst, proto, min_spi, max_spi): @@ -592,7 +630,7 @@ class Xfrm(netlink.NetlinkSocket): self._SendNlRequest(XFRM_MSG_FLUSHSA, usersa_flush.Pack(), flags) def CreateTunnel(self, direction, selector, src, dst, spi, encryption, - auth_trunc, mark, output_mark): + auth_trunc, mark, output_mark, xfrm_if_id): """Create an XFRM Tunnel Consisting of a Policy and an SA. Create a unidirectional XFRM tunnel, which entails one Policy and one @@ -614,11 +652,12 @@ class Xfrm(netlink.NetlinkSocket): unspecified. output_mark: The mark used to select the underlying network for packets outbound from xfrm. None means unspecified. + xfrm_if_id: The ID of the XFRM interface to use or None. """ outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst)) self.AddSaInfo(src, dst, spi, XFRM_MODE_TUNNEL, 0, encryption, auth_trunc, - None, None, mark, output_mark) + None, None, mark, output_mark, xfrm_if_id=xfrm_if_id) if selector is None: selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)] @@ -628,16 +667,19 @@ class Xfrm(netlink.NetlinkSocket): for selector in selectors: policy = UserPolicy(direction, selector) tmpl = UserTemplate(outer_family, spi, 0, (src, dst)) - self.AddPolicyInfo(policy, tmpl, mark) + self.AddPolicyInfo(policy, tmpl, mark, xfrm_if_id=xfrm_if_id) + + def DeleteTunnel(self, direction, selector, dst, spi, mark, xfrm_if_id): + if mark is not None: + mark = ExactMatchMark(mark) - def DeleteTunnel(self, direction, selector, dst, spi, mark): - self.DeleteSaInfo(dst, spi, IPPROTO_ESP, ExactMatchMark(mark)) + self.DeleteSaInfo(dst, spi, IPPROTO_ESP, mark, xfrm_if_id) if selector is None: selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)] else: selectors = [selector] for selector in selectors: - self.DeletePolicyInfo(selector, direction, ExactMatchMark(mark)) + self.DeletePolicyInfo(selector, direction, mark, xfrm_if_id) if __name__ == "__main__": diff --git a/net/test/xfrm_algorithm_test.py b/net/test/xfrm_algorithm_test.py index 6adc461..0176265 100755 --- a/net/test/xfrm_algorithm_test.py +++ b/net/test/xfrm_algorithm_test.py @@ -27,6 +27,7 @@ import unittest import multinetwork_base import net_test from tun_twister import TapTwister +import util import xfrm import xfrm_base @@ -72,49 +73,26 @@ AEAD_ALGOS = [ ] def InjectTests(): - XfrmAlgorithmTest.InjectTests() + XfrmAlgorithmTest.InjectTests() + class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): @classmethod def InjectTests(cls): - """Inject parameterized test cases into this class. - - Because a library for parameterized testing is not availble in - net_test.rootfs.20150203, this does a minimal parameterization. - - This finds methods named like "ParamTestFoo" and replaces them with several - "testFoo(*)" methods taking different parameter dicts. A set of test - parameters is generated from every combination of encryption, - authentication, IP version, and TCP/UDP. - - The benefit of this approach is that an individually failing tests have a - clearly separated stack trace, and one failed test doesn't prevent the rest - from running. - """ - param_test_names = [ - name for name in dir(cls) if name.startswith("ParamTest") - ] VERSIONS = (4, 6) TYPES = (SOCK_DGRAM, SOCK_STREAM) # Tests all combinations of auth & crypt. Mutually exclusive with aead. - for crypt, auth, version, proto, name in itertools.product( - CRYPT_ALGOS, AUTH_ALGOS, VERSIONS, TYPES, param_test_names): - XfrmAlgorithmTest.InjectSingleTest(name, version, proto, crypt=crypt, auth=auth) + param_list = itertools.product(VERSIONS, TYPES, AUTH_ALGOS, CRYPT_ALGOS, + [None]) + util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator) # Tests all combinations of aead. Mutually exclusive with auth/crypt. - for aead, version, proto, name in itertools.product( - AEAD_ALGOS, VERSIONS, TYPES, param_test_names): - XfrmAlgorithmTest.InjectSingleTest(name, version, proto, aead=aead) - - @classmethod - def InjectSingleTest(cls, name, version, proto, crypt=None, auth=None, aead=None): - func = getattr(cls, name) - - def TestClosure(self): - func(self, {"crypt": crypt, "auth": auth, "aead": aead, - "version": version, "proto": proto}) + param_list = itertools.product(VERSIONS, TYPES, [None], [None], AEAD_ALGOS) + util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator) + @staticmethod + def TestNameGenerator(version, proto, auth, crypt, aead): # Produce a unique and readable name for each test. e.g. # testSocketPolicySimple_cbc-aes_256_hmac-sha512_512_256_IPv6_UDP param_string = "" @@ -131,12 +109,9 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): param_string += "%s_%s" % ("IPv4" if version == 4 else "IPv6", "UDP" if proto == SOCK_DGRAM else "TCP") - new_name = "%s_%s" % (func.__name__.replace("ParamTest", "test"), - param_string) - new_name = new_name.replace("(", "-").replace(")", "") # remove parens - setattr(cls, new_name, TestClosure) + return param_string - def ParamTestSocketPolicySimple(self, params): + def ParamTestSocketPolicySimple(self, version, proto, auth, crypt, aead): """Test two-way traffic using transport mode and socket policies.""" def AssertEncrypted(packet): @@ -153,37 +128,21 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): # other using transport mode ESP. Because of TapTwister, both sockets # perceive each other as owning "remote_addr". netid = self.RandomNetid() - family = net_test.GetAddressFamily(params["version"]) - local_addr = self.MyAddress(params["version"], netid) - remote_addr = self.GetRemoteSocketAddress(params["version"]) - crypt_left = (xfrm.XfrmAlgo(( - params["crypt"].name, - params["crypt"].key_len)), - os.urandom(params["crypt"].key_len / 8)) if params["crypt"] else None - crypt_right = (xfrm.XfrmAlgo(( - params["crypt"].name, - params["crypt"].key_len)), - os.urandom(params["crypt"].key_len / 8)) if params["crypt"] else None - auth_left = (xfrm.XfrmAlgoAuth(( - params["auth"].name, - params["auth"].key_len, - params["auth"].trunc_len)), - os.urandom(params["auth"].key_len / 8)) if params["auth"] else None - auth_right = (xfrm.XfrmAlgoAuth(( - params["auth"].name, - params["auth"].key_len, - params["auth"].trunc_len)), - os.urandom(params["auth"].key_len / 8)) if params["auth"] else None - aead_left = (xfrm.XfrmAlgoAead(( - params["aead"].name, - params["aead"].key_len, - params["aead"].icv_len)), - os.urandom(params["aead"].key_len / 8)) if params["aead"] else None - aead_right = (xfrm.XfrmAlgoAead(( - params["aead"].name, - params["aead"].key_len, - params["aead"].icv_len)), - os.urandom(params["aead"].key_len / 8)) if params["aead"] else None + family = net_test.GetAddressFamily(version) + local_addr = self.MyAddress(version, netid) + remote_addr = self.GetRemoteSocketAddress(version) + auth_left = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)), + os.urandom(auth.key_len / 8)) if auth else None + auth_right = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)), + os.urandom(auth.key_len / 8)) if auth else None + crypt_left = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)), + os.urandom(crypt.key_len / 8)) if crypt else None + crypt_right = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)), + os.urandom(crypt.key_len / 8)) if crypt else None + aead_left = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)), + os.urandom(aead.key_len / 8)) if aead else None + aead_right = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)), + os.urandom(aead.key_len / 8)) if aead else None spi_left = 0xbeefface spi_right = 0xcafed00d req_ids = [100, 200, 300, 400] # Used to match templates and SAs. @@ -242,20 +201,20 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): output_mark=None) # Make two sockets. - sock_left = socket(family, params["proto"], 0) + sock_left = socket(family, proto, 0) sock_left.settimeout(2.0) sock_left.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.SelectInterface(sock_left, netid, "mark") - sock_right = socket(family, params["proto"], 0) + sock_right = socket(family, proto, 0) sock_right.settimeout(2.0) sock_right.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) self.SelectInterface(sock_right, netid, "mark") # For UDP, set SO_LINGER to 0, to prevent TCP sockets from hanging around # in a TIME_WAIT state. - if params["proto"] == SOCK_STREAM: - net_test.DisableFinWait(sock_left) - net_test.DisableFinWait(sock_right) + if proto == SOCK_STREAM: + net_test.DisableFinWait(sock_left) + net_test.DisableFinWait(sock_right) # Apply the left outbound socket policy. xfrm_base.ApplySocketPolicy(sock_left, family, xfrm.XFRM_POLICY_OUT, @@ -302,14 +261,14 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest): sock.close() # Server and client need to know each other's port numbers in advance. - wildcard_addr = net_test.GetWildcardAddress(params["version"]) + wildcard_addr = net_test.GetWildcardAddress(version) sock_left.bind((wildcard_addr, 0)) sock_right.bind((wildcard_addr, 0)) left_port = sock_left.getsockname()[1] right_port = sock_right.getsockname()[1] # Start the appropriate server type on sock_right. - target = TcpServer if params["proto"] == SOCK_STREAM else UdpServer + target = TcpServer if proto == SOCK_STREAM else UdpServer server = threading.Thread( target=target, args=(sock_right, left_port), diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py index 24f9edc..3a3d9b0 100755 --- a/net/test/xfrm_test.py +++ b/net/test/xfrm_test.py @@ -640,14 +640,14 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest): self.assertEquals(attributes['XFRMA_TMPL'], tmpl) # Create a new policy using update. - self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark) + self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark, None) # NEWPOLICY will not update the existing policy. This checks both that # UPDPOLICY created a policy and that NEWPOLICY will not perform updates. _CheckTemplateMatch(tmpl1) with self.assertRaisesErrno(EEXIST): - self.xfrm.AddPolicyInfo(policy, tmpl2, mark) + self.xfrm.AddPolicyInfo(policy, tmpl2, mark, None) # Update the policy using UPDPOLICY. - self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark) + self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark, None) # There should only be one policy after update, and it should have the # updated template. _CheckTemplateMatch(tmpl2) diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index 77cc8b3..5f243cc 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -19,32 +19,59 @@ from errno import * # pylint: disable=wildcard-import from socket import * # pylint: disable=wildcard-import import random +import itertools import struct import unittest +from scapy import all as scapy from tun_twister import TunTwister import csocket import iproute import multinetwork_base import net_test import packets +import util import xfrm import xfrm_base -# Parameters to Set up VTI as a special network -_BASE_VTI_NETID = {4: 40, 6: 60} +_LOOPBACK_IFINDEX = 1 +_TEST_XFRM_IFNAME = "ipsec42" +_TEST_XFRM_IF_ID = 42 + +# Does the kernel support xfrmi interfaces? +def HaveXfrmInterfaces(): + try: + i = iproute.IPRoute() + i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, + _LOOPBACK_IFINDEX) + i.DeleteLink(_TEST_XFRM_IFNAME) + try: + i.GetIfIndex(_TEST_XFRM_IFNAME) + assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME + except IOError: + pass + return True + except IOError: + return False + +HAVE_XFRM_INTERFACES = HaveXfrmInterfaces() + +# Parameters to setup tunnels as special networks +_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService +_BASE_TUNNEL_NETID = {4: 40, 6: 60} _BASE_VTI_OKEY = 2000000100 _BASE_VTI_IKEY = 2000000200 -_VTI_NETID = 50 -_VTI_IFNAME = "test_vti" - _TEST_OUT_SPI = 0x1234 _TEST_IN_SPI = _TEST_OUT_SPI _TEST_OKEY = 2000000100 _TEST_IKEY = 2000000200 +_TEST_REMOTE_PORT = 1234 + +_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6} + def _GetLocalInnerAddress(version): return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] @@ -58,61 +85,147 @@ def _GetRemoteOuterAddress(version): return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] +def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer, + src_port, dst_inner, dst_outer, + dst_port, spi, seq_num, ip_hdr_options={}): + ip_hdr_options.update({'src': src_inner, 'dst': dst_inner}) + + # Build and receive an ESP packet destined for the inner socket + IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] + input_pkt = ( + IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) / + net_test.UDP_PAYLOAD) + input_pkt = IpType(str(input_pkt)) # Compute length, checksum. + input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num, + (src_outer, dst_outer)) + + return input_pkt + + +def _CreateReceiveSock(version, port=0): + # Create a socket to receive packets. + read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) + read_sock.bind((net_test.GetWildcardAddress(version), port)) + # The second parameter of the tuple is the port number regardless of AF. + local_port = read_sock.getsockname()[1] + # Guard against the eventuality of the receive failing. + net_test.SetNonBlocking(read_sock.fileno()) + + return read_sock, local_port + + +def _SendPacket(testInstance, netid, version, remote, remote_port): + # Send a packet out via the tunnel-backed network, bound for the port number + # of the input socket. + write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) + testInstance.SelectInterface(write_sock, netid, "mark") + write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port)) + local_port = write_sock.getsockname()[1] + + return local_port + + +def InjectTests(): + InjectParameterizedTests(XfrmTunnelTest) + InjectParameterizedTests(XfrmInterfaceTest) + InjectParameterizedTests(XfrmVtiTest) + + +def InjectParameterizedTests(cls): + VERSIONS = (4, 6) + param_list = itertools.product(VERSIONS, VERSIONS) + + def NameGenerator(*args): + return "IPv%d_in_IPv%d" % tuple(args) + + util.InjectParameterizedTest(cls, param_list, NameGenerator) + + class XfrmTunnelTest(xfrm_base.XfrmLazyTest): - def _CheckTunnelOutput(self, inner_version, outer_version): - """Test a bi-directional XFRM Tunnel with explicit selectors""" + def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid, + netid, local_inner, remote_inner, local_outer, + remote_outer, write_sock): + + write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) + self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, + local_outer, remote_outer) + + def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid, + netid, local_inner, remote_inner, local_outer, + remote_outer, read_sock): + + # The second parameter of the tuple is the port number regardless of AF. + local_port = read_sock.getsockname()[1] + + # Build and receive an ESP packet destined for the inner socket + input_pkt = _GetNullAuthCryptTunnelModePkt( + inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT, + local_inner, local_outer, local_port, _TEST_IN_SPI, 1) + self.ReceivePacketOn(underlying_netid, input_pkt) + + # Verify that the packet data and src are correct + data, src = read_sock.recvfrom(4096) + self.assertEquals(net_test.UDP_PAYLOAD, data) + self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2]) + + def _TestTunnel(self, inner_version, outer_version, func, direction): + """Test a unidirectional XFRM Tunnel with explicit selectors""" # Select the underlying netid, which represents the external # interface from/to which to route ESP packets. - underlying_netid = self.RandomNetid() + u_netid = self.RandomNetid() # Select a random netid that will originate traffic locally and - # which represents the logical tunnel network. - netid = self.RandomNetid(exclude=underlying_netid) + # which represents the netid on which the plaintext is sent + netid = self.RandomNetid(exclude=u_netid) local_inner = self.MyAddress(inner_version, netid) remote_inner = _GetRemoteInnerAddress(inner_version) - local_outer = self.MyAddress(outer_version, underlying_netid) + local_outer = self.MyAddress(outer_version, u_netid) remote_outer = _GetRemoteOuterAddress(outer_version) + # Create input/ouput SPs, SAs and sockets to simulate a more realistic + # environment. + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, + xfrm.SrcDstSelector(remote_inner, local_inner), + remote_outer, local_outer, _TEST_IN_SPI, + xfrm_base._ALGO_CRYPT_NULL, + xfrm_base._ALGO_AUTH_NULL, None, None, None) + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner), local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - None, underlying_netid) + xfrm_base._ALGO_HMAC_SHA1, None, u_netid, None) write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - # Select an interface, which provides the source address of the inner - # packet. self.SelectInterface(write_sock, netid, "mark") - write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) - self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, - local_outer, remote_outer) - - # TODO: Add support for the input path. + read_sock, _ = _CreateReceiveSock(inner_version) - def testIpv4InIpv4TunnelOutput(self): - self._CheckTunnelOutput(4, 4) + sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock + func(inner_version, outer_version, u_netid, netid, local_inner, + remote_inner, local_outer, remote_outer, sock) - def testIpv4InIpv6TunnelOutput(self): - self._CheckTunnelOutput(4, 6) + def ParamTestTunnelInput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, + xfrm.XFRM_POLICY_IN) - def testIpv6InIpv4TunnelOutput(self): - self._CheckTunnelOutput(6, 4) - - def testIpv6InIpv6TunnelOutput(self): - self._CheckTunnelOutput(6, 6) + def ParamTestTunnelOutput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, + xfrm.XFRM_POLICY_OUT) @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): - def verifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, ikey, okey): + def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, + ikey, okey): self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey) self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey) family = AF_INET if version == 4 else AF_INET6 - self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), local_addr) - self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), remote_addr) + self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), + local_addr) + self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), + remote_addr) def testAddVti(self): """Test the creation of a Virtual Tunnel Interface.""" @@ -120,37 +233,37 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): netid = self.RandomNetid() local_addr = self.MyAddress(version, netid) self.iproute.CreateVirtualTunnelInterface( - dev_name=_VTI_IFNAME, + dev_name=_TEST_XFRM_IFNAME, local_addr=local_addr, remote_addr=_GetRemoteOuterAddress(version), o_key=_TEST_OKEY, i_key=_TEST_IKEY) - self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME), - version, local_addr, _GetRemoteOuterAddress(version), - _TEST_IKEY, _TEST_OKEY) + self._VerifyVtiInfoData( + self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, + _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} - new_okey = _TEST_OKEY + _VTI_NETID - new_ikey = _TEST_IKEY + _VTI_NETID + new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID + new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID self.iproute.CreateVirtualTunnelInterface( - dev_name=_VTI_IFNAME, + dev_name=_TEST_XFRM_IFNAME, local_addr=local_addr, remote_addr=new_remote_addr[version], o_key=new_okey, i_key=new_ikey, is_update=True) - self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME), - version, local_addr, new_remote_addr[version], - new_ikey, new_okey) + self._VerifyVtiInfoData( + self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, + new_remote_addr[version], new_ikey, new_okey) - if_index = self.iproute.GetIfIndex(_VTI_IFNAME) + if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) # Validate that the netlink interface matches the ioctl interface. - self.assertEquals(net_test.GetInterfaceIndex(_VTI_IFNAME), if_index) - self.iproute.DeleteLink(_VTI_IFNAME) + self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) + self.iproute.DeleteLink(_TEST_XFRM_IFNAME) with self.assertRaises(IOError): - self.iproute.GetIfIndex(_VTI_IFNAME) + self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) def _QuietDeleteLink(self, ifname): try: @@ -161,100 +274,204 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): def tearDown(self): super(XfrmAddDeleteVtiTest, self).tearDown() - self._QuietDeleteLink(_VTI_IFNAME) + self._QuietDeleteLink(_TEST_XFRM_IFNAME) + + +class SaInfo(object): + + def __init__(self, spi): + self.spi = spi + self.seq_num = 1 -class VtiInterface(object): +class IpSecBaseInterface(object): - def __init__(self, iface, netid, underlying_netid, local, remote): + def __init__(self, iface, netid, underlying_netid, local, remote, version): self.iface = iface self.netid = netid self.underlying_netid = underlying_netid self.local, self.remote = local, remote + + # XFRM interfaces technically do not have a version. This keeps track of + # the IP version of the local and remote addresses. + self.version = version self.rx = self.tx = 0 - self.ikey = _TEST_IKEY + netid - self.okey = _TEST_OKEY + netid - self.out_spi = self.in_spi = random.randint(0, 0x7fffffff) + self.addrs = {} self.iproute = iproute.IPRoute() self.xfrm = xfrm.Xfrm() - self.SetupInterface() - self.SetupXfrm() - self.addrs = {} - def Teardown(self): self.TeardownXfrm() self.TeardownInterface() - def SetupInterface(self): - self.iproute.CreateVirtualTunnelInterface( - self.iface, self.local, self.remote, self.ikey, self.okey) - def TeardownInterface(self): self.iproute.DeleteLink(self.iface) - def SetupXfrm(self): + def SetupXfrm(self, use_null_crypt): + rand_spi = random.randint(0, 0x7fffffff) + self.in_sa = SaInfo(rand_spi) + self.out_sa = SaInfo(rand_spi) + + # Select algorithms: + if use_null_crypt: + auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL + else: + auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256 + + self._SetupXfrmByType(auth, crypt) + + def TeardownXfrm(self): + raise NotImplementedError("Subclasses should implement this") + + def _SetupXfrmByType(self, auth_algo, crypt_algo): + raise NotImplementedError("Subclasses should implement this") + + +class VtiInterface(IpSecBaseInterface): + + def __init__(self, iface, netid, underlying_netid, _, local, remote, version): + super(VtiInterface, self).__init__(iface, netid, underlying_netid, local, + remote, version) + + self.ikey = _TEST_IKEY + netid + self.okey = _TEST_OKEY + netid + + self.SetupInterface() + self.SetupXfrm(False) + + def SetupInterface(self): + return self.iproute.CreateVirtualTunnelInterface( + self.iface, self.local, self.remote, self.ikey, self.okey) + + def _SetupXfrmByType(self, auth_algo, crypt_algo): # For the VTI, the selectors are wildcard since packets will only # be selected if they have the appropriate mark, hence the inner # addresses are wildcard. self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, - self.out_spi, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, + self.out_sa.spi, crypt_algo, auth_algo, xfrm.ExactMatchMark(self.okey), - self.underlying_netid) + self.underlying_netid, None) self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, - self.in_spi, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - xfrm.ExactMatchMark(self.ikey), None) + self.in_sa.spi, crypt_algo, auth_algo, + xfrm.ExactMatchMark(self.ikey), None, None) def TeardownXfrm(self): self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, - self.out_spi, self.okey) + self.out_sa.spi, self.okey, None) self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, - self.in_spi, self.ikey) + self.in_sa.spi, self.ikey, None) + + +@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") +class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): + """Test the creation of an XFRM Interface.""" + + def testAddXfrmInterface(self): + self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, + _LOOPBACK_IFINDEX) + if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) + net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) + + # Validate that the netlink interface matches the ioctl interface. + self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) + self.iproute.DeleteLink(_TEST_XFRM_IFNAME) + with self.assertRaises(IOError): + self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) + + +class XfrmInterface(IpSecBaseInterface): + + def __init__(self, iface, netid, underlying_netid, ifindex, local, remote, + version): + super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local, + remote, version) + + self.ifindex = ifindex + self.xfrm_if_id = netid + + self.SetupInterface() + self.SetupXfrm(False) + + def SetupInterface(self): + """Create an XFRM interface.""" + return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) + + def _SetupXfrmByType(self, auth_algo, crypt_algo): + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, + self.out_sa.spi, crypt_algo, auth_algo, None, + self.underlying_netid, self.xfrm_if_id) + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, + self.in_sa.spi, crypt_algo, auth_algo, None, None, + self.xfrm_if_id) + + def TeardownXfrm(self): + self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, + self.out_sa.spi, None, self.xfrm_if_id) + self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, + self.in_sa.spi, None, self.xfrm_if_id) -@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") -class XfrmVtiTest(xfrm_base.XfrmBaseTest): + +class XfrmTunnelBase(xfrm_base.XfrmBaseTest): @classmethod def setUpClass(cls): xfrm_base.XfrmBaseTest.setUpClass() - # VTI interfaces use marks extensively, so configure realistic packet + # Tunnel interfaces use marks extensively, so configure realistic packet # marking rules to make the test representative, make PMTUD work, etc. cls.SetInboundMarks(True) cls.SetMarkReflectSysctls(1) - cls.vtis = {} + # Group by tunnel version to ensure that we test at least one IPv4 and one + # IPv6 tunnel + cls.tunnelsV4 = {} + cls.tunnelsV6 = {} for i, underlying_netid in enumerate(cls.tuns): for version in 4, 6: - netid = _BASE_VTI_NETID[version] + i + netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i iface = "ipsec%s" % netid local = cls.MyAddress(version, underlying_netid) if version == 4: - remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR + remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2) else: - remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR - vti = VtiInterface(iface, netid, underlying_netid, local, remote) + remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2) + + ifindex = cls.ifindices[underlying_netid] + tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, + local, remote, version) cls._SetInboundMarking(netid, iface, True) - cls._SetupVtiNetwork(vti, True) - cls.vtis[netid] = vti + cls._SetupTunnelNetwork(tunnel, True) + + # On slower platforms, the test does not complete before the delay probe time fires. + # This causes the test to fail because of the unexpected NUD packet. b/123202162 + if version == 6: + cls.SetSysctl("/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" + % cls.GetInterfaceName(underlying_netid) , 10) + if version == 4: + cls.tunnelsV4[netid] = tunnel + else: + cls.tunnelsV6[netid] = tunnel @classmethod def tearDownClass(cls): # The sysctls are restored by MultinetworkBaseTest.tearDownClass. cls.SetInboundMarks(False) - for vti in cls.vtis.values(): - cls._SetInboundMarking(vti.netid, vti.iface, False) - cls._SetupVtiNetwork(vti, False) - vti.Teardown() + for tunnel in cls.tunnelsV4.values() + cls.tunnelsV6.values(): + cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) + cls._SetupTunnelNetwork(tunnel, False) + tunnel.Teardown() xfrm_base.XfrmBaseTest.tearDownClass() + def randomTunnel(self, outer_version): + version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 + return random.choice(version_dict.values()) + def setUp(self): multinetwork_base.MultiNetworkBaseTest.setUp(self) self.iproute = iproute.IPRoute() + self.xfrm = xfrm.Xfrm() def tearDown(self): multinetwork_base.MultiNetworkBaseTest.tearDown(self) @@ -275,16 +492,23 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): net_test.AddressLengthBits(version), ifindex) @classmethod - def _SetupVtiNetwork(cls, vti, is_add): - """Setup rules and routes for a VTI Network. + def _GetLocalAddress(cls, version, netid): + if version == 4: + return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET) + else: + return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1" + + @classmethod + def _SetupTunnelNetwork(cls, tunnel, is_add): + """Setup rules and routes for a tunnel Network. Takes an interface and depending on the boolean value of is_add, either adds or removes the rules - and routes for a VTI to behave like an Android - Network for purposes of testing. + and routes for a tunnel interface to behave like an + Android Network for purposes of testing. Args: - vti: A VtiInterface, the VTI to set up. + tunnel: A VtiInterface or XfrmInterface, the tunnel to set up. is_add: Boolean that causes this method to perform setup if True or teardown if False """ @@ -292,32 +516,30 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): # Disable router solicitations to avoid occasional spurious packets # arriving on the underlying network; there are two possible behaviors # when that occurred: either only the RA packet is read, and when it - # is echoed back to the VTI, it causes the test to fail by not receiving - # the UDP_PAYLOAD; or, two packets may arrive on the underlying - # network which fails the assertion that only one ESP packet is received. + # is echoed back to the tunnel, it causes the test to fail by not + # receiving # the UDP_PAYLOAD; or, two packets may arrive on the + # underlying # network which fails the assertion that only one ESP packet + # is received. cls.SetSysctl( - "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0) - net_test.SetInterfaceUp(vti.iface) + "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0) + net_test.SetInterfaceUp(tunnel.iface) for version in [4, 6]: - ifindex = net_test.GetInterfaceIndex(vti.iface) - table = vti.netid + ifindex = net_test.GetInterfaceIndex(tunnel.iface) + table = tunnel.netid # Set up routing rules. - start, end = cls.UidRangeForNetid(vti.netid) + start, end = cls.UidRangeForNetid(tunnel.netid) cls.iproute.UidRangeRule(version, is_add, start, end, table, cls.PRIORITY_UID) - cls.iproute.OifRule(version, is_add, vti.iface, table, cls.PRIORITY_OIF) - cls.iproute.FwmarkRule(version, is_add, vti.netid, cls.NETID_FWMASK, + cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF) + cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK, table, cls.PRIORITY_FWMARK) # Configure IP addresses. - if version == 4: - addr = cls._MyIPv4Address(vti.netid) - else: - addr = cls.OnlinkPrefix(6, vti.netid) + "1" + addr = cls._GetLocalAddress(version, tunnel.netid) prefixlen = net_test.AddressLengthBits(version) - vti.addrs[version] = addr + tunnel.addrs[version] = addr if is_add: cls.iproute.AddAddress(addr, prefixlen, ifindex) cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) @@ -325,100 +547,238 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) cls.iproute.DelAddress(addr, prefixlen, ifindex) - def assertReceivedPacket(self, vti): - vti.rx += 1 - self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) - - def assertSentPacket(self, vti): - vti.tx += 1 - self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface)) - - # TODO: Should we completely re-write this using null encryption and null - # authentication? We could then assemble and disassemble packets for each - # direction individually. This approach would improve debuggability, avoid the - # complexity of the twister, and allow the test to more-closely validate - # deployable configurations. - def _CheckVtiInputOutput(self, vti, inner_version): - local_outer = vti.local - remote_outer = vti.remote - - # Create a socket to receive packets. - read_sock = socket( - net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - read_sock.bind((net_test.GetWildcardAddress(inner_version), 0)) - # The second parameter of the tuple is the port number regardless of AF. - port = read_sock.getsockname()[1] - # Guard against the eventuality of the receive failing. - csocket.SetSocketTimeout(read_sock, 100) - - # Send a packet out via the vti-backed network, bound for the port number - # of the input socket. - write_sock = socket( - net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) - self.SelectInterface(write_sock, vti.netid, "mark") - write_sock.sendto(net_test.UDP_PAYLOAD, - (_GetRemoteInnerAddress(inner_version), port)) + def assertReceivedPacket(self, tunnel, sa_info): + tunnel.rx += 1 + self.assertEquals((tunnel.rx, tunnel.tx), + self.iproute.GetRxTxPackets(tunnel.iface)) + sa_info.seq_num += 1 + + def assertSentPacket(self, tunnel, sa_info): + tunnel.tx += 1 + self.assertEquals((tunnel.rx, tunnel.tx), + self.iproute.GetRxTxPackets(tunnel.iface)) + sa_info.seq_num += 1 + + def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner, + sa_info=None): + """Test null-crypt input path over an IPsec interface.""" + if sa_info is None: + sa_info = tunnel.in_sa + read_sock, local_port = _CreateReceiveSock(inner_version) + + input_pkt = _GetNullAuthCryptTunnelModePkt( + inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT, + local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num) + self.ReceivePacketOn(tunnel.underlying_netid, input_pkt) + + # Verify that the packet data and src are correct + self.assertReceivedPacket(tunnel, sa_info) + data, src = read_sock.recvfrom(4096) + self.assertEquals(net_test.UDP_PAYLOAD, data) + self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2]) + + def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, + remote_inner, sa_info=None): + """Test null-crypt output path over an IPsec interface.""" + if sa_info is None: + sa_info = tunnel.out_sa + local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, + _TEST_REMOTE_PORT) # Read a tunneled IP packet on the underlying (outbound) network # verifying that it is an ESP packet. - self.assertSentPacket(vti) - pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, - local_outer, remote_outer) - - # Perform an address switcheroo so that the inner address of the remote - # end of the tunnel is now the address on the local VTI interface; this - # way, the twisted inner packet finds a destination via the VTI once - # decrypted. - remote = _GetRemoteInnerAddress(inner_version) - local = vti.addrs[inner_version] - self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local) + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, + sa_info.seq_num, None, tunnel.local, + tunnel.remote) + + # Get and update the IP headers on the inner payload so that we can do a simple + # comparison of byte data. Unfortunately, due to the scapy version this runs on, + # we cannot parse past the ESP header to the inner IP header, and thus have to + # workaround in this manner + if inner_version == 4: + ip_hdr_options = { + 'id': scapy.IP(str(pkt.payload)[8:]).id, + 'flags': scapy.IP(str(pkt.payload)[8:]).flags + } + else: + ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl} + + expected = _GetNullAuthCryptTunnelModePkt( + inner_version, local_inner, tunnel.local, local_port, remote_inner, + tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num, + ip_hdr_options) + + # Check outer header manually (Avoids having to overwrite outer header's + # id, flags or flow label) + self.assertSentPacket(tunnel, sa_info) + self.assertEquals(expected.src, pkt.src) + self.assertEquals(expected.dst, pkt.dst) + self.assertEquals(len(expected), len(pkt)) + + # Check everything else + self.assertEquals(str(expected.payload), str(pkt.payload)) + + def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, + remote_inner): + """Test both input and output paths over an encrypted IPsec interface. + + This tests specifically makes sure that the both encryption and decryption + work together, as opposed to the _CheckTunnel(Input|Output) where the + input and output paths are tested separately, and using null encryption. + """ + src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, + _TEST_REMOTE_PORT) + + # Make sure it appeared on the underlying interface + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi, + tunnel.out_sa.seq_num, None, tunnel.local, + tunnel.remote) + + # Check that packet is not sent in plaintext + self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt)) + + # Check src/dst + self.assertEquals(tunnel.local, pkt.src) + self.assertEquals(tunnel.remote, pkt.dst) + + # Check that the interface statistics recorded the outbound packet + self.assertSentPacket(tunnel, tunnel.out_sa) + try: - # Swap the packet's IP headers and write it back to the - # underlying network. + # Swap the interface addresses to pretend we are the remote + self._SwapInterfaceAddress( + tunnel.iface, new_addr=remote_inner, old_addr=local_inner) + + # Swap the packet's IP headers and write it back to the underlying + # network. pkt = TunTwister.TwistPacket(pkt) - self.ReceivePacketOn(vti.underlying_netid, pkt) - # Receive the decrypted packet on the dest port number. - read_packet = read_sock.recv(4096) - self.assertEquals(read_packet, net_test.UDP_PAYLOAD) - self.assertReceivedPacket(vti) - finally: - # Unwind the switcheroo - self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote) + read_sock, local_port = _CreateReceiveSock(inner_version, + _TEST_REMOTE_PORT) + self.ReceivePacketOn(tunnel.underlying_netid, pkt) + # Verify that the packet data and src are correct + data, src = read_sock.recvfrom(4096) + self.assertEquals(net_test.UDP_PAYLOAD, data) + self.assertEquals((local_inner, src_port), src[:2]) + + # Check that the interface statistics recorded the inbound packet + self.assertReceivedPacket(tunnel, tunnel.in_sa) + finally: + # Swap the interface addresses to pretend we are the remote + self._SwapInterfaceAddress( + tunnel.iface, new_addr=local_inner, old_addr=remote_inner) + + def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner, + sa_info=None): + """Test ICMP error path over an IPsec interface.""" + if sa_info is None: + sa_info = tunnel.out_sa # Now attempt to provoke an ICMP error. # TODO: deduplicate with multinetwork_test.py. - version = net_test.GetAddressVersion(vti.remote) dst_prefix, intermediate = { 4: ("172.19.", "172.16.9.12"), 6: ("2001:db8::", "2001:db8::1") - }[version] + }[tunnel.version] - write_sock.sendto(net_test.UDP_PAYLOAD, - (_GetRemoteInnerAddress(inner_version), port)) - self.assertSentPacket(vti) - pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None, - local_outer, remote_outer) - myaddr = self.MyAddress(version, vti.underlying_netid) - _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt) - self.ReceivePacketOn(vti.underlying_netid, toobig) + local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, + _TEST_REMOTE_PORT) + pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, + sa_info.seq_num, None, tunnel.local, + tunnel.remote) + self.assertSentPacket(tunnel, sa_info) + + myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid) + _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr, + pkt) + self.ReceivePacketOn(tunnel.underlying_netid, toobig) # Check that the packet too big reduced the MTU. - routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None) + routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) self.assertEquals(1, len(routes)) rtmsg, attributes = routes[0] self.assertEquals(iproute.RTN_UNICAST, rtmsg.type) self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) # Clear PMTU information so that future tests don't have to worry about it. - self.InvalidateDstCache(version, vti.underlying_netid) + self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) + + def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner, + remote_inner): + """Test combined encryption path with ICMP errors over an IPsec tunnel""" + self._CheckTunnelEncryption(tunnel, inner_version, local_inner, + remote_inner) + self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner) + self._CheckTunnelEncryption(tunnel, inner_version, local_inner, + remote_inner) + + def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): + """Bootstrap method to setup and run tests for the given parameters.""" + tunnel = self.randomTunnel(outer_version) + + try: + tunnel.TeardownXfrm() + tunnel.SetupXfrm(use_null_crypt) + + local_inner = tunnel.addrs[inner_version] + remote_inner = _GetRemoteInnerAddress(inner_version) + + # Run twice to ensure sequence numbers are tested + for i in range(2): + func(tunnel, inner_version, local_inner, remote_inner) + finally: + if use_null_crypt: + tunnel.TeardownXfrm() + tunnel.SetupXfrm(False) + + +@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") +class XfrmVtiTest(XfrmTunnelBase): + + INTERFACE_CLASS = VtiInterface + + def ParamTestVtiInput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) + + def ParamTestVtiOutput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, + True) + + def ParamTestVtiInOutEncrypted(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, + False) + + def ParamTestVtiIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) + + def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, + self._CheckTunnelEncryptionWithIcmp, False) + + +@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") +class XfrmInterfaceTest(XfrmTunnelBase): + + INTERFACE_CLASS = XfrmInterface + + def ParamTestXfrmIntfInput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) + + def ParamTestXfrmIntfOutput(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, + True) + + def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, + False) + + def ParamTestXfrmIntfIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) - def testVtiInputOutput(self): - """Test packet input and output over a Virtual Tunnel Interface.""" - for i in xrange(3 * len(self.vtis.values())): - vti = random.choice(self.vtis.values()) - self._CheckVtiInputOutput(vti, 4) - self._CheckVtiInputOutput(vti, 6) + def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): + self._TestTunnel(inner_version, outer_version, + self._CheckTunnelEncryptionWithIcmp, False) if __name__ == "__main__": + InjectTests() unittest.main() |