diff options
author | Lorenzo Colitti <lorenzo@google.com> | 2014-03-27 20:10:14 +0900 |
---|---|---|
committer | Lorenzo Colitti <lorenzo@google.com> | 2015-02-02 17:47:24 +0900 |
commit | 6ef3e4fdfd823b08aca5ea2c2368e956b5e0eaea (patch) | |
tree | 60182c5dc9b112003bc36ef357fc23d88d781bd1 | |
parent | 22669b72523d4aee0f5756515c42df82ed67eda7 (diff) | |
download | extras-6ef3e4fdfd823b08aca5ea2c2368e956b5e0eaea.tar.gz |
Kernel networking test scripts using UML.
Change-Id: I28a9901e01a53e3dde551b6c8abf6391fe4354a4
-rwxr-xr-x | tests/net_test/mark_test.py | 427 | ||||
-rwxr-xr-x | tests/net_test/net_test.py | 238 | ||||
-rwxr-xr-x | tests/net_test/net_test.sh | 20 | ||||
-rwxr-xr-x | tests/net_test/ping6_test.py | 459 | ||||
-rwxr-xr-x | tests/net_test/ping6_test.sh | 16 | ||||
-rwxr-xr-x | tests/net_test/run_net_test.sh | 76 |
6 files changed, 1236 insertions, 0 deletions
diff --git a/tests/net_test/mark_test.py b/tests/net_test/mark_test.py new file mode 100755 index 00000000..edfa4765 --- /dev/null +++ b/tests/net_test/mark_test.py @@ -0,0 +1,427 @@ +#!/usr/bin/python + +import fcntl +import errno +import os +import posix +import struct +import time +import unittest +from scapy import all as scapy +from socket import * + +import net_test + + +IFF_TUN = 1 +IFF_TAP = 2 +IFF_NO_PI = 0x1000 +TUNSETIFF = 0x400454ca + +AUTOCONF_TABLE_SYSCTL = "/proc/sys/net/ipv6/route/autoconf_table_offset" + +class ConfigurationError(AssertionError): + pass + + +class UnexpectedPacketError(AssertionError): + pass + + +class Packets(object): + + @staticmethod + def _GetIpLayer(version): + return {4: scapy.IP, 6: scapy.IPv6}[version] + + @staticmethod + def _SetPacketTos(packet, tos): + if isinstance(packet, scapy.IPv6): + packet.tc = tos + elif isinstance(packet, scapy.IP): + packet.tos = tos + else: + raise ValueError("Can't find ToS Field") + + @classmethod + def UdpPacket(self, version, srcaddr, dstaddr): + ip = self._GetIpLayer(version) + return ("UDPv%d packet" % version, + ip(src=srcaddr, dst=dstaddr) / + scapy.UDP(sport=999, dport=1234) / "hello") + + @classmethod + def SYN(self, port, version, srcaddr, dstaddr): + ip = self._GetIpLayer(version) + return ("TCP SYN", + ip(src=srcaddr, dst=dstaddr) / + scapy.TCP(sport=50999, dport=port, seq=1692871236, ack=0, + flags=2, window=14400)) + + @classmethod + def RST(self, version, srcaddr, dstaddr, packet): + ip = self._GetIpLayer(version) + original = packet.getlayer("TCP") + return ("TCP RST", + ip(src=srcaddr, dst=dstaddr) / + scapy.TCP(sport=original.dport, dport=original.sport, + ack=original.seq + 1, seq=None, + flags=20, window=None)) + + @classmethod + def SYNACK(self, version, srcaddr, dstaddr, packet): + ip = self._GetIpLayer(version) + original = packet.getlayer("TCP") + return ("TCP SYN+ACK", + ip(src=srcaddr, dst=dstaddr) / + scapy.TCP(sport=original.dport, dport=original.sport, + ack=original.seq + 1, seq=None, + flags=18, window=None)) + + @classmethod + def ICMPPortUnreachable(self, version, srcaddr, dstaddr, packet): + if version == 4: + # Linux hardcodes the ToS on ICMP errors to 0xc0 or greater because of + # RFC 1812 4.3.2.5 (!). + return ("ICMPv4 port unreachable", + scapy.IP(src=srcaddr, dst=dstaddr, proto=1, tos=0xc0) / + scapy.ICMPerror(type=3, code=3) / packet) + else: + return ("ICMPv6 port unreachable", + scapy.IPv6(src=srcaddr, dst=dstaddr) / + scapy.ICMPv6DestUnreach(code=4) / packet) + + @classmethod + def ICMPEcho(self, version, srcaddr, dstaddr): + ip = self._GetIpLayer(version) + icmp = {4: scapy.ICMP, 6: scapy.ICMPv6EchoRequest}[version] + packet = (ip(src=srcaddr, dst=dstaddr) / + icmp(id=0xff19, seq=3) / "foobarbaz") + self._SetPacketTos(packet, 0x83) + return ("ICMPv%d echo" % version, packet) + + @classmethod + def ICMPReply(self, version, srcaddr, dstaddr, packet, tos=None): + ip = self._GetIpLayer(version) + + # Scapy doesn't provide an ICMP echo reply constructor. + icmpv4_reply = lambda **kwargs: scapy.ICMP(type=0, **kwargs) + icmp = {4: icmpv4_reply, 6: scapy.ICMPv6EchoReply}[version] + packet = (ip(src=srcaddr, dst=dstaddr) / + icmp(id=0xff19, seq=3) / "foobarbaz") + self._SetPacketTos(packet, 0x83) + return ("ICMPv%d echo" % version, packet) + + +class MarkTest(net_test.NetworkTest): + + NETIDS = [100, 200] + + @staticmethod + def _RouterMacAddress(netid): + return "02:00:00:00:%02x:00" % netid + + @staticmethod + def _MyMacAddress(netid): + return "02:00:00:00:%02x:01" % netid + + @staticmethod + def _RouterAddress(netid, version): + if version == 6: + return "fe80::%02x00" % netid + elif version == 4: + return "10.0.%d.1" % netid + else: + raise ValueError("Don't support IPv%s" % version) + + @staticmethod + def _MyIPv4Address(netid): + return "10.0.%d.2" % netid + + @classmethod + def _CreateTunInterface(self, netid): + iface = self._GetInterfaceName(netid) + f = open("/dev/net/tun", "r+b") + ifr = struct.pack("16sH", iface, IFF_TAP | IFF_NO_PI) + ifr = ifr + "\x00" * (40 - len(ifr)) + fcntl.ioctl(f, TUNSETIFF, ifr) + # Give ourselves a predictable MAC address. + macaddr = self._MyMacAddress(netid) + net_test.SetInterfaceHWAddr(iface, self._MyMacAddress(netid)) + # Disable DAD so we don't have to wait for it. + open("/proc/sys/net/ipv6/conf/%s/dad_transmits" % iface, "w").write("0") + net_test.SetInterfaceUp(iface) + net_test.SetNonBlocking(f) + return f + + @staticmethod + def _GetInterfaceName(netid): + return "nettest%d" % netid + + @classmethod + def _SendRA(self, netid): + validity = 300 # seconds + validity_ms = validity * 1000 # milliseconds + macaddr = self._RouterMacAddress(netid) + lladdr = self._RouterAddress(netid, 6) + ra = (scapy.Ether(src=macaddr, dst="33:33:00:00:00:01") / + scapy.IPv6(src=lladdr, hlim=255) / + scapy.ICMPv6ND_RA(retranstimer=validity_ms, + routerlifetime=validity) / + scapy.ICMPv6NDOptSrcLLAddr(lladdr=macaddr) / + scapy.ICMPv6NDOptPrefixInfo(prefix="2001:db8:%d::" % netid, + prefixlen=64, + L=1, A=1, + validlifetime=validity, + preferredlifetime=validity)) + posix.write(self.tuns[netid].fileno(), str(ra)) + + COMMANDS = [ + "/sbin/%(iptables)s %(append_delete)s INPUT -t mangle -i %(iface)s" + " -j MARK --set-mark %(netid)d", + "ip -%(version)d rule %(add_del)s fwmark %(netid)s lookup %(table)s", + ] + ROUTE_COMMANDS = [ + "ip -%(version)d route %(add_del)s table %(table)s" + " default dev %(iface)s via %(router)s", + ] + IPV4_COMMANDS = [ + "ip -4 nei %(add_del)s %(router)s dev %(iface)s" + " lladdr %(macaddr)s nud permanent", + "ip -4 addr %(add_del)s 10.0.%(netid)d.2/24 dev %(iface)s", + ] + + @classmethod + def _RunSetupCommands(self, netid, is_add): + iface = self._GetInterfaceName(netid) + for version, iptables in zip([4, 6], ["iptables", "ip6tables"]): + + if version == 6: + cmds = self.COMMANDS + if self.AUTOCONF_TABLE_OFFSET < 0: + # Set up routing manually. + cmds += self.ROUTE_COMMANDS + + if version == 4: + # Deleting addresses also causes routes to be deleted, so watch the + # order or the test will output lots of ENOENT errors. + if is_add: + cmds = self.COMMANDS + self.IPV4_COMMANDS + self.ROUTE_COMMANDS + else: + cmds = self.COMMANDS + self.ROUTE_COMMANDS + self.IPV4_COMMANDS + + cmds = str("\n".join(cmds) % { + "add_del": "add" if is_add else "del", + "append_delete": "-A" if is_add else "-D", + "iface": iface, + "iptables": iptables, + "ipv4addr": self._MyIPv4Address(netid), + "macaddr": self._RouterMacAddress(netid), + "netid": netid, + "router": self._RouterAddress(netid, version), + "table": self._TableForNetid(netid), + "version": version, + }).split("\n") + for cmd in cmds: + cmd = cmd.split(" ") + #print cmd + ret = os.spawnvp(os.P_WAIT, cmd[0], cmd) + if ret: + raise ConfigurationError("Setup command failed: %s" % " ".join(cmd)) + + @classmethod + def _SetAutoconfTableSysctl(self, offset): + try: + open(AUTOCONF_TABLE_SYSCTL, "w").write(str(offset)) + self.AUTOCONF_TABLE_OFFSET = offset + except IOError: + self.AUTOCONF_TABLE_OFFSET = -1 + + @classmethod + def _TableForNetid(self, netid): + if self.AUTOCONF_TABLE_OFFSET >= 0: + return self.ifindices[netid] + self.AUTOCONF_TABLE_OFFSET + else: + return netid + + @classmethod + def setUpClass(self): + self.tuns = {} + self.ifindices = {} + self._SetAutoconfTableSysctl(1000) + for netid in self.NETIDS: + self.tuns[netid] = self._CreateTunInterface(netid) + + iface = self._GetInterfaceName(netid) + self.ifindices[netid] = net_test.GetInterfaceIndex(iface) + + self._SendRA(netid) + self._RunSetupCommands(netid, True) + + # Open a port so we can observe SYN+ACKs. Since it's a dual-stack socket it + # will accept both IPv4 and IPv6 connections. We do this here instead of in + # each test so we can use the same socket every time. That way, if a kernel + # bug causes incoming packets to mark the listening socket instead of the + # accepted socket, the test will fail as soon as the next address/interface + # combination is tried. + self.listenport = 1234 + self.listensocket = net_test.IPv6TCPSocket() + self.listensocket.bind(("::", self.listenport)) + self.listensocket.listen(100) + + # Give time for unknown things to settle down. + time.sleep(0.5) + # Uncomment to look around at interface and rule configuration while + # running in the background. (Once the test finishes running, all the + # interfaces and rules are gone.) + #time.sleep(30) + + @classmethod + def tearDownClass(self): + for netid in self.tuns: + self._RunSetupCommands(netid, False) + self.tuns[netid].close() + + def CheckExpectedPacket(self, expected, actual, msg): + # Remove the Ethernet header from the incoming packet. + actual = scapy.Ether(actual).payload + + # Blank out IPv4 fields that we can't predict, like ID and the DF bit. + actualip = actual.getlayer("IP") + expectedip = expected.getlayer("IP") + if actualip and expectedip: + actualip.id = expectedip.id + actualip.flags &= 5 + actualip.chksum = None # Change the header, recalculate the checksum. + + # Blank out TCP fields that we can't predict. + actualtcp = actual.getlayer("TCP") + expectedtcp = expected.getlayer("TCP") + if actualtcp and expectedtcp: + actualtcp.dataofs = expectedtcp.dataofs + actualtcp.options = expectedtcp.options + actualtcp.window = expectedtcp.window + if expectedtcp.seq is None: + actualtcp.seq = None + if expectedtcp.ack is None: + actualtcp.ack = None + actualtcp.chksum = None + + # Serialize the packet so: + # - Expected packet fields that are only set when a packet is serialized + # (e.g., the checksum) are filled in. + # - The packet is readable. Scapy has detailed dissection capabilities, + # but they only seem to be usable to print the packet, not return its + # dissection as a string. + # TODO: Check if this is true. + self.assertMultiLineEqual(str(expected).encode("hex"), + str(actual).encode("hex")) + + def ExpectPacketOn(self, netid, msg, expected): + try: + actual = self.tuns[netid].read(4096) + except IOError, e: + raise AssertionError(msg + ": " + str(e)) + + self.assertTrue(actual) + if expected: + self.CheckExpectedPacket(expected, actual, msg) + + def assertNoPacketsOn(self, netids, msg): + for netid in netids: + try: + self.assertRaisesErrno(errno.EAGAIN, self.tuns[netid].read, 4096) + except AssertionError, e: + raise UnexpectedPacketError("%s: Unexpected packet on %s" % ( + msg, self._GetInterfaceName(netid))) + + def assertNoOtherPackets(self, msg): + self.assertNoPacketsOn([netid for netid in self.tuns], msg) + + def assertNoPacketsExceptOn(self, netid, msg): + self.assertNoPacketsOn([n for n in self.tuns if n != netid], msg) + + def ReceivePacketOn(self, netid, ip_packet): + routermac = self._RouterMacAddress(netid) + mymac = self._MyMacAddress(netid) + packet = scapy.Ether(src=routermac, dst=mymac) / ip_packet + posix.write(self.tuns[netid].fileno(), str(packet)) + + def ClearTunQueues(self): + for f in self.tuns.values(): + try: + f.read(4096) + except IOError: + continue + self.assertNoOtherPackets("Unexpected packets after clearing queues") + + def setUp(self): + self.ClearTunQueues() + + @staticmethod + def _GetRemoteAddress(version): + return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] + + def CheckReflection(self, version, packet_generator, reply_generator): + # Test packets addressed to the IP addresses of all our interfaces... + for dest_ip_netid in self.tuns: + dest_ip_iface = self._GetInterfaceName(dest_ip_netid) + + if version == 4: + myaddr = self._MyIPv4Address(dest_ip_netid) + else: + myaddr = net_test.GetLinkAddress(self._GetInterfaceName(dest_ip_netid), + False) + remote_addr = self._GetRemoteAddress(version) + + # ... coming in on all our interfaces. + for iif_netid in self.tuns: + iif = self._GetInterfaceName(iif_netid) + desc, packet = packet_generator(version, remote_addr, myaddr) + if reply_generator: + # We know what we want a reply to. + reply_desc, reply = reply_generator(version, myaddr, remote_addr, + packet) + else: + # Expect any reply. + reply_desc, reply = "any packet", None + msg = "Receiving %s on %s to %s IP: Expecting %s on %s" % ( + desc, iif, dest_ip_iface, reply_desc, iif) + + # Expect a reply on the interface the original packet came in on. + self.ClearTunQueues() + self.ReceivePacketOn(iif_netid, packet) + self.assertNoPacketsExceptOn(iif_netid, msg) + self.ExpectPacketOn(iif_netid, msg, reply) + + def SYNToClosedPort(self, *args): + return Packets.SYN(999, *args) + + def SYNToOpenPort(self, *args): + return Packets.SYN(self.listenport, *args) + + def testIPv4ICMPErrorsReflectMark(self): + self.CheckReflection(4, Packets.UdpPacket, Packets.ICMPPortUnreachable) + + def testIPv6ICMPErrorsReflectMark(self): + self.CheckReflection(6, Packets.UdpPacket, Packets.ICMPPortUnreachable) + + def testIPv4PingRepliesReflectMarkAndTos(self): + self.CheckReflection(4, Packets.ICMPEcho, Packets.ICMPReply) + + def testIPv6PingRepliesReflectMarkAndTos(self): + self.CheckReflection(6, Packets.ICMPEcho, Packets.ICMPReply) + + def testIPv4RSTsReflectMark(self): + self.CheckReflection(4, self.SYNToClosedPort, Packets.RST) + + def testIPv6RSTsReflectMark(self): + self.CheckReflection(6, self.SYNToClosedPort, Packets.RST) + + @unittest.skipUnless(False, "skipping: doesn't work yet") + def testIPv6SYNACKsReflectMark(self): + self.CheckReflection(6, Packets.SYNToOpenPort, Packets.SYNACK) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/net_test/net_test.py b/tests/net_test/net_test.py new file mode 100755 index 00000000..da892f0b --- /dev/null +++ b/tests/net_test/net_test.py @@ -0,0 +1,238 @@ +#!/usr/bin/python + +import cProfile + +import ctypes +import ctypes.util +import errno +import fcntl +import os +import posix +import re +import struct +import time +import unittest + +from socket import * +from scapy import all as scapy + +SOL_IPV6 = 41 +IP_TRANSPARENT = 19 +IPV6_TRANSPARENT = 75 +IP_RECVERR = 11 +IPV6_RECVERR = 25 +SO_BINDTODEVICE = 25 +SO_MARK = 36 +IPV6_FLOWLABEL_MGR = 32 +IPV6_FLOWINFO_SEND = 33 + +ETH_P_IPV6 = 0x86dd + +SIOCSIFHWADDR = 0x8924 + +IPV6_FL_A_GET = 0 +IPV6_FL_A_PUT = 1 +IPV6_FL_A_RENEW = 1 + +IPV6_FL_F_CREATE = 1 +IPV6_FL_F_EXCL = 2 + +IPV6_FL_S_NONE = 0 +IPV6_FL_S_EXCL = 1 +IPV6_FL_S_ANY = 255 + +IPV4_PING = "\x08\x00\x00\x00\x0a\xce\x00\x03" +IPV6_PING = "\x80\x00\x00\x00\x0a\xce\x00\x03" + +IPV4_ADDR = "8.8.8.8" +IPV6_ADDR = "2001:4860:4860::8888" + +IPV6_SEQ_DGRAM_HEADER = (" sl " + "local_address " + "remote_address " + "st tx_queue rx_queue tr tm->when retrnsmt" + " uid timeout inode ref pointer drops\n") + +LIBC = ctypes.CDLL(ctypes.util.find_library('c')) + +def SetSocketTimeout(sock, ms): + s = ms / 1000 + us = (ms % 1000) * 1000 + sock.setsockopt(SOL_SOCKET, SO_RCVTIMEO, struct.pack("LL", s, us)) + +# Convenience functions to create ping sockets. +def Socket(family, sock_type, protocol): + s = socket(family, sock_type, protocol) + SetSocketTimeout(s, 1000) + return s + +def IPv4PingSocket(): + return Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) + +def IPv6PingSocket(): + return Socket(AF_INET6, SOCK_DGRAM, IPPROTO_ICMPV6) + +def IPv4TCPSocket(): + return Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) + +def IPv6TCPSocket(): + return Socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP) + +def IPv6PacketSocket(): + return Socket(AF_PACKET, SOCK_DGRAM, htons(ETH_P_IPV6)) + +def IPv4RawSocket(protocol): + s = Socket(AF_INET, SOCK_RAW, protocol) + s.setsockopt(SOL_IP, IP_HDRINCL, 1) + return s + +def SetNonBlocking(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + +def GetInterfaceIndex(ifname): + s = IPv4PingSocket() + ifr = struct.pack("16si", ifname, 0) + ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr) + return struct.unpack("16si", ifr)[1] + +def SetInterfaceHWAddr(ifname, hwaddr): + hwaddr = hwaddr.replace(":", "") + hwaddr = hwaddr.decode("hex") + if len(hwaddr) != 6: + raise ValueError("Unknown hardware address length %d" % len(hwaddr)) + ifr = struct.pack("16sH6s", ifname, scapy.ARPHDR_ETHER, hwaddr) + fcntl.ioctl(s, SIOCSIFHWADDR, ifr) + +def SetInterfaceState(ifname, up): + s = IPv4PingSocket() + ifr = struct.pack("16sH", ifname, 0) + ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr) + _, flags = struct.unpack("16sH", ifr) + if up: + flags |= scapy.IFF_UP + else: + flags &= ~scapy.IFF_UP + ifr = struct.pack("16sH", ifname, flags) + ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr) + +def SetInterfaceUp(ifname): + return SetInterfaceState(ifname, True) + +def SetInterfaceDown(ifname): + return SetInterfaceState(ifname, False) + +def FormatProcAddress(unformatted): + groups = [] + for i in xrange(0, len(unformatted), 4): + groups.append(unformatted[i:i+4]) + formatted = ":".join(groups) + # Compress the address. + address = inet_ntop(AF_INET6, inet_pton(AF_INET6, formatted)) + return address + + +def FormatSockStatAddress(address): + if ":" in address: + family = AF_INET6 + else: + family = AF_INET + binary = inet_pton(family, address) + out = "" + for i in xrange(0, len(binary), 4): + out += "%08X" % struct.unpack("=L", binary[i:i+4]) + return out + + +def GetLinkAddress(ifname, linklocal): + addresses = open("/proc/net/if_inet6").readlines() + for address in addresses: + address = [s for s in address.strip().split(" ") if s != ""] + if address[5] == ifname: + if (linklocal and address[0].startswith("fe80") + or not linklocal and not address[0].startswith("fe80")): + # Convert the address from raw hex to something with colons in it. + return FormatProcAddress(address[0]) + return None + +def GetDefaultRoute(version=6): + if version == 6: + routes = open("/proc/net/ipv6_route").readlines() + for route in routes: + route = [s for s in route.strip().split(" ") if s != ""] + if (route[0] == "00000000000000000000000000000000" and route[1] == "00" + # Routes in non-default tables end up in /proc/net/ipv6_route!!! + and route[9] != "lo" and not route[9].startswith("nettest")): + return FormatProcAddress(route[4]), route[9] + raise ValueError("No IPv6 default route found") + elif version == 4: + routes = open("/proc/net/route").readlines() + for route in routes: + route = [s for s in route.strip().split("\t") if s != ""] + if (route[1] == "00000000" and route[7] == "00000000"): + gw, iface = route[2], route[0] + gw = inet_ntop(AF_INET, gw.decode("hex")[::-1]) + return gw, iface + raise ValueError("No IPv4 default route found") + else: + raise ValueError("Don't know about IPv%s" % version) + +def GetDefaultRouteInterface(): + gw, iface = GetDefaultRoute() + return iface + + +def MakeFlowLabelOption(addr, label): + # struct in6_flowlabel_req { + # struct in6_addr flr_dst; + # __be32 flr_label; + # __u8 flr_action; + # __u8 flr_share; + # __u16 flr_flags; + # __u16 flr_expires; + # __u16 flr_linger; + # __u32 __flr_pad; + # /* Options in format of IPV6_PKTOPTIONS */ + # }; + fmt = "16sIBBHHH4s"; struct.calcsize(fmt) + assert struct.calcsize(fmt) == 32 + addr = inet_pton(AF_INET6, addr) + assert len(addr) == 16 + label = htonl(label & 0xfffff) + action = IPV6_FL_A_GET + share = IPV6_FL_S_ANY + flags = IPV6_FL_F_CREATE + pad = '\x00' * 4 + return struct.pack(fmt, addr, label, action, share, flags, 0, 0, pad) + + +def SetFlowLabel(sock, addr, label): + opt = MakeFlowLabelOption("2001:db8::f", 0x1234) + s.setsockopt(SOL_IPV6, IPV6_FLOWLABEL_MGR, opt) + s.setsockopt(SOL_IPV6, IPV6_FLOWINFO_SEND, 1) + + +# Determine IPv6 configuration. +try: + GetDefaultRoute(version=6) + HAVE_IPV6 = True +except ValueError: + HAVE_IPV6 = False +HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6") + +try: + s = IPv4RawSocket(IPPROTO_ICMP) + HAVE_CAP_NET_RAW = True +except error: # Actually socket.error, because we import * from socket. + HAVE_CAP_NET_RAW = False + + +class NetworkTest(unittest.TestCase): + + def assertRaisesErrno(self, err_num, f, *args): + msg = os.strerror(err_num) + self.assertRaisesRegexp(EnvironmentError, msg, f, *args) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/net_test/net_test.sh b/tests/net_test/net_test.sh new file mode 100755 index 00000000..acac6602 --- /dev/null +++ b/tests/net_test/net_test.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# In case IPv6 is compiled as a module. +[ -f /proc/net/if_inet6 ] || insmod $DIR/kernel/net-next/net/ipv6/ipv6.ko + +# Minimal network setup. +ip link set lo up +ip link set lo mtu 16436 +ip link set eth0 up + +# Allow people to run ping. +echo "0 65536" > /proc/sys/net/ipv4/ping_group_range + +# Fall out to a shell once the test completes or if there's an error. +trap "exec /bin/bash" ERR EXIT + +# Find and run the test. +test=$(cat /proc/cmdline | sed -re 's/.*net_test=([^ ]*).*/\1/g') +echo -e "Running $test\n" +$test diff --git a/tests/net_test/ping6_test.py b/tests/net_test/ping6_test.py new file mode 100755 index 00000000..366b7243 --- /dev/null +++ b/tests/net_test/ping6_test.py @@ -0,0 +1,459 @@ +#!/usr/bin/python + +import errno +import os +import re +import posix +import unittest +from socket import * + +import net_test + + +class Ping6Test(net_test.NetworkTest): + + def setUp(self): + if net_test.HAVE_IPV6: + self.ifname = net_test.GetDefaultRouteInterface() + self.ifindex = net_test.GetInterfaceIndex(self.ifname) + self.lladdr = net_test.GetLinkAddress(self.ifname, True) + self.globaladdr = net_test.GetLinkAddress(self.ifname, False) + + def assertValidPingResponse(self, s, data): + family = s.family + + # Check the data being sent is valid. + self.assertGreater(len(data), 7, "Not enough data for ping packet") + if family == AF_INET: + self.assertTrue(data.startswith('\x08\x00'), "Not an IPv4 echo request") + elif family == AF_INET6: + self.assertTrue(data.startswith('\x80\x00'), "Not an IPv4 echo request") + else: + self.fail("Unknown socket address family %d", s.family) + + # Receive the reply. + rcvd, src = s.recvfrom(32768) + self.assertNotEqual(0, len(rcvd), "No data received") + + # Check address, ICMP type, and ICMP code. + if family == AF_INET: + addr, port = src + self.assertGreaterEqual(len(addr), len("1.1.1.1")) + self.assertTrue(rcvd.startswith('\x00\x00'), "Not an IPv4 echo reply") + else: + addr, port, flowlabel, scope_id = src + self.assertGreaterEqual(len(addr), len("::")) + self.assertTrue(rcvd.startswith('\x81\x00'), "Not an IPv6 echo reply") + # Check that the flow label is zero and that the scope ID is sane. + self.assertEqual(flowlabel, 0) + self.assertLess(scope_id, 100) + + # TODO: check the checksum. We can't do this easily now for ICMPv6 because + # we don't have the IP addresses so we can't construct the pseudoheader. + + # Check the sequence number and the data. + self.assertEqual(len(data), len(rcvd)) + self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) + + def ReadProcNetSocket(self, protocol): + # Read file. + lines = open("/proc/net/%s" % protocol).readlines() + + # Possibly check, and strip, header. + if protocol in ["icmp6", "raw6", "udp6"]: + self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0]) + lines = lines[1:] + + # Check contents. + if (protocol.endswith("6")): + addrlen = 32 + else: + addrlen = 8 + regexp = re.compile(" *(\d+): " # bucket + "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port + "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port + "([0-9A-F][0-9A-F]) " # state + "([0-9A-F]{8}:[0-9A-F]{8}) " # mem + "([0-9A-F]{2}:[0-9A-F]{8}) " # ? + "([0-9A-F]{8}) +" # ? + "([0-9]+) +" # uid + "([0-9]+) +" # ? + "([0-9]+) +" # inode + "([0-9]+) +" # refcnt + "([0-9a-f]+) +" # sp + "([0-9]+) *$" # drops, icmp has spaces + % (addrlen, addrlen)) + # Return a list of lists with only source / dest addresses for now. + out = [] + for line in lines: + (_, src, dst, state, mem, + _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups() + out.append([src, dst, state, mem, uid, refcnt, drops]) + return out + + def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, + txmem=0, rxmem=0): + expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), + "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport), + "%02X" % state, + "%08X:%08X" % (txmem, rxmem), + str(os.getuid()), "2", "0"] + actual = self.ReadProcNetSocket(name)[-1] + self.assertListEqual(expected, actual) + + def testIPv4SendWithNoConnection(self): + s = net_test.IPv4PingSocket() + self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6SendWithNoConnection(self): + s = net_test.IPv6PingSocket() + self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING) + + def testIPv4LoopbackPingWithConnect(self): + s = net_test.IPv4PingSocket() + s.connect(("127.0.0.1", 55)) + data = net_test.IPV4_PING + "foobarbaz" + s.send(data) + self.assertValidPingResponse(s, data) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6LoopbackPingWithConnect(self): + s = net_test.IPv6PingSocket() + s.connect(("::1", 55)) + s.send(net_test.IPV6_PING) + self.assertValidPingResponse(s, net_test.IPV6_PING) + + def testIPv4PingUsingSendto(self): + s = net_test.IPv4PingSocket() + written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) + self.assertEquals(len(net_test.IPV4_PING), written) + self.assertValidPingResponse(s, net_test.IPV4_PING) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6PingUsingSendto(self): + s = net_test.IPv6PingSocket() + written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) + self.assertEquals(len(net_test.IPV6_PING), written) + self.assertValidPingResponse(s, net_test.IPV6_PING) + + def testIPv4NoCrash(self): + # Python 2.x does not provide either read() or recvmsg. + s = net_test.IPv4PingSocket() + written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55)) + self.assertEquals(len(net_test.IPV4_PING), written) + fd = s.fileno() + reply = posix.read(fd, 4096) + self.assertEquals(written, len(reply)) + + def testIPv6NoCrash(self): + # Python 2.x does not provide either read() or recvmsg. + s = net_test.IPv6PingSocket() + written = s.sendto(net_test.IPV6_PING, ("::1", 55)) + self.assertEquals(len(net_test.IPV6_PING), written) + fd = s.fileno() + reply = posix.read(fd, 4096) + self.assertEquals(written, len(reply)) + + def testIPv4Bind(self): + # Bind to unspecified address. + s = net_test.IPv4PingSocket() + s.bind(("0.0.0.0", 544)) + self.assertEquals(("0.0.0.0", 544), s.getsockname()) + + # Bind to loopback. + s = net_test.IPv4PingSocket() + s.bind(("127.0.0.1", 99)) + self.assertEquals(("127.0.0.1", 99), s.getsockname()) + + # Binding twice is not allowed. + self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22)) + + # But binding two different sockets to the same ID is allowed. + s2 = net_test.IPv4PingSocket() + s2.bind(("127.0.0.1", 99)) + self.assertEquals(("127.0.0.1", 99), s2.getsockname()) + s3 = net_test.IPv4PingSocket() + s3.bind(("127.0.0.1", 99)) + self.assertEquals(("127.0.0.1", 99), s3.getsockname()) + + # If two sockets bind to the same port, the first one to call read() gets + # the response. + s4 = net_test.IPv4PingSocket() + s5 = net_test.IPv4PingSocket() + s4.bind(("0.0.0.0", 167)) + s5.bind(("0.0.0.0", 167)) + s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44)) + self.assertValidPingResponse(s5, net_test.IPV4_PING) + net_test.SetSocketTimeout(s4, 100) + self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768) + + # If SO_REUSEADDR is turned off, then we get EADDRINUSE. + s6 = net_test.IPv4PingSocket() + s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0) + self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167)) + + # Can't bind after sendto. + s = net_test.IPv4PingSocket() + s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132)) + self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429)) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6Bind(self): + # Bind to unspecified address. + s = net_test.IPv6PingSocket() + s.bind(("::", 769)) + self.assertEquals(("::", 769, 0, 0), s.getsockname()) + + # Bind to loopback. + s = net_test.IPv6PingSocket() + s.bind(("::1", 99)) + self.assertEquals(("::1", 99, 0, 0), s.getsockname()) + + # Binding twice is not allowed. + self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22)) + + # But binding two different sockets to the same ID is allowed. + s2 = net_test.IPv6PingSocket() + s2.bind(("::1", 99)) + self.assertEquals(("::1", 99, 0, 0), s2.getsockname()) + s3 = net_test.IPv6PingSocket() + s3.bind(("::1", 99)) + self.assertEquals(("::1", 99, 0, 0), s3.getsockname()) + + # Binding both IPv4 and IPv6 to the same socket works. + s4 = net_test.IPv4PingSocket() + s6 = net_test.IPv6PingSocket() + s4.bind(("0.0.0.0", 444)) + s6.bind(("::", 666, 0, 0)) + + # Can't bind after sendto. + s = net_test.IPv6PingSocket() + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132)) + self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429)) + + def testIPv4InvalidBind(self): + s = net_test.IPv4PingSocket() + self.assertRaisesErrno(errno.EADDRNOTAVAIL, + s.bind, ("255.255.255.255", 1026)) + self.assertRaisesErrno(errno.EADDRNOTAVAIL, + s.bind, ("224.0.0.1", 651)) + # Binding to an address we don't have only works with net_test.IP_TRANSPARENT. + self.assertRaisesErrno(errno.EADDRNOTAVAIL, + s.bind, (net_test.IPV4_ADDR, 651)) + try: + s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1) + s.bind((net_test.IPV4_ADDR, 651)) + except IOError, e: + if e.errno == errno.EACCES: + pass # We're not root. let it go for now. + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6InvalidBind(self): + s = net_test.IPv6PingSocket() + self.assertRaisesErrno(errno.EINVAL, + s.bind, ("ff02::2", 1026)) + + # Binding to an address we don't have only works with IPV6_TRANSPARENT. + self.assertRaisesErrno(errno.EADDRNOTAVAIL, + s.bind, (net_test.IPV6_ADDR, 651)) + try: + s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1) + s.bind((net_test.IPV6_ADDR, 651)) + except IOError, e: + if e.errno == errno.EACCES: + pass # We're not root. let it go for now. + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6ScopedBind(self): + # Can't bind to a link-local address without a scope ID. + s = net_test.IPv6PingSocket() + self.assertRaisesErrno(errno.EINVAL, + s.bind, (self.lladdr, 1026, 0, 0)) + + # Binding to a link-local address with a scope ID works, and the scope ID is + # returned by a subsequent getsockname. Interestingly, Python's getsockname + # returns "fe80:1%foo", even though it does not understand it. + expected = self.lladdr + "%" + self.ifname + s.bind((self.lladdr, 4646, 0, self.ifindex)) + self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname()) + + # Of course, for the above to work the address actually has to be configured + # on the machine. + self.assertRaisesErrno(errno.EADDRNOTAVAIL, + s.bind, ("fe80::f00", 1026, 0, 1)) + + # Scope IDs on non-link-local addresses are silently ignored. + s = net_test.IPv6PingSocket() + s.bind(("::1", 1234, 0, 1)) + self.assertEquals(("::1", 1234, 0, 0), s.getsockname()) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testBindAffectsIdentifier(self): + s = net_test.IPv6PingSocket() + s.bind((self.globaladdr, 0xf976)) + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) + self.assertEquals("\xf9\x76", s.recv(32768)[4:6]) + + s = net_test.IPv6PingSocket() + s.bind((self.globaladdr, 0xace)) + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) + self.assertEquals("\x0a\xce", s.recv(32768)[4:6]) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testLinkLocalAddress(self): + s = net_test.IPv6PingSocket() + # Sending to a link-local address with no scope fails with EINVAL. + msg = os.strerror(errno.EINVAL) + self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, ("fe80::1", 55)) + # Sending to link-local address with a scope succeeds. Note that Python + # doesn't understand the "fe80::1%lo" format, even though it returns it. + s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex)) + # No exceptions? Good. + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testMappedAddressFails(self): + s = net_test.IPv6PingSocket() + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) + self.assertValidPingResponse(s, net_test.IPV6_PING) + s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55)) + self.assertValidPingResponse(s, net_test.IPV6_PING) + msg = os.strerror(errno.EINVAL) + self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, + ("::ffff:192.0.2.1", 55)) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testFlowLabel(self): + s = net_test.IPv6PingSocket() + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) + self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks the flow label is 0. + + s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1) + self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND)) + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0)) + rcvd, src = s.recvfrom(32768) + addr, port, flowlabel, scope_id = src + self.assertEqual(0, flowlabel & 0xfffff) + + def testIPv4Error(self): + s = net_test.IPv4PingSocket() + s.setsockopt(SOL_IP, IP_TTL, 2) + s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1) + s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55)) + # We can't check the actual error because Python 2.7 doesn't implement + # recvmsg, but we can at least check that the socket returns an error. + self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6Error(self): + s = net_test.IPv6PingSocket() + s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2) + s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1) + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55)) + # We can't check the actual error because Python 2.7 doesn't implement + # recvmsg, but we can at least check that the socket returns an error. + self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response. + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6MulticastPing(self): + s = net_test.IPv6PingSocket() + # Send a multicast ping and check we get at least one duplicate. + s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex)) + self.assertValidPingResponse(s, net_test.IPV6_PING) + self.assertValidPingResponse(s, net_test.IPV6_PING) + + def testIPv4LargePacket(self): + s = net_test.IPv4PingSocket() + data = net_test.IPV4_PING + 20000 * "a" + s.sendto(data, ("127.0.0.1", 987)) + self.assertValidPingResponse(s, data) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testIPv6LargePacket(self): + s = net_test.IPv6PingSocket() + s.bind(("::", 0xace)) + data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa" + s.sendto(data, ("::1", 953)) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + @unittest.skipUnless(net_test.HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") + def testIcmpSocketsNotInIcmp6(self): + numrows = len(self.ReadProcNetSocket("icmp")) + numrows6 = len(self.ReadProcNetSocket("icmp6")) + s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) + s.bind(("127.0.0.1", 0xace)) + s.connect(("127.0.0.1", 0xbeef)) + self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp"))) + self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + @unittest.skipUnless(net_test.HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") + def testIcmp6SocketsNotInIcmp(self): + numrows = len(self.ReadProcNetSocket("icmp")) + numrows6 = len(self.ReadProcNetSocket("icmp6")) + s = net_test.IPv6PingSocket() + s.bind(("::1", 0xace)) + s.connect(("::1", 0xbeef)) + self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp"))) + self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6"))) + + def testProcNetIcmp(self): + s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP) + s.bind(("127.0.0.1", 0xace)) + s.connect(("127.0.0.1", 0xbeef)) + self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + @unittest.skipUnless(net_test.HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6") + def testProcNetIcmp6(self): + numrows6 = len(self.ReadProcNetSocket("icmp6")) + s = net_test.IPv6PingSocket() + s.bind(("::1", 0xace)) + s.connect(("::1", 0xbeef)) + self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1) + + # Check the row goes away when the socket is closed. + s.close() + self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6"))) + + # Try send, bind and connect to check the addresses and the state. + s = net_test.IPv6PingSocket() + self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) + s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345)) + self.assertEqual(1, len(self.ReadProcNetSocket("icmp6"))) + + # Can't bind after sendto, apparently. + s = net_test.IPv6PingSocket() + self.assertEqual(0, len(self.ReadProcNetSocket("icmp6"))) + s.bind((self.lladdr, 0xd00d, 0, self.ifindex)) + self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7) + + # Check receive bytes. + s.connect(("ff02::1", 0xdead)) + self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1) + s.send(net_test.IPV6_PING) + #time.sleep(0.1) + self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, + txmem=0, rxmem=0x880) + self.assertValidPingResponse(s, net_test.IPV6_PING) + self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1, + txmem=0, rxmem=0) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testProcNetUdp6(self): + s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP) + s.bind(("::1", 0xace)) + s.connect(("::1", 0xbeef)) + self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1) + + @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6") + def testProcNetRaw6(self): + s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW) + s.bind(("::1", 0xace)) + s.connect(("::1", 0xbeef)) + self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1) + + +if __name__ == "__main__": + unittest.main() + diff --git a/tests/net_test/ping6_test.sh b/tests/net_test/ping6_test.sh new file mode 100755 index 00000000..41dabcea --- /dev/null +++ b/tests/net_test/ping6_test.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# Minimal network initialization. +ip link set eth0 up + +# Wait for autoconf and DAD to complete. +sleep 3 & + +# Block on starting DHCPv4. +udhcpc -i eth0 + +# If DHCPv4 took less than 3 seconds, keep waiting. +wait + +# Run the test. +$(dirname $0)/ping6_test.py diff --git a/tests/net_test/run_net_test.sh b/tests/net_test/run_net_test.sh new file mode 100755 index 00000000..d896175d --- /dev/null +++ b/tests/net_test/run_net_test.sh @@ -0,0 +1,76 @@ +#!/bin/bash + +# Kernel configration options. +OPTIONS=" IPV6 IPV6_ROUTER_PREF IPV6_MULTIPLE_TABLES IPV6_ROUTE_INFO" +OPTIONS="$OPTIONS TUN IP_ADVANCED_ROUTER IP_MULTIPLE_TABLES" +OPTIONS="$OPTIONS NETFILTER NETFILTER_ADVANCED NETFILTER_XTABLES" +OPTIONS="$OPTIONS NETFILTER_XT_MARK NETFILTER_XT_TARGET_MARK" +OPTIONS="$OPTIONS IP_NF_IPTABLES IP_NF_MANGLE" +OPTIONS="$OPTIONS IP6_NF_IPTABLES IP6_NF_MANGLE" + +# How many tap interfaces to create. +NUMTAPINTERFACES=2 + +# The root filesystem disk image we'll use. +ROOTFS=$(dirname $0)/net_test.rootfs + +# Figure out which test to run. +if [ -z "$1" ]; then + echo "Usage: $0 <test>" >&2 + exit 1 +fi +test=$1 + +set -e + +# Check if we need to uncompress the disk image. +# We use xz because it compresses better: to 42M vs 72M (gzip) / 62M (bzip2). +if [ $ROOTFS.xz -nt $ROOTFS ]; then + echo "Uncompressing $ROOTFS.xz" >&2 + unxz --keep $ROOTFS.xz +fi + + +# Create NUMTAPINTERFACES tap interfaces on the host, and prepare UML command +# line params to use them. The interfaces are called <user>TAP0, <user>TAP1, +# ..., on the host, and eth0, eth1, ..., in the VM. +user=${USER:0:10} +tapinterfaces= +netconfig= +for id in $(seq 0 $(( NUMTAPINTERFACES - 1 )) ); do + tap=${user}TAP$id + tapinterfaces="$tapinterfaces $tap" + mac=$(printf fe:fd:00:00:00:%02x $id) + netconfig="$netconfig eth$id=tuntap,$tap,$mac" +done + +for tap in $tapinterfaces; do + if ! ip link list $tap > /dev/null; then + echo "Creating tap interface $tap" >&2 + sudo tunctl -u $USER -t $tap + sudo ip link set $tap up + fi +done + +# Set kernel compilation options for make. +export ARCH=um +export SUBARCH=x86_64 + +# If there's no kernel config at all, create one or UML won't work. +[ -f .config ] || make defconfig + +# Enable the kernel config options listed in $OPTIONS. +cmdline=${OPTIONS// / -e } +./scripts/config $cmdline +make olddefconfig + +# Compile the kernel. +make -j12 linux + +# Get the absolute path to the test file that's being run. +dir=/host$(dirname $(readlink -f $0)) + +# Start the VM. +exec ./linux umid=net_test ubda=$(dirname $0)/net_test.rootfs \ + mem=512M init=/sbin/net_test.sh net_test=$dir/$test \ + $netconfig |