diff options
author | Lorenzo Colitti <lorenzo@google.com> | 2018-02-15 14:58:38 +0000 |
---|---|---|
committer | android-build-merger <android-build-merger@google.com> | 2018-02-15 14:58:38 +0000 |
commit | 7a5cd50e7016708dc1ddbb863f44cd19377345c6 (patch) | |
tree | b6ac509af501858d4ed94ce92cbbfdf4fa8a62df | |
parent | 4179f9493d456a3a7e7c8fa1a895007004a6a9d5 (diff) | |
parent | 54e55c9cf14adc6f777c9fbcfde1bec80651e050 (diff) | |
download | tests-7a5cd50e7016708dc1ddbb863f44cd19377345c6.tar.gz |
Move some utility code from xfrm_base to xfrm.
am: 54e55c9cf1
Change-Id: I2919458b4be0ae4956c00bff306f07f105709978
-rwxr-xr-x | net/test/xfrm.py | 103 | ||||
-rw-r--r-- | net/test/xfrm_base.py | 112 | ||||
-rwxr-xr-x | net/test/xfrm_test.py | 16 | ||||
-rwxr-xr-x | net/test/xfrm_tunnel_test.py | 31 |
4 files changed, 130 insertions, 132 deletions
diff --git a/net/test/xfrm.py b/net/test/xfrm.py index 04a434d..93d15ac 100755 --- a/net/test/xfrm.py +++ b/net/test/xfrm.py @@ -215,6 +215,7 @@ EspHdr = cstruct.Struct("EspHdr", "!II", "spi seqnum") # Local constants. _DEFAULT_REPLAY_WINDOW = 4 +ALL_ALGORITHMS = 0xffffffff def RawAddress(addr): @@ -231,6 +232,9 @@ def PaddedAddress(addr): return padded +XFRM_ADDR_ANY = PaddedAddress("::") + + def EmptySelector(family): """A selector that matches all packets of the specified address family.""" return XfrmSelector(family=family) @@ -249,6 +253,66 @@ def SrcDstSelector(src, dst): prefixlen_s=prefixlen, prefixlen_d=prefixlen, family=family) +def UserPolicy(direction, selector): + """Create an IPsec policy. + + Args: + direction: XFRM_POLICY_IN or XFRM_POLICY_OUT + selector: An XfrmSelector, the packets to transform. + + Return: a XfrmUserpolicyInfo cstruct. + """ + # Create a user policy that specifies that all packets in the specified + # direction matching the selector should be encrypted. + return XfrmUserpolicyInfo( + sel=selector, + lft=NO_LIFETIME_CFG, + curlft=NO_LIFETIME_CUR, + dir=direction, + action=XFRM_POLICY_ALLOW, + flags=XFRM_POLICY_LOCALOK, + share=XFRM_SHARE_UNIQUE) + + +def UserTemplate(family, spi, reqid, tun_addrs): + """Create an ESP policy and template. + + Args: + spi: 32-bit SPI in host byte order + reqid: 32-bit ID matched against SAs + tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None + to request a transport mode SA. + + Return: a tuple of XfrmUserpolicyInfo, XfrmUserTmpl + """ + # For transport mode, set template source and destination are empty. + # For tunnel mode, explicitly specify source and destination addresses. + if tun_addrs is None: + mode = XFRM_MODE_TRANSPORT + saddr = XFRM_ADDR_ANY + daddr = XFRM_ADDR_ANY + else: + mode = XFRM_MODE_TUNNEL + saddr = PaddedAddress(tun_addrs[0]) + daddr = PaddedAddress(tun_addrs[1]) + + # Create a template that specifies the SPI and the protocol. + xfrmid = XfrmId(daddr=daddr, spi=spi, proto=IPPROTO_ESP) + template = XfrmUserTmpl( + id=xfrmid, + family=family, + saddr=saddr, + reqid=reqid, + mode=mode, + share=XFRM_SHARE_UNIQUE, + optional=0, #require + aalgos=ALL_ALGORITHMS, + ealgos=ALL_ALGORITHMS, + calgos=ALL_ALGORITHMS) + + return template + + def ExactMatchMark(mark): """An XfrmMark that matches only the specified mark.""" return XfrmMark((mark, 0xffffffff)) @@ -527,6 +591,45 @@ class Xfrm(netlink.NetlinkSocket): flags = netlink.NLM_F_REQUEST | netlink.NLM_F_ACK self._SendNlRequest(XFRM_MSG_FLUSHSA, usersa_flush.Pack(), flags) + def CreateTunnel(self, direction, selector, src, dst, spi, encryption, + auth_trunc, mark, output_mark): + """Create an XFRM Tunnel Consisting of a Policy and an SA. + + Create a unidirectional XFRM tunnel, which entails one Policy and one + security association. + + Args: + direction: XFRM_POLICY_IN or XFRM_POLICY_OUT + selector: An XfrmSelector that specifies the packets to be transformed. + This is only applied to the policy; the selector in the SA is always + empty. If the passed-in selector is None, then the tunnel is made + dual-stack. This requires two policies, one for IPv4 and one for IPv6. + src: The source address of the tunneled packets + dst: The destination address of the tunneled packets + spi: The SPI for the IPsec SA that encapsulates the tunneled packet + encryption: A tuple (XfrmAlgo, key), the encryption parameters. + auth_trunc: A tuple (XfrmAlgoAuth, key), the authentication parameters. + mark: An XfrmMark, the mark used for selecting packets to be tunneled, and + for matching the security policy and security association. None means + unspecified. + output_mark: The mark used to select the underlying network for packets + outbound from xfrm. None means unspecified. + """ + outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst)) + + self.AddSaInfo(src, dst, spi, XFRM_MODE_TUNNEL, 0, encryption, auth_trunc, + None, None, mark, output_mark) + + if selector is None: + selectors = [EmptySelector(AF_INET), EmptySelector(AF_INET6)] + else: + selectors = [selector] + + for selector in selectors: + policy = UserPolicy(direction, selector) + tmpl = UserTemplate(outer_family, spi, 0, (src, dst)) + self.AddPolicyInfo(policy, tmpl, mark) + if __name__ == "__main__": x = Xfrm() diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py index 7aba508..c9cfd49 100644 --- a/net/test/xfrm_base.py +++ b/net/test/xfrm_base.py @@ -39,68 +39,6 @@ _ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)), # Match all bits of the mark MARK_MASK_ALL = 0xffffffff -ALL_ALGORITHMS = 0xffffffff - -XFRM_ADDR_ANY = xfrm.PaddedAddress("::") - - -def UserPolicy(direction, selector): - """Create an IPsec policy. - - Args: - direction: XFRM_POLICY_IN or XFRM_POLICY_OUT - selector: An XfrmSelector, the packets to transform. - - Return: a XfrmUserpolicyInfo cstruct. - """ - # Create a user policy that specifies that all packets in the specified - # direction matching the selector should be encrypted. - return xfrm.XfrmUserpolicyInfo( - sel=selector, - lft=xfrm.NO_LIFETIME_CFG, - curlft=xfrm.NO_LIFETIME_CUR, - dir=direction, - action=xfrm.XFRM_POLICY_ALLOW, - flags=xfrm.XFRM_POLICY_LOCALOK, - share=xfrm.XFRM_SHARE_UNIQUE) - -def UserTemplate(family, spi, reqid, tun_addrs): - """Create an ESP policy and template. - - Args: - spi: 32-bit SPI in host byte order - reqid: 32-bit ID matched against SAs - tun_addrs: A tuple of (local, remote) addresses for tunnel mode, or None - to request a transport mode SA. - - Return: a tuple of XfrmUserpolicyInfo, XfrmUserTmpl - """ - # For transport mode, set template source and destination are empty. - # For tunnel mode, explicitly specify source and destination addresses. - if tun_addrs is None: - mode = xfrm.XFRM_MODE_TRANSPORT - saddr = XFRM_ADDR_ANY - daddr = XFRM_ADDR_ANY - else: - mode = xfrm.XFRM_MODE_TUNNEL - saddr = xfrm.PaddedAddress(tun_addrs[0]) - daddr = xfrm.PaddedAddress(tun_addrs[1]) - - # Create a template that specifies the SPI and the protocol. - xfrmid = xfrm.XfrmId(daddr=daddr, spi=spi, proto=IPPROTO_ESP) - template = xfrm.XfrmUserTmpl( - id=xfrmid, - family=family, - saddr=saddr, - reqid=reqid, - mode=mode, - share=xfrm.XFRM_SHARE_UNIQUE, - optional=0, #require - aalgos=ALL_ALGORITHMS, - ealgos=ALL_ALGORITHMS, - calgos=ALL_ALGORITHMS) - - return template def SetPolicySockopt(sock, family, opt_data): @@ -131,8 +69,8 @@ def ApplySocketPolicy(sock, family, direction, spi, reqid, tun_addrs): selector = xfrm.EmptySelector(family) # Create an XFRM policy and template. - policy = UserPolicy(direction, selector) - template = UserTemplate(family, spi, reqid, tun_addrs) + policy = xfrm.UserPolicy(direction, selector) + template = xfrm.UserTemplate(family, spi, reqid, tun_addrs) # Set the policy and template on our socket. opt_data = policy.Pack() + template.Pack() @@ -374,49 +312,3 @@ class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest): esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr) self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr) return packet - - def CreateTunnel(self, direction, selector, src, dst, spi, encryption, - auth_trunc, mark, output_mark): - """Create an XFRM Tunnel Consisting of a Policy and an SA. - - Create a unidirectional XFRM tunnel, which entails one Policy and one - security association. - - Args: - direction: XFRM_POLICY_IN or XFRM_POLICY_OUT - selector: An XfrmSelector that specifies the packets to be transformed. - This is only applied to the policy; the selector in the SA is always - empty. If the passed-in selector is None, then the tunnel is made - dual-stack. This requires two policies, one for IPv4 and one for IPv6. - src: The source address of the tunneled packets - dst: The destination address of the tunneled packets - spi: The SPI for the IPsec SA that encapsulates the tunneled packet - encryption: A tuple (XfrmAlgo, key), the encryption parameters. - auth_trunc: A tuple (XfrmAlgoAuth, key), the authentication parameters. - mark: An XfrmMark, the mark used for selecting packets to be tunneled, and - for matching the security policy and security association. None means - unspecified. - output_mark: The mark used to select the underlying network for packets - outbound from xfrm. None means unspecified. - """ - outer_family = net_test.GetAddressFamily(net_test.GetAddressVersion(dst)) - - self.xfrm.AddSaInfo( - src, dst, - spi, xfrm.XFRM_MODE_TUNNEL, 0, - encryption, - auth_trunc, - None, - None, - mark, - output_mark) - - if selector is None: - selectors = [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)] - else: - selectors = [selector] - - for selector in selectors: - policy = UserPolicy(direction, selector) - tmpl = UserTemplate(outer_family, spi, 0, (src, dst)) - self.xfrm.AddPolicyInfo(policy, tmpl, mark) diff --git a/net/test/xfrm_test.py b/net/test/xfrm_test.py index 42faecc..ace49be 100755 --- a/net/test/xfrm_test.py +++ b/net/test/xfrm_test.py @@ -598,8 +598,8 @@ class XfrmFunctionalTest(xfrm_base.XfrmBaseTest): mark1 = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL) mark2 = xfrm.XfrmMark(mark=0xf00d, mask=xfrm_base.MARK_MASK_ALL) # Create a global policy. - policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) - tmpl = xfrm_base.UserTemplate(AF_UNSPEC, 0xfeed, 0, None) + policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) + tmpl = xfrm.UserTemplate(AF_UNSPEC, 0xfeed, 0, None) # Create the policy with the first mark. self.xfrm.AddPolicyInfo(policy, tmpl, mark1) # Create the same policy but with the second (different) mark. @@ -617,10 +617,10 @@ class XfrmFunctionalTest(xfrm_base.XfrmBaseTest): def _CheckUpdatePolicy(self, version): """Tests that we can can update the template on a policy.""" family = net_test.GetAddressFamily(version) - tmpl1 = xfrm_base.UserTemplate(family, 0xdead, 0, None) - tmpl2 = xfrm_base.UserTemplate(family, 0xbeef, 0, None) + tmpl1 = xfrm.UserTemplate(family, 0xdead, 0, None) + tmpl2 = xfrm.UserTemplate(family, 0xbeef, 0, None) sel = xfrm.EmptySelector(family) - policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) + policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) mark = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL) def _CheckTemplateMatch(tmpl): @@ -652,12 +652,12 @@ class XfrmFunctionalTest(xfrm_base.XfrmBaseTest): def _CheckPolicyDifferByDirection(self,version): """Tests that policies can differ only by direction.""" family = net_test.GetAddressFamily(version) - tmpl = xfrm_base.UserTemplate(family, 0xdead, 0, None) + tmpl = xfrm.UserTemplate(family, 0xdead, 0, None) sel = xfrm.EmptySelector(family) mark = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL) - policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) + policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) self.xfrm.AddPolicyInfo(policy, tmpl, mark) - policy = xfrm_base.UserPolicy(xfrm.XFRM_POLICY_IN, sel) + policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_IN, sel) self.xfrm.AddPolicyInfo(policy, tmpl, mark) def testPolicyDifferByDirectionV4(self): diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py index 7825a1e..6cb73c0 100755 --- a/net/test/xfrm_tunnel_test.py +++ b/net/test/xfrm_tunnel_test.py @@ -69,11 +69,12 @@ class XfrmTunnelTest(xfrm_base.XfrmBaseTest): local_outer = self.MyAddress(outer_version, underlying_netid) remote_outer = _GetRemoteOuterAddress(outer_version) - self.CreateTunnel(xfrm.XFRM_POLICY_OUT, - xfrm.SrcDstSelector(local_inner, remote_inner), - local_outer, remote_outer, _TEST_OUT_SPI, - xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1, - None, underlying_netid) + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, + xfrm.SrcDstSelector(local_inner, remote_inner), + local_outer, remote_outer, _TEST_OUT_SPI, + xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, + None, underlying_netid) write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) # Select an interface, which provides the source address of the inner @@ -224,15 +225,17 @@ class XfrmVtiTest(xfrm_base.XfrmBaseTest): # For the VTI, the selectors are wildcard since packets will only # be selected if they have the appropriate mark, hence the inner # addresses are wildcard. - self.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer, remote_outer, - _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - xfrm.ExactMatchMark(_TEST_OKEY), netid) - - self.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer, local_outer, - _TEST_IN_SPI, xfrm_base._ALGO_CBC_AES_256, - xfrm_base._ALGO_HMAC_SHA1, - xfrm.ExactMatchMark(_TEST_IKEY), None) + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, local_outer, + remote_outer, _TEST_OUT_SPI, + xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, + xfrm.ExactMatchMark(_TEST_OKEY), netid) + + self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, remote_outer, + local_outer, _TEST_IN_SPI, + xfrm_base._ALGO_CBC_AES_256, + xfrm_base._ALGO_HMAC_SHA1, + xfrm.ExactMatchMark(_TEST_IKEY), None) def _CheckVtiInputOutput(self, netid, vti_netid, iface, outer_version, inner_version, rx, tx): |