summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenzo Colitti <lorenzo@google.com>2018-02-15 14:58:38 +0000
committerandroid-build-merger <android-build-merger@google.com>2018-02-15 14:58:38 +0000
commit7a5cd50e7016708dc1ddbb863f44cd19377345c6 (patch)
treeb6ac509af501858d4ed94ce92cbbfdf4fa8a62df
parent4179f9493d456a3a7e7c8fa1a895007004a6a9d5 (diff)
parent54e55c9cf14adc6f777c9fbcfde1bec80651e050 (diff)
downloadtests-7a5cd50e7016708dc1ddbb863f44cd19377345c6.tar.gz
Move some utility code from xfrm_base to xfrm.
am: 54e55c9cf1 Change-Id: I2919458b4be0ae4956c00bff306f07f105709978
-rwxr-xr-xnet/test/xfrm.py103
-rw-r--r--net/test/xfrm_base.py112
-rwxr-xr-xnet/test/xfrm_test.py16
-rwxr-xr-xnet/test/xfrm_tunnel_test.py31
4 files changed, 130 insertions, 132 deletions
diff --git a/net/test/xfrm.py b/net/test/xfrm.py
index 04a434d..93d15ac 100755
--- a/net/test/xfrm.py
+++ b/net/test/xfrm.py
@@ -215,6 +215,7 @@ EspHdr = cstruct.Struct("EspHdr", "!II", "spi seqnum")
# Local constants.
_DEFAULT_REPLAY_WINDOW = 4
+ALL_ALGORITHMS = 0xffffffff
def RawAddress(addr):
@@ -231,6 +232,9 @@ def PaddedAddress(addr):
return padded
+XFRM_ADDR_ANY = PaddedAddress("::")
+
+
def EmptySelector(family):
"""A selector that matches all packets of the specified address family."""
return XfrmSelector(family=family)
@@ -249,6 +253,66 @@ def SrcDstSelector(src, dst):
prefixlen_s=prefixlen, prefixlen_d=prefixlen, family=family)
+def UserPolicy(direction, selector):
+ """Create an IPsec policy.
+
+ Args:
+ direction: XFRM_POLICY_IN or XFRM_POLICY_OUT
+ selector: An XfrmSelector, the packets to transform.
+
+ Return: a XfrmUserpolicyInfo cstruct.
+ """
+ # Create a user policy that specifies that all packets in the specified
+ # direction matching the selector should be encrypted.
+ return XfrmUserpolicyInfo(
+ sel=selector,
+ lft=NO_LIFETIME_CFG,
+ curlft=NO_LIFETIME_CUR,
+ dir=direction,
+ action=XFRM_POLICY_ALLOW,
+ flags=XFRM_POLICY_LOCALOK,
+ share=XFRM_SHARE_UNIQUE)
+
+
+def UserTemplate(family, spi, reqid, tun_addrs):
+ """Create an ESP policy and template.
+
+ Args:
+ spi: 32-bit SPI in host byte order
+ reqid: 32-bit ID matched against SAs
+ tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None
+ to request a transport mode SA.
+
+ Return: a tuple of XfrmUserpolicyInfo, XfrmUserTmpl
+ """
+ # For transport mode, set template source and destination are empty.
+ # For tunnel mode, explicitly specify source and destination addresses.
+ if tun_addrs is None:
+ mode = XFRM_MODE_TRANSPORT
+ saddr = XFRM_ADDR_ANY
+ daddr = XFRM_ADDR_ANY
+ else:
+ mode = XFRM_MODE_TUNNEL
+ saddr = PaddedAddress(tun_addrs[0])
+ daddr = PaddedAddress(tun_addrs[1])
+
+ # Create a template that specifies the SPI and the protocol.
+ xfrmid = XfrmId(daddr=daddr, spi=spi, proto=IPPROTO_ESP)
+ template = XfrmUserTmpl(
+ id=xfrmid,
+ family=family,
+ saddr=saddr,
+ reqid=reqid,
+ mode=mode,
+ share=XFRM_SHARE_UNIQUE,
+ optional=0, #require
+ aalgos=ALL_ALGORITHMS,
+ ealgos=ALL_ALGORITHMS,
+ calgos=ALL_ALGORITHMS)
+
+ return template
+
+
def ExactMatchMark(mark):
"""An XfrmMark that matches only the specified mark."""
return XfrmMark((mark, 0xffffffff))
@@ -527,6 +591,45 @@ class Xfrm(netlink.NetlinkSocket):
flags = netlink.NLM_F_REQUEST | netlink.NLM_F_ACK
self._SendNlRequest(XFRM_MSG_FLUSHSA, usersa_flush.Pack(), flags)
+ def CreateTunnel(self, direction, selector, src, dst, spi, encryption,
+ auth_trunc, mark, output_mark):
+ """Create an XFRM Tunnel Consisting of a Policy and an SA.
+
+ Create a unidirectional XFRM tunnel, which entails one Policy and one
+ security association.
+
+ Args:
+ direction: XFRM_POLICY_IN or XFRM_POLICY_OUT
+ selector: An XfrmSelector that specifies the packets to be transformed.
+ This is only applied to the policy; the selector in the SA is always
+ empty. If the passed-in selector is None, then the tunnel is made
+ dual-stack. This requires two policies, one for IPv4 and one for IPv6.
+ src: The source address of the tunneled packets
+ dst: The destination address of the tunneled packets
+ spi: The SPI for the IPsec SA that encapsulates the tunneled packet
+ encryption: A tuple (XfrmAlgo, key), the encryption parameters.
+ auth_trunc: A tuple (XfrmAlgoAuth, key), the authentication parameters.
+ mark: An XfrmMark, the mark used for selecting packets to be tunneled, and
+ for matching the security policy and security association. None means
+ unspecified.
+ output_mark: The mark used to select the underlying network for packets
+ outbound from xfrm. None means unspecified.
+ """
+ outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst))
+
+ self.AddSaInfo(src, dst, spi, XFRM_MODE_TUNNEL, 0, encryption, auth_trunc,
+ None, None, mark, output_mark)
+
+ if selector is None:
+ selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)]
+ else:
+ selectors = [selector]
+
+ for selector in selectors:
+ policy = UserPolicy(direction, selector)
+ tmpl = UserTemplate(outer_family, spi, 0, (src, dst))
+ self.AddPolicyInfo(policy, tmpl, mark)
+
if __name__ == "__main__":
x = Xfrm()
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index 7aba508..c9cfd49 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -39,68 +39,6 @@ _ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)),
# Match all bits of the mark
MARK_MASK_ALL = 0xffffffff
-ALL_ALGORITHMS = 0xffffffff
-
-XFRM_ADDR_ANY = xfrm.PaddedAddress("::")
-
-
-def UserPolicy(direction, selector):
- """Create an IPsec policy.
-
- Args:
- direction: XFRM_POLICY_IN or XFRM_POLICY_OUT
- selector: An XfrmSelector, the packets to transform.
-
- Return: a XfrmUserpolicyInfo cstruct.
- """
- # Create a user policy that specifies that all packets in the specified
- # direction matching the selector should be encrypted.
- return xfrm.XfrmUserpolicyInfo(
- sel=selector,
- lft=xfrm.NO_LIFETIME_CFG,
- curlft=xfrm.NO_LIFETIME_CUR,
- dir=direction,
- action=xfrm.XFRM_POLICY_ALLOW,
- flags=xfrm.XFRM_POLICY_LOCALOK,
- share=xfrm.XFRM_SHARE_UNIQUE)
-
-def UserTemplate(family, spi, reqid, tun_addrs):
- """Create an ESP policy and template.
-
- Args:
- spi: 32-bit SPI in host byte order
- reqid: 32-bit ID matched against SAs
- tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None
- to request a transport mode SA.
-
- Return: a tuple of XfrmUserpolicyInfo, XfrmUserTmpl
- """
- # For transport mode, set template source and destination are empty.
- # For tunnel mode, explicitly specify source and destination addresses.
- if tun_addrs is None:
- mode = xfrm.XFRM_MODE_TRANSPORT
- saddr = XFRM_ADDR_ANY
- daddr = XFRM_ADDR_ANY
- else:
- mode = xfrm.XFRM_MODE_TUNNEL
- saddr = xfrm.PaddedAddress(tun_addrs[0])
- daddr = xfrm.PaddedAddress(tun_addrs[1])
-
- # Create a template that specifies the SPI and the protocol.
- xfrmid = xfrm.XfrmId(daddr=daddr, spi=spi, proto=IPPROTO_ESP)
- template = xfrm.XfrmUserTmpl(
- id=xfrmid,
- family=family,
- saddr=saddr,
- reqid=reqid,
- mode=mode,
- share=xfrm.XFRM_SHARE_UNIQUE,
- optional=0, #require
- aalgos=ALL_ALGORITHMS,
- ealgos=ALL_ALGORITHMS,
- calgos=ALL_ALGORITHMS)
-
- return template
def SetPolicySockopt(sock, family, opt_data):
@@ -131,8 +69,8 @@ def ApplySocketPolicy(sock, family, direction, spi, reqid, tun_addrs):
selector = xfrm.EmptySelector(family)
# Create an XFRM policy and template.
- policy = UserPolicy(direction, selector)
- template = UserTemplate(family, spi, reqid, tun_addrs)
+ policy = xfrm.UserPolicy(direction, selector)
+ template = xfrm.UserTemplate(family, spi, reqid, tun_addrs)
# Set the policy and template on our socket.
opt_data = policy.Pack() + template.Pack()
@@ -374,49 +312,3 @@ class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest):
esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr)
self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr)
return packet
-
- def CreateTunnel(self, direction, selector, src, dst, spi, encryption,
- auth_trunc, mark, output_mark):
- """Create an XFRM Tunnel Consisting of a Policy and an SA.
-
- Create a unidirectional XFRM tunnel, which entails one Policy and one
- security association.
-
- Args:
- direction: XFRM_POLICY_IN or XFRM_POLICY_OUT
- selector: An XfrmSelector that specifies the packets to be transformed.
- This is only applied to the policy; the selector in the SA is always
- empty. If the passed-in selector is None, then the tunnel is made
- dual-stack. This requires two policies, one for IPv4 and one for IPv6.
- src: The source address of the tunneled packets
- dst: The destination address of the tunneled packets
- spi: The SPI for the IPsec SA that encapsulates the tunneled packet
- encryption: A tuple (XfrmAlgo, key), the encryption parameters.
- auth_trunc: A tuple (XfrmAlgoAuth, key), the authentication parameters.
- mark: An XfrmMark, the mark used for selecting packets to be tunneled, and
- for matching the security policy and security association. None means
- unspecified.
- output_mark: The mark used to select the underlying network for packets
- outbound from xfrm. None means unspecified.
- """
- outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst))
-
- self.xfrm.AddSaInfo(
- src, dst,
- spi, xfrm.XFRM_MODE_TUNNEL, 0,
- encryption,
- auth_trunc,
- None,
- None,
- mark,
- output_mark)
-
- if selector is None:
- selectors = [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]
- else:
- selectors = [selector]
-
- for selector in selectors:
- policy = UserPolicy(direction, selector)
- tmpl = UserTemplate(outer_family, spi, 0, (src, dst))
- self.xfrm.AddPolicyInfo(policy, tmpl, mark)
diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py
index 42faecc..ace49be 100755
--- a/net/test/xfrm_test.py
+++ b/net/test/xfrm_test.py
@@ -598,8 +598,8 @@ class XfrmFunctionalTest(xfrm_base.XfrmBaseTest):
mark1 = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL)
mark2 = xfrm.XfrmMark(mark=0xf00d, mask=xfrm_base.MARK_MASK_ALL)
# Create a global policy.
- policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
- tmpl = xfrm_base.UserTemplate(AF_UNSPEC, 0xfeed, 0, None)
+ policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
+ tmpl = xfrm.UserTemplate(AF_UNSPEC, 0xfeed, 0, None)
# Create the policy with the first mark.
self.xfrm.AddPolicyInfo(policy, tmpl, mark1)
# Create the same policy but with the second (different) mark.
@@ -617,10 +617,10 @@ class XfrmFunctionalTest(xfrm_base.XfrmBaseTest):
def _CheckUpdatePolicy(self, version):
"""Tests that we can can update the template on a policy."""
family = net_test.GetAddressFamily(version)
- tmpl1 = xfrm_base.UserTemplate(family, 0xdead, 0, None)
- tmpl2 = xfrm_base.UserTemplate(family, 0xbeef, 0, None)
+ tmpl1 = xfrm.UserTemplate(family, 0xdead, 0, None)
+ tmpl2 = xfrm.UserTemplate(family, 0xbeef, 0, None)
sel = xfrm.EmptySelector(family)
- policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
+ policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
mark = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL)
def _CheckTemplateMatch(tmpl):
@@ -652,12 +652,12 @@ class XfrmFunctionalTest(xfrm_base.XfrmBaseTest):
def _CheckPolicyDifferByDirection(self,version):
"""Tests that policies can differ only by direction."""
family = net_test.GetAddressFamily(version)
- tmpl = xfrm_base.UserTemplate(family, 0xdead, 0, None)
+ tmpl = xfrm.UserTemplate(family, 0xdead, 0, None)
sel = xfrm.EmptySelector(family)
mark = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL)
- policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
+ policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
self.xfrm.AddPolicyInfo(policy, tmpl, mark)
- policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_IN, sel)
+ policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_IN, sel)
self.xfrm.AddPolicyInfo(policy, tmpl, mark)
def testPolicyDifferByDirectionV4(self):
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index 7825a1e..6cb73c0 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -69,11 +69,12 @@ class XfrmTunnelTest(xfrm_base.XfrmBaseTest):
local_outer = self.MyAddress(outer_version, underlying_netid)
remote_outer = _GetRemoteOuterAddress(outer_version)
- self.CreateTunnel(xfrm.XFRM_POLICY_OUT,
- xfrm.SrcDstSelector(local_inner, remote_inner),
- local_outer, remote_outer, _TEST_OUT_SPI,
- xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1,
- None, underlying_netid)
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT,
+ xfrm.SrcDstSelector(local_inner, remote_inner),
+ local_outer, remote_outer, _TEST_OUT_SPI,
+ xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ None, underlying_netid)
write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
# Select an interface, which provides the source address of the inner
@@ -224,15 +225,17 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest):
# For the VTI, the selectors are wildcard since packets will only
# be selected if they have the appropriate mark, hence the inner
# addresses are wildcard.
- self.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer, remote_outer,
- _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(_TEST_OKEY), netid)
-
- self.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer, local_outer,
- _TEST_IN_SPI, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- xfrm.ExactMatchMark(_TEST_IKEY), None)
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer,
+ remote_outer, _TEST_OUT_SPI,
+ xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ xfrm.ExactMatchMark(_TEST_OKEY), netid)
+
+ self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer,
+ local_outer, _TEST_IN_SPI,
+ xfrm_base._ALGO_CBC_AES_256,
+ xfrm_base._ALGO_HMAC_SHA1,
+ xfrm.ExactMatchMark(_TEST_IKEY), None)
def _CheckVtiInputOutput(self, netid, vti_netid, iface, outer_version,
inner_version, rx, tx):