summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorandroid-build-team Robot <android-build-team-robot@google.com>2019-07-08 23:24:51 +0000
committerandroid-build-team Robot <android-build-team-robot@google.com>2019-07-08 23:24:51 +0000
commit776c2fe2038c881d5c2e7f829a1bd0c4b4807294 (patch)
tree06d8c097803925948575a36f2bf67f9773b2ddc0
parentef1b71e21270dcb334e577dab3a9e51321daff5f (diff)
parente90514f71460097fa3c1058cb73d78238c121433 (diff)
downloadtests-pie-platform-release.tar.gz
Snap for 5622519 from e90514f71460097fa3c1058cb73d78238c121433 to pi-platform-releasepie-platform-release
Change-Id: Ia179f195a05965e129c2b7e0452aed0519a4a62c
-rwxr-xr-xnet/test/bpf.py4
-rw-r--r--net/test/iproute.py26
-rwxr-xr-xnet/test/parameterization_test.py83
-rwxr-xr-xnet/test/run_net_test.sh2
-rw-r--r--net/test/util.py57
-rwxr-xr-xnet/test/xfrm.py74
-rwxr-xr-xnet/test/xfrm_algorithm_test.py109
-rwxr-xr-xnet/test/xfrm_test.py6
-rwxr-xr-xnet/test/xfrm_tunnel_test.py710
9 files changed, 796 insertions, 275 deletions
diff --git a/net/test/bpf.py b/net/test/bpf.py
index 5e90daa..9dc3c46 100755
--- a/net/test/bpf.py
+++ b/net/test/bpf.py
@@ -28,7 +28,9 @@ __NR_bpf = {
"aarch64": 280,
"armv7l": 386,
"armv8l": 386,
- "x86_64": 321}[os.uname()[4]]
+ "i686" : 357,
+ "x86_64": 321,
+}[os.uname()[4]]
LOG_LEVEL = 1
LOG_SIZE = 65536
diff --git a/net/test/iproute.py b/net/test/iproute.py
index a3310a2..8376eb6 100644
--- a/net/test/iproute.py
+++ b/net/test/iproute.py
@@ -209,13 +209,17 @@ IFLA_PAD = 42
IFLA_XDP = 43
IFLA_EVENT = 44
-# linux/include/uapi/if_link.h
+# include/uapi/linux/if_link.h
IFLA_INFO_UNSPEC = 0
IFLA_INFO_KIND = 1
IFLA_INFO_DATA = 2
IFLA_INFO_XSTATS = 3
-# linux/if_tunnel.h
+IFLA_XFRM_UNSPEC = 0
+IFLA_XFRM_LINK = 1
+IFLA_XFRM_IF_ID = 2
+
+# include/uapi/linux/if_tunnel.h
IFLA_VTI_UNSPEC = 0
IFLA_VTI_LINK = 1
IFLA_VTI_IKEY = 2
@@ -682,7 +686,7 @@ class IPRoute(netlink.NetlinkSocket):
attrs = self._ParseAttributes(RTM_NEWLINK, IfinfoMsg, attrs)
return attrs["IFLA_STATS64"]
- def GetVtiInfoData(self, dev_name):
+ def GetIfinfoData(self, dev_name):
"""Returns an IFLA_INFO_DATA dict object for the specified interface."""
_, attrs = self.GetIfinfo(dev_name)
attrs = self._ParseAttributes(RTM_NEWLINK, IfinfoMsg, attrs)
@@ -745,6 +749,22 @@ class IPRoute(netlink.NetlinkSocket):
flags |= netlink.NLM_F_EXCL
return self._SendNlRequest(RTM_NEWLINK, ifinfo, flags)
+ def CreateXfrmInterface(self, dev_name, xfrm_if_id, underlying_ifindex):
+ """Creates an XFRM interface with the specified parameters."""
+ # The netlink attribute structure is essentially identical to the one
+ # for VTI above (q.v).
+ ifdata = self._NlAttrU32(IFLA_XFRM_LINK, underlying_ifindex)
+ ifdata += self._NlAttrU32(IFLA_XFRM_IF_ID, xfrm_if_id)
+
+ linkinfo = self._NlAttrStr(IFLA_INFO_KIND, "xfrm")
+ linkinfo += self._NlAttr(IFLA_INFO_DATA, ifdata)
+
+ msg = IfinfoMsg().Pack()
+ msg += self._NlAttrStr(IFLA_IFNAME, dev_name)
+ msg += self._NlAttr(IFLA_LINKINFO, linkinfo)
+
+ return self._SendNlRequest(RTM_NEWLINK, msg)
+
if __name__ == "__main__":
iproute = IPRoute()
diff --git a/net/test/parameterization_test.py b/net/test/parameterization_test.py
new file mode 100755
index 0000000..8f9e130
--- /dev/null
+++ b/net/test/parameterization_test.py
@@ -0,0 +1,83 @@
+#!/usr/bin/python
+#
+# Copyright 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import unittest
+
+import net_test
+import util
+
+
+def InjectTests():
+ ParmeterizationTest.InjectTests()
+
+
+# This test class ensures that the Parameterized Test generator in utils.py
+# works properly. It injects test methods into itself, and ensures that they
+# are generated as expected, and that the TestClosures being run are properly
+# defined, and running different parameterized tests each time.
+class ParmeterizationTest(net_test.NetworkTest):
+ tests_run_list = []
+
+ @staticmethod
+ def NameGenerator(a, b, c):
+ return str(a) + "_" + str(b) + "_" + str(c)
+
+ @classmethod
+ def InjectTests(cls):
+ PARAMS_A = (1, 2)
+ PARAMS_B = (3, 4)
+ PARAMS_C = (5, 6)
+
+ param_list = itertools.product(PARAMS_A, PARAMS_B, PARAMS_C)
+ util.InjectParameterizedTest(cls, param_list, cls.NameGenerator)
+
+ def ParamTestDummyFunc(self, a, b, c):
+ self.tests_run_list.append(
+ "testDummyFunc_" + ParmeterizationTest.NameGenerator(a, b, c))
+
+ def testParameterization(self):
+ expected = [
+ "testDummyFunc_1_3_5",
+ "testDummyFunc_1_3_6",
+ "testDummyFunc_1_4_5",
+ "testDummyFunc_1_4_6",
+ "testDummyFunc_2_3_5",
+ "testDummyFunc_2_3_6",
+ "testDummyFunc_2_4_5",
+ "testDummyFunc_2_4_6",
+ ]
+
+ actual = [name for name in dir(self) if name.startswith("testDummyFunc")]
+
+ # Check that name and contents are equal
+ self.assertEqual(len(expected), len(actual))
+ self.assertEqual(sorted(expected), sorted(actual))
+
+ # Start a clean list, and run all the tests.
+ self.tests_run_list = list()
+ for test_name in expected:
+ test_method = getattr(self, test_name)
+ test_method()
+
+ # Make sure all tests have been run with the correct parameters
+ for test_name in expected:
+ self.assertTrue(test_name in self.tests_run_list)
+
+
+if __name__ == "__main__":
+ ParmeterizationTest.InjectTests()
+ unittest.main()
diff --git a/net/test/run_net_test.sh b/net/test/run_net_test.sh
index 8c256b4..c7c18d3 100755
--- a/net/test/run_net_test.sh
+++ b/net/test/run_net_test.sh
@@ -30,7 +30,7 @@ OPTIONS="$OPTIONS TRANSPORT INET_XFRM_MODE_TUNNEL INET6_AH INET6_ESP"
OPTIONS="$OPTIONS INET6_XFRM_MODE_TRANSPORT INET6_XFRM_MODE_TUNNEL"
OPTIONS="$OPTIONS CRYPTO_SHA256 CRYPTO_SHA512 CRYPTO_AES_X86_64 CRYPTO_NULL"
OPTIONS="$OPTIONS CRYPTO_GCM CRYPTO_ECHAINIV NET_IPVTI IPV6_VTI"
-OPTIONS="$OPTIONS SOCK_CGROUP_DATA CGROUP_BPF"
+OPTIONS="$OPTIONS SOCK_CGROUP_DATA CGROUP_BPF CONFIG_XFRM_INTERFACE"
# For 4.14 kernels, where UBD and HOSTFS are not set
OPTIONS="$OPTIONS CONFIG_BLK_DEV_UBD CONFIG_HOSTFS"
diff --git a/net/test/util.py b/net/test/util.py
index bed3e1d..cbcd2d0 100644
--- a/net/test/util.py
+++ b/net/test/util.py
@@ -13,4 +13,59 @@
# limitations under the License.
def GetPadLength(block_size, length):
- return (block_size - (length % block_size)) % block_size \ No newline at end of file
+ return (block_size - (length % block_size)) % block_size
+
+
+def InjectParameterizedTest(cls, param_list, name_generator):
+ """Injects parameterized tests into the provided class
+
+ This method searches for all tests that start with the name "ParamTest",
+ and injects a test method for each set of parameters in param_list. Names
+ are generated via the use of the name_generator.
+
+ Args:
+ cls: the class for which to inject all parameterized tests
+ param_list: a list of tuples, where each tuple is a combination of
+ of parameters to test (i.e. representing a single test case)
+ name_generator: A function that takes a combination of parameters and
+ returns a string that identifies the test case.
+ """
+ param_test_names = [name for name in dir(cls) if name.startswith("ParamTest")]
+
+ # Force param_list to an actual list; otherwise itertools.Product will hit
+ # the end, resulting in only the first ParamTest* method actually being
+ # parameterized
+ param_list = list(param_list)
+
+ # Parameterize each test method starting with "ParamTest"
+ for test_name in param_test_names:
+ func = getattr(cls, test_name)
+
+ for params in param_list:
+ # Give the test method a readable, debuggable name.
+ param_string = name_generator(*params)
+ new_name = "%s_%s" % (func.__name__.replace("ParamTest", "test"),
+ param_string)
+ new_name = new_name.replace("(", "-").replace(")", "") # remove parens
+
+ # Inject the test method
+ setattr(cls, new_name, _GetTestClosure(func, params))
+
+
+def _GetTestClosure(func, params):
+ """ Creates a no-argument test method for the given function and parameters.
+
+ This is required to be separate from the InjectParameterizedTest method, due
+ to some interesting scoping issues with internal function declarations. If
+ left in InjectParameterizedTest, all the tests end up using the same
+ instance of TestClosure
+
+ Args:
+ func: the function for which this test closure should run
+ params: the parameters for the run of this test function
+ """
+
+ def TestClosure(self):
+ func(self, *params)
+
+ return TestClosure
diff --git a/net/test/xfrm.py b/net/test/xfrm.py
index 1bd10da..6a03c13 100755
--- a/net/test/xfrm.py
+++ b/net/test/xfrm.py
@@ -85,6 +85,8 @@ XFRMA_ADDRESS_FILTER = 26
XFRMA_PAD = 27
XFRMA_OFFLOAD_DEV = 28
XFRMA_OUTPUT_MARK = 29
+XFRMA_INPUT_MARK = 30
+XFRMA_IF_ID = 31
# Other netlink constants. See include/uapi/linux/xfrm.h.
@@ -369,21 +371,25 @@ class Xfrm(netlink.NetlinkSocket):
data = struct.unpack("=I", nla_data)[0]
elif name == "XFRMA_TMPL":
data = cstruct.Read(nla_data, XfrmUserTmpl)[0]
+ elif name == "XFRMA_IF_ID":
+ data = struct.unpack("=I", nla_data)[0]
else:
data = nla_data
return name, data
- def _UpdatePolicyInfo(self, msg, policy, tmpl, mark):
+ def _UpdatePolicyInfo(self, msg, policy, tmpl, mark, xfrm_if_id):
"""Send a policy to the Security Policy Database"""
nlattrs = []
if tmpl is not None:
nlattrs.append((XFRMA_TMPL, tmpl))
if mark is not None:
nlattrs.append((XFRMA_MARK, mark))
+ if xfrm_if_id is not None:
+ nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id)))
self.SendXfrmNlRequest(msg, policy, nlattrs)
- def AddPolicyInfo(self, policy, tmpl, mark):
+ def AddPolicyInfo(self, policy, tmpl, mark, xfrm_if_id=None):
"""Add a new policy to the Security Policy Database
If the policy exists, then return an error (EEXIST).
@@ -392,10 +398,11 @@ class Xfrm(netlink.NetlinkSocket):
policy: an unpacked XfrmUserpolicyInfo
tmpl: an unpacked XfrmUserTmpl
mark: an unpacked XfrmMark
+ xfrm_if_id: the XFRM interface ID as an integer, or None
"""
- self._UpdatePolicyInfo(XFRM_MSG_NEWPOLICY, policy, tmpl, mark)
+ self._UpdatePolicyInfo(XFRM_MSG_NEWPOLICY, policy, tmpl, mark, xfrm_if_id)
- def UpdatePolicyInfo(self, policy, tmpl, mark):
+ def UpdatePolicyInfo(self, policy, tmpl, mark, xfrm_if_id):
"""Update an existing policy in the Security Policy Database
If the policy does not exist, then create it; otherwise, update the
@@ -405,10 +412,11 @@ class Xfrm(netlink.NetlinkSocket):
policy: an unpacked XfrmUserpolicyInfo
tmpl: an unpacked XfrmUserTmpl to update
mark: an unpacked XfrmMark to match the existing policy or None
+ xfrm_if_id: an XFRM interface ID or None
"""
- self._UpdatePolicyInfo(XFRM_MSG_UPDPOLICY, policy, tmpl, mark)
+ self._UpdatePolicyInfo(XFRM_MSG_UPDPOLICY, policy, tmpl, mark, xfrm_if_id)
- def DeletePolicyInfo(self, selector, direction, mark):
+ def DeletePolicyInfo(self, selector, direction, mark, xfrm_if_id=None):
"""Delete a policy from the Security Policy Database
Args:
@@ -419,6 +427,8 @@ class Xfrm(netlink.NetlinkSocket):
nlattrs = []
if mark is not None:
nlattrs.append((XFRMA_MARK, mark))
+ if xfrm_if_id is not None:
+ nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id)))
self.SendXfrmNlRequest(XFRM_MSG_DELPOLICY,
XfrmUserpolicyId(sel=selector, dir=direction),
nlattrs)
@@ -440,11 +450,35 @@ class Xfrm(netlink.NetlinkSocket):
if nlattrs is None:
nlattrs = []
for attr_type, attr_msg in nlattrs:
- msg += self._NlAttr(attr_type, attr_msg.Pack())
+ # TODO: find a better way to deal with the fact that many XFRM messages
+ # use nlattrs that aren't cstructs.
+ #
+ # This code allows callers to pass in either something that has a Pack()
+ # method or a packed netlink attr, but not other types of attributes.
+ # Alternatives include:
+ #
+ # 1. Require callers to marshal netlink attributes themselves and call
+ # _SendNlRequest directly. Delete this method.
+ # 2. Rename this function to _SendXfrmNlRequestCstructOnly (or other name
+ # that makes it clear that this only takes cstructs). Switch callers
+ # that need non-cstruct elements to calling _SendNlRequest directly.
+ # 3. Make this function somehow automatically detect what to do for
+ # all types of XFRM attributes today and in the future. This may be
+ # feasible because all XFRM attributes today occupy the same number
+ # space, but what about nested attributes? It is unlikley feasible via
+ # things like "if isinstance(attr_msg, str): ...", because that would
+ # not be able to determine the right size or byte order for non-struct
+ # types such as int.
+ # 4. Define fictitious cstructs which have no correspondence to actual
+ # kernel structs such as the following to represent a raw integer.
+ # XfrmAttrOutputMark = cstruct.Struct("=I", mark)
+ if hasattr(attr_msg, "Pack"):
+ attr_msg = attr_msg.Pack()
+ msg += self._NlAttr(attr_type, attr_msg)
return self._SendNlRequest(msg_type, msg, flags)
def AddSaInfo(self, src, dst, spi, mode, reqid, encryption, auth_trunc, aead,
- encap, mark, output_mark, is_update=False):
+ encap, mark, output_mark, is_update=False, xfrm_if_id=None):
"""Adds an IPsec security association.
Args:
@@ -463,6 +497,7 @@ class Xfrm(netlink.NetlinkSocket):
output_mark: An integer, the output mark. 0 means unset.
is_update: If true, update an existing SA otherwise create a new SA. For
compatibility reasons, this value defaults to False.
+ xfrm_if_id: The XFRM interface ID, or None.
"""
proto = IPPROTO_ESP
xfrm_id = XfrmId((PaddedAddress(dst), spi, proto))
@@ -488,6 +523,8 @@ class Xfrm(netlink.NetlinkSocket):
nlattrs += self._NlAttr(XFRMA_ENCAP, encap.Pack())
if output_mark is not None:
nlattrs += self._NlAttrU32(XFRMA_OUTPUT_MARK, output_mark)
+ if xfrm_if_id is not None:
+ nlattrs += self._NlAttrU32(XFRMA_IF_ID, xfrm_if_id)
# The kernel ignores these on input, so make them empty.
cur = XfrmLifetimeCur()
@@ -519,7 +556,7 @@ class Xfrm(netlink.NetlinkSocket):
nl_msg_type = XFRM_MSG_UPDSA if is_update else XFRM_MSG_NEWSA
self._SendNlRequest(nl_msg_type, msg, flags)
- def DeleteSaInfo(self, dst, spi, proto, mark=None):
+ def DeleteSaInfo(self, dst, spi, proto, mark=None, xfrm_if_id=None):
"""Delete an SA from the SAD
Args:
@@ -530,12 +567,13 @@ class Xfrm(netlink.NetlinkSocket):
mark: A mark match specifier, such as returned by ExactMatchMark(), or
None for an SA without a Mark attribute.
"""
- # TODO: deletes take a mark as well.
family = AF_INET6 if ":" in dst else AF_INET
usersa_id = XfrmUsersaId((PaddedAddress(dst), spi, family, proto))
nlattrs = []
if mark is not None:
nlattrs.append((XFRMA_MARK, mark))
+ if xfrm_if_id is not None:
+ nlattrs.append((XFRMA_IF_ID, struct.pack("=I", xfrm_if_id)))
self.SendXfrmNlRequest(XFRM_MSG_DELSA, usersa_id, nlattrs)
def AllocSpi(self, dst, proto, min_spi, max_spi):
@@ -592,7 +630,7 @@ class Xfrm(netlink.NetlinkSocket):
self._SendNlRequest(XFRM_MSG_FLUSHSA, usersa_flush.Pack(), flags)
def CreateTunnel(self, direction, selector, src, dst, spi, encryption,
- auth_trunc, mark, output_mark):
+ auth_trunc, mark, output_mark, xfrm_if_id):
"""Create an XFRM Tunnel Consisting of a Policy and an SA.
Create a unidirectional XFRM tunnel, which entails one Policy and one
@@ -614,11 +652,12 @@ class Xfrm(netlink.NetlinkSocket):
unspecified.
output_mark: The mark used to select the underlying network for packets
outbound from xfrm. None means unspecified.
+ xfrm_if_id: The ID of the XFRM interface to use or None.
"""
outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst))
self.AddSaInfo(src, dst, spi, XFRM_MODE_TUNNEL, 0, encryption, auth_trunc,
- None, None, mark, output_mark)
+ None, None, mark, output_mark, xfrm_if_id=xfrm_if_id)
if selector is None:
selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)]
@@ -628,16 +667,19 @@ class Xfrm(netlink.NetlinkSocket):
for selector in selectors:
policy = UserPolicy(direction, selector)
tmpl = UserTemplate(outer_family, spi, 0, (src, dst))
- self.AddPolicyInfo(policy, tmpl, mark)
+ self.AddPolicyInfo(policy, tmpl, mark, xfrm_if_id=xfrm_if_id)
+
+ def DeleteTunnel(self, direction, selector, dst, spi, mark, xfrm_if_id):
+ if mark is not None:
+ mark = ExactMatchMark(mark)
- def DeleteTunnel(self, direction, selector, dst, spi, mark):
- self.DeleteSaInfo(dst, spi, IPPROTO_ESP, ExactMatchMark(mark))
+ self.DeleteSaInfo(dst, spi, IPPROTO_ESP, mark, xfrm_if_id)
if selector is None:
selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)]
else:
selectors = [selector]
for selector in selectors:
- self.DeletePolicyInfo(selector, direction, ExactMatchMark(mark))
+ self.DeletePolicyInfo(selector, direction, mark, xfrm_if_id)
if __name__ == "__main__":
diff --git a/net/test/xfrm_algorithm_test.py b/net/test/xfrm_algorithm_test.py
index 6adc461..0176265 100755
--- a/net/test/xfrm_algorithm_test.py
+++ b/net/test/xfrm_algorithm_test.py
@@ -27,6 +27,7 @@ import unittest
import multinetwork_base
import net_test
from tun_twister import TapTwister
+import util
import xfrm
import xfrm_base
@@ -72,49 +73,26 @@ AEAD_ALGOS = [
]
def InjectTests():
- XfrmAlgorithmTest.InjectTests()
+ XfrmAlgorithmTest.InjectTests()
+
class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
@classmethod
def InjectTests(cls):
- """Inject parameterized test cases into this class.
-
- Because a library for parameterized testing is not availble in
- net_test.rootfs.20150203, this does a minimal parameterization.
-
- This finds methods named like "ParamTestFoo" and replaces them with several
- "testFoo(*)" methods taking different parameter dicts. A set of test
- parameters is generated from every combination of encryption,
- authentication, IP version, and TCP/UDP.
-
- The benefit of this approach is that an individually failing tests have a
- clearly separated stack trace, and one failed test doesn't prevent the rest
- from running.
- """
- param_test_names = [
- name for name in dir(cls) if name.startswith("ParamTest")
- ]
VERSIONS = (4, 6)
TYPES = (SOCK_DGRAM, SOCK_STREAM)
# Tests all combinations of auth & crypt. Mutually exclusive with aead.
- for crypt, auth, version, proto, name in itertools.product(
- CRYPT_ALGOS, AUTH_ALGOS, VERSIONS, TYPES, param_test_names):
- XfrmAlgorithmTest.InjectSingleTest(name, version, proto, crypt=crypt, auth=auth)
+ param_list = itertools.product(VERSIONS, TYPES, AUTH_ALGOS, CRYPT_ALGOS,
+ [None])
+ util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator)
# Tests all combinations of aead. Mutually exclusive with auth/crypt.
- for aead, version, proto, name in itertools.product(
- AEAD_ALGOS, VERSIONS, TYPES, param_test_names):
- XfrmAlgorithmTest.InjectSingleTest(name, version, proto, aead=aead)
-
- @classmethod
- def InjectSingleTest(cls, name, version, proto, crypt=None, auth=None, aead=None):
- func = getattr(cls, name)
-
- def TestClosure(self):
- func(self, {"crypt": crypt, "auth": auth, "aead": aead,
- "version": version, "proto": proto})
+ param_list = itertools.product(VERSIONS, TYPES, [None], [None], AEAD_ALGOS)
+ util.InjectParameterizedTest(cls, param_list, cls.TestNameGenerator)
+ @staticmethod
+ def TestNameGenerator(version, proto, auth, crypt, aead):
# Produce a unique and readable name for each test. e.g.
# testSocketPolicySimple_cbc-aes_256_hmac-sha512_512_256_IPv6_UDP
param_string = ""
@@ -131,12 +109,9 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
param_string += "%s_%s" % ("IPv4" if version == 4 else "IPv6",
"UDP" if proto == SOCK_DGRAM else "TCP")
- new_name = "%s_%s" % (func.__name__.replace("ParamTest", "test"),
- param_string)
- new_name = new_name.replace("(", "-").replace(")", "") # remove parens
- setattr(cls, new_name, TestClosure)
+ return param_string
- def ParamTestSocketPolicySimple(self, params):
+ def ParamTestSocketPolicySimple(self, version, proto, auth, crypt, aead):
"""Test two-way traffic using transport mode and socket policies."""
def AssertEncrypted(packet):
@@ -153,37 +128,21 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
# other using transport mode ESP. Because of TapTwister, both sockets
# perceive each other as owning "remote_addr".
netid = self.RandomNetid()
- family = net_test.GetAddressFamily(params["version"])
- local_addr = self.MyAddress(params["version"], netid)
- remote_addr = self.GetRemoteSocketAddress(params["version"])
- crypt_left = (xfrm.XfrmAlgo((
- params["crypt"].name,
- params["crypt"].key_len)),
- os.urandom(params["crypt"].key_len / 8)) if params["crypt"] else None
- crypt_right = (xfrm.XfrmAlgo((
- params["crypt"].name,
- params["crypt"].key_len)),
- os.urandom(params["crypt"].key_len / 8)) if params["crypt"] else None
- auth_left = (xfrm.XfrmAlgoAuth((
- params["auth"].name,
- params["auth"].key_len,
- params["auth"].trunc_len)),
- os.urandom(params["auth"].key_len / 8)) if params["auth"] else None
- auth_right = (xfrm.XfrmAlgoAuth((
- params["auth"].name,
- params["auth"].key_len,
- params["auth"].trunc_len)),
- os.urandom(params["auth"].key_len / 8)) if params["auth"] else None
- aead_left = (xfrm.XfrmAlgoAead((
- params["aead"].name,
- params["aead"].key_len,
- params["aead"].icv_len)),
- os.urandom(params["aead"].key_len / 8)) if params["aead"] else None
- aead_right = (xfrm.XfrmAlgoAead((
- params["aead"].name,
- params["aead"].key_len,
- params["aead"].icv_len)),
- os.urandom(params["aead"].key_len / 8)) if params["aead"] else None
+ family = net_test.GetAddressFamily(version)
+ local_addr = self.MyAddress(version, netid)
+ remote_addr = self.GetRemoteSocketAddress(version)
+ auth_left = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)),
+ os.urandom(auth.key_len / 8)) if auth else None
+ auth_right = (xfrm.XfrmAlgoAuth((auth.name, auth.key_len, auth.trunc_len)),
+ os.urandom(auth.key_len / 8)) if auth else None
+ crypt_left = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)),
+ os.urandom(crypt.key_len / 8)) if crypt else None
+ crypt_right = (xfrm.XfrmAlgo((crypt.name, crypt.key_len)),
+ os.urandom(crypt.key_len / 8)) if crypt else None
+ aead_left = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)),
+ os.urandom(aead.key_len / 8)) if aead else None
+ aead_right = (xfrm.XfrmAlgoAead((aead.name, aead.key_len, aead.icv_len)),
+ os.urandom(aead.key_len / 8)) if aead else None
spi_left = 0xbeefface
spi_right = 0xcafed00d
req_ids = [100, 200, 300, 400] # Used to match templates and SAs.
@@ -242,20 +201,20 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
output_mark=None)
# Make two sockets.
- sock_left = socket(family, params["proto"], 0)
+ sock_left = socket(family, proto, 0)
sock_left.settimeout(2.0)
sock_left.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.SelectInterface(sock_left, netid, "mark")
- sock_right = socket(family, params["proto"], 0)
+ sock_right = socket(family, proto, 0)
sock_right.settimeout(2.0)
sock_right.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.SelectInterface(sock_right, netid, "mark")
# For UDP, set SO_LINGER to 0, to prevent TCP sockets from hanging around
# in a TIME_WAIT state.
- if params["proto"] == SOCK_STREAM:
- net_test.DisableFinWait(sock_left)
- net_test.DisableFinWait(sock_right)
+ if proto == SOCK_STREAM:
+ net_test.DisableFinWait(sock_left)
+ net_test.DisableFinWait(sock_right)
# Apply the left outbound socket policy.
xfrm_base.ApplySocketPolicy(sock_left, family, xfrm.XFRM_POLICY_OUT,
@@ -302,14 +261,14 @@ class XfrmAlgorithmTest(xfrm_base.XfrmLazyTest):
sock.close()
# Server and client need to know each other's port numbers in advance.
- wildcard_addr = net_test.GetWildcardAddress(params["version"])
+ wildcard_addr = net_test.GetWildcardAddress(version)
sock_left.bind((wildcard_addr, 0))
sock_right.bind((wildcard_addr, 0))
left_port = sock_left.getsockname()[1]
right_port = sock_right.getsockname()[1]
# Start the appropriate server type on sock_right.
- target = TcpServer if params["proto"] == SOCK_STREAM else UdpServer
+ target = TcpServer if proto == SOCK_STREAM else UdpServer
server = threading.Thread(
target=target,
args=(sock_right, left_port),
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 24f9edc..3a3d9b0 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -640,14 +640,14 @@ class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
self.assertEquals(attributes['XFRMA_TMPL'], tmpl)
# Create a new policy using update.
- self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark)
+ self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark, None)
# NEWPOLICY will not update the existing policy. This checks both that
# UPDPOLICY created a policy and that NEWPOLICY will not perform updates.
_CheckTemplateMatch(tmpl1)
with self.assertRaisesErrno(EEXIST):
- self.xfrm.AddPolicyInfo(policy, tmpl2, mark)
+ self.xfrm.AddPolicyInfo(policy, tmpl2, mark, None)
# Update the policy using UPDPOLICY.
- self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark)
+ self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark, None)
# There should only be one policy after update, and it should have the
# updated template.
_CheckTemplateMatch(tmpl2)
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index 77cc8b3..5f243cc 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -19,32 +19,59 @@ from errno import * # pylint: disable=wildcard-import
from socket import * # pylint: disable=wildcard-import
import random
+import itertools
import struct
import unittest
+from scapy import all as scapy
from tun_twister import TunTwister
import csocket
import iproute
import multinetwork_base
import net_test
import packets
+import util
import xfrm
import xfrm_base
-# Parameters to Set up VTI as a special network
-_BASE_VTI_NETID = {4: 40, 6: 60}
+_LOOPBACK_IFINDEX = 1
+_TEST_XFRM_IFNAME = "ipsec42"
+_TEST_XFRM_IF_ID = 42
+
+# Does the kernel support xfrmi interfaces?
+def HaveXfrmInterfaces():
+ try:
+ i = iproute.IPRoute()
+ i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
+ _LOOPBACK_IFINDEX)
+ i.DeleteLink(_TEST_XFRM_IFNAME)
+ try:
+ i.GetIfIndex(_TEST_XFRM_IFNAME)
+ assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME
+ except IOError:
+ pass
+ return True
+ except IOError:
+ return False
+
+HAVE_XFRM_INTERFACES = HaveXfrmInterfaces()
+
+# Parameters to setup tunnels as special networks
+_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService
+_BASE_TUNNEL_NETID = {4: 40, 6: 60}
_BASE_VTI_OKEY = 2000000100
_BASE_VTI_IKEY = 2000000200
-_VTI_NETID = 50
-_VTI_IFNAME = "test_vti"
-
_TEST_OUT_SPI = 0x1234
_TEST_IN_SPI = _TEST_OUT_SPI
_TEST_OKEY = 2000000100
_TEST_IKEY = 2000000200
+_TEST_REMOTE_PORT = 1234
+
+_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6}
+
def _GetLocalInnerAddress(version):
return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
@@ -58,61 +85,147 @@ def _GetRemoteOuterAddress(version):
return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
+def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer,
+ src_port, dst_inner, dst_outer,
+ dst_port, spi, seq_num, ip_hdr_options={}):
+ ip_hdr_options.update({'src': src_inner, 'dst': dst_inner})
+
+ # Build and receive an ESP packet destined for the inner socket
+ IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
+ input_pkt = (
+ IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) /
+ net_test.UDP_PAYLOAD)
+ input_pkt = IpType(str(input_pkt)) # Compute length, checksum.
+ input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num,
+ (src_outer, dst_outer))
+
+ return input_pkt
+
+
+def _CreateReceiveSock(version, port=0):
+ # Create a socket to receive packets.
+ read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
+ read_sock.bind((net_test.GetWildcardAddress(version), port))
+ # The second parameter of the tuple is the port number regardless of AF.
+ local_port = read_sock.getsockname()[1]
+ # Guard against the eventuality of the receive failing.
+ net_test.SetNonBlocking(read_sock.fileno())
+
+ return read_sock, local_port
+
+
+def _SendPacket(testInstance, netid, version, remote, remote_port):
+ # Send a packet out via the tunnel-backed network, bound for the port number
+ # of the input socket.
+ write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
+ testInstance.SelectInterface(write_sock, netid, "mark")
+ write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port))
+ local_port = write_sock.getsockname()[1]
+
+ return local_port
+
+
+def InjectTests():
+ InjectParameterizedTests(XfrmTunnelTest)
+ InjectParameterizedTests(XfrmInterfaceTest)
+ InjectParameterizedTests(XfrmVtiTest)
+
+
+def InjectParameterizedTests(cls):
+ VERSIONS = (4, 6)
+ param_list = itertools.product(VERSIONS, VERSIONS)
+
+ def NameGenerator(*args):
+ return "IPv%d_in_IPv%d" % tuple(args)
+
+ util.InjectParameterizedTest(cls, param_list, NameGenerator)
+
+
class XfrmTunnelTest(xfrm_base.XfrmLazyTest):
- def _CheckTunnelOutput(self, inner_version, outer_version):
- """Test a bi-directional XFRM Tunnel with explicit selectors"""
+ def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid,
+ netid, local_inner, remote_inner, local_outer,
+ remote_outer, write_sock):
+
+ write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
+ self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
+ local_outer, remote_outer)
+
+ def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid,
+ netid, local_inner, remote_inner, local_outer,
+ remote_outer, read_sock):
+
+ # The second parameter of the tuple is the port number regardless of AF.
+ local_port = read_sock.getsockname()[1]
+
+ # Build and receive an ESP packet destined for the inner socket
+ input_pkt = _GetNullAuthCryptTunnelModePkt(
+ inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT,
+ local_inner, local_outer, local_port, _TEST_IN_SPI, 1)
+ self.ReceivePacketOn(underlying_netid, input_pkt)
+
+ # Verify that the packet data and src are correct
+ data, src = read_sock.recvfrom(4096)
+ self.assertEquals(net_test.UDP_PAYLOAD, data)
+ self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2])
+
+ def _TestTunnel(self, inner_version, outer_version, func, direction):
+ """Test a unidirectional XFRM Tunnel with explicit selectors"""
# Select the underlying netid, which represents the external
# interface from/to which to route ESP packets.
- underlying_netid = self.RandomNetid()
+ u_netid = self.RandomNetid()
# Select a random netid that will originate traffic locally and
- # which represents the logical tunnel network.
- netid = self.RandomNetid(exclude=underlying_netid)
+ # which represents the netid on which the plaintext is sent
+ netid = self.RandomNetid(exclude=u_netid)
local_inner = self.MyAddress(inner_version, netid)
remote_inner = _GetRemoteInnerAddress(inner_version)
- local_outer = self.MyAddress(outer_version, underlying_netid)
+ local_outer = self.MyAddress(outer_version, u_netid)
remote_outer = _GetRemoteOuterAddress(outer_version)
+ # Create input/ouput SPs, SAs and sockets to simulate a more realistic
+ # environment.
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN,
+ xfrm.SrcDstSelector(remote_inner, local_inner),
+ remote_outer, local_outer, _TEST_IN_SPI,
+ xfrm_base._ALGO_CRYPT_NULL,
+ xfrm_base._ALGO_AUTH_NULL, None, None, None)
+
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT,
xfrm.SrcDstSelector(local_inner, remote_inner),
local_outer, remote_outer, _TEST_OUT_SPI,
xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- None, underlying_netid)
+ xfrm_base._ALGO_HMAC_SHA1, None, u_netid, None)
write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- # Select an interface, which provides the source address of the inner
- # packet.
self.SelectInterface(write_sock, netid, "mark")
- write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53))
- self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None,
- local_outer, remote_outer)
-
- # TODO: Add support for the input path.
+ read_sock, _ = _CreateReceiveSock(inner_version)
- def testIpv4InIpv4TunnelOutput(self):
- self._CheckTunnelOutput(4, 4)
+ sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
+ func(inner_version, outer_version, u_netid, netid, local_inner,
+ remote_inner, local_outer, remote_outer, sock)
- def testIpv4InIpv6TunnelOutput(self):
- self._CheckTunnelOutput(4, 6)
+ def ParamTestTunnelInput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput,
+ xfrm.XFRM_POLICY_IN)
- def testIpv6InIpv4TunnelOutput(self):
- self._CheckTunnelOutput(6, 4)
-
- def testIpv6InIpv6TunnelOutput(self):
- self._CheckTunnelOutput(6, 6)
+ def ParamTestTunnelOutput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
+ xfrm.XFRM_POLICY_OUT)
@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
- def verifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, ikey, okey):
+ def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr,
+ ikey, okey):
self.assertEquals(vti_info_data["IFLA_VTI_IKEY"], ikey)
self.assertEquals(vti_info_data["IFLA_VTI_OKEY"], okey)
family = AF_INET if version == 4 else AF_INET6
- self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), local_addr)
- self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), remote_addr)
+ self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]),
+ local_addr)
+ self.assertEquals(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]),
+ remote_addr)
def testAddVti(self):
"""Test the creation of a Virtual Tunnel Interface."""
@@ -120,37 +233,37 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
netid = self.RandomNetid()
local_addr = self.MyAddress(version, netid)
self.iproute.CreateVirtualTunnelInterface(
- dev_name=_VTI_IFNAME,
+ dev_name=_TEST_XFRM_IFNAME,
local_addr=local_addr,
remote_addr=_GetRemoteOuterAddress(version),
o_key=_TEST_OKEY,
i_key=_TEST_IKEY)
- self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME),
- version, local_addr, _GetRemoteOuterAddress(version),
- _TEST_IKEY, _TEST_OKEY)
+ self._VerifyVtiInfoData(
+ self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
+ _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY)
new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
- new_okey = _TEST_OKEY + _VTI_NETID
- new_ikey = _TEST_IKEY + _VTI_NETID
+ new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID
+ new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID
self.iproute.CreateVirtualTunnelInterface(
- dev_name=_VTI_IFNAME,
+ dev_name=_TEST_XFRM_IFNAME,
local_addr=local_addr,
remote_addr=new_remote_addr[version],
o_key=new_okey,
i_key=new_ikey,
is_update=True)
- self.verifyVtiInfoData(self.iproute.GetVtiInfoData(_VTI_IFNAME),
- version, local_addr, new_remote_addr[version],
- new_ikey, new_okey)
+ self._VerifyVtiInfoData(
+ self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
+ new_remote_addr[version], new_ikey, new_okey)
- if_index = self.iproute.GetIfIndex(_VTI_IFNAME)
+ if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
# Validate that the netlink interface matches the ioctl interface.
- self.assertEquals(net_test.GetInterfaceIndex(_VTI_IFNAME), if_index)
- self.iproute.DeleteLink(_VTI_IFNAME)
+ self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
+ self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
with self.assertRaises(IOError):
- self.iproute.GetIfIndex(_VTI_IFNAME)
+ self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
def _QuietDeleteLink(self, ifname):
try:
@@ -161,100 +274,204 @@ class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest):
def tearDown(self):
super(XfrmAddDeleteVtiTest, self).tearDown()
- self._QuietDeleteLink(_VTI_IFNAME)
+ self._QuietDeleteLink(_TEST_XFRM_IFNAME)
+
+
+class SaInfo(object):
+
+ def __init__(self, spi):
+ self.spi = spi
+ self.seq_num = 1
-class VtiInterface(object):
+class IpSecBaseInterface(object):
- def __init__(self, iface, netid, underlying_netid, local, remote):
+ def __init__(self, iface, netid, underlying_netid, local, remote, version):
self.iface = iface
self.netid = netid
self.underlying_netid = underlying_netid
self.local, self.remote = local, remote
+
+ # XFRM interfaces technically do not have a version. This keeps track of
+ # the IP version of the local and remote addresses.
+ self.version = version
self.rx = self.tx = 0
- self.ikey = _TEST_IKEY + netid
- self.okey = _TEST_OKEY + netid
- self.out_spi = self.in_spi = random.randint(0, 0x7fffffff)
+ self.addrs = {}
self.iproute = iproute.IPRoute()
self.xfrm = xfrm.Xfrm()
- self.SetupInterface()
- self.SetupXfrm()
- self.addrs = {}
-
def Teardown(self):
self.TeardownXfrm()
self.TeardownInterface()
- def SetupInterface(self):
- self.iproute.CreateVirtualTunnelInterface(
- self.iface, self.local, self.remote, self.ikey, self.okey)
-
def TeardownInterface(self):
self.iproute.DeleteLink(self.iface)
- def SetupXfrm(self):
+ def SetupXfrm(self, use_null_crypt):
+ rand_spi = random.randint(0, 0x7fffffff)
+ self.in_sa = SaInfo(rand_spi)
+ self.out_sa = SaInfo(rand_spi)
+
+ # Select algorithms:
+ if use_null_crypt:
+ auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL
+ else:
+ auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256
+
+ self._SetupXfrmByType(auth, crypt)
+
+ def TeardownXfrm(self):
+ raise NotImplementedError("Subclasses should implement this")
+
+ def _SetupXfrmByType(self, auth_algo, crypt_algo):
+ raise NotImplementedError("Subclasses should implement this")
+
+
+class VtiInterface(IpSecBaseInterface):
+
+ def __init__(self, iface, netid, underlying_netid, _, local, remote, version):
+ super(VtiInterface, self).__init__(iface, netid, underlying_netid, local,
+ remote, version)
+
+ self.ikey = _TEST_IKEY + netid
+ self.okey = _TEST_OKEY + netid
+
+ self.SetupInterface()
+ self.SetupXfrm(False)
+
+ def SetupInterface(self):
+ return self.iproute.CreateVirtualTunnelInterface(
+ self.iface, self.local, self.remote, self.ikey, self.okey)
+
+ def _SetupXfrmByType(self, auth_algo, crypt_algo):
# For the VTI, the selectors are wildcard since packets will only
# be selected if they have the appropriate mark, hence the inner
# addresses are wildcard.
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
- self.out_spi, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
+ self.out_sa.spi, crypt_algo, auth_algo,
xfrm.ExactMatchMark(self.okey),
- self.underlying_netid)
+ self.underlying_netid, None)
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
- self.in_spi, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(self.ikey), None)
+ self.in_sa.spi, crypt_algo, auth_algo,
+ xfrm.ExactMatchMark(self.ikey), None, None)
def TeardownXfrm(self):
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
- self.out_spi, self.okey)
+ self.out_sa.spi, self.okey, None)
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
- self.in_spi, self.ikey)
+ self.in_sa.spi, self.ikey, None)
+
+
+@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
+class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest):
+ """Test the creation of an XFRM Interface."""
+
+ def testAddXfrmInterface(self):
+ self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID,
+ _LOOPBACK_IFINDEX)
+ if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
+ net_test.SetInterfaceUp(_TEST_XFRM_IFNAME)
+
+ # Validate that the netlink interface matches the ioctl interface.
+ self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
+ self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
+ with self.assertRaises(IOError):
+ self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
+
+
+class XfrmInterface(IpSecBaseInterface):
+
+ def __init__(self, iface, netid, underlying_netid, ifindex, local, remote,
+ version):
+ super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local,
+ remote, version)
+
+ self.ifindex = ifindex
+ self.xfrm_if_id = netid
+
+ self.SetupInterface()
+ self.SetupXfrm(False)
+
+ def SetupInterface(self):
+ """Create an XFRM interface."""
+ return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex)
+
+ def _SetupXfrmByType(self, auth_algo, crypt_algo):
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
+ self.out_sa.spi, crypt_algo, auth_algo, None,
+ self.underlying_netid, self.xfrm_if_id)
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
+ self.in_sa.spi, crypt_algo, auth_algo, None, None,
+ self.xfrm_if_id)
+
+ def TeardownXfrm(self):
+ self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
+ self.out_sa.spi, None, self.xfrm_if_id)
+ self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
+ self.in_sa.spi, None, self.xfrm_if_id)
-@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
-class XfrmVtiTest(xfrm_base.XfrmBaseTest):
+
+class XfrmTunnelBase(xfrm_base.XfrmBaseTest):
@classmethod
def setUpClass(cls):
xfrm_base.XfrmBaseTest.setUpClass()
- # VTI interfaces use marks extensively, so configure realistic packet
+ # Tunnel interfaces use marks extensively, so configure realistic packet
# marking rules to make the test representative, make PMTUD work, etc.
cls.SetInboundMarks(True)
cls.SetMarkReflectSysctls(1)
- cls.vtis = {}
+ # Group by tunnel version to ensure that we test at least one IPv4 and one
+ # IPv6 tunnel
+ cls.tunnelsV4 = {}
+ cls.tunnelsV6 = {}
for i, underlying_netid in enumerate(cls.tuns):
for version in 4, 6:
- netid = _BASE_VTI_NETID[version] + i
+ netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i
iface = "ipsec%s" % netid
local = cls.MyAddress(version, underlying_netid)
if version == 4:
- remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR
+ remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2)
else:
- remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR
- vti = VtiInterface(iface, netid, underlying_netid, local, remote)
+ remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2)
+
+ ifindex = cls.ifindices[underlying_netid]
+ tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex,
+ local, remote, version)
cls._SetInboundMarking(netid, iface, True)
- cls._SetupVtiNetwork(vti, True)
- cls.vtis[netid] = vti
+ cls._SetupTunnelNetwork(tunnel, True)
+
+ # On slower platforms, the test does not complete before the delay probe time fires.
+ # This causes the test to fail because of the unexpected NUD packet. b/123202162
+ if version == 6:
+ cls.SetSysctl("/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time"
+ % cls.GetInterfaceName(underlying_netid) , 10)
+ if version == 4:
+ cls.tunnelsV4[netid] = tunnel
+ else:
+ cls.tunnelsV6[netid] = tunnel
@classmethod
def tearDownClass(cls):
# The sysctls are restored by MultinetworkBaseTest.tearDownClass.
cls.SetInboundMarks(False)
- for vti in cls.vtis.values():
- cls._SetInboundMarking(vti.netid, vti.iface, False)
- cls._SetupVtiNetwork(vti, False)
- vti.Teardown()
+ for tunnel in cls.tunnelsV4.values() + cls.tunnelsV6.values():
+ cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
+ cls._SetupTunnelNetwork(tunnel, False)
+ tunnel.Teardown()
xfrm_base.XfrmBaseTest.tearDownClass()
+ def randomTunnel(self, outer_version):
+ version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
+ return random.choice(version_dict.values())
+
def setUp(self):
multinetwork_base.MultiNetworkBaseTest.setUp(self)
self.iproute = iproute.IPRoute()
+ self.xfrm = xfrm.Xfrm()
def tearDown(self):
multinetwork_base.MultiNetworkBaseTest.tearDown(self)
@@ -275,16 +492,23 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest):
net_test.AddressLengthBits(version), ifindex)
@classmethod
- def _SetupVtiNetwork(cls, vti, is_add):
- """Setup rules and routes for a VTI Network.
+ def _GetLocalAddress(cls, version, netid):
+ if version == 4:
+ return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET)
+ else:
+ return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1"
+
+ @classmethod
+ def _SetupTunnelNetwork(cls, tunnel, is_add):
+ """Setup rules and routes for a tunnel Network.
Takes an interface and depending on the boolean
value of is_add, either adds or removes the rules
- and routes for a VTI to behave like an Android
- Network for purposes of testing.
+ and routes for a tunnel interface to behave like an
+ Android Network for purposes of testing.
Args:
- vti: A VtiInterface, the VTI to set up.
+ tunnel: A VtiInterface or XfrmInterface, the tunnel to set up.
is_add: Boolean that causes this method to perform setup if True or
teardown if False
"""
@@ -292,32 +516,30 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest):
# Disable router solicitations to avoid occasional spurious packets
# arriving on the underlying network; there are two possible behaviors
# when that occurred: either only the RA packet is read, and when it
- # is echoed back to the VTI, it causes the test to fail by not receiving
- # the UDP_PAYLOAD; or, two packets may arrive on the underlying
- # network which fails the assertion that only one ESP packet is received.
+ # is echoed back to the tunnel, it causes the test to fail by not
+ # receiving # the UDP_PAYLOAD; or, two packets may arrive on the
+ # underlying # network which fails the assertion that only one ESP packet
+ # is received.
cls.SetSysctl(
- "/proc/sys/net/ipv6/conf/%s/router_solicitations" % vti.iface, 0)
- net_test.SetInterfaceUp(vti.iface)
+ "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0)
+ net_test.SetInterfaceUp(tunnel.iface)
for version in [4, 6]:
- ifindex = net_test.GetInterfaceIndex(vti.iface)
- table = vti.netid
+ ifindex = net_test.GetInterfaceIndex(tunnel.iface)
+ table = tunnel.netid
# Set up routing rules.
- start, end = cls.UidRangeForNetid(vti.netid)
+ start, end = cls.UidRangeForNetid(tunnel.netid)
cls.iproute.UidRangeRule(version, is_add, start, end, table,
cls.PRIORITY_UID)
- cls.iproute.OifRule(version, is_add, vti.iface, table, cls.PRIORITY_OIF)
- cls.iproute.FwmarkRule(version, is_add, vti.netid, cls.NETID_FWMASK,
+ cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF)
+ cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK,
table, cls.PRIORITY_FWMARK)
# Configure IP addresses.
- if version == 4:
- addr = cls._MyIPv4Address(vti.netid)
- else:
- addr = cls.OnlinkPrefix(6, vti.netid) + "1"
+ addr = cls._GetLocalAddress(version, tunnel.netid)
prefixlen = net_test.AddressLengthBits(version)
- vti.addrs[version] = addr
+ tunnel.addrs[version] = addr
if is_add:
cls.iproute.AddAddress(addr, prefixlen, ifindex)
cls.iproute.AddRoute(version, table, "default", 0, None, ifindex)
@@ -325,100 +547,238 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest):
cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
cls.iproute.DelAddress(addr, prefixlen, ifindex)
- def assertReceivedPacket(self, vti):
- vti.rx += 1
- self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
-
- def assertSentPacket(self, vti):
- vti.tx += 1
- self.assertEquals((vti.rx, vti.tx), self.iproute.GetRxTxPackets(vti.iface))
-
- # TODO: Should we completely re-write this using null encryption and null
- # authentication? We could then assemble and disassemble packets for each
- # direction individually. This approach would improve debuggability, avoid the
- # complexity of the twister, and allow the test to more-closely validate
- # deployable configurations.
- def _CheckVtiInputOutput(self, vti, inner_version):
- local_outer = vti.local
- remote_outer = vti.remote
-
- # Create a socket to receive packets.
- read_sock = socket(
- net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- read_sock.bind((net_test.GetWildcardAddress(inner_version), 0))
- # The second parameter of the tuple is the port number regardless of AF.
- port = read_sock.getsockname()[1]
- # Guard against the eventuality of the receive failing.
- csocket.SetSocketTimeout(read_sock, 100)
-
- # Send a packet out via the vti-backed network, bound for the port number
- # of the input socket.
- write_sock = socket(
- net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- self.SelectInterface(write_sock, vti.netid, "mark")
- write_sock.sendto(net_test.UDP_PAYLOAD,
- (_GetRemoteInnerAddress(inner_version), port))
+ def assertReceivedPacket(self, tunnel, sa_info):
+ tunnel.rx += 1
+ self.assertEquals((tunnel.rx, tunnel.tx),
+ self.iproute.GetRxTxPackets(tunnel.iface))
+ sa_info.seq_num += 1
+
+ def assertSentPacket(self, tunnel, sa_info):
+ tunnel.tx += 1
+ self.assertEquals((tunnel.rx, tunnel.tx),
+ self.iproute.GetRxTxPackets(tunnel.iface))
+ sa_info.seq_num += 1
+
+ def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner,
+ sa_info=None):
+ """Test null-crypt input path over an IPsec interface."""
+ if sa_info is None:
+ sa_info = tunnel.in_sa
+ read_sock, local_port = _CreateReceiveSock(inner_version)
+
+ input_pkt = _GetNullAuthCryptTunnelModePkt(
+ inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT,
+ local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num)
+ self.ReceivePacketOn(tunnel.underlying_netid, input_pkt)
+
+ # Verify that the packet data and src are correct
+ self.assertReceivedPacket(tunnel, sa_info)
+ data, src = read_sock.recvfrom(4096)
+ self.assertEquals(net_test.UDP_PAYLOAD, data)
+ self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2])
+
+ def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
+ remote_inner, sa_info=None):
+ """Test null-crypt output path over an IPsec interface."""
+ if sa_info is None:
+ sa_info = tunnel.out_sa
+ local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
+ _TEST_REMOTE_PORT)
# Read a tunneled IP packet on the underlying (outbound) network
# verifying that it is an ESP packet.
- self.assertSentPacket(vti)
- pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
- local_outer, remote_outer)
-
- # Perform an address switcheroo so that the inner address of the remote
- # end of the tunnel is now the address on the local VTI interface; this
- # way, the twisted inner packet finds a destination via the VTI once
- # decrypted.
- remote = _GetRemoteInnerAddress(inner_version)
- local = vti.addrs[inner_version]
- self._SwapInterfaceAddress(vti.iface, new_addr=remote, old_addr=local)
+ pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
+ sa_info.seq_num, None, tunnel.local,
+ tunnel.remote)
+
+ # Get and update the IP headers on the inner payload so that we can do a simple
+ # comparison of byte data. Unfortunately, due to the scapy version this runs on,
+ # we cannot parse past the ESP header to the inner IP header, and thus have to
+ # workaround in this manner
+ if inner_version == 4:
+ ip_hdr_options = {
+ 'id': scapy.IP(str(pkt.payload)[8:]).id,
+ 'flags': scapy.IP(str(pkt.payload)[8:]).flags
+ }
+ else:
+ ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl}
+
+ expected = _GetNullAuthCryptTunnelModePkt(
+ inner_version, local_inner, tunnel.local, local_port, remote_inner,
+ tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num,
+ ip_hdr_options)
+
+ # Check outer header manually (Avoids having to overwrite outer header's
+ # id, flags or flow label)
+ self.assertSentPacket(tunnel, sa_info)
+ self.assertEquals(expected.src, pkt.src)
+ self.assertEquals(expected.dst, pkt.dst)
+ self.assertEquals(len(expected), len(pkt))
+
+ # Check everything else
+ self.assertEquals(str(expected.payload), str(pkt.payload))
+
+ def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
+ remote_inner):
+ """Test both input and output paths over an encrypted IPsec interface.
+
+ This tests specifically makes sure that the both encryption and decryption
+ work together, as opposed to the _CheckTunnel(Input|Output) where the
+ input and output paths are tested separately, and using null encryption.
+ """
+ src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
+ _TEST_REMOTE_PORT)
+
+ # Make sure it appeared on the underlying interface
+ pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi,
+ tunnel.out_sa.seq_num, None, tunnel.local,
+ tunnel.remote)
+
+ # Check that packet is not sent in plaintext
+ self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt))
+
+ # Check src/dst
+ self.assertEquals(tunnel.local, pkt.src)
+ self.assertEquals(tunnel.remote, pkt.dst)
+
+ # Check that the interface statistics recorded the outbound packet
+ self.assertSentPacket(tunnel, tunnel.out_sa)
+
try:
- # Swap the packet's IP headers and write it back to the
- # underlying network.
+ # Swap the interface addresses to pretend we are the remote
+ self._SwapInterfaceAddress(
+ tunnel.iface, new_addr=remote_inner, old_addr=local_inner)
+
+ # Swap the packet's IP headers and write it back to the underlying
+ # network.
pkt = TunTwister.TwistPacket(pkt)
- self.ReceivePacketOn(vti.underlying_netid, pkt)
- # Receive the decrypted packet on the dest port number.
- read_packet = read_sock.recv(4096)
- self.assertEquals(read_packet, net_test.UDP_PAYLOAD)
- self.assertReceivedPacket(vti)
- finally:
- # Unwind the switcheroo
- self._SwapInterfaceAddress(vti.iface, new_addr=local, old_addr=remote)
+ read_sock, local_port = _CreateReceiveSock(inner_version,
+ _TEST_REMOTE_PORT)
+ self.ReceivePacketOn(tunnel.underlying_netid, pkt)
+ # Verify that the packet data and src are correct
+ data, src = read_sock.recvfrom(4096)
+ self.assertEquals(net_test.UDP_PAYLOAD, data)
+ self.assertEquals((local_inner, src_port), src[:2])
+
+ # Check that the interface statistics recorded the inbound packet
+ self.assertReceivedPacket(tunnel, tunnel.in_sa)
+ finally:
+ # Swap the interface addresses to pretend we are the remote
+ self._SwapInterfaceAddress(
+ tunnel.iface, new_addr=local_inner, old_addr=remote_inner)
+
+ def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner,
+ sa_info=None):
+ """Test ICMP error path over an IPsec interface."""
+ if sa_info is None:
+ sa_info = tunnel.out_sa
# Now attempt to provoke an ICMP error.
# TODO: deduplicate with multinetwork_test.py.
- version = net_test.GetAddressVersion(vti.remote)
dst_prefix, intermediate = {
4: ("172.19.", "172.16.9.12"),
6: ("2001:db8::", "2001:db8::1")
- }[version]
+ }[tunnel.version]
- write_sock.sendto(net_test.UDP_PAYLOAD,
- (_GetRemoteInnerAddress(inner_version), port))
- self.assertSentPacket(vti)
- pkt = self._ExpectEspPacketOn(vti.underlying_netid, vti.out_spi, vti.tx, None,
- local_outer, remote_outer)
- myaddr = self.MyAddress(version, vti.underlying_netid)
- _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt)
- self.ReceivePacketOn(vti.underlying_netid, toobig)
+ local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
+ _TEST_REMOTE_PORT)
+ pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
+ sa_info.seq_num, None, tunnel.local,
+ tunnel.remote)
+ self.assertSentPacket(tunnel, sa_info)
+
+ myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid)
+ _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr,
+ pkt)
+ self.ReceivePacketOn(tunnel.underlying_netid, toobig)
# Check that the packet too big reduced the MTU.
- routes = self.iproute.GetRoutes(vti.remote, 0, vti.underlying_netid, None)
+ routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None)
self.assertEquals(1, len(routes))
rtmsg, attributes = routes[0]
self.assertEquals(iproute.RTN_UNICAST, rtmsg.type)
self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
# Clear PMTU information so that future tests don't have to worry about it.
- self.InvalidateDstCache(version, vti.underlying_netid)
+ self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)
+
+ def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner,
+ remote_inner):
+ """Test combined encryption path with ICMP errors over an IPsec tunnel"""
+ self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
+ remote_inner)
+ self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner)
+ self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
+ remote_inner)
+
+ def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
+ """Bootstrap method to setup and run tests for the given parameters."""
+ tunnel = self.randomTunnel(outer_version)
+
+ try:
+ tunnel.TeardownXfrm()
+ tunnel.SetupXfrm(use_null_crypt)
+
+ local_inner = tunnel.addrs[inner_version]
+ remote_inner = _GetRemoteInnerAddress(inner_version)
+
+ # Run twice to ensure sequence numbers are tested
+ for i in range(2):
+ func(tunnel, inner_version, local_inner, remote_inner)
+ finally:
+ if use_null_crypt:
+ tunnel.TeardownXfrm()
+ tunnel.SetupXfrm(False)
+
+
+@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
+class XfrmVtiTest(XfrmTunnelBase):
+
+ INTERFACE_CLASS = VtiInterface
+
+ def ParamTestVtiInput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
+
+ def ParamTestVtiOutput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
+ True)
+
+ def ParamTestVtiInOutEncrypted(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
+ False)
+
+ def ParamTestVtiIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
+
+ def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version,
+ self._CheckTunnelEncryptionWithIcmp, False)
+
+
+@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
+class XfrmInterfaceTest(XfrmTunnelBase):
+
+ INTERFACE_CLASS = XfrmInterface
+
+ def ParamTestXfrmIntfInput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
+
+ def ParamTestXfrmIntfOutput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
+ True)
+
+ def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
+ False)
+
+ def ParamTestXfrmIntfIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
- def testVtiInputOutput(self):
- """Test packet input and output over a Virtual Tunnel Interface."""
- for i in xrange(3 * len(self.vtis.values())):
- vti = random.choice(self.vtis.values())
- self._CheckVtiInputOutput(vti, 4)
- self._CheckVtiInputOutput(vti, 6)
+ def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version,
+ self._CheckTunnelEncryptionWithIcmp, False)
if __name__ == "__main__":
+ InjectTests()
unittest.main()