diff options
Diffstat (limited to 'tests/net_test/srcaddr_selection_test.py')
-rwxr-xr-x | tests/net_test/srcaddr_selection_test.py | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/tests/net_test/srcaddr_selection_test.py b/tests/net_test/srcaddr_selection_test.py new file mode 100755 index 00000000..eb09b7fb --- /dev/null +++ b/tests/net_test/srcaddr_selection_test.py @@ -0,0 +1,311 @@ +#!/usr/bin/python +# +# Copyright 2014 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import errno +import os +import random +from socket import * # pylint: disable=wildcard-import +import time +import unittest + +from scapy import all as scapy + +import csocket +import iproute +import multinetwork_base +import multinetwork_test +import net_test + +# Setsockopt values. +IPV6_ADDR_PREFERENCES = 72 +IPV6_PREFER_SRC_PUBLIC = 0x0002 + + +USE_OPTIMISTIC_SYSCTL = "/proc/sys/net/ipv6/conf/default/use_optimistic" + +HAVE_USE_OPTIMISTIC = os.path.isfile(USE_OPTIMISTIC_SYSCTL) + + +class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest): + + def SetDAD(self, ifname, value): + self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value) + self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value) + + def SetOptimisticDAD(self, ifname, value): + self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value) + + def SetUseTempaddrs(self, ifname, value): + self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value) + + def SetUseOptimistic(self, ifname, value): + self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value) + + def GetSourceIP(self, netid, mode="mark"): + s = self.BuildSocket(6, net_test.UDPSocket, netid, mode) + # Because why not...testing for temporary addresses is a separate thing. + s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC) + + s.connect((net_test.IPV6_ADDR, 123)) + src_addr = s.getsockname()[0] + self.assertTrue(src_addr) + return src_addr + + def assertAddressNotPresent(self, address): + self.assertRaises(IOError, self.iproute.GetAddress, address) + + def assertAddressHasExpectedAttributes( + self, address, expected_ifindex, expected_flags): + ifa_msg = self.iproute.GetAddress(address)[0] + self.assertEquals(AF_INET6 if ":" in address else AF_INET, ifa_msg.family) + self.assertEquals(64, ifa_msg.prefixlen) + self.assertEquals(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope) + self.assertEquals(expected_ifindex, ifa_msg.index) + self.assertEquals(expected_flags, ifa_msg.flags & expected_flags) + + def AddressIsTentative(self, address): + ifa_msg = self.iproute.GetAddress(address)[0] + return ifa_msg.flags & iproute.IFA_F_TENTATIVE + + def BindToAddress(self, address): + s = net_test.UDPSocket(AF_INET6) + s.bind((address, 0, 0, 0)) + + def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR): + pktinfo = multinetwork_base.MakePktInfo(6, address, 0) + cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)] + s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark") + return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0) + + def assertAddressUsable(self, address, netid): + self.BindToAddress(address) + self.SendWithSourceAddress(address, netid) + # No exceptions? Good. + + def assertAddressNotUsable(self, address, netid): + self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address) + self.assertRaisesErrno(errno.EINVAL, + self.SendWithSourceAddress, address, netid) + + def assertAddressSelected(self, address, netid): + self.assertEquals(address, self.GetSourceIP(netid)) + + def assertAddressNotSelected(self, address, netid): + self.assertNotEquals(address, self.GetSourceIP(netid)) + + def WaitForDad(self, address): + for _ in xrange(20): + if not self.AddressIsTentative(address): + return + time.sleep(0.1) + raise AssertionError("%s did not complete DAD after 2 seconds") + + +class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest): + + def setUp(self): + # [0] Make sure DAD, optimistic DAD, and the use_optimistic option + # are all consistently disabled at the outset. + for netid in self.tuns: + self.SetDAD(self.GetInterfaceName(netid), 0) + self.SetOptimisticDAD(self.GetInterfaceName(netid), 0) + self.SetUseTempaddrs(self.GetInterfaceName(netid), 0) + if HAVE_USE_OPTIMISTIC: + self.SetUseOptimistic(self.GetInterfaceName(netid), 0) + + # [1] Pick an interface on which to test. + self.test_netid = random.choice(self.tuns.keys()) + self.test_ip = self.MyAddress(6, self.test_netid) + self.test_ifindex = self.ifindices[self.test_netid] + self.test_ifname = self.GetInterfaceName(self.test_netid) + + # [2] Delete the test interface's IPv6 address. + self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex) + self.assertAddressNotPresent(self.test_ip) + + self.assertAddressNotUsable(self.test_ip, self.test_netid) + + +class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest): + + def testRfc6724Behaviour(self): + # [3] Get an IPv6 address back, in DAD start-up. + self.SetDAD(self.test_ifname, 1) # Enable DAD + # Send a RA to start SLAAC and subsequent DAD. + self.SendRA(self.test_netid, 0) + # Get flags and prove tentative-ness. + self.assertAddressHasExpectedAttributes( + self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE) + + # Even though the interface has an IPv6 address, its tentative nature + # prevents it from being selected. + self.assertAddressNotUsable(self.test_ip, self.test_netid) + self.assertAddressNotSelected(self.test_ip, self.test_netid) + + # Busy wait for DAD to complete (should be less than 1 second). + self.WaitForDad(self.test_ip) + + # The test_ip should have completed DAD by now, and should be the + # chosen source address, eligible to bind to, etc. + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressSelected(self.test_ip, self.test_netid) + + +class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest): + + def testRfc6724Behaviour(self): + # [3] Get an IPv6 address back, in optimistic DAD start-up. + self.SetDAD(self.test_ifname, 1) # Enable DAD + self.SetOptimisticDAD(self.test_ifname, 1) + # Send a RA to start SLAAC and subsequent DAD. + self.SendRA(self.test_netid, 0) + # Get flags and prove optimism. + self.assertAddressHasExpectedAttributes( + self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) + + # Optimistic addresses are usable but are not selected. + if net_test.LinuxVersion() >= (3, 18, 0): + # The version checked in to android kernels <= 3.10 requires the + # use_optimistic sysctl to be turned on. + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressNotSelected(self.test_ip, self.test_netid) + + # Busy wait for DAD to complete (should be less than 1 second). + self.WaitForDad(self.test_ip) + + # The test_ip should have completed DAD by now, and should be the + # chosen source address. + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressSelected(self.test_ip, self.test_netid) + + +class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest): + + @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported") + def testModifiedRfc6724Behaviour(self): + # [3] Get an IPv6 address back, in optimistic DAD start-up. + self.SetDAD(self.test_ifname, 1) # Enable DAD + self.SetOptimisticDAD(self.test_ifname, 1) + self.SetUseOptimistic(self.test_ifname, 1) + # Send a RA to start SLAAC and subsequent DAD. + self.SendRA(self.test_netid, 0) + # Get flags and prove optimistism. + self.assertAddressHasExpectedAttributes( + self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) + + # The interface has an IPv6 address and, despite its optimistic nature, + # the use_optimistic option allows it to be selected. + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressSelected(self.test_ip, self.test_netid) + + +class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest): + + @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported") + def testModifiedRfc6724Behaviour(self): + # [3] Add a valid IPv6 address to this interface and verify it is + # selected as the source address. + preferred_ip = self.IPv6Prefix(self.test_netid) + "cafe" + self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex) + self.assertAddressHasExpectedAttributes( + preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT) + self.assertEquals(preferred_ip, self.GetSourceIP(self.test_netid)) + + # [4] Get another IPv6 address, in optimistic DAD start-up. + self.SetDAD(self.test_ifname, 1) # Enable DAD + self.SetOptimisticDAD(self.test_ifname, 1) + self.SetUseOptimistic(self.test_ifname, 1) + # Send a RA to start SLAAC and subsequent DAD. + self.SendRA(self.test_netid, 0) + # Get flags and prove optimism. + self.assertAddressHasExpectedAttributes( + self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) + + # Since the interface has another IPv6 address, the optimistic address + # is not selected--the other, valid address is chosen. + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressNotSelected(self.test_ip, self.test_netid) + self.assertAddressSelected(preferred_ip, self.test_netid) + + +class DadFailureTest(MultiInterfaceSourceAddressSelectionTest): + + @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported") + def testDadFailure(self): + # [3] Get an IPv6 address back, in optimistic DAD start-up. + self.SetDAD(self.test_ifname, 1) # Enable DAD + self.SetOptimisticDAD(self.test_ifname, 1) + self.SetUseOptimistic(self.test_ifname, 1) + # Send a RA to start SLAAC and subsequent DAD. + self.SendRA(self.test_netid, 0) + # Prove optimism and usability. + self.assertAddressHasExpectedAttributes( + self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressSelected(self.test_ip, self.test_netid) + + # Send a NA for the optimistic address, indicating address conflict + # ("DAD defense"). + conflict_macaddr = "02:00:0b:ad:d0:0d" + dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") / + scapy.IPv6(src=self.test_ip, dst="ff02::1") / + scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) / + scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr)) + self.ReceiveEtherPacketOn(self.test_netid, dad_defense) + + # The address should have failed DAD, and therefore no longer be usable. + self.assertAddressNotUsable(self.test_ip, self.test_netid) + self.assertAddressNotSelected(self.test_ip, self.test_netid) + + # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address. + + +class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest): + + @unittest.skipUnless(HAVE_USE_OPTIMISTIC, "use_optimistic not supported") + @unittest.skipUnless(net_test.LinuxVersion() >= (3, 18, 0), + "correct optimistic bind() not supported") + def testSendToOnlinkDestination(self): + # [3] Get an IPv6 address back, in optimistic DAD start-up. + self.SetDAD(self.test_ifname, 1) # Enable DAD + self.SetOptimisticDAD(self.test_ifname, 1) + self.SetUseOptimistic(self.test_ifname, 1) + # Send a RA to start SLAAC and subsequent DAD. + self.SendRA(self.test_netid, 0) + # Prove optimism and usability. + self.assertAddressHasExpectedAttributes( + self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC) + self.assertAddressUsable(self.test_ip, self.test_netid) + self.assertAddressSelected(self.test_ip, self.test_netid) + + # [4] Send to an on-link destination and observe a Neighbor Solicitation + # packet with a source address that is NOT the optimistic address. + # In this setup, the only usable address is the link-local address. + onlink_dest = self.GetRandomDestination(self.IPv6Prefix(self.test_netid)) + self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest) + + expected_ns = multinetwork_test.Packets.NS( + net_test.GetLinkAddress(self.test_ifname, True), + onlink_dest, + self.MyMacAddress(self.test_netid))[1] + self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns) + + +# TODO(ek): add tests listening for netlink events. + + +if __name__ == "__main__": + unittest.main() |