aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander V. Chernikov <melifaro@FreeBSD.org>2023-06-01 10:45:29 +0000
committerAlexander V. Chernikov <melifaro@FreeBSD.org>2023-06-01 10:45:29 +0000
commit54b955f4df5e76b5679ba7f3eb6bb2d5fc62923d (patch)
treecb1c0e2b080d982c4410661c1da4cad5a924dc95
parentfd7edfcdc3c329cdbd3f5e7a554f7153e805ab04 (diff)
downloadsrc-54b955f4df5e76b5679ba7f3eb6bb2d5fc62923d.tar.gz
src-54b955f4df5e76b5679ba7f3eb6bb2d5fc62923d.zip
netlink: add support for decoding genl ops/groups in pytest
MFC after: 2 weeks
-rw-r--r--tests/atf_python/sys/netlink/message.py39
-rw-r--r--tests/atf_python/sys/netlink/netlink.py3
-rw-r--r--tests/atf_python/sys/netlink/netlink_generic.py39
-rw-r--r--tests/atf_python/sys/netlink/utils.py2
4 files changed, 73 insertions, 10 deletions
diff --git a/tests/atf_python/sys/netlink/message.py b/tests/atf_python/sys/netlink/message.py
index 1e2b71775102..98a1e3bb21c5 100644
--- a/tests/atf_python/sys/netlink/message.py
+++ b/tests/atf_python/sys/netlink/message.py
@@ -194,11 +194,32 @@ class StdNetlinkMessage(BaseNetlinkMessage):
raise
return self
+ def parse_child(self, data: bytes, attr_key, attr_map):
+ attrs, _ = self.parse_attrs(data, attr_map)
+ return NlAttrNested(attr_key, attrs)
+
+ def parse_child_array(self, data: bytes, attr_key, attr_map):
+ ret = []
+ off = 0
+ while len(data) - off >= 4:
+ nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4])
+ if nla_len + off > len(data):
+ raise ValueError(
+ "attr length {} > than the remaining length {}".format(
+ nla_len, len(data) - off
+ )
+ )
+ nla_type = raw_nla_type & 0x3FFF
+ val = self.parse_child(data[off + 4 : off + nla_len], nla_type, attr_map)
+ ret.append(val)
+ off += align4(nla_len)
+ return NlAttrNested(attr_key, ret)
+
def parse_attrs(self, data: bytes, attr_map):
ret = []
off = 0
while len(data) - off >= 4:
- nla_len, raw_nla_type = struct.unpack("@HH", data[off:off + 4])
+ nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4])
if nla_len + off > len(data):
raise ValueError(
"attr length {} > than the remaining length {}".format(
@@ -208,16 +229,20 @@ class StdNetlinkMessage(BaseNetlinkMessage):
nla_type = raw_nla_type & 0x3FFF
if nla_type in attr_map:
v = attr_map[nla_type]
- val = v["ad"].cls.from_bytes(data[off:off + nla_len], v["ad"].val)
+ val = v["ad"].cls.from_bytes(data[off : off + nla_len], v["ad"].val)
if "child" in v:
# nested
- attrs, _ = self.parse_attrs(
- data[off + 4:off + nla_len], v["child"]
- )
- val = NlAttrNested(v["ad"].val, attrs)
+ child_data = data[off + 4 : off + nla_len]
+ if v.get("is_array", False):
+ # Array of nested attributes
+ val = self.parse_child_array(
+ child_data, v["ad"].val, v["child"]
+ )
+ else:
+ val = self.parse_child(child_data, v["ad"].val, v["child"])
else:
# unknown attribute
- val = NlAttr(raw_nla_type, data[off + 4:off + nla_len])
+ val = NlAttr(raw_nla_type, data[off + 4 : off + nla_len])
ret.append(val)
off += align4(nla_len)
return ret, off
diff --git a/tests/atf_python/sys/netlink/netlink.py b/tests/atf_python/sys/netlink/netlink.py
index 9b5906815489..f8f886b09b24 100644
--- a/tests/atf_python/sys/netlink/netlink.py
+++ b/tests/atf_python/sys/netlink/netlink.py
@@ -265,6 +265,9 @@ class Nlsock:
# k = struct.pack("@BBHII", 12, 38, 0, self.pid, mask)
# self.sock_fd.bind(k)
+ def join_group(self, group_id: int):
+ self.sock_fd.setsockopt(270, 1, group_id)
+
def write_message(self, msg, verbose=True):
if verbose:
print("vvvvvvvv OUT vvvvvvvv")
diff --git a/tests/atf_python/sys/netlink/netlink_generic.py b/tests/atf_python/sys/netlink/netlink_generic.py
index b49a30c1e8e7..80c6eea72a93 100644
--- a/tests/atf_python/sys/netlink/netlink_generic.py
+++ b/tests/atf_python/sys/netlink/netlink_generic.py
@@ -9,6 +9,7 @@ from enum import Enum
from atf_python.sys.netlink.attrs import NlAttr
from atf_python.sys.netlink.attrs import NlAttrIp4
from atf_python.sys.netlink.attrs import NlAttrIp6
+from atf_python.sys.netlink.attrs import NlAttrNested
from atf_python.sys.netlink.attrs import NlAttrS32
from atf_python.sys.netlink.attrs import NlAttrStr
from atf_python.sys.netlink.attrs import NlAttrU16
@@ -94,6 +95,16 @@ class GenlCtrlAttrType(Enum):
CTRL_ATTR_OP = 10
+class GenlCtrlAttrOpType(Enum):
+ CTRL_ATTR_OP_ID = 1
+ CTRL_ATTR_OP_FLAGS = 2
+
+
+class GenlCtrlAttrMcastGroupsType(Enum):
+ CTRL_ATTR_MCAST_GRP_NAME = 1
+ CTRL_ATTR_MCAST_GRP_ID = 2
+
+
genl_ctrl_attrs = prepare_attrs_map(
[
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16),
@@ -101,6 +112,28 @@ genl_ctrl_attrs = prepare_attrs_map(
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32),
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32),
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32),
+ AttrDescr(
+ GenlCtrlAttrType.CTRL_ATTR_OPS,
+ NlAttrNested,
+ [
+ AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_ID, NlAttrU32),
+ AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_FLAGS, NlAttrU32),
+ ],
+ True,
+ ),
+ AttrDescr(
+ GenlCtrlAttrType.CTRL_ATTR_MCAST_GROUPS,
+ NlAttrNested,
+ [
+ AttrDescr(
+ GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_NAME, NlAttrStr
+ ),
+ AttrDescr(
+ GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_ID, NlAttrU32
+ ),
+ ],
+ True,
+ ),
]
)
@@ -220,13 +253,13 @@ class NlAttrTS(NlAttr):
@staticmethod
def _validate(data):
assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
- nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
+ nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
@classmethod
def _parse(cls, data):
- nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
- val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:])
+ nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
+ val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN :])
return cls(nla_type, val)
def __bytes__(self):
diff --git a/tests/atf_python/sys/netlink/utils.py b/tests/atf_python/sys/netlink/utils.py
index 7a41791b5318..f1d0ba3321ed 100644
--- a/tests/atf_python/sys/netlink/utils.py
+++ b/tests/atf_python/sys/netlink/utils.py
@@ -34,6 +34,7 @@ class AttrDescr(NamedTuple):
val: Enum
cls: "NlAttr"
child_map: Any = None
+ is_array: bool = False
def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
@@ -42,6 +43,7 @@ def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
ret[ad.val.value] = {"ad": ad}
if ad.child_map:
ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map)
+ ret[ad.val.value]["is_array"] = ad.is_array
return ret