summaryrefslogtreecommitdiff
path: root/tests/net_test/srcaddr_selection_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/net_test/srcaddr_selection_test.py')
-rwxr-xr-xtests/net_test/srcaddr_selection_test.py311
1 files changed, 311 insertions, 0 deletions
diff --git a/tests/net_test/srcaddr_selection_test.py b/tests/net_test/srcaddr_selection_test.py
new file mode 100755
index 00000000..eb09b7fb
--- /dev/null
+++ b/tests/net_test/srcaddr_selection_test.py
@@ -0,0 +1,311 @@
+#!/usr/bin/python
+#
+# Copyright 2014 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import errno
+import os
+import random
+from socket import * # pylint: disable=wildcard-import
+import time
+import unittest
+
+from scapy import all as scapy
+
+import csocket
+import iproute
+import multinetwork_base
+import multinetwork_test
+import net_test
+
+# Setsockopt values.
+IPV6_ADDR_PREFERENCES = 72
+IPV6_PREFER_SRC_PUBLIC = 0x0002
+
+
+USE_OPTIMISTIC_SYSCTL = "/proc/sys/net/ipv6/conf/default/use_optimistic"
+
+HAVE_USE_OPTIMISTIC = os.path.isfile(USE_OPTIMISTIC_SYSCTL)
+
+
+class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
+
+ def SetDAD(self, ifname, value):
+ self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
+ self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)
+
+ def SetOptimisticDAD(self, ifname, value):
+ self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)
+
+ def SetUseTempaddrs(self, ifname, value):
+ self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)
+
+ def SetUseOptimistic(self, ifname, value):
+ self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)
+
+ def GetSourceIP(self, netid, mode="mark"):
+ s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
+ # Because why not...testing for temporary addresses is a separate thing.
+ s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)
+
+ s.connect((net_test.IPV6_ADDR, 123))
+ src_addr = s.getsockname()[0]
+ self.assertTrue(src_addr)
+ return src_addr
+
+ def assertAddressNotPresent(self, address):
+ self.assertRaises(IOError, self.iproute.GetAddress, address)
+
+ def assertAddressHasExpectedAttributes(
+ self, address, expected_ifindex, expected_flags):
+ ifa_msg = self.iproute.GetAddress(address)[0]
+ self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
+ self.assertEquals(64, ifa_msg.prefixlen)
+ self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
+ self.assertEquals(expected_ifindex, ifa_msg.index)
+ self.assertEquals(expected_flags, ifa_msg.flags & expected_flags)
+
+ def AddressIsTentative(self, address):
+ ifa_msg = self.iproute.GetAddress(address)[0]
+ return ifa_msg.flags & iproute.IFA_F_TENTATIVE
+
+ def BindToAddress(self, address):
+ s = net_test.UDPSocket(AF_INET6)
+ s.bind((address, 0, 0, 0))
+
+ def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
+ pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
+ cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
+ s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
+ return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)
+
+ def assertAddressUsable(self, address, netid):
+ self.BindToAddress(address)
+ self.SendWithSourceAddress(address, netid)
+ # No exceptions? Good.
+
+ def assertAddressNotUsable(self, address, netid):
+ self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
+ self.assertRaisesErrno(errno.EINVAL,
+ self.SendWithSourceAddress, address, netid)
+
+ def assertAddressSelected(self, address, netid):
+ self.assertEquals(address, self.GetSourceIP(netid))
+
+ def assertAddressNotSelected(self, address, netid):
+ self.assertNotEquals(address, self.GetSourceIP(netid))
+
+ def WaitForDad(self, address):
+ for _ in xrange(20):
+ if not self.AddressIsTentative(address):
+ return
+ time.sleep(0.1)
+ raise AssertionError("%s did not complete DAD after 2 seconds")
+
+
+class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):
+
+ def setUp(self):
+ # [0] Make sure DAD, optimistic DAD, and the use_optimistic option
+ # are all consistently disabled at the outset.
+ for netid in self.tuns:
+ self.SetDAD(self.GetInterfaceName(netid), 0)
+ self.SetOptimisticDAD(self.GetInterfaceName(netid), 0)
+ self.SetUseTempaddrs(self.GetInterfaceName(netid), 0)
+ if HAVE_USE_OPTIMISTIC:
+ self.SetUseOptimistic(self.GetInterfaceName(netid), 0)
+
+ # [1] Pick an interface on which to test.
+ self.test_netid = random.choice(self.tuns.keys())
+ self.test_ip = self.MyAddress(6, self.test_netid)
+ self.test_ifindex = self.ifindices[self.test_netid]
+ self.test_ifname = self.GetInterfaceName(self.test_netid)
+
+ # [2] Delete the test interface's IPv6 address.
+ self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
+ self.assertAddressNotPresent(self.test_ip)
+
+ self.assertAddressNotUsable(self.test_ip, self.test_netid)
+
+
+class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):
+
+ def testRfc6724Behaviour(self):
+ # [3] Get an IPv6 address back, in DAD start-up.
+ self.SetDAD(self.test_ifname, 1) # Enable DAD
+ # Send a RA to start SLAAC and subsequent DAD.
+ self.SendRA(self.test_netid, 0)
+ # Get flags and prove tentative-ness.
+ self.assertAddressHasExpectedAttributes(
+ self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)
+
+ # Even though the interface has an IPv6 address, its tentative nature
+ # prevents it from being selected.
+ self.assertAddressNotUsable(self.test_ip, self.test_netid)
+ self.assertAddressNotSelected(self.test_ip, self.test_netid)
+
+ # Busy wait for DAD to complete (should be less than 1 second).
+ self.WaitForDad(self.test_ip)
+
+ # The test_ip should have completed DAD by now, and should be the
+ # chosen source address, eligible to bind to, etc.
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressSelected(self.test_ip, self.test_netid)
+
+
+class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):
+
+ def testRfc6724Behaviour(self):
+ # [3] Get an IPv6 address back, in optimistic DAD start-up.
+ self.SetDAD(self.test_ifname, 1) # Enable DAD
+ self.SetOptimisticDAD(self.test_ifname, 1)
+ # Send a RA to start SLAAC and subsequent DAD.
+ self.SendRA(self.test_netid, 0)
+ # Get flags and prove optimism.
+ self.assertAddressHasExpectedAttributes(
+ self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
+
+ # Optimistic addresses are usable but are not selected.
+ if net_test.LinuxVersion() >= (3, 18, 0):
+ # The version checked in to android kernels <= 3.10 requires the
+ # use_optimistic sysctl to be turned on.
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressNotSelected(self.test_ip, self.test_netid)
+
+ # Busy wait for DAD to complete (should be less than 1 second).
+ self.WaitForDad(self.test_ip)
+
+ # The test_ip should have completed DAD by now, and should be the
+ # chosen source address.
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressSelected(self.test_ip, self.test_netid)
+
+
+class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):
+
+ @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
+ def testModifiedRfc6724Behaviour(self):
+ # [3] Get an IPv6 address back, in optimistic DAD start-up.
+ self.SetDAD(self.test_ifname, 1) # Enable DAD
+ self.SetOptimisticDAD(self.test_ifname, 1)
+ self.SetUseOptimistic(self.test_ifname, 1)
+ # Send a RA to start SLAAC and subsequent DAD.
+ self.SendRA(self.test_netid, 0)
+ # Get flags and prove optimistism.
+ self.assertAddressHasExpectedAttributes(
+ self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
+
+ # The interface has an IPv6 address and, despite its optimistic nature,
+ # the use_optimistic option allows it to be selected.
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressSelected(self.test_ip, self.test_netid)
+
+
+class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
+
+ @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
+ def testModifiedRfc6724Behaviour(self):
+ # [3] Add a valid IPv6 address to this interface and verify it is
+ # selected as the source address.
+ preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe"
+ self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
+ self.assertAddressHasExpectedAttributes(
+ preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
+ self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid))
+
+ # [4] Get another IPv6 address, in optimistic DAD start-up.
+ self.SetDAD(self.test_ifname, 1) # Enable DAD
+ self.SetOptimisticDAD(self.test_ifname, 1)
+ self.SetUseOptimistic(self.test_ifname, 1)
+ # Send a RA to start SLAAC and subsequent DAD.
+ self.SendRA(self.test_netid, 0)
+ # Get flags and prove optimism.
+ self.assertAddressHasExpectedAttributes(
+ self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
+
+ # Since the interface has another IPv6 address, the optimistic address
+ # is not selected--the other, valid address is chosen.
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressNotSelected(self.test_ip, self.test_netid)
+ self.assertAddressSelected(preferred_ip, self.test_netid)
+
+
+class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):
+
+ @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
+ def testDadFailure(self):
+ # [3] Get an IPv6 address back, in optimistic DAD start-up.
+ self.SetDAD(self.test_ifname, 1) # Enable DAD
+ self.SetOptimisticDAD(self.test_ifname, 1)
+ self.SetUseOptimistic(self.test_ifname, 1)
+ # Send a RA to start SLAAC and subsequent DAD.
+ self.SendRA(self.test_netid, 0)
+ # Prove optimism and usability.
+ self.assertAddressHasExpectedAttributes(
+ self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressSelected(self.test_ip, self.test_netid)
+
+ # Send a NA for the optimistic address, indicating address conflict
+ # ("DAD defense").
+ conflict_macaddr = "02:00:0b:ad:d0:0d"
+ dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
+ scapy.IPv6(src=self.test_ip, dst="ff02::1") /
+ scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
+ scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
+ self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
+
+ # The address should have failed DAD, and therefore no longer be usable.
+ self.assertAddressNotUsable(self.test_ip, self.test_netid)
+ self.assertAddressNotSelected(self.test_ip, self.test_netid)
+
+ # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.
+
+
+class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):
+
+ @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported")
+ @unittest.skipUnless(net_test.LinuxVersion() >= (3, 18, 0),
+ "correct optimistic bind() not supported")
+ def testSendToOnlinkDestination(self):
+ # [3] Get an IPv6 address back, in optimistic DAD start-up.
+ self.SetDAD(self.test_ifname, 1) # Enable DAD
+ self.SetOptimisticDAD(self.test_ifname, 1)
+ self.SetUseOptimistic(self.test_ifname, 1)
+ # Send a RA to start SLAAC and subsequent DAD.
+ self.SendRA(self.test_netid, 0)
+ # Prove optimism and usability.
+ self.assertAddressHasExpectedAttributes(
+ self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
+ self.assertAddressUsable(self.test_ip, self.test_netid)
+ self.assertAddressSelected(self.test_ip, self.test_netid)
+
+ # [4] Send to an on-link destination and observe a Neighbor Solicitation
+ # packet with a source address that is NOT the optimistic address.
+ # In this setup, the only usable address is the link-local address.
+ onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid))
+ self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)
+
+ expected_ns = multinetwork_test.Packets.NS(
+ net_test.GetLinkAddress(self.test_ifname, True),
+ onlink_dest,
+ self.MyMacAddress(self.test_netid))[1]
+ self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)
+
+
+# TODO(ek): add tests listening for netlink events.
+
+
+if __name__ == "__main__":
+ unittest.main()