diff options
Diffstat (limited to 'wpa_supplicant/examples/dpp-nfc.py')
-rwxr-xr-x | wpa_supplicant/examples/dpp-nfc.py | 1186 |
1 files changed, 0 insertions, 1186 deletions
diff --git a/wpa_supplicant/examples/dpp-nfc.py b/wpa_supplicant/examples/dpp-nfc.py deleted file mode 100755 index 8e865f3fcd33..000000000000 --- a/wpa_supplicant/examples/dpp-nfc.py +++ /dev/null @@ -1,1186 +0,0 @@ -#!/usr/bin/python3 -# -# Example nfcpy to wpa_supplicant wrapper for DPP NFC operations -# Copyright (c) 2012-2013, Jouni Malinen <j@w1.fi> -# Copyright (c) 2019-2020, The Linux Foundation -# -# This software may be distributed under the terms of the BSD license. -# See README for more details. - -import binascii -import errno -import os -import struct -import sys -import time -import threading -import argparse - -import nfc -import ndef - -import logging - -scriptsdir = os.path.dirname(os.path.realpath(sys.modules[__name__].__file__)) -sys.path.append(os.path.join(scriptsdir, '..', '..', 'wpaspy')) -import wpaspy - -wpas_ctrl = '/var/run/wpa_supplicant' -ifname = None -init_on_touch = False -in_raw_mode = False -prev_tcgetattr = 0 -no_input = False -continue_loop = True -terminate_now = False -summary_file = None -success_file = None -netrole = None -operation_success = False -mutex = threading.Lock() - -C_NORMAL = '\033[0m' -C_RED = '\033[91m' -C_GREEN = '\033[92m' -C_YELLOW = '\033[93m' -C_BLUE = '\033[94m' -C_MAGENTA = '\033[95m' -C_CYAN = '\033[96m' - -def summary(txt, color=None): - with mutex: - if color: - print(color + txt + C_NORMAL) - else: - print(txt) - if summary_file: - with open(summary_file, 'a') as f: - f.write(txt + "\n") - -def success_report(txt): - summary(txt) - if success_file: - with open(success_file, 'a') as f: - f.write(txt + "\n") - -def wpas_connect(): - ifaces = [] - if os.path.isdir(wpas_ctrl): - try: - ifaces = [os.path.join(wpas_ctrl, i) for i in os.listdir(wpas_ctrl)] - except OSError as error: - summary("Could not find wpa_supplicant: %s", str(error)) - return None - - if len(ifaces) < 1: - summary("No wpa_supplicant control interface found") - return None - - for ctrl in ifaces: - if ifname and ifname not in ctrl: - continue - if os.path.basename(ctrl).startswith("p2p-dev-"): - # skip P2P management interface - continue - try: - summary("Trying to use control interface " + ctrl) - wpas = wpaspy.Ctrl(ctrl) - return wpas - except Exception as e: - pass - summary("Could not connect to wpa_supplicant") - return None - -def dpp_nfc_uri_process(uri): - wpas = wpas_connect() - if wpas is None: - return False - peer_id = wpas.request("DPP_NFC_URI " + uri) - if "FAIL" in peer_id: - summary("Could not parse DPP URI from NFC URI record", color=C_RED) - return False - peer_id = int(peer_id) - summary("peer_id=%d for URI from NFC Tag: %s" % (peer_id, uri)) - cmd = "DPP_AUTH_INIT peer=%d" % peer_id - global enrollee_only, configurator_only, config_params - if enrollee_only: - cmd += " role=enrollee" - elif configurator_only: - cmd += " role=configurator" - if config_params: - cmd += " " + config_params - summary("Initiate DPP authentication: " + cmd) - res = wpas.request(cmd) - if "OK" not in res: - summary("Failed to initiate DPP Authentication", color=C_RED) - return False - summary("DPP Authentication initiated") - return True - -def dpp_hs_tag_read(record): - wpas = wpas_connect() - if wpas is None: - return False - summary(record) - if len(record.data) < 5: - summary("Too short DPP HS", color=C_RED) - return False - if record.data[0] != 0: - summary("Unexpected URI Identifier Code", color=C_RED) - return False - uribuf = record.data[1:] - try: - uri = uribuf.decode() - except: - summary("Invalid URI payload", color=C_RED) - return False - summary("URI: " + uri) - if not uri.startswith("DPP:"): - summary("Not a DPP URI", color=C_RED) - return False - return dpp_nfc_uri_process(uri) - -def get_status(wpas, extra=None): - if extra: - extra = "-" + extra - else: - extra = "" - res = wpas.request("STATUS" + extra) - lines = res.splitlines() - vals = dict() - for l in lines: - try: - [name, value] = l.split('=', 1) - except ValueError: - summary("Ignore unexpected status line: %s" % l) - continue - vals[name] = value - return vals - -def get_status_field(wpas, field, extra=None): - vals = get_status(wpas, extra) - if field in vals: - return vals[field] - return None - -def own_addr(wpas): - addr = get_status_field(wpas, "address") - if addr is None: - addr = get_status_field(wpas, "bssid[0]") - return addr - -def dpp_bootstrap_gen(wpas, type="qrcode", chan=None, mac=None, info=None, - curve=None, key=None): - cmd = "DPP_BOOTSTRAP_GEN type=" + type - if chan: - cmd += " chan=" + chan - if mac: - if mac is True: - mac = own_addr(wpas) - if mac is None: - summary("Could not determine local MAC address for bootstrap info") - else: - cmd += " mac=" + mac.replace(':', '') - if info: - cmd += " info=" + info - if curve: - cmd += " curve=" + curve - if key: - cmd += " key=" + key - res = wpas.request(cmd) - if "FAIL" in res: - raise Exception("Failed to generate bootstrapping info") - return int(res) - -def dpp_start_listen(wpas, freq): - if get_status_field(wpas, "bssid[0]"): - summary("Own AP freq: %s MHz" % str(get_status_field(wpas, "freq"))) - if get_status_field(wpas, "beacon_set", extra="DRIVER") is None: - summary("Enable beaconing to have radio ready for RX") - wpas.request("DISABLE") - wpas.request("SET start_disabled 0") - wpas.request("ENABLE") - cmd = "DPP_LISTEN %d" % freq - global enrollee_only - global configurator_only - if enrollee_only: - cmd += " role=enrollee" - elif configurator_only: - cmd += " role=configurator" - global netrole - if netrole: - cmd += " netrole=" + netrole - summary(cmd) - res = wpas.request(cmd) - if "OK" not in res: - summary("Failed to start DPP listen", color=C_RED) - return False - return True - -def wpas_get_nfc_uri(start_listen=True, pick_channel=False, chan_override=None): - listen_freq = 2412 - wpas = wpas_connect() - if wpas is None: - return None - global own_id, chanlist - if chan_override: - chan = chan_override - else: - chan = chanlist - if chan and chan.startswith("81/"): - listen_freq = int(chan[3:].split(',')[0]) * 5 + 2407 - if chan is None and get_status_field(wpas, "bssid[0]"): - freq = get_status_field(wpas, "freq") - if freq: - freq = int(freq) - if freq >= 2412 and freq <= 2462: - chan = "81/%d" % ((freq - 2407) / 5) - summary("Use current AP operating channel (%d MHz) as the URI channel list (%s)" % (freq, chan)) - listen_freq = freq - if chan is None and pick_channel: - chan = "81/6" - summary("Use channel 2437 MHz since no other preference provided") - listen_freq = 2437 - own_id = dpp_bootstrap_gen(wpas, type="nfc-uri", chan=chan, mac=True) - res = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip() - if "FAIL" in res: - return None - if start_listen: - if not dpp_start_listen(wpas, listen_freq): - raise Exception("Failed to start listen operation on %d MHz" % listen_freq) - return res - -def wpas_report_handover_req(uri): - wpas = wpas_connect() - if wpas is None: - return None - global own_id - cmd = "DPP_NFC_HANDOVER_REQ own=%d uri=%s" % (own_id, uri) - return wpas.request(cmd) - -def wpas_report_handover_sel(uri): - wpas = wpas_connect() - if wpas is None: - return None - global own_id - cmd = "DPP_NFC_HANDOVER_SEL own=%d uri=%s" % (own_id, uri) - return wpas.request(cmd) - -def dpp_handover_client(handover, alt=False): - summary("About to start run_dpp_handover_client (alt=%s)" % str(alt)) - if alt: - handover.i_m_selector = False - run_dpp_handover_client(handover, alt) - summary("Done run_dpp_handover_client (alt=%s)" % str(alt)) - -def run_client_alt(handover, alt): - if handover.start_client_alt and not alt: - handover.start_client_alt = False - summary("Try to send alternative handover request") - dpp_handover_client(handover, alt=True) - -class HandoverClient(nfc.handover.HandoverClient): - def __init__(self, handover, llc): - super(HandoverClient, self).__init__(llc) - self.handover = handover - - def recv_records(self, timeout=None): - msg = self.recv_octets(timeout) - if msg is None: - return None - records = list(ndef.message_decoder(msg, 'relax')) - if records and records[0].type == 'urn:nfc:wkt:Hs': - summary("Handover client received message '{0}'".format(records[0].type)) - return list(ndef.message_decoder(msg, 'relax')) - summary("Handover client received invalid message: %s" + binascii.hexlify(msg)) - return None - - def recv_octets(self, timeout=None): - start = time.time() - msg = bytearray() - while True: - poll_timeout = 0.1 if timeout is None or timeout > 0.1 else timeout - if not self.socket.poll('recv', poll_timeout): - if timeout: - timeout -= time.time() - start - if timeout <= 0: - return None - start = time.time() - continue - try: - r = self.socket.recv() - if r is None: - return None - msg += r - except TypeError: - return b'' - try: - list(ndef.message_decoder(msg, 'strict', {})) - return bytes(msg) - except ndef.DecodeError: - if timeout: - timeout -= time.time() - start - if timeout <= 0: - return None - start = time.time() - continue - return None - -def run_dpp_handover_client(handover, alt=False): - chan_override = None - if alt: - chan_override = handover.altchanlist - handover.alt_proposal_used = True - global test_uri, test_alt_uri - if test_uri: - summary("TEST MODE: Using specified URI (alt=%s)" % str(alt)) - uri = test_alt_uri if alt else test_uri - else: - uri = wpas_get_nfc_uri(start_listen=False, chan_override=chan_override) - if uri is None: - summary("Cannot start handover client - no bootstrap URI available", - color=C_RED) - return - handover.my_uri = uri - uri = ndef.UriRecord(uri) - summary("NFC URI record for DPP: " + str(uri)) - carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data) - global test_crn - if test_crn: - prev, = struct.unpack('>H', test_crn) - summary("TEST MODE: Use specified crn %d" % prev) - crn = test_crn - test_crn = struct.pack('>H', prev + 0x10) - else: - crn = os.urandom(2) - hr = ndef.HandoverRequestRecord(version="1.4", crn=crn) - hr.add_alternative_carrier('active', carrier.name) - message = [hr, carrier] - summary("NFC Handover Request message for DPP: " + str(message)) - - if handover.peer_crn is not None and not alt: - summary("NFC handover request from peer was already received - do not send own") - return - if handover.client: - summary("Use already started handover client") - client = handover.client - else: - summary("Start handover client") - client = HandoverClient(handover, handover.llc) - try: - summary("Trying to initiate NFC connection handover") - client.connect() - summary("Connected for handover") - except nfc.llcp.ConnectRefused: - summary("Handover connection refused") - client.close() - return - except Exception as e: - summary("Other exception: " + str(e)) - client.close() - return - handover.client = client - - if handover.peer_crn is not None and not alt: - summary("NFC handover request from peer was already received - do not send own") - return - - summary("Sending handover request") - - handover.my_crn_ready = True - - if not client.send_records(message): - handover.my_crn_ready = False - summary("Failed to send handover request", color=C_RED) - run_client_alt(handover, alt) - return - - handover.my_crn, = struct.unpack('>H', crn) - - summary("Receiving handover response") - try: - start = time.time() - message = client.recv_records(timeout=3.0) - end = time.time() - summary("Received {} record(s) in {} seconds".format(len(message) if message is not None else -1, end - start)) - except Exception as e: - # This is fine if we are the handover selector - if handover.hs_sent: - summary("Client receive failed as expected since I'm the handover server: %s" % str(e)) - elif handover.alt_proposal_used and not alt: - summary("Client received failed for initial proposal as expected since alternative proposal was also used: %s" % str(e)) - else: - summary("Client receive failed: %s" % str(e), color=C_RED) - message = None - if message is None: - if handover.hs_sent: - summary("No response received as expected since I'm the handover server") - elif handover.alt_proposal_used and not alt: - summary("No response received for initial proposal as expected since alternative proposal was also used") - elif handover.try_own and not alt: - summary("No response received for initial proposal as expected since alternative proposal will also be sent") - else: - summary("No response received", color=C_RED) - run_client_alt(handover, alt) - return - summary("Received message: " + str(message)) - if len(message) < 1 or \ - not isinstance(message[0], ndef.HandoverSelectRecord): - summary("Response was not Hs - received: " + message.type) - return - - summary("Received handover select message") - summary("alternative carriers: " + str(message[0].alternative_carriers)) - if handover.i_m_selector: - summary("Ignore the received select since I'm the handover selector") - run_client_alt(handover, alt) - return - - if handover.alt_proposal_used and not alt: - summary("Ignore received handover select for the initial proposal since alternative proposal was sent") - client.close() - return - - dpp_found = False - for carrier in message: - if isinstance(carrier, ndef.HandoverSelectRecord): - continue - summary("Remote carrier type: " + carrier.type) - if carrier.type == "application/vnd.wfa.dpp": - if len(carrier.data) == 0 or carrier.data[0] != 0: - summary("URI Identifier Code 'None' not seen", color=C_RED) - continue - summary("DPP carrier type match - send to wpa_supplicant") - dpp_found = True - uri = carrier.data[1:].decode("utf-8") - summary("DPP URI: " + uri) - handover.peer_uri = uri - if test_uri: - summary("TEST MODE: Fake processing") - break - res = wpas_report_handover_sel(uri) - if res is None or "FAIL" in res: - summary("DPP handover report rejected", color=C_RED) - break - - success_report("DPP handover reported successfully (initiator)") - summary("peer_id=" + res) - peer_id = int(res) - wpas = wpas_connect() - if wpas is None: - break - - global enrollee_only - global config_params - if enrollee_only: - extra = " role=enrollee" - elif config_params: - extra = " role=configurator " + config_params - else: - # TODO: Single Configurator instance - res = wpas.request("DPP_CONFIGURATOR_ADD") - if "FAIL" in res: - summary("Failed to initiate Configurator", color=C_RED) - break - conf_id = int(res) - extra = " conf=sta-dpp configurator=%d" % conf_id - global own_id - summary("Initiate DPP authentication") - cmd = "DPP_AUTH_INIT peer=%d own=%d" % (peer_id, own_id) - cmd += extra - res = wpas.request(cmd) - if "FAIL" in res: - summary("Failed to initiate DPP authentication", color=C_RED) - break - - if not dpp_found and handover.no_alt_proposal: - summary("DPP carrier not seen in response - do not allow alternative proposal anymore") - elif not dpp_found: - summary("DPP carrier not seen in response - allow peer to initiate a new handover with different parameters") - handover.alt_proposal = True - handover.my_crn_ready = False - handover.my_crn = None - handover.peer_crn = None - handover.hs_sent = False - summary("Returning from dpp_handover_client") - return - - summary("Remove peer") - handover.close() - summary("Done with handover") - global only_one - if only_one: - print("only_one -> stop loop") - global continue_loop - continue_loop = False - - global no_wait - if no_wait or only_one: - summary("Trying to exit..") - global terminate_now - terminate_now = True - - summary("Returning from dpp_handover_client") - -class HandoverServer(nfc.handover.HandoverServer): - def __init__(self, handover, llc): - super(HandoverServer, self).__init__(llc) - self.sent_carrier = None - self.ho_server_processing = False - self.success = False - self.llc = llc - self.handover = handover - - def serve(self, socket): - peer_sap = socket.getpeername() - summary("Serving handover client on remote sap {0}".format(peer_sap)) - send_miu = socket.getsockopt(nfc.llcp.SO_SNDMIU) - try: - while socket.poll("recv"): - req = bytearray() - while socket.poll("recv"): - r = socket.recv() - if r is None: - return None - summary("Received %d octets" % len(r)) - req += r - if len(req) == 0: - continue - try: - list(ndef.message_decoder(req, 'strict', {})) - except ndef.DecodeError: - continue - summary("Full message received") - resp = self._process_request_data(req) - if resp is None or len(resp) == 0: - summary("No handover select to send out - wait for a possible alternative handover request") - handover.alt_proposal = True - req = bytearray() - continue - - for offset in range(0, len(resp), send_miu): - if not socket.send(resp[offset:offset + send_miu]): - summary("Failed to send handover select - connection closed") - return - summary("Sent out full handover select") - if handover.terminate_on_hs_send_completion: - handover.delayed_exit() - - except nfc.llcp.Error as e: - global terminate_now - summary("HandoverServer exception: %s" % e, - color=None if e.errno == errno.EPIPE or terminate_now else C_RED) - finally: - socket.close() - summary("Handover serve thread exiting") - - def process_handover_request_message(self, records): - handover = self.handover - self.ho_server_processing = True - global in_raw_mode - was_in_raw_mode = in_raw_mode - clear_raw_mode() - if was_in_raw_mode: - print("\n") - summary("HandoverServer - request received: " + str(records)) - - for carrier in records: - if not isinstance(carrier, ndef.HandoverRequestRecord): - continue - if carrier.collision_resolution_number: - handover.peer_crn = carrier.collision_resolution_number - summary("peer_crn: %d" % handover.peer_crn) - - if handover.my_crn is None and handover.my_crn_ready: - summary("Still trying to send own handover request - wait a moment to see if that succeeds before checking crn values") - for i in range(10): - if handover.my_crn is not None: - break - time.sleep(0.01) - if handover.my_crn is not None: - summary("my_crn: %d" % handover.my_crn) - - if handover.my_crn is not None and handover.peer_crn is not None: - if handover.my_crn == handover.peer_crn: - summary("Same crn used - automatic collision resolution failed") - # TODO: Should generate a new Handover Request message - return '' - if ((handover.my_crn & 1) == (handover.peer_crn & 1) and \ - handover.my_crn > handover.peer_crn) or \ - ((handover.my_crn & 1) != (handover.peer_crn & 1) and \ - handover.my_crn < handover.peer_crn): - summary("I'm the Handover Selector Device") - handover.i_m_selector = True - else: - summary("Peer is the Handover Selector device") - summary("Ignore the received request.") - return '' - - hs = ndef.HandoverSelectRecord('1.4') - sel = [hs] - - found = False - - for carrier in records: - if isinstance(carrier, ndef.HandoverRequestRecord): - continue - summary("Remote carrier type: " + carrier.type) - if carrier.type == "application/vnd.wfa.dpp": - summary("DPP carrier type match - add DPP carrier record") - if len(carrier.data) == 0 or carrier.data[0] != 0: - summary("URI Identifier Code 'None' not seen", color=C_RED) - continue - uri = carrier.data[1:].decode("utf-8") - summary("Received DPP URI: " + uri) - - global test_uri, test_alt_uri - if test_uri: - summary("TEST MODE: Using specified URI") - data = test_sel_uri if test_sel_uri else test_uri - elif handover.alt_proposal and handover.altchanlist: - summary("Use alternative channel list while processing alternative proposal from peer") - data = wpas_get_nfc_uri(start_listen=False, - chan_override=handover.altchanlist, - pick_channel=True) - else: - data = wpas_get_nfc_uri(start_listen=False, - pick_channel=True) - summary("Own URI (pre-processing): %s" % data) - - if test_uri: - summary("TEST MODE: Fake processing") - res = "OK" - data += " [%s]" % uri - else: - res = wpas_report_handover_req(uri) - if res is None or "FAIL" in res: - summary("DPP handover request processing failed", - color=C_RED) - if handover.altchanlist: - data = wpas_get_nfc_uri(start_listen=False, - chan_override=handover.altchanlist) - summary("Own URI (try another channel list): %s" % data) - continue - - if test_alt_uri: - summary("TEST MODE: Reject initial proposal") - continue - - found = True - - if not test_uri: - wpas = wpas_connect() - if wpas is None: - continue - global own_id - data = wpas.request("DPP_BOOTSTRAP_GET_URI %d" % own_id).rstrip() - if "FAIL" in data: - continue - summary("Own URI (post-processing): %s" % data) - handover.my_uri = data - handover.peer_uri = uri - uri = ndef.UriRecord(data) - summary("Own bootstrapping NFC URI record: " + str(uri)) - - if not test_uri: - info = wpas.request("DPP_BOOTSTRAP_INFO %d" % own_id) - freq = None - for line in info.splitlines(): - if line.startswith("use_freq="): - freq = int(line.split('=')[1]) - if freq is None or freq == 0: - summary("No channel negotiated over NFC - use channel 6") - freq = 2437 - else: - summary("Negotiated channel: %d MHz" % freq) - if not dpp_start_listen(wpas, freq): - break - - carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data) - summary("Own DPP carrier record: " + str(carrier)) - hs.add_alternative_carrier('active', carrier.name) - sel = [hs, carrier] - break - - summary("Sending handover select: " + str(sel)) - if found: - summary("Handover completed successfully") - handover.terminate_on_hs_send_completion = True - self.success = True - handover.hs_sent = True - handover.i_m_selector = True - elif handover.no_alt_proposal: - summary("Do not try alternative proposal anymore - handover failed", - color=C_RED) - handover.hs_sent = True - else: - summary("Try to initiate with alternative parameters") - handover.try_own = True - handover.hs_sent = False - handover.no_alt_proposal = True - if handover.client_thread: - handover.start_client_alt = True - else: - handover.client_thread = threading.Thread(target=llcp_worker, - args=(self.llc, True)) - handover.client_thread.start() - return sel - -def clear_raw_mode(): - import sys, tty, termios - global prev_tcgetattr, in_raw_mode - if not in_raw_mode: - return - fd = sys.stdin.fileno() - termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr) - in_raw_mode = False - -def getch(): - import sys, tty, termios, select - global prev_tcgetattr, in_raw_mode - fd = sys.stdin.fileno() - prev_tcgetattr = termios.tcgetattr(fd) - ch = None - try: - tty.setraw(fd) - in_raw_mode = True - [i, o, e] = select.select([fd], [], [], 0.05) - if i: - ch = sys.stdin.read(1) - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, prev_tcgetattr) - in_raw_mode = False - return ch - -def dpp_tag_read(tag): - success = False - for record in tag.ndef.records: - summary(record) - summary("record type " + record.type) - if record.type == "application/vnd.wfa.dpp": - summary("DPP HS tag - send to wpa_supplicant") - success = dpp_hs_tag_read(record) - break - if isinstance(record, ndef.UriRecord): - summary("URI record: uri=" + record.uri) - summary("URI record: iri=" + record.iri) - if record.iri.startswith("DPP:"): - summary("DPP URI") - if not dpp_nfc_uri_process(record.iri): - break - success = True - else: - summary("Ignore unknown URI") - break - - if success: - success_report("Tag read succeeded") - - return success - -def rdwr_connected_write_tag(tag): - summary("Tag found - writing - " + str(tag)) - if not tag.ndef: - summary("Not a formatted NDEF tag", color=C_RED) - return - if not tag.ndef.is_writeable: - summary("Not a writable tag", color=C_RED) - return - global dpp_tag_data - if tag.ndef.capacity < len(dpp_tag_data): - summary("Not enough room for the message") - return - try: - tag.ndef.records = dpp_tag_data - except ValueError as e: - summary("Writing the tag failed: %s" % str(e), color=C_RED) - return - success_report("Tag write succeeded") - summary("Tag writing completed - remove tag", color=C_GREEN) - global only_one, operation_success - operation_success = True - if only_one: - global continue_loop - continue_loop = False - global dpp_sel_wait_remove - return dpp_sel_wait_remove - -def write_nfc_uri(clf, wait_remove=True): - summary("Write NFC URI record") - data = wpas_get_nfc_uri() - if data is None: - summary("Could not get NFC URI from wpa_supplicant", color=C_RED) - return - - global dpp_sel_wait_remove - dpp_sel_wait_remove = wait_remove - summary("URI: %s" % data) - uri = ndef.UriRecord(data) - summary(uri) - - summary("Touch an NFC tag to write URI record", color=C_CYAN) - global dpp_tag_data - dpp_tag_data = [uri] - clf.connect(rdwr={'on-connect': rdwr_connected_write_tag}) - -def write_nfc_hs(clf, wait_remove=True): - summary("Write NFC Handover Select record on a tag") - data = wpas_get_nfc_uri() - if data is None: - summary("Could not get NFC URI from wpa_supplicant", color=C_RED) - return - - global dpp_sel_wait_remove - dpp_sel_wait_remove = wait_remove - summary("URI: %s" % data) - uri = ndef.UriRecord(data) - summary(uri) - carrier = ndef.Record('application/vnd.wfa.dpp', 'A', uri.data) - hs = ndef.HandoverSelectRecord('1.4') - hs.add_alternative_carrier('active', carrier.name) - summary(hs) - summary(carrier) - - summary("Touch an NFC tag to write HS record", color=C_CYAN) - global dpp_tag_data - dpp_tag_data = [hs, carrier] - summary(dpp_tag_data) - clf.connect(rdwr={'on-connect': rdwr_connected_write_tag}) - -def rdwr_connected(tag): - global only_one, no_wait - summary("Tag connected: " + str(tag)) - - if tag.ndef: - summary("NDEF tag: " + tag.type) - summary(tag.ndef.records) - success = dpp_tag_read(tag) - if only_one and success: - global continue_loop - continue_loop = False - else: - summary("Not an NDEF tag - remove tag", color=C_RED) - return True - - return not no_wait - -def llcp_worker(llc, try_alt): - global handover - print("Start of llcp_worker()") - if try_alt: - summary("Starting handover client (try_alt)") - dpp_handover_client(handover, alt=True) - summary("Exiting llcp_worker thread (try_alt)") - return - global init_on_touch - if init_on_touch: - summary("Starting handover client (init_on_touch)") - dpp_handover_client(handover) - summary("Exiting llcp_worker thread (init_on_touch)") - return - - global no_input - if no_input: - summary("Wait for handover to complete") - else: - print("Wait for handover to complete - press 'i' to initiate") - while not handover.wait_connection and handover.srv.sent_carrier is None: - if handover.try_own: - handover.try_own = False - summary("Try to initiate another handover with own parameters") - handover.my_crn_ready = False - handover.my_crn = None - handover.peer_crn = None - handover.hs_sent = False - dpp_handover_client(handover, alt=True) - summary("Exiting llcp_worker thread (retry with own parameters)") - return - if handover.srv.ho_server_processing: - time.sleep(0.025) - elif no_input: - time.sleep(0.5) - else: - res = getch() - if res != 'i': - continue - clear_raw_mode() - summary("Starting handover client") - dpp_handover_client(handover) - summary("Exiting llcp_worker thread (manual init)") - return - - global in_raw_mode - was_in_raw_mode = in_raw_mode - clear_raw_mode() - if was_in_raw_mode: - print("\r") - summary("Exiting llcp_worker thread") - -class ConnectionHandover(): - def __init__(self): - self.client = None - self.client_thread = None - self.reset() - self.exit_thread = None - - def reset(self): - self.wait_connection = False - self.my_crn_ready = False - self.my_crn = None - self.peer_crn = None - self.hs_sent = False - self.no_alt_proposal = False - self.alt_proposal_used = False - self.i_m_selector = False - self.start_client_alt = False - self.terminate_on_hs_send_completion = False - self.try_own = False - self.my_uri = None - self.peer_uri = None - self.connected = False - self.alt_proposal = False - - def start_handover_server(self, llc): - summary("Start handover server") - self.llc = llc - self.srv = HandoverServer(self, llc) - - def close(self): - if self.client: - self.client.close() - self.client = None - - def run_delayed_exit(self): - summary("Trying to exit (delayed)..") - time.sleep(0.25) - summary("Trying to exit (after wait)..") - global terminate_now - terminate_now = True - - def delayed_exit(self): - global only_one - if only_one: - self.exit_thread = threading.Thread(target=self.run_delayed_exit) - self.exit_thread.start() - -def llcp_startup(llc): - global handover - handover.start_handover_server(llc) - return llc - -def llcp_connected(llc): - summary("P2P LLCP connected") - global handover - handover.connected = True - handover.srv.start() - if init_on_touch or not no_input: - handover.client_thread = threading.Thread(target=llcp_worker, - args=(llc, False)) - handover.client_thread.start() - return True - -def llcp_release(llc): - summary("LLCP release") - global handover - handover.close() - return True - -def terminate_loop(): - global terminate_now - return terminate_now - -def main(): - clf = nfc.ContactlessFrontend() - - parser = argparse.ArgumentParser(description='nfcpy to wpa_supplicant integration for DPP NFC operations') - parser.add_argument('-d', const=logging.DEBUG, default=logging.INFO, - action='store_const', dest='loglevel', - help='verbose debug output') - parser.add_argument('-q', const=logging.WARNING, action='store_const', - dest='loglevel', help='be quiet') - parser.add_argument('--only-one', '-1', action='store_true', - help='run only one operation and exit') - parser.add_argument('--init-on-touch', '-I', action='store_true', - help='initiate handover on touch') - parser.add_argument('--no-wait', action='store_true', - help='do not wait for tag to be removed before exiting') - parser.add_argument('--ifname', '-i', - help='network interface name') - parser.add_argument('--no-input', '-a', action='store_true', - help='do not use stdout input to initiate handover') - parser.add_argument('--tag-read-only', '-t', action='store_true', - help='tag read only (do not allow connection handover)') - parser.add_argument('--handover-only', action='store_true', - help='connection handover only (do not allow tag read)') - parser.add_argument('--enrollee', action='store_true', - help='run as Enrollee-only') - parser.add_argument('--configurator', action='store_true', - help='run as Configurator-only') - parser.add_argument('--config-params', default='', - help='configurator parameters') - parser.add_argument('--ctrl', default='/var/run/wpa_supplicant', - help='wpa_supplicant/hostapd control interface') - parser.add_argument('--summary', - help='summary file for writing status updates') - parser.add_argument('--success', - help='success file for writing success update') - parser.add_argument('--device', default='usb', help='NFC device to open') - parser.add_argument('--chan', default=None, help='channel list') - parser.add_argument('--altchan', default=None, help='alternative channel list') - parser.add_argument('--netrole', default=None, help='netrole for Enrollee') - parser.add_argument('--test-uri', default=None, - help='test mode: initial URI') - parser.add_argument('--test-alt-uri', default=None, - help='test mode: alternative URI') - parser.add_argument('--test-sel-uri', default=None, - help='test mode: handover select URI') - parser.add_argument('--test-crn', default=None, - help='test mode: hardcoded crn') - parser.add_argument('command', choices=['write-nfc-uri', - 'write-nfc-hs'], - nargs='?') - args = parser.parse_args() - summary(args) - - global handover - handover = ConnectionHandover() - - global only_one - only_one = args.only_one - - global no_wait - no_wait = args.no_wait - - global chanlist, netrole, test_uri, test_alt_uri, test_sel_uri - global test_crn - chanlist = args.chan - handover.altchanlist = args.altchan - netrole = args.netrole - test_uri = args.test_uri - test_alt_uri = args.test_alt_uri - test_sel_uri = args.test_sel_uri - if args.test_crn: - test_crn = struct.pack('>H', int(args.test_crn)) - else: - test_crn = None - - logging.basicConfig(level=args.loglevel) - for l in ['nfc.clf.rcs380', - 'nfc.clf.transport', - 'nfc.clf.device', - 'nfc.clf.__init__', - 'nfc.llcp', - 'nfc.handover']: - log = logging.getLogger(l) - log.setLevel(args.loglevel) - - global init_on_touch - init_on_touch = args.init_on_touch - - global enrollee_only - enrollee_only = args.enrollee - - global configurator_only - configurator_only = args.configurator - - global config_params - config_params = args.config_params - - if args.ifname: - global ifname - ifname = args.ifname - summary("Selected ifname " + ifname) - - if args.ctrl: - global wpas_ctrl - wpas_ctrl = args.ctrl - - if args.summary: - global summary_file - summary_file = args.summary - - if args.success: - global success_file - success_file = args.success - - if args.no_input: - global no_input - no_input = True - - clf = nfc.ContactlessFrontend() - - try: - if not clf.open(args.device): - summary("Could not open connection with an NFC device", color=C_RED) - raise SystemExit(1) - - if args.command == "write-nfc-uri": - write_nfc_uri(clf, wait_remove=not args.no_wait) - if not operation_success: - raise SystemExit(1) - raise SystemExit - - if args.command == "write-nfc-hs": - write_nfc_hs(clf, wait_remove=not args.no_wait) - if not operation_success: - raise SystemExit(1) - raise SystemExit - - global continue_loop - while continue_loop: - global in_raw_mode - was_in_raw_mode = in_raw_mode - clear_raw_mode() - if was_in_raw_mode: - print("\r") - if args.handover_only: - summary("Waiting a peer to be touched", color=C_MAGENTA) - elif args.tag_read_only: - summary("Waiting for a tag to be touched", color=C_BLUE) - else: - summary("Waiting for a tag or peer to be touched", - color=C_GREEN) - handover.wait_connection = True - try: - if args.tag_read_only: - if not clf.connect(rdwr={'on-connect': rdwr_connected}): - break - elif args.handover_only: - if not clf.connect(llcp={'on-startup': llcp_startup, - 'on-connect': llcp_connected, - 'on-release': llcp_release}, - terminate=terminate_loop): - break - else: - if not clf.connect(rdwr={'on-connect': rdwr_connected}, - llcp={'on-startup': llcp_startup, - 'on-connect': llcp_connected, - 'on-release': llcp_release}, - terminate=terminate_loop): - break - except Exception as e: - summary("clf.connect failed: " + str(e)) - break - - if only_one and handover.connected: - role = "selector" if handover.i_m_selector else "requestor" - summary("Connection handover result: I'm the %s" % role, - color=C_YELLOW) - if handover.peer_uri: - summary("Peer URI: " + handover.peer_uri, color=C_YELLOW) - if handover.my_uri: - summary("My URI: " + handover.my_uri, color=C_YELLOW) - if not (handover.peer_uri and handover.my_uri): - summary("Negotiated connection handover failed", - color=C_YELLOW) - break - - except KeyboardInterrupt: - raise SystemExit - finally: - clf.close() - - raise SystemExit - -if __name__ == '__main__': - main() |