diff options
Diffstat (limited to 'net/test/xfrm_base.py')
-rw-r--r-- | net/test/xfrm_base.py | 65 |
1 files changed, 35 insertions, 30 deletions
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py index e61322e..e5aadf3 100644 --- a/net/test/xfrm_base.py +++ b/net/test/xfrm_base.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/python3 # # Copyright 2017 The Android Open Source Project # @@ -16,6 +16,7 @@ from socket import * # pylint: disable=wildcard-import from scapy import all as scapy +import binascii import struct import csocket @@ -25,15 +26,15 @@ import net_test import util import xfrm -_ENCRYPTION_KEY_256 = ("308146eb3bd84b044573d60f5a5fd159" - "57c7d4fe567a2120f35bae0f9869ec22".decode("hex")) -_AUTHENTICATION_KEY_128 = "af442892cdcd0ef650e9c299f9a8436a".decode("hex") +_ENCRYPTION_KEY_256 = binascii.unhexlify("308146eb3bd84b044573d60f5a5fd159" + "57c7d4fe567a2120f35bae0f9869ec22") +_AUTHENTICATION_KEY_128 = binascii.unhexlify("af442892cdcd0ef650e9c299f9a8436a") -_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth(("digest_null", 0, 0)), "") +_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth((b"digest_null", 0, 0)), b"") _ALGO_HMAC_SHA1 = (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 128, 96)), _AUTHENTICATION_KEY_128) -_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo(("ecb(cipher_null)", 0)), "") +_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo((b"ecb(cipher_null)", 0)), b"") _ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)), _ENCRYPTION_KEY_256) @@ -88,12 +89,12 @@ def _GetCryptParameters(crypt_alg): Returns: A tuple of the block size, and IV length """ - cryptParameters = { - _ALGO_CRYPT_NULL: (4, 0), - _ALGO_CBC_AES_256: (16, 16) - } + if crypt_alg == _ALGO_CRYPT_NULL: + return (4, 0) + if crypt_alg == _ALGO_CBC_AES_256: + return (16, 16) + return (0, 0) - return cryptParameters.get(crypt_alg, (0, 0)) def GetEspPacketLength(mode, version, udp_encap, payload, auth_alg, crypt_alg): @@ -120,7 +121,7 @@ def GetEspPacketLength(mode, version, udp_encap, payload, # Size constants esp_hdr_len = len(xfrm.EspHdr) # SPI + Seq number - icv_len = auth_trunc_len / 8 + icv_len = auth_trunc_len // 8 # Add inner IP header if tunnel mode if mode == xfrm.XFRM_MODE_TUNNEL: @@ -142,6 +143,18 @@ def GetEspPacketLength(mode, version, udp_encap, payload, return payload_len +def GetEspTrailer(length, nexthdr): + # ESP padding per RFC 4303 section 2.4. + # For a null cipher with a block size of 1, padding is only necessary to + # ensure that the 1-byte Pad Length and Next Header fields are right aligned + # on a 4-byte boundary. + esplen = length + 2 # Packet length plus Pad Length and Next Header. + padlen = util.GetPadLength(4, esplen) + # The pad bytes are consecutive integers starting from 0x01. + padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8") + return padding + struct.pack("BB", padlen, nexthdr) + + def EncryptPacketWithNull(packet, spi, seq, tun_addrs): """Apply null encryption to a packet. @@ -151,7 +164,7 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs): The input packet is assumed to be a UDP packet. The input packet *MUST* have its length and checksum fields in IP and UDP headers set appropriately. This can be done by "rebuilding" the scapy object. e.g., - ip6_packet = scapy.IPv6(str(ip6_packet)) + ip6_packet = scapy.IPv6(bytes(ip6_packet)) TODO: Support TCP @@ -188,21 +201,12 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs): inner_layer = udp_layer esp_nexthdr = IPPROTO_UDP - - # ESP padding per RFC 4303 section 2.4. - # For a null cipher with a block size of 1, padding is only necessary to - # ensure that the 1-byte Pad Length and Next Header fields are right aligned - # on a 4-byte boundary. - esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header. - padlen = util.GetPadLength(4, esplen) - # The pad bytes are consecutive integers starting from 0x01. - padding = "".join((chr(i) for i in range(1, padlen + 1))) - trailer = padding + struct.pack("BB", padlen, esp_nexthdr) + trailer = GetEspTrailer(len(inner_layer), esp_nexthdr) # Assemble the packet. esp_packet.payload = scapy.Raw(inner_layer) packet = new_ip_layer if new_ip_layer else packet - packet.payload = scapy.Raw(str(esp_packet) + trailer) + packet.payload = scapy.Raw(bytes(esp_packet) + trailer) # TODO: Can we simplify this and avoid the initial copy()? # Fix the IPv4/IPv6 headers. @@ -210,13 +214,13 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs): packet.nh = IPPROTO_ESP # Recompute plen. packet.plen = None - packet = scapy.IPv6(str(packet)) + packet = scapy.IPv6(bytes(packet)) elif type(packet) is scapy.IP: packet.proto = IPPROTO_ESP # Recompute IPv4 len and checksum. packet.len = None packet.chksum = None - packet = scapy.IP(str(packet)) + packet = scapy.IP(bytes(packet)) else: raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) return packet @@ -237,7 +241,7 @@ def DecryptPacketWithNull(packet): Returns: A tuple of decrypted packet (scapy.IPv6 or scapy.IP) and EspHdr """ - esp_hdr, esp_data = cstruct.Read(str(packet.payload), xfrm.EspHdr) + esp_hdr, esp_data = cstruct.Read(bytes(packet.payload), xfrm.EspHdr) # Parse and strip ESP trailer. pad_len, esp_nexthdr = struct.unpack("BB", esp_data[-2:]) trailer_len = pad_len + 2 # Add the size of the pad_len and next_hdr fields. @@ -256,12 +260,12 @@ def DecryptPacketWithNull(packet): if type(packet) is scapy.IPv6: packet.nh = IPPROTO_UDP packet.plen = None # Recompute packet length. - packet = scapy.IPv6(str(packet)) + packet = scapy.IPv6(bytes(packet)) elif type(packet) is scapy.IP: packet.proto = IPPROTO_UDP packet.len = None # Recompute packet length. packet.chksum = None # Recompute IPv4 checksum. - packet = scapy.IP(str(packet)) + packet = scapy.IP(bytes(packet)) else: raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet)) return packet, esp_hdr @@ -305,7 +309,7 @@ class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest): if src_addr is not None: self.assertEqual(src_addr, packet.src) # extract the ESP header - esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr) + esp_hdr, _ = cstruct.Read(bytes(packet.payload), xfrm.EspHdr) self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr) return packet @@ -323,3 +327,4 @@ class XfrmLazyTest(XfrmBaseTest): super(XfrmBaseTest, self).tearDown() self.xfrm.FlushSaInfo() self.xfrm.FlushPolicyInfo() + self.xfrm.close() |