aboutsummaryrefslogtreecommitdiff
path: root/tests/sys/netlink/test_rtnl_ifaddr.py
blob: bd1c55e268e738c1c01681a0bb269c22243a23f9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import ipaddress
import socket
import struct

from atf_python.sys.net.netlink import IfattrType
from atf_python.sys.net.netlink import NetlinkIfaMessage
from atf_python.sys.net.netlink import NetlinkTestTemplate
from atf_python.sys.net.netlink import NlConst
from atf_python.sys.net.netlink import NlHelper
from atf_python.sys.net.netlink import NlmBaseFlags
from atf_python.sys.net.netlink import NlMsgType
from atf_python.sys.net.netlink import NlRtMsgType
from atf_python.sys.net.netlink import Nlsock
from atf_python.sys.net.netlink import RtScope
from atf_python.sys.net.vnet import SingleVnetTestTemplate


class TestRtNlIfaddr(NetlinkTestTemplate, SingleVnetTestTemplate):
    def setup_method(self, method):
        method_name = method.__name__
        if "4" in method_name:
            self.IPV4_PREFIXES = ["192.0.2.1/24"]
        if "6" in method_name:
            self.IPV6_PREFIXES = ["2001:db8::1/64"]
        super().setup_method(method)
        self.setup_netlink(NlConst.NETLINK_ROUTE)

    def test_46_nofilter(self):
        """Tests that listing outputs both IPv4/IPv6 and interfaces"""
        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.nl_hdr.nlmsg_flags = (
            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
        )
        self.write_message(msg)

        ret = []
        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            family = rx_msg.base_hdr.ifa_family
            ret.append((ifname, family, rx_msg))

        ifname = "lo0"
        assert len([r for r in ret if r[0] == ifname]) > 0

        ifname = self.vnet.iface_alias_map["if1"].name
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2

    def test_46_filter_iface(self):
        """Tests that listing outputs both IPv4/IPv6 for the specific interface"""
        epair_ifname = self.vnet.iface_alias_map["if1"].name

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.nl_hdr.nlmsg_flags = (
            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
        )
        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
        self.write_message(msg)

        ret = []
        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            family = rx_msg.base_hdr.ifa_family
            ret.append((ifname, family, rx_msg))

        ifname = epair_ifname
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
        assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
        assert len(ret) == 3

    def filter_iface_family(self, family, num_items):
        """Tests that listing outputs IPv4 for the specific interface"""
        epair_ifname = self.vnet.iface_alias_map["if1"].name

        msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
        msg.nl_hdr.nlmsg_flags = (
            NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
        )
        msg.base_hdr.ifa_family = family
        msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
        self.write_message(msg)

        ret = []
        for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
            assert family == rx_msg.base_hdr.ifa_family
            assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
            ret.append(rx_msg)
        assert len(ret) == num_items
        return ret

    def test_4_broadcast(self):
        """Tests header/attr output for listing IPv4 ifas on broadcast iface"""
        ret = self.filter_iface_family(socket.AF_INET, 1)
        # Should be 192.0.2.1/24
        msg = ret[0]
        # Family and ifindex has been checked already
        assert msg.base_hdr.ifa_prefixlen == 24
        # Ignore IFA_FLAGS for now
        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "192.0.2.1"
        assert msg.get_nla(IfattrType.IFA_LOCAL).addr == "192.0.2.1"
        assert msg.get_nla(IfattrType.IFA_BROADCAST).addr == "192.0.2.255"

        epair_ifname = self.vnet.iface_alias_map["if1"].name
        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname

    def test_6_broadcast(self):
        """Tests header/attr output for listing IPv6 ifas on broadcast iface"""
        ret = self.filter_iface_family(socket.AF_INET6, 2)
        # Should be 192.0.2.1/24
        if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
            (gmsg, lmsg) = ret
        else:
            (lmsg, gmsg) = ret
        # Start with global ( 2001:db8::1/64 )
        msg = gmsg
        # Family and ifindex has been checked already
        assert msg.base_hdr.ifa_prefixlen == 64
        # Ignore IFA_FLAGS for now
        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value

        assert msg.get_nla(IfattrType.IFA_ADDRESS).addr == "2001:db8::1"
        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None

        epair_ifname = self.vnet.iface_alias_map["if1"].name
        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname

        # Local: fe80::/64
        msg = lmsg
        assert msg.base_hdr.ifa_prefixlen == 64
        # Ignore IFA_FLAGS for now
        assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value

        addr = ipaddress.ip_address(msg.get_nla(IfattrType.IFA_ADDRESS).addr)
        assert addr.is_link_local
        # Verify that ifindex is not emmbedded
        assert struct.unpack("!H", addr.packed[2:4])[0] == 0
        assert msg.get_nla(IfattrType.IFA_LOCAL) is None
        assert msg.get_nla(IfattrType.IFA_BROADCAST) is None

        epair_ifname = self.vnet.iface_alias_map["if1"].name
        assert msg.get_nla(IfattrType.IFA_LABEL).text == epair_ifname