summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenzo Colitti <lorenzo@google.com>2014-03-27 20:10:14 +0900
committerLorenzo Colitti <lorenzo@google.com>2015-02-02 17:47:24 +0900
commit6ef3e4fdfd823b08aca5ea2c2368e956b5e0eaea (patch)
tree60182c5dc9b112003bc36ef357fc23d88d781bd1
parent22669b72523d4aee0f5756515c42df82ed67eda7 (diff)
downloadextras-6ef3e4fdfd823b08aca5ea2c2368e956b5e0eaea.tar.gz
Kernel networking test scripts using UML.
Change-Id: I28a9901e01a53e3dde551b6c8abf6391fe4354a4
-rwxr-xr-xtests/net_test/mark_test.py427
-rwxr-xr-xtests/net_test/net_test.py238
-rwxr-xr-xtests/net_test/net_test.sh20
-rwxr-xr-xtests/net_test/ping6_test.py459
-rwxr-xr-xtests/net_test/ping6_test.sh16
-rwxr-xr-xtests/net_test/run_net_test.sh76
6 files changed, 1236 insertions, 0 deletions
diff --git a/tests/net_test/mark_test.py b/tests/net_test/mark_test.py
new file mode 100755
index 00000000..edfa4765
--- /dev/null
+++ b/tests/net_test/mark_test.py
@@ -0,0 +1,427 @@
+#!/usr/bin/python
+
+import fcntl
+import errno
+import os
+import posix
+import struct
+import time
+import unittest
+from scapy import all as scapy
+from socket import *
+
+import net_test
+
+
+IFF_TUN = 1
+IFF_TAP = 2
+IFF_NO_PI = 0x1000
+TUNSETIFF = 0x400454ca
+
+AUTOCONF_TABLE_SYSCTL = "/proc/sys/net/ipv6/route/autoconf_table_offset"
+
+class ConfigurationError(AssertionError):
+ pass
+
+
+class UnexpectedPacketError(AssertionError):
+ pass
+
+
+class Packets(object):
+
+ @staticmethod
+ def _GetIpLayer(version):
+ return {4: scapy.IP, 6: scapy.IPv6}[version]
+
+ @staticmethod
+ def _SetPacketTos(packet, tos):
+ if isinstance(packet, scapy.IPv6):
+ packet.tc = tos
+ elif isinstance(packet, scapy.IP):
+ packet.tos = tos
+ else:
+ raise ValueError("Can't find ToS Field")
+
+ @classmethod
+ def UdpPacket(self, version, srcaddr, dstaddr):
+ ip = self._GetIpLayer(version)
+ return ("UDPv%d packet" % version,
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.UDP(sport=999, dport=1234) / "hello")
+
+ @classmethod
+ def SYN(self, port, version, srcaddr, dstaddr):
+ ip = self._GetIpLayer(version)
+ return ("TCP SYN",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=50999, dport=port, seq=1692871236, ack=0,
+ flags=2, window=14400))
+
+ @classmethod
+ def RST(self, version, srcaddr, dstaddr, packet):
+ ip = self._GetIpLayer(version)
+ original = packet.getlayer("TCP")
+ return ("TCP RST",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=original.dport, dport=original.sport,
+ ack=original.seq + 1, seq=None,
+ flags=20, window=None))
+
+ @classmethod
+ def SYNACK(self, version, srcaddr, dstaddr, packet):
+ ip = self._GetIpLayer(version)
+ original = packet.getlayer("TCP")
+ return ("TCP SYN+ACK",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=original.dport, dport=original.sport,
+ ack=original.seq + 1, seq=None,
+ flags=18, window=None))
+
+ @classmethod
+ def ICMPPortUnreachable(self, version, srcaddr, dstaddr, packet):
+ if version == 4:
+ # Linux hardcodes the ToS on ICMP errors to 0xc0 or greater because of
+ # RFC 1812 4.3.2.5 (!).
+ return ("ICMPv4 port unreachable",
+ scapy.IP(src=srcaddr, dst=dstaddr, proto=1, tos=0xc0) /
+ scapy.ICMPerror(type=3, code=3) / packet)
+ else:
+ return ("ICMPv6 port unreachable",
+ scapy.IPv6(src=srcaddr, dst=dstaddr) /
+ scapy.ICMPv6DestUnreach(code=4) / packet)
+
+ @classmethod
+ def ICMPEcho(self, version, srcaddr, dstaddr):
+ ip = self._GetIpLayer(version)
+ icmp = {4: scapy.ICMP, 6: scapy.ICMPv6EchoRequest}[version]
+ packet = (ip(src=srcaddr, dst=dstaddr) /
+ icmp(id=0xff19, seq=3) / "foobarbaz")
+ self._SetPacketTos(packet, 0x83)
+ return ("ICMPv%d echo" % version, packet)
+
+ @classmethod
+ def ICMPReply(self, version, srcaddr, dstaddr, packet, tos=None):
+ ip = self._GetIpLayer(version)
+
+ # Scapy doesn't provide an ICMP echo reply constructor.
+ icmpv4_reply = lambda **kwargs: scapy.ICMP(type=0, **kwargs)
+ icmp = {4: icmpv4_reply, 6: scapy.ICMPv6EchoReply}[version]
+ packet = (ip(src=srcaddr, dst=dstaddr) /
+ icmp(id=0xff19, seq=3) / "foobarbaz")
+ self._SetPacketTos(packet, 0x83)
+ return ("ICMPv%d echo" % version, packet)
+
+
+class MarkTest(net_test.NetworkTest):
+
+ NETIDS = [100, 200]
+
+ @staticmethod
+ def _RouterMacAddress(netid):
+ return "02:00:00:00:%02x:00" % netid
+
+ @staticmethod
+ def _MyMacAddress(netid):
+ return "02:00:00:00:%02x:01" % netid
+
+ @staticmethod
+ def _RouterAddress(netid, version):
+ if version == 6:
+ return "fe80::%02x00" % netid
+ elif version == 4:
+ return "10.0.%d.1" % netid
+ else:
+ raise ValueError("Don't support IPv%s" % version)
+
+ @staticmethod
+ def _MyIPv4Address(netid):
+ return "10.0.%d.2" % netid
+
+ @classmethod
+ def _CreateTunInterface(self, netid):
+ iface = self._GetInterfaceName(netid)
+ f = open("/dev/net/tun", "r+b")
+ ifr = struct.pack("16sH", iface, IFF_TAP | IFF_NO_PI)
+ ifr = ifr + "\x00" * (40 - len(ifr))
+ fcntl.ioctl(f, TUNSETIFF, ifr)
+ # Give ourselves a predictable MAC address.
+ macaddr = self._MyMacAddress(netid)
+ net_test.SetInterfaceHWAddr(iface, self._MyMacAddress(netid))
+ # Disable DAD so we don't have to wait for it.
+ open("/proc/sys/net/ipv6/conf/%s/dad_transmits" % iface, "w").write("0")
+ net_test.SetInterfaceUp(iface)
+ net_test.SetNonBlocking(f)
+ return f
+
+ @staticmethod
+ def _GetInterfaceName(netid):
+ return "nettest%d" % netid
+
+ @classmethod
+ def _SendRA(self, netid):
+ validity = 300 # seconds
+ validity_ms = validity * 1000 # milliseconds
+ macaddr = self._RouterMacAddress(netid)
+ lladdr = self._RouterAddress(netid, 6)
+ ra = (scapy.Ether(src=macaddr, dst="33:33:00:00:00:01") /
+ scapy.IPv6(src=lladdr, hlim=255) /
+ scapy.ICMPv6ND_RA(retranstimer=validity_ms,
+ routerlifetime=validity) /
+ scapy.ICMPv6NDOptSrcLLAddr(lladdr=macaddr) /
+ scapy.ICMPv6NDOptPrefixInfo(prefix="2001:db8:%d::" % netid,
+ prefixlen=64,
+ L=1, A=1,
+ validlifetime=validity,
+ preferredlifetime=validity))
+ posix.write(self.tuns[netid].fileno(), str(ra))
+
+ COMMANDS = [
+ "/sbin/%(iptables)s %(append_delete)s INPUT -t mangle -i %(iface)s"
+ " -j MARK --set-mark %(netid)d",
+ "ip -%(version)d rule %(add_del)s fwmark %(netid)s lookup %(table)s",
+ ]
+ ROUTE_COMMANDS = [
+ "ip -%(version)d route %(add_del)s table %(table)s"
+ " default dev %(iface)s via %(router)s",
+ ]
+ IPV4_COMMANDS = [
+ "ip -4 nei %(add_del)s %(router)s dev %(iface)s"
+ " lladdr %(macaddr)s nud permanent",
+ "ip -4 addr %(add_del)s 10.0.%(netid)d.2/24 dev %(iface)s",
+ ]
+
+ @classmethod
+ def _RunSetupCommands(self, netid, is_add):
+ iface = self._GetInterfaceName(netid)
+ for version, iptables in zip([4, 6], ["iptables", "ip6tables"]):
+
+ if version == 6:
+ cmds = self.COMMANDS
+ if self.AUTOCONF_TABLE_OFFSET < 0:
+ # Set up routing manually.
+ cmds += self.ROUTE_COMMANDS
+
+ if version == 4:
+ # Deleting addresses also causes routes to be deleted, so watch the
+ # order or the test will output lots of ENOENT errors.
+ if is_add:
+ cmds = self.COMMANDS + self.IPV4_COMMANDS + self.ROUTE_COMMANDS
+ else:
+ cmds = self.COMMANDS + self.ROUTE_COMMANDS + self.IPV4_COMMANDS
+
+ cmds = str("\n".join(cmds) % {
+ "add_del": "add" if is_add else "del",
+ "append_delete": "-A" if is_add else "-D",
+ "iface": iface,
+ "iptables": iptables,
+ "ipv4addr": self._MyIPv4Address(netid),
+ "macaddr": self._RouterMacAddress(netid),
+ "netid": netid,
+ "router": self._RouterAddress(netid, version),
+ "table": self._TableForNetid(netid),
+ "version": version,
+ }).split("\n")
+ for cmd in cmds:
+ cmd = cmd.split(" ")
+ #print cmd
+ ret = os.spawnvp(os.P_WAIT, cmd[0], cmd)
+ if ret:
+ raise ConfigurationError("Setup command failed: %s" % " ".join(cmd))
+
+ @classmethod
+ def _SetAutoconfTableSysctl(self, offset):
+ try:
+ open(AUTOCONF_TABLE_SYSCTL, "w").write(str(offset))
+ self.AUTOCONF_TABLE_OFFSET = offset
+ except IOError:
+ self.AUTOCONF_TABLE_OFFSET = -1
+
+ @classmethod
+ def _TableForNetid(self, netid):
+ if self.AUTOCONF_TABLE_OFFSET >= 0:
+ return self.ifindices[netid] + self.AUTOCONF_TABLE_OFFSET
+ else:
+ return netid
+
+ @classmethod
+ def setUpClass(self):
+ self.tuns = {}
+ self.ifindices = {}
+ self._SetAutoconfTableSysctl(1000)
+ for netid in self.NETIDS:
+ self.tuns[netid] = self._CreateTunInterface(netid)
+
+ iface = self._GetInterfaceName(netid)
+ self.ifindices[netid] = net_test.GetInterfaceIndex(iface)
+
+ self._SendRA(netid)
+ self._RunSetupCommands(netid, True)
+
+ # Open a port so we can observe SYN+ACKs. Since it's a dual-stack socket it
+ # will accept both IPv4 and IPv6 connections. We do this here instead of in
+ # each test so we can use the same socket every time. That way, if a kernel
+ # bug causes incoming packets to mark the listening socket instead of the
+ # accepted socket, the test will fail as soon as the next address/interface
+ # combination is tried.
+ self.listenport = 1234
+ self.listensocket = net_test.IPv6TCPSocket()
+ self.listensocket.bind(("::", self.listenport))
+ self.listensocket.listen(100)
+
+ # Give time for unknown things to settle down.
+ time.sleep(0.5)
+ # Uncomment to look around at interface and rule configuration while
+ # running in the background. (Once the test finishes running, all the
+ # interfaces and rules are gone.)
+ #time.sleep(30)
+
+ @classmethod
+ def tearDownClass(self):
+ for netid in self.tuns:
+ self._RunSetupCommands(netid, False)
+ self.tuns[netid].close()
+
+ def CheckExpectedPacket(self, expected, actual, msg):
+ # Remove the Ethernet header from the incoming packet.
+ actual = scapy.Ether(actual).payload
+
+ # Blank out IPv4 fields that we can't predict, like ID and the DF bit.
+ actualip = actual.getlayer("IP")
+ expectedip = expected.getlayer("IP")
+ if actualip and expectedip:
+ actualip.id = expectedip.id
+ actualip.flags &= 5
+ actualip.chksum = None # Change the header, recalculate the checksum.
+
+ # Blank out TCP fields that we can't predict.
+ actualtcp = actual.getlayer("TCP")
+ expectedtcp = expected.getlayer("TCP")
+ if actualtcp and expectedtcp:
+ actualtcp.dataofs = expectedtcp.dataofs
+ actualtcp.options = expectedtcp.options
+ actualtcp.window = expectedtcp.window
+ if expectedtcp.seq is None:
+ actualtcp.seq = None
+ if expectedtcp.ack is None:
+ actualtcp.ack = None
+ actualtcp.chksum = None
+
+ # Serialize the packet so:
+ # - Expected packet fields that are only set when a packet is serialized
+ # (e.g., the checksum) are filled in.
+ # - The packet is readable. Scapy has detailed dissection capabilities,
+ # but they only seem to be usable to print the packet, not return its
+ # dissection as a string.
+ # TODO: Check if this is true.
+ self.assertMultiLineEqual(str(expected).encode("hex"),
+ str(actual).encode("hex"))
+
+ def ExpectPacketOn(self, netid, msg, expected):
+ try:
+ actual = self.tuns[netid].read(4096)
+ except IOError, e:
+ raise AssertionError(msg + ": " + str(e))
+
+ self.assertTrue(actual)
+ if expected:
+ self.CheckExpectedPacket(expected, actual, msg)
+
+ def assertNoPacketsOn(self, netids, msg):
+ for netid in netids:
+ try:
+ self.assertRaisesErrno(errno.EAGAIN, self.tuns[netid].read, 4096)
+ except AssertionError, e:
+ raise UnexpectedPacketError("%s: Unexpected packet on %s" % (
+ msg, self._GetInterfaceName(netid)))
+
+ def assertNoOtherPackets(self, msg):
+ self.assertNoPacketsOn([netid for netid in self.tuns], msg)
+
+ def assertNoPacketsExceptOn(self, netid, msg):
+ self.assertNoPacketsOn([n for n in self.tuns if n != netid], msg)
+
+ def ReceivePacketOn(self, netid, ip_packet):
+ routermac = self._RouterMacAddress(netid)
+ mymac = self._MyMacAddress(netid)
+ packet = scapy.Ether(src=routermac, dst=mymac) / ip_packet
+ posix.write(self.tuns[netid].fileno(), str(packet))
+
+ def ClearTunQueues(self):
+ for f in self.tuns.values():
+ try:
+ f.read(4096)
+ except IOError:
+ continue
+ self.assertNoOtherPackets("Unexpected packets after clearing queues")
+
+ def setUp(self):
+ self.ClearTunQueues()
+
+ @staticmethod
+ def _GetRemoteAddress(version):
+ return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
+
+ def CheckReflection(self, version, packet_generator, reply_generator):
+ # Test packets addressed to the IP addresses of all our interfaces...
+ for dest_ip_netid in self.tuns:
+ dest_ip_iface = self._GetInterfaceName(dest_ip_netid)
+
+ if version == 4:
+ myaddr = self._MyIPv4Address(dest_ip_netid)
+ else:
+ myaddr = net_test.GetLinkAddress(self._GetInterfaceName(dest_ip_netid),
+ False)
+ remote_addr = self._GetRemoteAddress(version)
+
+ # ... coming in on all our interfaces.
+ for iif_netid in self.tuns:
+ iif = self._GetInterfaceName(iif_netid)
+ desc, packet = packet_generator(version, remote_addr, myaddr)
+ if reply_generator:
+ # We know what we want a reply to.
+ reply_desc, reply = reply_generator(version, myaddr, remote_addr,
+ packet)
+ else:
+ # Expect any reply.
+ reply_desc, reply = "any packet", None
+ msg = "Receiving %s on %s to %s IP: Expecting %s on %s" % (
+ desc, iif, dest_ip_iface, reply_desc, iif)
+
+ # Expect a reply on the interface the original packet came in on.
+ self.ClearTunQueues()
+ self.ReceivePacketOn(iif_netid, packet)
+ self.assertNoPacketsExceptOn(iif_netid, msg)
+ self.ExpectPacketOn(iif_netid, msg, reply)
+
+ def SYNToClosedPort(self, *args):
+ return Packets.SYN(999, *args)
+
+ def SYNToOpenPort(self, *args):
+ return Packets.SYN(self.listenport, *args)
+
+ def testIPv4ICMPErrorsReflectMark(self):
+ self.CheckReflection(4, Packets.UdpPacket, Packets.ICMPPortUnreachable)
+
+ def testIPv6ICMPErrorsReflectMark(self):
+ self.CheckReflection(6, Packets.UdpPacket, Packets.ICMPPortUnreachable)
+
+ def testIPv4PingRepliesReflectMarkAndTos(self):
+ self.CheckReflection(4, Packets.ICMPEcho, Packets.ICMPReply)
+
+ def testIPv6PingRepliesReflectMarkAndTos(self):
+ self.CheckReflection(6, Packets.ICMPEcho, Packets.ICMPReply)
+
+ def testIPv4RSTsReflectMark(self):
+ self.CheckReflection(4, self.SYNToClosedPort, Packets.RST)
+
+ def testIPv6RSTsReflectMark(self):
+ self.CheckReflection(6, self.SYNToClosedPort, Packets.RST)
+
+ @unittest.skipUnless(False, "skipping: doesn't work yet")
+ def testIPv6SYNACKsReflectMark(self):
+ self.CheckReflection(6, Packets.SYNToOpenPort, Packets.SYNACK)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/net_test/net_test.py b/tests/net_test/net_test.py
new file mode 100755
index 00000000..da892f0b
--- /dev/null
+++ b/tests/net_test/net_test.py
@@ -0,0 +1,238 @@
+#!/usr/bin/python
+
+import cProfile
+
+import ctypes
+import ctypes.util
+import errno
+import fcntl
+import os
+import posix
+import re
+import struct
+import time
+import unittest
+
+from socket import *
+from scapy import all as scapy
+
+SOL_IPV6 = 41
+IP_TRANSPARENT = 19
+IPV6_TRANSPARENT = 75
+IP_RECVERR = 11
+IPV6_RECVERR = 25
+SO_BINDTODEVICE = 25
+SO_MARK = 36
+IPV6_FLOWLABEL_MGR = 32
+IPV6_FLOWINFO_SEND = 33
+
+ETH_P_IPV6 = 0x86dd
+
+SIOCSIFHWADDR = 0x8924
+
+IPV6_FL_A_GET = 0
+IPV6_FL_A_PUT = 1
+IPV6_FL_A_RENEW = 1
+
+IPV6_FL_F_CREATE = 1
+IPV6_FL_F_EXCL = 2
+
+IPV6_FL_S_NONE = 0
+IPV6_FL_S_EXCL = 1
+IPV6_FL_S_ANY = 255
+
+IPV4_PING = "\x08\x00\x00\x00\x0a\xce\x00\x03"
+IPV6_PING = "\x80\x00\x00\x00\x0a\xce\x00\x03"
+
+IPV4_ADDR = "8.8.8.8"
+IPV6_ADDR = "2001:4860:4860::8888"
+
+IPV6_SEQ_DGRAM_HEADER = (" sl "
+ "local_address "
+ "remote_address "
+ "st tx_queue rx_queue tr tm->when retrnsmt"
+ " uid timeout inode ref pointer drops\n")
+
+LIBC = ctypes.CDLL(ctypes.util.find_library('c'))
+
+def SetSocketTimeout(sock, ms):
+ s = ms / 1000
+ us = (ms % 1000) * 1000
+ sock.setsockopt(SOL_SOCKET, SO_RCVTIMEO, struct.pack("LL", s, us))
+
+# Convenience functions to create ping sockets.
+def Socket(family, sock_type, protocol):
+ s = socket(family, sock_type, protocol)
+ SetSocketTimeout(s, 1000)
+ return s
+
+def IPv4PingSocket():
+ return Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
+
+def IPv6PingSocket():
+ return Socket(AF_INET6, SOCK_DGRAM, IPPROTO_ICMPV6)
+
+def IPv4TCPSocket():
+ return Socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
+
+def IPv6TCPSocket():
+ return Socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP)
+
+def IPv6PacketSocket():
+ return Socket(AF_PACKET, SOCK_DGRAM, htons(ETH_P_IPV6))
+
+def IPv4RawSocket(protocol):
+ s = Socket(AF_INET, SOCK_RAW, protocol)
+ s.setsockopt(SOL_IP, IP_HDRINCL, 1)
+ return s
+
+def SetNonBlocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+def GetInterfaceIndex(ifname):
+ s = IPv4PingSocket()
+ ifr = struct.pack("16si", ifname, 0)
+ ifr = fcntl.ioctl(s, scapy.SIOCGIFINDEX, ifr)
+ return struct.unpack("16si", ifr)[1]
+
+def SetInterfaceHWAddr(ifname, hwaddr):
+ hwaddr = hwaddr.replace(":", "")
+ hwaddr = hwaddr.decode("hex")
+ if len(hwaddr) != 6:
+ raise ValueError("Unknown hardware address length %d" % len(hwaddr))
+ ifr = struct.pack("16sH6s", ifname, scapy.ARPHDR_ETHER, hwaddr)
+ fcntl.ioctl(s, SIOCSIFHWADDR, ifr)
+
+def SetInterfaceState(ifname, up):
+ s = IPv4PingSocket()
+ ifr = struct.pack("16sH", ifname, 0)
+ ifr = fcntl.ioctl(s, scapy.SIOCGIFFLAGS, ifr)
+ _, flags = struct.unpack("16sH", ifr)
+ if up:
+ flags |= scapy.IFF_UP
+ else:
+ flags &= ~scapy.IFF_UP
+ ifr = struct.pack("16sH", ifname, flags)
+ ifr = fcntl.ioctl(s, scapy.SIOCSIFFLAGS, ifr)
+
+def SetInterfaceUp(ifname):
+ return SetInterfaceState(ifname, True)
+
+def SetInterfaceDown(ifname):
+ return SetInterfaceState(ifname, False)
+
+def FormatProcAddress(unformatted):
+ groups = []
+ for i in xrange(0, len(unformatted), 4):
+ groups.append(unformatted[i:i+4])
+ formatted = ":".join(groups)
+ # Compress the address.
+ address = inet_ntop(AF_INET6, inet_pton(AF_INET6, formatted))
+ return address
+
+
+def FormatSockStatAddress(address):
+ if ":" in address:
+ family = AF_INET6
+ else:
+ family = AF_INET
+ binary = inet_pton(family, address)
+ out = ""
+ for i in xrange(0, len(binary), 4):
+ out += "%08X" % struct.unpack("=L", binary[i:i+4])
+ return out
+
+
+def GetLinkAddress(ifname, linklocal):
+ addresses = open("/proc/net/if_inet6").readlines()
+ for address in addresses:
+ address = [s for s in address.strip().split(" ") if s != ""]
+ if address[5] == ifname:
+ if (linklocal and address[0].startswith("fe80")
+ or not linklocal and not address[0].startswith("fe80")):
+ # Convert the address from raw hex to something with colons in it.
+ return FormatProcAddress(address[0])
+ return None
+
+def GetDefaultRoute(version=6):
+ if version == 6:
+ routes = open("/proc/net/ipv6_route").readlines()
+ for route in routes:
+ route = [s for s in route.strip().split(" ") if s != ""]
+ if (route[0] == "00000000000000000000000000000000" and route[1] == "00"
+ # Routes in non-default tables end up in /proc/net/ipv6_route!!!
+ and route[9] != "lo" and not route[9].startswith("nettest")):
+ return FormatProcAddress(route[4]), route[9]
+ raise ValueError("No IPv6 default route found")
+ elif version == 4:
+ routes = open("/proc/net/route").readlines()
+ for route in routes:
+ route = [s for s in route.strip().split("\t") if s != ""]
+ if (route[1] == "00000000" and route[7] == "00000000"):
+ gw, iface = route[2], route[0]
+ gw = inet_ntop(AF_INET, gw.decode("hex")[::-1])
+ return gw, iface
+ raise ValueError("No IPv4 default route found")
+ else:
+ raise ValueError("Don't know about IPv%s" % version)
+
+def GetDefaultRouteInterface():
+ gw, iface = GetDefaultRoute()
+ return iface
+
+
+def MakeFlowLabelOption(addr, label):
+ # struct in6_flowlabel_req {
+ # struct in6_addr flr_dst;
+ # __be32 flr_label;
+ # __u8 flr_action;
+ # __u8 flr_share;
+ # __u16 flr_flags;
+ # __u16 flr_expires;
+ # __u16 flr_linger;
+ # __u32 __flr_pad;
+ # /* Options in format of IPV6_PKTOPTIONS */
+ # };
+ fmt = "16sIBBHHH4s"; struct.calcsize(fmt)
+ assert struct.calcsize(fmt) == 32
+ addr = inet_pton(AF_INET6, addr)
+ assert len(addr) == 16
+ label = htonl(label & 0xfffff)
+ action = IPV6_FL_A_GET
+ share = IPV6_FL_S_ANY
+ flags = IPV6_FL_F_CREATE
+ pad = '\x00' * 4
+ return struct.pack(fmt, addr, label, action, share, flags, 0, 0, pad)
+
+
+def SetFlowLabel(sock, addr, label):
+ opt = MakeFlowLabelOption("2001:db8::f", 0x1234)
+ s.setsockopt(SOL_IPV6, IPV6_FLOWLABEL_MGR, opt)
+ s.setsockopt(SOL_IPV6, IPV6_FLOWINFO_SEND, 1)
+
+
+# Determine IPv6 configuration.
+try:
+ GetDefaultRoute(version=6)
+ HAVE_IPV6 = True
+except ValueError:
+ HAVE_IPV6 = False
+HAVE_PROC_NET_ICMP6 = os.path.isfile("/proc/net/icmp6")
+
+try:
+ s = IPv4RawSocket(IPPROTO_ICMP)
+ HAVE_CAP_NET_RAW = True
+except error: # Actually socket.error, because we import * from socket.
+ HAVE_CAP_NET_RAW = False
+
+
+class NetworkTest(unittest.TestCase):
+
+ def assertRaisesErrno(self, err_num, f, *args):
+ msg = os.strerror(err_num)
+ self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/net_test/net_test.sh b/tests/net_test/net_test.sh
new file mode 100755
index 00000000..acac6602
--- /dev/null
+++ b/tests/net_test/net_test.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+# In case IPv6 is compiled as a module.
+[ -f /proc/net/if_inet6 ] || insmod $DIR/kernel/net-next/net/ipv6/ipv6.ko
+
+# Minimal network setup.
+ip link set lo up
+ip link set lo mtu 16436
+ip link set eth0 up
+
+# Allow people to run ping.
+echo "0 65536" > /proc/sys/net/ipv4/ping_group_range
+
+# Fall out to a shell once the test completes or if there's an error.
+trap "exec /bin/bash" ERR EXIT
+
+# Find and run the test.
+test=$(cat /proc/cmdline | sed -re 's/.*net_test=([^ ]*).*/\1/g')
+echo -e "Running $test\n"
+$test
diff --git a/tests/net_test/ping6_test.py b/tests/net_test/ping6_test.py
new file mode 100755
index 00000000..366b7243
--- /dev/null
+++ b/tests/net_test/ping6_test.py
@@ -0,0 +1,459 @@
+#!/usr/bin/python
+
+import errno
+import os
+import re
+import posix
+import unittest
+from socket import *
+
+import net_test
+
+
+class Ping6Test(net_test.NetworkTest):
+
+ def setUp(self):
+ if net_test.HAVE_IPV6:
+ self.ifname = net_test.GetDefaultRouteInterface()
+ self.ifindex = net_test.GetInterfaceIndex(self.ifname)
+ self.lladdr = net_test.GetLinkAddress(self.ifname, True)
+ self.globaladdr = net_test.GetLinkAddress(self.ifname, False)
+
+ def assertValidPingResponse(self, s, data):
+ family = s.family
+
+ # Check the data being sent is valid.
+ self.assertGreater(len(data), 7, "Not enough data for ping packet")
+ if family == AF_INET:
+ self.assertTrue(data.startswith('\x08\x00'), "Not an IPv4 echo request")
+ elif family == AF_INET6:
+ self.assertTrue(data.startswith('\x80\x00'), "Not an IPv4 echo request")
+ else:
+ self.fail("Unknown socket address family %d", s.family)
+
+ # Receive the reply.
+ rcvd, src = s.recvfrom(32768)
+ self.assertNotEqual(0, len(rcvd), "No data received")
+
+ # Check address, ICMP type, and ICMP code.
+ if family == AF_INET:
+ addr, port = src
+ self.assertGreaterEqual(len(addr), len("1.1.1.1"))
+ self.assertTrue(rcvd.startswith('\x00\x00'), "Not an IPv4 echo reply")
+ else:
+ addr, port, flowlabel, scope_id = src
+ self.assertGreaterEqual(len(addr), len("::"))
+ self.assertTrue(rcvd.startswith('\x81\x00'), "Not an IPv6 echo reply")
+ # Check that the flow label is zero and that the scope ID is sane.
+ self.assertEqual(flowlabel, 0)
+ self.assertLess(scope_id, 100)
+
+ # TODO: check the checksum. We can't do this easily now for ICMPv6 because
+ # we don't have the IP addresses so we can't construct the pseudoheader.
+
+ # Check the sequence number and the data.
+ self.assertEqual(len(data), len(rcvd))
+ self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
+
+ def ReadProcNetSocket(self, protocol):
+ # Read file.
+ lines = open("/proc/net/%s" % protocol).readlines()
+
+ # Possibly check, and strip, header.
+ if protocol in ["icmp6", "raw6", "udp6"]:
+ self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
+ lines = lines[1:]
+
+ # Check contents.
+ if (protocol.endswith("6")):
+ addrlen = 32
+ else:
+ addrlen = 8
+ regexp = re.compile(" *(\d+): " # bucket
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
+ "([0-9A-F][0-9A-F]) " # state
+ "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
+ "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
+ "([0-9A-F]{8}) +" # ?
+ "([0-9]+) +" # uid
+ "([0-9]+) +" # ?
+ "([0-9]+) +" # inode
+ "([0-9]+) +" # refcnt
+ "([0-9a-f]+) +" # sp
+ "([0-9]+) *$" # drops, icmp has spaces
+ % (addrlen, addrlen))
+ # Return a list of lists with only source / dest addresses for now.
+ out = []
+ for line in lines:
+ (_, src, dst, state, mem,
+ _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
+ out.append([src, dst, state, mem, uid, refcnt, drops])
+ return out
+
+ def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
+ txmem=0, rxmem=0):
+ expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),
+ "%s:%04X" % (net_test.FormatSockStatAddress(dstaddr), dstport),
+ "%02X" % state,
+ "%08X:%08X" % (txmem, rxmem),
+ str(os.getuid()), "2", "0"]
+ actual = self.ReadProcNetSocket(name)[-1]
+ self.assertListEqual(expected, actual)
+
+ def testIPv4SendWithNoConnection(self):
+ s = net_test.IPv4PingSocket()
+ self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV4_PING)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6SendWithNoConnection(self):
+ s = net_test.IPv6PingSocket()
+ self.assertRaisesErrno(errno.EDESTADDRREQ, s.send, net_test.IPV6_PING)
+
+ def testIPv4LoopbackPingWithConnect(self):
+ s = net_test.IPv4PingSocket()
+ s.connect(("127.0.0.1", 55))
+ data = net_test.IPV4_PING + "foobarbaz"
+ s.send(data)
+ self.assertValidPingResponse(s, data)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6LoopbackPingWithConnect(self):
+ s = net_test.IPv6PingSocket()
+ s.connect(("::1", 55))
+ s.send(net_test.IPV6_PING)
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+
+ def testIPv4PingUsingSendto(self):
+ s = net_test.IPv4PingSocket()
+ written = s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
+ self.assertEquals(len(net_test.IPV4_PING), written)
+ self.assertValidPingResponse(s, net_test.IPV4_PING)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6PingUsingSendto(self):
+ s = net_test.IPv6PingSocket()
+ written = s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
+ self.assertEquals(len(net_test.IPV6_PING), written)
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+
+ def testIPv4NoCrash(self):
+ # Python 2.x does not provide either read() or recvmsg.
+ s = net_test.IPv4PingSocket()
+ written = s.sendto(net_test.IPV4_PING, ("127.0.0.1", 55))
+ self.assertEquals(len(net_test.IPV4_PING), written)
+ fd = s.fileno()
+ reply = posix.read(fd, 4096)
+ self.assertEquals(written, len(reply))
+
+ def testIPv6NoCrash(self):
+ # Python 2.x does not provide either read() or recvmsg.
+ s = net_test.IPv6PingSocket()
+ written = s.sendto(net_test.IPV6_PING, ("::1", 55))
+ self.assertEquals(len(net_test.IPV6_PING), written)
+ fd = s.fileno()
+ reply = posix.read(fd, 4096)
+ self.assertEquals(written, len(reply))
+
+ def testIPv4Bind(self):
+ # Bind to unspecified address.
+ s = net_test.IPv4PingSocket()
+ s.bind(("0.0.0.0", 544))
+ self.assertEquals(("0.0.0.0", 544), s.getsockname())
+
+ # Bind to loopback.
+ s = net_test.IPv4PingSocket()
+ s.bind(("127.0.0.1", 99))
+ self.assertEquals(("127.0.0.1", 99), s.getsockname())
+
+ # Binding twice is not allowed.
+ self.assertRaisesErrno(errno.EINVAL, s.bind, ("127.0.0.1", 22))
+
+ # But binding two different sockets to the same ID is allowed.
+ s2 = net_test.IPv4PingSocket()
+ s2.bind(("127.0.0.1", 99))
+ self.assertEquals(("127.0.0.1", 99), s2.getsockname())
+ s3 = net_test.IPv4PingSocket()
+ s3.bind(("127.0.0.1", 99))
+ self.assertEquals(("127.0.0.1", 99), s3.getsockname())
+
+ # If two sockets bind to the same port, the first one to call read() gets
+ # the response.
+ s4 = net_test.IPv4PingSocket()
+ s5 = net_test.IPv4PingSocket()
+ s4.bind(("0.0.0.0", 167))
+ s5.bind(("0.0.0.0", 167))
+ s4.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 44))
+ self.assertValidPingResponse(s5, net_test.IPV4_PING)
+ net_test.SetSocketTimeout(s4, 100)
+ self.assertRaisesErrno(errno.EAGAIN, s4.recv, 32768)
+
+ # If SO_REUSEADDR is turned off, then we get EADDRINUSE.
+ s6 = net_test.IPv4PingSocket()
+ s4.setsockopt(SOL_SOCKET, SO_REUSEADDR, 0)
+ self.assertRaisesErrno(errno.EADDRINUSE, s6.bind, ("0.0.0.0", 167))
+
+ # Can't bind after sendto.
+ s = net_test.IPv4PingSocket()
+ s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 9132))
+ self.assertRaisesErrno(errno.EINVAL, s.bind, ("0.0.0.0", 5429))
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6Bind(self):
+ # Bind to unspecified address.
+ s = net_test.IPv6PingSocket()
+ s.bind(("::", 769))
+ self.assertEquals(("::", 769, 0, 0), s.getsockname())
+
+ # Bind to loopback.
+ s = net_test.IPv6PingSocket()
+ s.bind(("::1", 99))
+ self.assertEquals(("::1", 99, 0, 0), s.getsockname())
+
+ # Binding twice is not allowed.
+ self.assertRaisesErrno(errno.EINVAL, s.bind, ("::1", 22))
+
+ # But binding two different sockets to the same ID is allowed.
+ s2 = net_test.IPv6PingSocket()
+ s2.bind(("::1", 99))
+ self.assertEquals(("::1", 99, 0, 0), s2.getsockname())
+ s3 = net_test.IPv6PingSocket()
+ s3.bind(("::1", 99))
+ self.assertEquals(("::1", 99, 0, 0), s3.getsockname())
+
+ # Binding both IPv4 and IPv6 to the same socket works.
+ s4 = net_test.IPv4PingSocket()
+ s6 = net_test.IPv6PingSocket()
+ s4.bind(("0.0.0.0", 444))
+ s6.bind(("::", 666, 0, 0))
+
+ # Can't bind after sendto.
+ s = net_test.IPv6PingSocket()
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 9132))
+ self.assertRaisesErrno(errno.EINVAL, s.bind, ("::", 5429))
+
+ def testIPv4InvalidBind(self):
+ s = net_test.IPv4PingSocket()
+ self.assertRaisesErrno(errno.EADDRNOTAVAIL,
+ s.bind, ("255.255.255.255", 1026))
+ self.assertRaisesErrno(errno.EADDRNOTAVAIL,
+ s.bind, ("224.0.0.1", 651))
+ # Binding to an address we don't have only works with net_test.IP_TRANSPARENT.
+ self.assertRaisesErrno(errno.EADDRNOTAVAIL,
+ s.bind, (net_test.IPV4_ADDR, 651))
+ try:
+ s.setsockopt(SOL_IP, net_test.IP_TRANSPARENT, 1)
+ s.bind((net_test.IPV4_ADDR, 651))
+ except IOError, e:
+ if e.errno == errno.EACCES:
+ pass # We're not root. let it go for now.
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6InvalidBind(self):
+ s = net_test.IPv6PingSocket()
+ self.assertRaisesErrno(errno.EINVAL,
+ s.bind, ("ff02::2", 1026))
+
+ # Binding to an address we don't have only works with IPV6_TRANSPARENT.
+ self.assertRaisesErrno(errno.EADDRNOTAVAIL,
+ s.bind, (net_test.IPV6_ADDR, 651))
+ try:
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_TRANSPARENT, 1)
+ s.bind((net_test.IPV6_ADDR, 651))
+ except IOError, e:
+ if e.errno == errno.EACCES:
+ pass # We're not root. let it go for now.
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6ScopedBind(self):
+ # Can't bind to a link-local address without a scope ID.
+ s = net_test.IPv6PingSocket()
+ self.assertRaisesErrno(errno.EINVAL,
+ s.bind, (self.lladdr, 1026, 0, 0))
+
+ # Binding to a link-local address with a scope ID works, and the scope ID is
+ # returned by a subsequent getsockname. Interestingly, Python's getsockname
+ # returns "fe80:1%foo", even though it does not understand it.
+ expected = self.lladdr + "%" + self.ifname
+ s.bind((self.lladdr, 4646, 0, self.ifindex))
+ self.assertEquals((expected, 4646, 0, self.ifindex), s.getsockname())
+
+ # Of course, for the above to work the address actually has to be configured
+ # on the machine.
+ self.assertRaisesErrno(errno.EADDRNOTAVAIL,
+ s.bind, ("fe80::f00", 1026, 0, 1))
+
+ # Scope IDs on non-link-local addresses are silently ignored.
+ s = net_test.IPv6PingSocket()
+ s.bind(("::1", 1234, 0, 1))
+ self.assertEquals(("::1", 1234, 0, 0), s.getsockname())
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testBindAffectsIdentifier(self):
+ s = net_test.IPv6PingSocket()
+ s.bind((self.globaladdr, 0xf976))
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
+ self.assertEquals("\xf9\x76", s.recv(32768)[4:6])
+
+ s = net_test.IPv6PingSocket()
+ s.bind((self.globaladdr, 0xace))
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
+ self.assertEquals("\x0a\xce", s.recv(32768)[4:6])
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testLinkLocalAddress(self):
+ s = net_test.IPv6PingSocket()
+ # Sending to a link-local address with no scope fails with EINVAL.
+ msg = os.strerror(errno.EINVAL)
+ self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING, ("fe80::1", 55))
+ # Sending to link-local address with a scope succeeds. Note that Python
+ # doesn't understand the "fe80::1%lo" format, even though it returns it.
+ s.sendto(net_test.IPV6_PING, ("fe80::1", 55, 0, self.ifindex))
+ # No exceptions? Good.
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testMappedAddressFails(self):
+ s = net_test.IPv6PingSocket()
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+ s.sendto(net_test.IPV6_PING, ("2001:4860:4860::8844", 55))
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+ msg = os.strerror(errno.EINVAL)
+ self.assertRaisesErrno(errno.EINVAL, s.sendto, net_test.IPV6_PING,
+ ("::ffff:192.0.2.1", 55))
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testFlowLabel(self):
+ s = net_test.IPv6PingSocket()
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
+ self.assertValidPingResponse(s, net_test.IPV6_PING) # Checks the flow label is 0.
+
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND, 1)
+ self.assertEqual(1, s.getsockopt(net_test.SOL_IPV6, net_test.IPV6_FLOWINFO_SEND))
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 93, 0xdead, 0))
+ rcvd, src = s.recvfrom(32768)
+ addr, port, flowlabel, scope_id = src
+ self.assertEqual(0, flowlabel & 0xfffff)
+
+ def testIPv4Error(self):
+ s = net_test.IPv4PingSocket()
+ s.setsockopt(SOL_IP, IP_TTL, 2)
+ s.setsockopt(SOL_IP, net_test.IP_RECVERR, 1)
+ s.sendto(net_test.IPV4_PING, (net_test.IPV4_ADDR, 55))
+ # We can't check the actual error because Python 2.7 doesn't implement
+ # recvmsg, but we can at least check that the socket returns an error.
+ self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6Error(self):
+ s = net_test.IPv6PingSocket()
+ s.setsockopt(net_test.SOL_IPV6, IPV6_UNICAST_HOPS, 2)
+ s.setsockopt(net_test.SOL_IPV6, net_test.IPV6_RECVERR, 1)
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 55))
+ # We can't check the actual error because Python 2.7 doesn't implement
+ # recvmsg, but we can at least check that the socket returns an error.
+ self.assertRaisesErrno(errno.EHOSTUNREACH, s.recv, 32768) # No response.
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6MulticastPing(self):
+ s = net_test.IPv6PingSocket()
+ # Send a multicast ping and check we get at least one duplicate.
+ s.sendto(net_test.IPV6_PING, ("ff02::1", 55, 0, self.ifindex))
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+
+ def testIPv4LargePacket(self):
+ s = net_test.IPv4PingSocket()
+ data = net_test.IPV4_PING + 20000 * "a"
+ s.sendto(data, ("127.0.0.1", 987))
+ self.assertValidPingResponse(s, data)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testIPv6LargePacket(self):
+ s = net_test.IPv6PingSocket()
+ s.bind(("::", 0xace))
+ data = net_test.IPV6_PING + "\x01" + 19994 * "\x00" + "aaaaa"
+ s.sendto(data, ("::1", 953))
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ @unittest.skipUnless(net_test.HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
+ def testIcmpSocketsNotInIcmp6(self):
+ numrows = len(self.ReadProcNetSocket("icmp"))
+ numrows6 = len(self.ReadProcNetSocket("icmp6"))
+ s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
+ s.bind(("127.0.0.1", 0xace))
+ s.connect(("127.0.0.1", 0xbeef))
+ self.assertEquals(numrows + 1, len(self.ReadProcNetSocket("icmp")))
+ self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ @unittest.skipUnless(net_test.HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
+ def testIcmp6SocketsNotInIcmp(self):
+ numrows = len(self.ReadProcNetSocket("icmp"))
+ numrows6 = len(self.ReadProcNetSocket("icmp6"))
+ s = net_test.IPv6PingSocket()
+ s.bind(("::1", 0xace))
+ s.connect(("::1", 0xbeef))
+ self.assertEquals(numrows, len(self.ReadProcNetSocket("icmp")))
+ self.assertEquals(numrows6 + 1, len(self.ReadProcNetSocket("icmp6")))
+
+ def testProcNetIcmp(self):
+ s = net_test.Socket(AF_INET, SOCK_DGRAM, IPPROTO_ICMP)
+ s.bind(("127.0.0.1", 0xace))
+ s.connect(("127.0.0.1", 0xbeef))
+ self.CheckSockStatFile("icmp", "127.0.0.1", 0xace, "127.0.0.1", 0xbeef, 1)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ @unittest.skipUnless(net_test.HAVE_PROC_NET_ICMP6, "skipping: no /proc/net/icmp6")
+ def testProcNetIcmp6(self):
+ numrows6 = len(self.ReadProcNetSocket("icmp6"))
+ s = net_test.IPv6PingSocket()
+ s.bind(("::1", 0xace))
+ s.connect(("::1", 0xbeef))
+ self.CheckSockStatFile("icmp6", "::1", 0xace, "::1", 0xbeef, 1)
+
+ # Check the row goes away when the socket is closed.
+ s.close()
+ self.assertEquals(numrows6, len(self.ReadProcNetSocket("icmp6")))
+
+ # Try send, bind and connect to check the addresses and the state.
+ s = net_test.IPv6PingSocket()
+ self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
+ s.sendto(net_test.IPV6_PING, (net_test.IPV6_ADDR, 12345))
+ self.assertEqual(1, len(self.ReadProcNetSocket("icmp6")))
+
+ # Can't bind after sendto, apparently.
+ s = net_test.IPv6PingSocket()
+ self.assertEqual(0, len(self.ReadProcNetSocket("icmp6")))
+ s.bind((self.lladdr, 0xd00d, 0, self.ifindex))
+ self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "::", 0, 7)
+
+ # Check receive bytes.
+ s.connect(("ff02::1", 0xdead))
+ self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1)
+ s.send(net_test.IPV6_PING)
+ #time.sleep(0.1)
+ self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
+ txmem=0, rxmem=0x880)
+ self.assertValidPingResponse(s, net_test.IPV6_PING)
+ self.CheckSockStatFile("icmp6", self.lladdr, 0xd00d, "ff02::1", 0xdead, 1,
+ txmem=0, rxmem=0)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testProcNetUdp6(self):
+ s = net_test.Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)
+ s.bind(("::1", 0xace))
+ s.connect(("::1", 0xbeef))
+ self.CheckSockStatFile("udp6", "::1", 0xace, "::1", 0xbeef, 1)
+
+ @unittest.skipUnless(net_test.HAVE_IPV6, "skipping: no IPv6")
+ def testProcNetRaw6(self):
+ s = net_test.Socket(AF_INET6, SOCK_RAW, IPPROTO_RAW)
+ s.bind(("::1", 0xace))
+ s.connect(("::1", 0xbeef))
+ self.CheckSockStatFile("raw6", "::1", 0xff, "::1", 0, 1)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/tests/net_test/ping6_test.sh b/tests/net_test/ping6_test.sh
new file mode 100755
index 00000000..41dabcea
--- /dev/null
+++ b/tests/net_test/ping6_test.sh
@@ -0,0 +1,16 @@
+#!/bin/bash
+
+# Minimal network initialization.
+ip link set eth0 up
+
+# Wait for autoconf and DAD to complete.
+sleep 3 &
+
+# Block on starting DHCPv4.
+udhcpc -i eth0
+
+# If DHCPv4 took less than 3 seconds, keep waiting.
+wait
+
+# Run the test.
+$(dirname $0)/ping6_test.py
diff --git a/tests/net_test/run_net_test.sh b/tests/net_test/run_net_test.sh
new file mode 100755
index 00000000..d896175d
--- /dev/null
+++ b/tests/net_test/run_net_test.sh
@@ -0,0 +1,76 @@
+#!/bin/bash
+
+# Kernel configration options.
+OPTIONS=" IPV6 IPV6_ROUTER_PREF IPV6_MULTIPLE_TABLES IPV6_ROUTE_INFO"
+OPTIONS="$OPTIONS TUN IP_ADVANCED_ROUTER IP_MULTIPLE_TABLES"
+OPTIONS="$OPTIONS NETFILTER NETFILTER_ADVANCED NETFILTER_XTABLES"
+OPTIONS="$OPTIONS NETFILTER_XT_MARK NETFILTER_XT_TARGET_MARK"
+OPTIONS="$OPTIONS IP_NF_IPTABLES IP_NF_MANGLE"
+OPTIONS="$OPTIONS IP6_NF_IPTABLES IP6_NF_MANGLE"
+
+# How many tap interfaces to create.
+NUMTAPINTERFACES=2
+
+# The root filesystem disk image we'll use.
+ROOTFS=$(dirname $0)/net_test.rootfs
+
+# Figure out which test to run.
+if [ -z "$1" ]; then
+ echo "Usage: $0 <test>" >&2
+ exit 1
+fi
+test=$1
+
+set -e
+
+# Check if we need to uncompress the disk image.
+# We use xz because it compresses better: to 42M vs 72M (gzip) / 62M (bzip2).
+if [ $ROOTFS.xz -nt $ROOTFS ]; then
+ echo "Uncompressing $ROOTFS.xz" >&2
+ unxz --keep $ROOTFS.xz
+fi
+
+
+# Create NUMTAPINTERFACES tap interfaces on the host, and prepare UML command
+# line params to use them. The interfaces are called <user>TAP0, <user>TAP1,
+# ..., on the host, and eth0, eth1, ..., in the VM.
+user=${USER:0:10}
+tapinterfaces=
+netconfig=
+for id in $(seq 0 $(( NUMTAPINTERFACES - 1 )) ); do
+ tap=${user}TAP$id
+ tapinterfaces="$tapinterfaces $tap"
+ mac=$(printf fe:fd:00:00:00:%02x $id)
+ netconfig="$netconfig eth$id=tuntap,$tap,$mac"
+done
+
+for tap in $tapinterfaces; do
+ if ! ip link list $tap > /dev/null; then
+ echo "Creating tap interface $tap" >&2
+ sudo tunctl -u $USER -t $tap
+ sudo ip link set $tap up
+ fi
+done
+
+# Set kernel compilation options for make.
+export ARCH=um
+export SUBARCH=x86_64
+
+# If there's no kernel config at all, create one or UML won't work.
+[ -f .config ] || make defconfig
+
+# Enable the kernel config options listed in $OPTIONS.
+cmdline=${OPTIONS// / -e }
+./scripts/config $cmdline
+make olddefconfig
+
+# Compile the kernel.
+make -j12 linux
+
+# Get the absolute path to the test file that's being run.
+dir=/host$(dirname $(readlink -f $0))
+
+# Start the VM.
+exec ./linux umid=net_test ubda=$(dirname $0)/net_test.rootfs \
+ mem=512M init=/sbin/net_test.sh net_test=$dir/$test \
+ $netconfig