aboutsummaryrefslogtreecommitdiff
path: root/tests/hwsim/test_dscp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/hwsim/test_dscp.py')
-rw-r--r--tests/hwsim/test_dscp.py407
1 files changed, 0 insertions, 407 deletions
diff --git a/tests/hwsim/test_dscp.py b/tests/hwsim/test_dscp.py
deleted file mode 100644
index e017938bc355..000000000000
--- a/tests/hwsim/test_dscp.py
+++ /dev/null
@@ -1,407 +0,0 @@
-# Test cases for dscp policy
-# Copyright (c) 2021, Jouni Malinen <j@w1.fi>
-# Copyright (c) 2021, The Linux Foundation
-#
-# This software may be distributed under the terms of the BSD license.
-# See README for more details.
-
-import struct
-import time
-import sys
-import socket
-
-import hostapd
-from wpasupplicant import WpaSupplicant
-from utils import *
-
-def register_dscp_req(hapd):
- type = 0x00d0
- match = "7e506f9a1a"
- if "OK" not in hapd.request("REGISTER_FRAME %04x %s" % (type, match)):
- raise Exception("Could not register frame reception for Vendor specific protected type")
-
-def send_dscp_req(hapd, da, oui_subtype, dialog_token, req_control, qos_ie,
- truncate=False):
- type = 0
- subtype = 13
- category = 126
- oui_type = 0x506f9a1a
- if truncate:
- req = struct.pack('>BLBB', category, oui_type, oui_subtype,
- dialog_token)
- else:
- req = struct.pack('>BLBBB', category, oui_type, oui_subtype,
- dialog_token, req_control)
- if qos_ie:
- req += qos_ie
-
- msg = {}
- msg['fc'] = 0x00d0
- msg['sa'] = hapd.own_addr()
- msg['da'] = da
- msg['bssid'] = hapd.own_addr()
- msg['type'] = type
- msg['subtype'] = subtype
- msg['payload'] = req
-
- hapd.mgmt_tx(msg)
- ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
- if ev is None or "stype=13 ok=1" not in ev:
- raise Exception("No DSCP Policy Request sent")
-
-def prepare_qos_ie(policy_id, req_type, dscp, start_port=0, end_port=0,
- frame_classifier=None, frame_class_len=0, domain_name=None):
- qos_elem_oui_type = 0x229a6f50
- qos_elem_id = 221
-
- if policy_id:
- qos_attr = struct.pack('BBBBB', 2, 3, policy_id, req_type, dscp)
- qos_attr_len = 5
- else:
- qos_attr = 0
- qos_attr_len = 0
-
- if start_port and end_port:
- port_range_attr = struct.pack('>BBHH', 1, 4, start_port, end_port)
- if qos_attr:
- qos_attr += port_range_attr
- else:
- qos_attr = port_range_attr
- qos_attr_len += 6
-
- if frame_classifier and frame_class_len:
- tclas_attr = struct.pack('>BB%ds' % (len(frame_classifier),), 3,
- len(frame_classifier), frame_classifier)
- if qos_attr:
- qos_attr += tclas_attr
- else:
- qos_attr = tclas_attr
- qos_attr_len += 2 + len(frame_classifier)
-
- if domain_name:
- s = bytes(domain_name, 'utf-8')
- domain_name_attr = struct.pack('>BB%ds' % (len(s),), 4, len(s), s)
- if qos_attr:
- qos_attr += domain_name_attr
- else:
- qos_attr = domain_name_attr
- qos_attr_len += 2 + len(s)
-
- qos_attr_len += 4
- qos_ie = struct.pack('<BBL', qos_elem_id, qos_attr_len,
- qos_elem_oui_type) + qos_attr
-
- return qos_ie
-
-def validate_dscp_req_event(dev, event):
- ev = dev.wait_event(["CTRL-EVENT-DSCP-POLICY"], timeout=2)
- if ev is None:
- raise Exception("No DSCP request reported")
- if ev != event:
- raise Exception("Invalid DSCP event received (%s; expected: %s)" % (ev, event))
-
-def handle_dscp_query(hapd, query):
- msg = hapd.mgmt_rx()
- if msg['payload'] != query:
- raise Exception("Invalid DSCP Query received at AP")
-
-def handle_dscp_response(hapd, response):
- msg = hapd.mgmt_rx()
- if msg['payload'] != response:
- raise Exception("Invalid DSCP Response received at AP")
-
-def ap_sta_connectivity(dev, apdev, params):
- p = hostapd.wpa2_params(passphrase="12345678")
- p["wpa_key_mgmt"] = "WPA-PSK"
- p["ieee80211w"] = "1"
- p.update(params)
- hapd = hostapd.add_ap(apdev[0], p)
- register_dscp_req(hapd)
-
- dev[0].request("SET enable_dscp_policy_capa 1")
- dev[0].connect("dscp", psk="12345678", ieee80211w="1",
- key_mgmt="WPA-PSK WPA-PSK-SHA256", scan_freq="2412")
- hapd.wait_sta()
-
- hapd.dump_monitor()
- hapd.set("ext_mgmt_frame_handling", "1")
- return hapd
-
-def test_dscp_query(dev, apdev):
- """DSCP Policy Query"""
-
- # Positive tests
- #AP with DSCP Capabilities
- params = {"ssid": "dscp",
- "ext_capa": 6*"00" + "40",
- "assocresp_elements": "dd06506f9a230101",
- "vendor_elements": "dd06506f9a230101"}
-
- hapd = ap_sta_connectivity(dev, apdev, params)
- da = dev[0].own_addr()
-
- # Query 1
- cmd = "DSCP_QUERY wildcard"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Query failed")
- query = b'\x7e\x50\x6f\x9a\x1a\x00\x01'
- handle_dscp_query(hapd, query)
-
- # Query 2
- cmd = "DSCP_QUERY domain_name=example.com"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Query failed")
- query = b'\x7e\x50\x6f\x9a\x1a\x00\x02\xdd\x11\x50\x6f\x9a\x22\x04\x0b\x65\x78\x61\x6d\x70\x6c\x65\x2e\x63\x6f\x6d'
- handle_dscp_query(hapd, query)
-
- # Negative tests
-
- cmd = "DSCP_QUERY domain_name=" + 250*'a' + ".example.com"
- if "FAIL" not in dev[0].request(cmd):
- raise Exception("Invalid DSCP_QUERY accepted")
-
- dev[0].disconnect_and_stop_scan()
- # AP without DSCP Capabilities
- params = {"ssid": "dscp",
- "ext_capa": 6*"00" + "40"}
- hapd = ap_sta_connectivity(dev, apdev, params)
-
- # Query 3
- cmd = "DSCP_QUERY wildcard"
- if "FAIL" not in dev[0].request(cmd):
- raise Exception("Able to send invalid DSCP Query")
-
-def test_dscp_request(dev, apdev):
- """DSCP Policy Request"""
-
- # Positive tests
-
- #AP with DSCP Capabilities
- params = {"ssid": "dscp",
- "ext_capa": 6*"00" + "40",
- "assocresp_elements": "dd06506f9a230101",
- "vendor_elements": "dd06506f9a230101"}
-
- hapd = ap_sta_connectivity(dev, apdev, params)
- da = dev[0].own_addr()
-
- # Request 1
- dialog_token = 5
- send_dscp_req(hapd, da, 1, dialog_token, 2, 0)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_start clear_all"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
- validate_dscp_req_event(dev[0], event)
-
- # DSCP Request with multiple QoS IEs
- # QoS IE 1
- dialog_token = 1
- domain_name = "example.com"
- ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1")
- ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2")
- frame_classifier_start = [4, 91, 4]
- frame_classifier_end = [12, 34, 12, 34, 0, 17, 0]
- frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end)
- frame_len = len(frame_classifier)
- qos_ie = prepare_qos_ie(1, 0, 22, 0, 0, frame_classifier, frame_len, domain_name)
-
- # QoS IE 2
- ipv6_src_addr = socket.inet_pton(socket.AF_INET6, "aaaa:bbbb:cccc::1")
- ipv6_dest_addr = socket.inet_pton(socket.AF_INET6, "aaaa:bbbb:cccc::2")
- frame_classifier_start = [4, 79, 6]
- frame_classifier_end = [0, 12, 34, 0, 0, 17, 0, 0, 0]
- frame_classifier = bytes(frame_classifier_start) + ipv6_src_addr + ipv6_dest_addr + bytes(frame_classifier_end)
- frame_len = len(frame_classifier)
- ie = prepare_qos_ie(5, 0, 48, 12345, 23456, frame_classifier, frame_len,
- None)
- qos_ie += ie
-
- # QoS IE 3
- ie = prepare_qos_ie(4, 0, 32, 12345, 23456, 0, 0, domain_name)
- qos_ie += ie
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
-
- event = "<3>CTRL-EVENT-DSCP-POLICY request_start"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY add policy_id=1 dscp=22 ip_version=4 src_ip=192.168.0.1 src_port=3106 dst_port=3106 protocol=17 domain_name=example.com"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY add policy_id=5 dscp=48 ip_version=6 src_ip=aaaa:bbbb:cccc::1 dst_ip=aaaa:bbbb:cccc::2 src_port=12 protocol=17 start_port=12345 end_port=23456"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY add policy_id=4 dscp=32 ip_version=0 start_port=12345 end_port=23456 domain_name=example.com"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
- validate_dscp_req_event(dev[0], event)
-
- # Negative Tests
-
- # No DSCP policy attribute
- dialog_token = 4
- domain_name = "example.com"
- qos_ie = prepare_qos_ie(0, 0, 0, 12345, 23456, frame_classifier, frame_len,
- domain_name)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_start"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
- validate_dscp_req_event(dev[0], event)
-
- # No DSCP stream classifier params
- dialog_token = 6
- qos_ie = prepare_qos_ie(1, 0, 32, 0, 0, 0, 0, None)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_start"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
- validate_dscp_req_event(dev[0], event)
-
- # DSCP request with both destination and domain name
- dialog_token = 7
- domain_name = "example.com"
- ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1")
- ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2")
- frame_classifier_start = [4, 69, 4]
- frame_classifier_end = [0, 0, 0, 0, 0, 17, 0]
- frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end)
- frame_len = len(frame_classifier)
- qos_ie = prepare_qos_ie(1, 0, 36, 0, 0, frame_classifier, frame_len,
- domain_name)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_start"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
- validate_dscp_req_event(dev[0], event)
-
- # DSCP request with both port range and destination port
- frame_classifier_start = [4, 81, 4]
- frame_classifier_end = [0, 0, 23, 45, 0, 17, 0]
- frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end)
- frame_len = len(frame_classifier)
- qos_ie = prepare_qos_ie(1, 0, 36, 12345, 23456, frame_classifier, frame_len,
- None)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_start"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1"
- validate_dscp_req_event(dev[0], event)
- event = "<3>CTRL-EVENT-DSCP-POLICY request_end"
- validate_dscp_req_event(dev[0], event)
-
- # Too short DSCP Policy Request frame
- dialog_token += 1
- send_dscp_req(hapd, da, 1, dialog_token, 0, None, truncate=True)
-
- # Request Type: Remove
- dialog_token += 1
- qos_ie = prepare_qos_ie(1, 1, 36)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start")
- validate_dscp_req_event(dev[0],
- "<3>CTRL-EVENT-DSCP-POLICY remove policy_id=1")
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end")
-
- # Request Type: Reserved
- dialog_token += 1
- qos_ie = prepare_qos_ie(1, 2, 36)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start")
- validate_dscp_req_event(dev[0],
- "<3>CTRL-EVENT-DSCP-POLICY reject policy_id=1")
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end")
-
-def test_dscp_response(dev, apdev):
- """DSCP Policy Response"""
-
- # Positive tests
-
- # AP with DSCP Capabilities
- params = {"ssid": "dscp",
- "ext_capa": 6*"00" + "40",
- "assocresp_elements": "dd06506f9a230101",
- "vendor_elements": "dd06506f9a230101"}
- hapd = ap_sta_connectivity(dev, apdev, params)
- da = dev[0].own_addr()
-
- # Sending solicited DSCP response after receiving DSCP request
- dialog_token = 1
- domain_name = "example.com"
- ipv4_src_addr = socket.inet_pton(socket.AF_INET, "192.168.0.1")
- ipv4_dest_addr = socket.inet_pton(socket.AF_INET, "192.168.0.2")
- frame_classifier_start = [4,91,4]
- frame_classifier_end = [12,34,12,34,0,17,0]
- frame_classifier = bytes(frame_classifier_start) + ipv4_src_addr + ipv4_dest_addr + bytes(frame_classifier_end)
- frame_len = len(frame_classifier)
- qos_ie = prepare_qos_ie(1, 0, 22, 0, 0, frame_classifier, frame_len,
- domain_name)
- ie = prepare_qos_ie(4, 0, 32, 12345, 23456, 0, 0, domain_name)
- qos_ie += ie
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
-
- cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=4 status=0"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Response failed")
- response = b'\x7e\x50\x6f\x9a\x1a\x02\x01\x00\x02\x01\x00\x04\x00'
- handle_dscp_response(hapd, response)
-
- # Unsolicited DSCP Response without status duples
- cmd = "DSCP_RESP reset more"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Response failed")
- response = b'\x7e\x50\x6f\x9a\x1a\x02\x00\x03\x00'
- handle_dscp_response(hapd, response)
-
- # Unsolicited DSCP Response with one status duple
- cmd = "DSCP_RESP policy_id=2 status=0"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Response failed")
- response = b'\x7e\x50\x6f\x9a\x1a\x02\x00\x00\x01\x02\x00'
- handle_dscp_response(hapd, response)
-
- # Negative tests
-
- # Send solicited DSCP Response without prior DSCP request
- cmd = "DSCP_RESP solicited policy_id=1 status=0 policy_id=5 status=0"
- if "FAIL" not in dev[0].request(cmd):
- raise Exception("Able to send invalid DSCP response")
-
-def test_dscp_unsolicited_req_at_assoc(dev, apdev):
- """DSCP Policy and unsolicited request at association"""
- params = {"ssid": "dscp",
- "ext_capa": 6*"00" + "40",
- "assocresp_elements": "dd06506f9a230103",
- "vendor_elements": "dd06506f9a230103"}
- hapd = ap_sta_connectivity(dev, apdev, params)
- da = dev[0].own_addr()
-
- dialog_token = 1
- qos_ie = prepare_qos_ie(1, 0, 36, 12345, 23456)
- send_dscp_req(hapd, da, 1, dialog_token, 0, qos_ie)
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_start")
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY add policy_id=1 dscp=36 ip_version=0 start_port=12345 end_port=23456")
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_end")
-
- cmd = "DSCP_QUERY wildcard"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Query failed")
-
-def test_dscp_missing_unsolicited_req_at_assoc(dev, apdev):
- """DSCP Policy and missing unsolicited request at association"""
- params = {"ssid": "dscp",
- "ext_capa": 6*"00" + "40",
- "assocresp_elements": "dd06506f9a230103",
- "vendor_elements": "dd06506f9a230103"}
- hapd = ap_sta_connectivity(dev, apdev, params)
- da = dev[0].own_addr()
-
- cmd = "DSCP_QUERY wildcard"
- if "FAIL" not in dev[0].request(cmd):
- raise Exception("DSCP_QUERY accepted during wait for unsolicited requesdt")
- time.sleep(5)
- validate_dscp_req_event(dev[0], "<3>CTRL-EVENT-DSCP-POLICY request_wait end")
-
- cmd = "DSCP_QUERY wildcard"
- if "OK" not in dev[0].request(cmd):
- raise Exception("Sending DSCP Query failed")