summaryrefslogtreecommitdiff
path: root/net/test/xfrm_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'net/test/xfrm_base.py')
-rw-r--r--net/test/xfrm_base.py65
1 files changed, 35 insertions, 30 deletions
diff --git a/net/test/xfrm_base.py b/net/test/xfrm_base.py
index e61322e..e5aadf3 100644
--- a/net/test/xfrm_base.py
+++ b/net/test/xfrm_base.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/python3
#
# Copyright 2017 The Android Open Source Project
#
@@ -16,6 +16,7 @@
from socket import * # pylint: disable=wildcard-import
from scapy import all as scapy
+import binascii
import struct
import csocket
@@ -25,15 +26,15 @@ import net_test
import util
import xfrm
-_ENCRYPTION_KEY_256 = ("308146eb3bd84b044573d60f5a5fd159"
- "57c7d4fe567a2120f35bae0f9869ec22".decode("hex"))
-_AUTHENTICATION_KEY_128 = "af442892cdcd0ef650e9c299f9a8436a".decode("hex")
+_ENCRYPTION_KEY_256 = binascii.unhexlify("308146eb3bd84b044573d60f5a5fd159"
+ "57c7d4fe567a2120f35bae0f9869ec22")
+_AUTHENTICATION_KEY_128 = binascii.unhexlify("af442892cdcd0ef650e9c299f9a8436a")
-_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth(("digest_null", 0, 0)), "")
+_ALGO_AUTH_NULL = (xfrm.XfrmAlgoAuth((b"digest_null", 0, 0)), b"")
_ALGO_HMAC_SHA1 = (xfrm.XfrmAlgoAuth((xfrm.XFRM_AALG_HMAC_SHA1, 128, 96)),
_AUTHENTICATION_KEY_128)
-_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo(("ecb(cipher_null)", 0)), "")
+_ALGO_CRYPT_NULL = (xfrm.XfrmAlgo((b"ecb(cipher_null)", 0)), b"")
_ALGO_CBC_AES_256 = (xfrm.XfrmAlgo((xfrm.XFRM_EALG_CBC_AES, 256)),
_ENCRYPTION_KEY_256)
@@ -88,12 +89,12 @@ def _GetCryptParameters(crypt_alg):
Returns:
A tuple of the block size, and IV length
"""
- cryptParameters = {
- _ALGO_CRYPT_NULL: (4, 0),
- _ALGO_CBC_AES_256: (16, 16)
- }
+ if crypt_alg == _ALGO_CRYPT_NULL:
+ return (4, 0)
+ if crypt_alg == _ALGO_CBC_AES_256:
+ return (16, 16)
+ return (0, 0)
- return cryptParameters.get(crypt_alg, (0, 0))
def GetEspPacketLength(mode, version, udp_encap, payload,
auth_alg, crypt_alg):
@@ -120,7 +121,7 @@ def GetEspPacketLength(mode, version, udp_encap, payload,
# Size constants
esp_hdr_len = len(xfrm.EspHdr) # SPI + Seq number
- icv_len = auth_trunc_len / 8
+ icv_len = auth_trunc_len // 8
# Add inner IP header if tunnel mode
if mode == xfrm.XFRM_MODE_TUNNEL:
@@ -142,6 +143,18 @@ def GetEspPacketLength(mode, version, udp_encap, payload,
return payload_len
+def GetEspTrailer(length, nexthdr):
+ # ESP padding per RFC 4303 section 2.4.
+ # For a null cipher with a block size of 1, padding is only necessary to
+ # ensure that the 1-byte Pad Length and Next Header fields are right aligned
+ # on a 4-byte boundary.
+ esplen = length + 2 # Packet length plus Pad Length and Next Header.
+ padlen = util.GetPadLength(4, esplen)
+ # The pad bytes are consecutive integers starting from 0x01.
+ padding = "".join((chr(i) for i in range(1, padlen + 1))).encode("utf-8")
+ return padding + struct.pack("BB", padlen, nexthdr)
+
+
def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
"""Apply null encryption to a packet.
@@ -151,7 +164,7 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
The input packet is assumed to be a UDP packet. The input packet *MUST* have
its length and checksum fields in IP and UDP headers set appropriately. This
can be done by "rebuilding" the scapy object. e.g.,
- ip6_packet = scapy.IPv6(str(ip6_packet))
+ ip6_packet = scapy.IPv6(bytes(ip6_packet))
TODO: Support TCP
@@ -188,21 +201,12 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
inner_layer = udp_layer
esp_nexthdr = IPPROTO_UDP
-
- # ESP padding per RFC 4303 section 2.4.
- # For a null cipher with a block size of 1, padding is only necessary to
- # ensure that the 1-byte Pad Length and Next Header fields are right aligned
- # on a 4-byte boundary.
- esplen = (len(inner_layer) + 2) # UDP length plus Pad Length and Next Header.
- padlen = util.GetPadLength(4, esplen)
- # The pad bytes are consecutive integers starting from 0x01.
- padding = "".join((chr(i) for i in range(1, padlen + 1)))
- trailer = padding + struct.pack("BB", padlen, esp_nexthdr)
+ trailer = GetEspTrailer(len(inner_layer), esp_nexthdr)
# Assemble the packet.
esp_packet.payload = scapy.Raw(inner_layer)
packet = new_ip_layer if new_ip_layer else packet
- packet.payload = scapy.Raw(str(esp_packet) + trailer)
+ packet.payload = scapy.Raw(bytes(esp_packet) + trailer)
# TODO: Can we simplify this and avoid the initial copy()?
# Fix the IPv4/IPv6 headers.
@@ -210,13 +214,13 @@ def EncryptPacketWithNull(packet, spi, seq, tun_addrs):
packet.nh = IPPROTO_ESP
# Recompute plen.
packet.plen = None
- packet = scapy.IPv6(str(packet))
+ packet = scapy.IPv6(bytes(packet))
elif type(packet) is scapy.IP:
packet.proto = IPPROTO_ESP
# Recompute IPv4 len and checksum.
packet.len = None
packet.chksum = None
- packet = scapy.IP(str(packet))
+ packet = scapy.IP(bytes(packet))
else:
raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet))
return packet
@@ -237,7 +241,7 @@ def DecryptPacketWithNull(packet):
Returns:
A tuple of decrypted packet (scapy.IPv6 or scapy.IP) and EspHdr
"""
- esp_hdr, esp_data = cstruct.Read(str(packet.payload), xfrm.EspHdr)
+ esp_hdr, esp_data = cstruct.Read(bytes(packet.payload), xfrm.EspHdr)
# Parse and strip ESP trailer.
pad_len, esp_nexthdr = struct.unpack("BB", esp_data[-2:])
trailer_len = pad_len + 2 # Add the size of the pad_len and next_hdr fields.
@@ -256,12 +260,12 @@ def DecryptPacketWithNull(packet):
if type(packet) is scapy.IPv6:
packet.nh = IPPROTO_UDP
packet.plen = None # Recompute packet length.
- packet = scapy.IPv6(str(packet))
+ packet = scapy.IPv6(bytes(packet))
elif type(packet) is scapy.IP:
packet.proto = IPPROTO_UDP
packet.len = None # Recompute packet length.
packet.chksum = None # Recompute IPv4 checksum.
- packet = scapy.IP(str(packet))
+ packet = scapy.IP(bytes(packet))
else:
raise ValueError("First layer in packet should be IPv4 or IPv6: " + repr(packet))
return packet, esp_hdr
@@ -305,7 +309,7 @@ class XfrmBaseTest(multinetwork_base.MultiNetworkBaseTest):
if src_addr is not None:
self.assertEqual(src_addr, packet.src)
# extract the ESP header
- esp_hdr, _ = cstruct.Read(str(packet.payload), xfrm.EspHdr)
+ esp_hdr, _ = cstruct.Read(bytes(packet.payload), xfrm.EspHdr)
self.assertEqual(xfrm.EspHdr((spi, seq)), esp_hdr)
return packet
@@ -323,3 +327,4 @@ class XfrmLazyTest(XfrmBaseTest):
super(XfrmBaseTest, self).tearDown()
self.xfrm.FlushSaInfo()
self.xfrm.FlushPolicyInfo()
+ self.xfrm.close()