diff --git a/modules/default.nix b/modules/default.nix index 38dc354..d2f35d2 100644 --- a/modules/default.nix +++ b/modules/default.nix @@ -49,7 +49,7 @@ ./uefi.nix ./unfree.nix ./upgrade-diff.nix - ./wireshark + ./wireshark.nix ./yubikey-gpg.nix ]; diff --git a/modules/wireshark.nix b/modules/wireshark.nix new file mode 100644 index 0000000..878f649 --- /dev/null +++ b/modules/wireshark.nix @@ -0,0 +1,7 @@ +{ config, lib, pkgs, ... }: +lib.mkIf config.jalr.gui.enable { + programs.wireshark = { + enable = true; + package = pkgs.wireshark; + }; +} diff --git a/modules/wireshark/default.nix b/modules/wireshark/default.nix deleted file mode 100644 index 6c1b0c0..0000000 --- a/modules/wireshark/default.nix +++ /dev/null @@ -1,23 +0,0 @@ -{ config, lib, pkgs, ... }: -let - extcap = ./extcap; - pythonWithPackages = pkgs.python3.withPackages (pp: with pp; [ - pyserial - psutil - ]); - nrf_sniffer_ble = pkgs.writeShellScript "nrf_sniffer_ble" '' - script_path=$(dirname `which $0`) - - exec ${pythonWithPackages}/bin/python3 $script_path/nrf_sniffer_ble.py "$@" - ''; -in -lib.mkIf config.jalr.gui.enable { - programs.wireshark = { - enable = true; - package = pkgs.wireshark.overrideAttrs (o: { - postInstall = '' - cp -r ${extcap}/* ${nrf_sniffer_ble} $out/lib/wireshark/extcap - '' + o.postInstall; - }); - }; -} diff --git a/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py b/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py deleted file mode 100644 index 8c218e5..0000000 --- a/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -import time, os, logging -from . import Logger -from . import Pcap - - -DEFAULT_CAPTURE_FILE_DIR = Logger.DEFAULT_LOG_FILE_DIR -DEFAULT_CAPTURE_FILE_NAME = "capture.pcap" - - -def get_capture_file_path(capture_file_path=None): - default_path = os.path.join(DEFAULT_CAPTURE_FILE_DIR, DEFAULT_CAPTURE_FILE_NAME) - if capture_file_path is None: - return default_path - if os.path.splitext(capture_file_path)[1] != ".pcap": - return default_path - return os.path.abspath(capture_file_path) - - -class CaptureFileHandler: - def __init__(self, capture_file_path=None, clear=False): - filename = get_capture_file_path(capture_file_path) - if not os.path.isdir(os.path.dirname(filename)): - os.makedirs(os.path.dirname(filename)) - self.filename = filename - self.backupFilename = self.filename + ".1" - if not os.path.isfile(self.filename): - self.startNewFile() - elif os.path.getsize(self.filename) > 20000000: - self.doRollover() - if clear: - # clear file - self.startNewFile() - - def startNewFile(self): - with open(self.filename, "wb") as f: - f.write(Pcap.get_global_header()) - - def doRollover(self): - try: - os.remove(self.backupFilename) - except: - logging.exception("capture file rollover remove backup failed") - try: - os.rename(self.filename, self.backupFilename) - self.startNewFile() - except: - logging.exception("capture file rollover failed") - - def writePacket(self, packet): - with open(self.filename, "ab") as f: - packet = Pcap.create_packet( - bytes([packet.boardId] + packet.getList()), packet.time - ) - f.write(packet) diff --git a/modules/wireshark/extcap/SnifferAPI/Devices.py b/modules/wireshark/extcap/SnifferAPI/Devices.py deleted file mode 100644 index 61ac961..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Devices.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright (c) 2017, Nordic Semiconductor ASA -# -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from . import Notifications -import logging, threading - - -class DeviceList(Notifications.Notifier): - def __init__(self, *args, **kwargs): - Notifications.Notifier.__init__(self, *args, **kwargs) - logging.info("args: " + str(args)) - logging.info("kwargs: " + str(kwargs)) - self._deviceListLock = threading.RLock() - with self._deviceListLock: - self.devices = [] - - def __len__(self): - return len(self.devices) - - def __repr__(self): - return "Sniffer Device List: " + str(self.asList()) - - def clear(self): - logging.info("Clearing") - with self._deviceListLock: - self.devices = [] - self.notify("DEVICES_CLEARED") - - def appendOrUpdate(self, newDevice): - with self._deviceListLock: - existingDevice = self.find(newDevice) - - # Add device to the list of devices being displayed, but only if CRC is OK - if existingDevice == None: - self.append(newDevice) - else: - updated = False - if (newDevice.name != '""') and (existingDevice.name == '""'): - existingDevice.name = newDevice.name - updated = True - - if ( - newDevice.RSSI != 0 - and (existingDevice.RSSI < (newDevice.RSSI - 5)) - or (existingDevice.RSSI > (newDevice.RSSI + 2)) - ): - existingDevice.RSSI = newDevice.RSSI - updated = True - - if updated: - self.notify("DEVICE_UPDATED", existingDevice) - - def append(self, device): - self.devices.append(device) - self.notify("DEVICE_ADDED", device) - - def find(self, id): - if type(id) == list: - for dev in self.devices: - if dev.address == id: - return dev - elif type(id) == int: - return self.devices[id] - elif type(id) == str: - for dev in self.devices: - if dev.name in [id, '"' + id + '"']: - return dev - elif id.__class__.__name__ == "Device": - return self.find(id.address) - return None - - def remove(self, id): - if type(id) == list: # address - device = self.devices.pop(self.devices.index(self.find(id))) - elif type(id) == int: - device = self.devices.pop(id) - elif type(id) == Device: - device = self.devices.pop(self.devices.index(self.find(id.address))) - self.notify("DEVICE_REMOVED", device) - - def index(self, device): - index = 0 - for dev in self.devices: - if dev.address == device.address: - return index - index += 1 - return None - - def setFollowed(self, device): - if device in self.devices: - for dev in self.devices: - dev.followed = False - device.followed = True - self.notify("DEVICE_FOLLOWED", device) - - def asList(self): - return self.devices[:] - - -class Device: - def __init__(self, address, name, RSSI): - self.address = address - self.name = name - self.RSSI = RSSI - self.followed = False - - def __repr__(self): - return 'Bluetooth LE device "' + self.name + '" (' + str(self.address) + ")" - - -def listToString(list): - str = "" - for i in list: - str += chr(i) - return str diff --git a/modules/wireshark/extcap/SnifferAPI/Exceptions.py b/modules/wireshark/extcap/SnifferAPI/Exceptions.py deleted file mode 100644 index 86f356a..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Exceptions.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -class SnifferTimeout(Exception): - pass - - -class UARTPacketError(Exception): - pass - - -class LockedException(Exception): - def __init__(self, message): - self.message = message - - -class InvalidPacketException(Exception): - pass - - -class InvalidAdvChannel(Exception): - pass - - -# Internal Use -class SnifferWatchDogTimeout(SnifferTimeout): - pass - - -# Internal Use -class ExitCodeException(Exception): - pass diff --git a/modules/wireshark/extcap/SnifferAPI/Filelock.py b/modules/wireshark/extcap/SnifferAPI/Filelock.py deleted file mode 100644 index 7bf21b5..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Filelock.py +++ /dev/null @@ -1,67 +0,0 @@ -import os -import logging -from sys import platform - -if platform == "linux": - import psutil - -from . import Exceptions - -# Lock file management. -# ref: https://refspecs.linuxfoundation.org/FHS_3.0/fhs/ch05s09.html -# -# Stored in /var/lock: -# The naming convention which must be used is "LCK.." followed by the base name of the device. -# For example, to lock /dev/ttyS0 the file "LCK..ttyS0" would be created. -# HDB UUCP lock file format: -# process identifier (PID) as a ten byte ASCII decimal number, with a trailing newline - - -def lockpid(lockfile): - if os.path.isfile(lockfile): - with open(lockfile) as fd: - lockpid = fd.read() - - try: - return int(lockpid) - except: - logging.info("Lockfile is invalid. Overriding it..") - os.remove(lockfile) - return 0 - - return 0 - - -def lock(port): - if platform != "linux": - return - - tty = os.path.basename(port) - lockfile = os.path.join("/run", "user", f"{os.getuid()}", f"{tty}.lock") - - lockedpid = lockpid(lockfile) - if lockedpid: - if lockedpid == os.getpid(): - return - - if psutil.pid_exists(lockedpid): - raise Exceptions.LockedException(f"Device {port} is locked") - else: - logging.info("Lockfile is stale. Overriding it..") - os.remove(lockfile) - - fd = open(lockfile, "w") - with open(lockfile, "w") as fd: - fd.write(f"{os.getpid():10}") - - -def unlock(port): - if platform != "linux": - return - - tty = os.path.basename(port) - lockfile = f"/var/lock/LCK..{tty}" - - lockedpid = lockpid(lockfile) - if lockedpid == os.getpid(): - os.remove(lockfile) diff --git a/modules/wireshark/extcap/SnifferAPI/Logger.py b/modules/wireshark/extcap/SnifferAPI/Logger.py deleted file mode 100644 index 228a0f1..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Logger.py +++ /dev/null @@ -1,214 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -import time, os, logging, traceback, threading -import logging.handlers as logHandlers - -################################################################# -# This file contains the logger. To log a line, simply write # -# 'logging.[level]("whatever you want to log")' # -# [level] is one of {info, debug, warning, error, critical, # -# exception} # -# See python logging documentation # -# As long as Logger.initLogger has been called beforehand, this # -# will result in the line being appended to the log file # -################################################################# - -appdata = os.getenv("appdata") -if appdata: - DEFAULT_LOG_FILE_DIR = os.path.join( - appdata, "Nordic Semiconductor", "Sniffer", "logs" - ) -else: - DEFAULT_LOG_FILE_DIR = "/tmp/logs" - -DEFAULT_LOG_FILE_NAME = "log.txt" - -logFileName = None -logHandler = None -logHandlerArray = [] -logFlusher = None - -myMaxBytes = 1000000 - - -def setLogFileName(log_file_path): - global logFileName - logFileName = os.path.abspath(log_file_path) - - -# Ensure that the directory we are writing the log file to exists. -# Create our logfile, and write the timestamp in the first line. -def initLogger(): - try: - global logFileName - if logFileName is None: - logFileName = os.path.join(DEFAULT_LOG_FILE_DIR, DEFAULT_LOG_FILE_NAME) - - # First, make sure that the directory exists - if not os.path.isdir(os.path.dirname(logFileName)): - os.makedirs(os.path.dirname(logFileName)) - - # If the file does not exist, create it, and save the timestamp - if not os.path.isfile(logFileName): - with open(logFileName, "w") as f: - f.write(str(time.time()) + str(os.linesep)) - - global logFlusher - global logHandlerArray - - logHandler = MyRotatingFileHandler( - logFileName, mode="a", maxBytes=myMaxBytes, backupCount=3 - ) - logFormatter = logging.Formatter( - "%(asctime)s %(levelname)s: %(message)s", datefmt="%d-%b-%Y %H:%M:%S (%z)" - ) - logHandler.setFormatter(logFormatter) - logger = logging.getLogger() - logger.addHandler(logHandler) - logger.setLevel(logging.INFO) - logFlusher = LogFlusher(logHandler) - logHandlerArray.append(logHandler) - except: - print("LOGGING FAILED") - print(traceback.format_exc()) - raise - - -def shutdownLogger(): - if logFlusher is not None: - logFlusher.stop() - logging.shutdown() - - -# Clear the log (typically after it has been sent on email) -def clearLog(): - try: - logHandler.doRollover() - except: - print("LOGGING FAILED") - raise - - -# Returns the timestamp residing on the first line of the logfile. Used for checking the time of creation -def getTimestamp(): - try: - with open(logFileName, "r") as f: - f.seek(0) - return f.readline() - except: - print("LOGGING FAILED") - - -def addTimestamp(): - try: - with open(logFileName, "a") as f: - f.write(str(time.time()) + os.linesep) - except: - print("LOGGING FAILED") - - -# Returns the entire content of the logfile. Used when sending emails -def readAll(): - try: - text = "" - with open(logFileName, "r") as f: - text = f.read() - return text - except: - print("LOGGING FAILED") - - -def addLogHandler(logHandler): - global logHandlerArray - logger = logging.getLogger() - logger.addHandler(logHandler) - logger.setLevel(logging.INFO) - logHandlerArray.append(logHandler) - - -def removeLogHandler(logHandler): - global logHandlerArray - logger = logging.getLogger() - logger.removeHandler(logHandler) - logHandlerArray.remove(logHandler) - - -class MyRotatingFileHandler(logHandlers.RotatingFileHandler): - def doRollover(self): - try: - logHandlers.RotatingFileHandler.doRollover(self) - addTimestamp() - self.maxBytes = myMaxBytes - except: - # There have been permissions issues with the log files. - self.maxBytes += int(myMaxBytes / 2) - - -class LogFlusher(threading.Thread): - def __init__(self, logHandler): - threading.Thread.__init__(self) - - self.daemon = True - self.handler = logHandler - self.exit = threading.Event() - - self.start() - - def run(self): - while True: - if self.exit.wait(10): - try: - self.doFlush() - except AttributeError as e: - print(e) - break - self.doFlush() - - def doFlush(self): - self.handler.flush() - os.fsync(self.handler.stream.fileno()) - - def stop(self): - self.exit.set() - - -if __name__ == "__main__": - initLogger() - for i in range(50): - logging.info("test log no. " + str(i)) - print("test log no. ", i) diff --git a/modules/wireshark/extcap/SnifferAPI/Notifications.py b/modules/wireshark/extcap/SnifferAPI/Notifications.py deleted file mode 100644 index b7cba37..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Notifications.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -import threading, logging - - -class Notification: - def __init__(self, key, msg=None): - if type(key) is not str: - raise TypeError("Invalid notification key: " + str(key)) - self.key = key - self.msg = msg - - def __repr__(self): - return "Notification (key: %s, msg: %s)" % (str(self.key), str(self.msg)) - - -class Notifier: - def __init__(self, callbacks=[], **kwargs): - self.callbacks = {} - self.callbackLock = threading.RLock() - - for callback in callbacks: - self.subscribe(*callback) - - def clearCallbacks(self): - with self.callbackLock: - self.callbacks.clear() - - def subscribe(self, key, callback): - with self.callbackLock: - if callback not in self.getCallbacks(key): - self.getCallbacks(key).append(callback) - - def unSubscribe(self, key, callback): - with self.callbackLock: - if callback in self.getCallbacks(key): - self.getCallbacks(key).remove(callback) - - def getCallbacks(self, key): - with self.callbackLock: - if key not in self.callbacks: - self.callbacks[key] = [] - return self.callbacks[key] - - def notify(self, key=None, msg=None, notification=None): - with self.callbackLock: - if notification == None: - notification = Notification(key, msg) - - for callback in self.getCallbacks(notification.key): - callback(notification) - - for callback in self.getCallbacks("*"): - callback(notification) - - def passOnNotification(self, notification): - self.notify(notification=notification) diff --git a/modules/wireshark/extcap/SnifferAPI/Packet.py b/modules/wireshark/extcap/SnifferAPI/Packet.py deleted file mode 100644 index bc4abd9..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Packet.py +++ /dev/null @@ -1,651 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from . import UART, Exceptions, Notifications -import time, logging, os, sys, serial -from .Types import * - -ADV_ACCESS_ADDRESS = [0xD6, 0xBE, 0x89, 0x8E] - -SYNCWORD_POS = 0 -PAYLOAD_LEN_POS_V1 = 1 -PAYLOAD_LEN_POS = 0 -PROTOVER_POS = PAYLOAD_LEN_POS + 2 -PACKETCOUNTER_POS = PROTOVER_POS + 1 -ID_POS = PACKETCOUNTER_POS + 2 - -BLE_HEADER_LEN_POS = ID_POS + 1 -FLAGS_POS = BLE_HEADER_LEN_POS + 1 -CHANNEL_POS = FLAGS_POS + 1 -RSSI_POS = CHANNEL_POS + 1 -EVENTCOUNTER_POS = RSSI_POS + 1 -TIMESTAMP_POS = EVENTCOUNTER_POS + 2 -BLEPACKET_POS = TIMESTAMP_POS + 4 -TXADD_POS = BLEPACKET_POS + 4 -TXADD_MSK = 0x40 -PAYLOAD_POS = BLE_HEADER_LEN_POS - -HEADER_LENGTH = 6 -BLE_HEADER_LENGTH = 10 - -VALID_ADV_CHANS = [37, 38, 39] - -PACKET_COUNTER_CAP = 2**16 - - -class PacketReader(Notifications.Notifier): - def __init__(self, portnum=None, callbacks=[], baudrate=None): - Notifications.Notifier.__init__(self, callbacks) - self.portnum = portnum - try: - self.uart = UART.Uart(portnum, baudrate) - except serial.SerialException as e: - logging.exception("Error opening UART %s" % str(e)) - self.uart = UART.Uart() - self.packetCounter = 0 - self.lastReceivedPacketCounter = 0 - self.lastReceivedPacket = None - self.lastReceivedTimestampPacket = None - self.supportedProtocolVersion = PROTOVER_V3 - - def setup(self): - pass - - def doExit(self): - # This method will always join the Uart worker thread - self.uart.close() - # Clear method references to avoid uncollectable cyclic references - self.clearCallbacks() - - # This function takes a byte list, encode it in SLIP protocol and return the encoded byte list - def encodeToSLIP(self, byteList): - tempSLIPBuffer = [] - tempSLIPBuffer.append(SLIP_START) - for i in byteList: - if i == SLIP_START: - tempSLIPBuffer.append(SLIP_ESC) - tempSLIPBuffer.append(SLIP_ESC_START) - elif i == SLIP_END: - tempSLIPBuffer.append(SLIP_ESC) - tempSLIPBuffer.append(SLIP_ESC_END) - elif i == SLIP_ESC: - tempSLIPBuffer.append(SLIP_ESC) - tempSLIPBuffer.append(SLIP_ESC_ESC) - else: - tempSLIPBuffer.append(i) - tempSLIPBuffer.append(SLIP_END) - return tempSLIPBuffer - - # This function uses getSerialByte() function to get SLIP encoded bytes from the serial port and return a decoded byte list - # Based on https://github.com/mehdix/pyslip/ - def decodeFromSLIP(self, timeout=None, complete_timeout=None): - dataBuffer = [] - startOfPacket = False - endOfPacket = False - - if complete_timeout is not None: - time_start = time.time() - - while not startOfPacket and ( - complete_timeout is None or (time.time() - time_start < complete_timeout) - ): - res = self.getSerialByte(timeout) - startOfPacket = res == SLIP_START - - while not endOfPacket and ( - complete_timeout is None or (time.time() - time_start < complete_timeout) - ): - serialByte = self.getSerialByte(timeout) - if serialByte == SLIP_END: - endOfPacket = True - elif serialByte == SLIP_ESC: - serialByte = self.getSerialByte(timeout) - if serialByte == SLIP_ESC_START: - dataBuffer.append(SLIP_START) - elif serialByte == SLIP_ESC_END: - dataBuffer.append(SLIP_END) - elif serialByte == SLIP_ESC_ESC: - dataBuffer.append(SLIP_ESC) - else: - dataBuffer.append(SLIP_END) - else: - dataBuffer.append(serialByte) - if not endOfPacket: - raise Exceptions.UARTPacketError( - "Exceeded max timeout of %f seconds." % complete_timeout - ) - return dataBuffer - - # This function read byte chuncks from the serial port and return one byte at a time - # Based on https://github.com/mehdix/pyslip/ - def getSerialByte(self, timeout=None): - serialByte = self.uart.readByte(timeout) - if serialByte is None: - raise Exceptions.SnifferTimeout("Packet read timed out.") - return serialByte - - def handlePacketHistory(self, packet): - # Reads and validates packet counter - if ( - self.lastReceivedPacket is not None - and packet.packetCounter - != (self.lastReceivedPacket.packetCounter + 1) % PACKET_COUNTER_CAP - and self.lastReceivedPacket.packetCounter != 0 - ): - - logging.info( - "gap in packets, between " - + str(self.lastReceivedPacket.packetCounter) - + " and " - + str(packet.packetCounter) - + " packet before: " - + str(self.lastReceivedPacket.packetList) - + " packet after: " - + str(packet.packetList) - ) - - self.lastReceivedPacket = packet - if packet.id in [EVENT_PACKET_DATA_PDU, EVENT_PACKET_ADV_PDU]: - self.lastReceivedTimestampPacket = packet - - def getPacketTime(self, packet): - ble_payload_length = self.lastReceivedPacket.payloadLength - BLE_HEADER_LENGTH - - if packet.phy == PHY_1M: - return 8 * (1 + ble_payload_length) - elif packet.phy == PHY_2M: - return 4 * (2 + ble_payload_length) - elif packet.phy == PHY_CODED: - # blePacket is not assigned if not packet is "OK" (CRC error) - ci = packet.packetList[BLEPACKET_POS + 4] - fec2_block_len = ble_payload_length - 4 - 1 - fec1_block_us = 80 + 256 + 16 + 24 - if ci == PHY_CODED_CI_S8: - return fec1_block_us + 64 * fec2_block_len + 24 - elif ci == PHY_CODED_CI_S2: - return fec1_block_us + 16 * fec2_block_len + 6 - # Unknown PHY or Coding Indicator - return 0 - - def convertPacketListProtoVer2(self, packet): - # Convert to version 2 - packet.packetList[PROTOVER_POS] = 2 - - # Convert to common packet ID - if packet.packetList[ID_POS] == EVENT_PACKET_ADV_PDU: - packet.packetList[ID_POS] = EVENT_PACKET_DATA_PDU - - if packet.packetList[ID_POS] != EVENT_PACKET_DATA_PDU: - # These types do not have a timestamp - return - - # Convert time-stamp to End to Start delta - time_delta = 0 - if ( - self.lastReceivedTimestampPacket is not None - and self.lastReceivedTimestampPacket.valid - ): - time_delta = packet.timestamp - ( - self.lastReceivedTimestampPacket.timestamp - + self.getPacketTime(self.lastReceivedTimestampPacket) - ) - - time_delta = toLittleEndian(time_delta, 4) - packet.packetList[TIMESTAMP_POS] = time_delta[0] - packet.packetList[TIMESTAMP_POS + 1] = time_delta[1] - packet.packetList[TIMESTAMP_POS + 2] = time_delta[2] - packet.packetList[TIMESTAMP_POS + 3] = time_delta[3] - - def handlePacketCompatibility(self, packet): - if ( - self.supportedProtocolVersion == PROTOVER_V2 - and packet.packetList[PROTOVER_POS] > PROTOVER_V2 - ): - self.convertPacketListProtoVer2(packet) - - def setSupportedProtocolVersion(self, supportedProtocolVersion): - if supportedProtocolVersion != PROTOVER_V3: - logging.info( - "Using packet compatibility, converting packets to protocol version %d", - supportedProtocolVersion, - ) - self.supportedProtocolVersion = supportedProtocolVersion - - def getPacket(self, timeout=None): - packetList = [] - try: - packetList = self.decodeFromSLIP(timeout) - except Exceptions.UARTPacketError: # FIXME: This is never thrown... - logging.exception("") - return None - else: - packet = Packet(packetList) - if packet.valid: - self.handlePacketCompatibility(packet) - self.handlePacketHistory(packet) - return packet - - def sendPacket(self, id, payload): - packetList = ( - [HEADER_LENGTH] - + [len(payload)] - + [PROTOVER_V1] - + toLittleEndian(self.packetCounter, 2) - + [id] - + payload - ) - packetList = self.encodeToSLIP(packetList) - self.packetCounter += 1 - self.uart.writeList(packetList) - - def sendScan(self, findScanRsp=False, findAux=False, scanCoded=False): - flags0 = findScanRsp | (findAux << 1) | (scanCoded << 2) - self.sendPacket(REQ_SCAN_CONT, [flags0]) - logging.info("Scan flags: %s" % bin(flags0)) - - def sendFollow( - self, - addr, - followOnlyAdvertisements=False, - followOnlyLegacy=False, - followCoded=False, - ): - flags0 = followOnlyAdvertisements | (followOnlyLegacy << 1) | (followCoded << 2) - logging.info("Follow flags: %s" % bin(flags0)) - self.sendPacket(REQ_FOLLOW, addr + [flags0]) - - def sendPingReq(self): - self.sendPacket(PING_REQ, []) - - def getBytes(self, value, size): - if len(value) < size: - value = [0] * (size - len(value)) + value - else: - value = value[:size] - - return value - - def sendTK(self, TK): - TK = self.getBytes(TK, 16) - self.sendPacket(SET_TEMPORARY_KEY, TK) - logging.info("Sent TK to sniffer: " + str(TK)) - - def sendPrivateKey(self, pk): - pk = self.getBytes(pk, 32) - self.sendPacket(SET_PRIVATE_KEY, pk) - logging.info("Sent private key to sniffer: " + str(pk)) - - def sendLegacyLTK(self, ltk): - ltk = self.getBytes(ltk, 16) - self.sendPacket(SET_LEGACY_LONG_TERM_KEY, ltk) - logging.info("Sent Legacy LTK to sniffer: " + str(ltk)) - - def sendSCLTK(self, ltk): - ltk = self.getBytes(ltk, 16) - self.sendPacket(SET_SC_LONG_TERM_KEY, ltk) - logging.info("Sent SC LTK to sniffer: " + str(ltk)) - - def sendIRK(self, irk): - irk = self.getBytes(irk, 16) - self.sendPacket(SET_IDENTITY_RESOLVING_KEY, irk) - logging.info("Sent IRK to sniffer: " + str(irk)) - - def sendSwitchBaudRate(self, newBaudRate): - self.sendPacket(SWITCH_BAUD_RATE_REQ, toLittleEndian(newBaudRate, 4)) - - def switchBaudRate(self, newBaudRate): - self.uart.switchBaudRate(newBaudRate) - - def sendHopSequence(self, hopSequence): - for chan in hopSequence: - if chan not in VALID_ADV_CHANS: - raise Exceptions.InvalidAdvChannel( - "%s is not an adv channel" % str(chan) - ) - payload = [len(hopSequence)] + hopSequence + [37] * (3 - len(hopSequence)) - self.sendPacket(SET_ADV_CHANNEL_HOP_SEQ, payload) - self.notify("NEW_ADV_HOP_SEQ", {"hopSequence": hopSequence}) - - def sendVersionReq(self): - self.sendPacket(REQ_VERSION, []) - - def sendTimestampReq(self): - self.sendPacket(REQ_TIMESTAMP, []) - - def sendGoIdle(self): - self.sendPacket(GO_IDLE, []) - - -class Packet: - def __init__(self, packetList): - try: - if not packetList: - raise Exceptions.InvalidPacketException( - "packet list not valid: %s" % str(packetList) - ) - - self.protover = packetList[PROTOVER_POS] - - if self.protover > PROTOVER_V3: - logging.exception( - "Unsupported protocol version %s" % str(self.protover) - ) - raise RuntimeError( - "Unsupported protocol version %s" % str(self.protover) - ) - - self.packetCounter = parseLittleEndian( - packetList[PACKETCOUNTER_POS : PACKETCOUNTER_POS + 2] - ) - self.id = packetList[ID_POS] - - if int(self.protover) == PROTOVER_V1: - self.payloadLength = packetList[PAYLOAD_LEN_POS_V1] - else: - self.payloadLength = parseLittleEndian( - packetList[PAYLOAD_LEN_POS : PAYLOAD_LEN_POS + 2] - ) - - self.packetList = packetList - self.readPayload(packetList) - - except Exceptions.InvalidPacketException as e: - logging.error("Invalid packet: %s" % str(e)) - self.OK = False - self.valid = False - except Exception as e: - logging.exception("packet creation error %s" % str(e)) - logging.info("packetList: " + str(packetList)) - self.OK = False - self.valid = False - - def __repr__(self): - return "UART packet, type: " + str(self.id) + ", PC: " + str(self.packetCounter) - - def readPayload(self, packetList): - self.blePacket = None - self.OK = False - - if not self.validatePacketList(packetList): - raise Exceptions.InvalidPacketException( - "packet list not valid: %s" % str(packetList) - ) - else: - self.valid = True - - self.payload = packetList[PAYLOAD_POS : PAYLOAD_POS + self.payloadLength] - - if self.id == EVENT_PACKET_ADV_PDU or self.id == EVENT_PACKET_DATA_PDU: - try: - self.bleHeaderLength = packetList[BLE_HEADER_LEN_POS] - if self.bleHeaderLength == BLE_HEADER_LENGTH: - self.flags = packetList[FLAGS_POS] - self.readFlags() - self.channel = packetList[CHANNEL_POS] - self.rawRSSI = packetList[RSSI_POS] - self.RSSI = -self.rawRSSI - self.eventCounter = parseLittleEndian( - packetList[EVENTCOUNTER_POS : EVENTCOUNTER_POS + 2] - ) - - self.timestamp = parseLittleEndian( - packetList[TIMESTAMP_POS : TIMESTAMP_POS + 4] - ) - - # The hardware adds a padding byte which isn't sent on air. - # We remove it, and update the payload length in the packet list. - if self.phy == PHY_CODED: - self.packetList.pop(BLEPACKET_POS + 6 + 1) - else: - self.packetList.pop(BLEPACKET_POS + 6) - self.payloadLength -= 1 - if self.protover >= PROTOVER_V2: - # Write updated payload length back to the packet list. - payloadLength = toLittleEndian(self.payloadLength, 2) - packetList[PAYLOAD_LEN_POS] = payloadLength[0] - packetList[PAYLOAD_LEN_POS + 1] = payloadLength[1] - else: # PROTOVER_V1 - packetList[PAYLOAD_LEN_POS_V1] = self.payloadLength - else: - logging.info("Invalid BLE Header Length " + str(packetList)) - self.valid = False - - if self.OK: - try: - if self.protover >= PROTOVER_V3: - packet_type = ( - PACKET_TYPE_ADVERTISING - if self.id == EVENT_PACKET_ADV_PDU - else PACKET_TYPE_DATA - ) - else: - packet_type = ( - PACKET_TYPE_ADVERTISING - if packetList[BLEPACKET_POS : BLEPACKET_POS + 4] - == ADV_ACCESS_ADDRESS - else PACKET_TYPE_DATA - ) - - self.blePacket = BlePacket( - packet_type, packetList[BLEPACKET_POS:], self.phy - ) - except Exception as e: - logging.exception("blePacket error %s" % str(e)) - except Exception as e: - # malformed packet - logging.exception("packet error %s" % str(e)) - self.OK = False - elif self.id == PING_RESP: - if self.protover < PROTOVER_V3: - self.version = parseLittleEndian( - packetList[PAYLOAD_POS : PAYLOAD_POS + 2] - ) - elif self.id == RESP_VERSION: - self.version = "".join([chr(i) for i in packetList[PAYLOAD_POS:]]) - elif self.id == RESP_TIMESTAMP: - self.timestamp = parseLittleEndian( - packetList[PAYLOAD_POS : PAYLOAD_POS + 4] - ) - elif self.id == SWITCH_BAUD_RATE_RESP or self.id == SWITCH_BAUD_RATE_REQ: - self.baudRate = parseLittleEndian(packetList[PAYLOAD_POS : PAYLOAD_POS + 4]) - else: - logging.info("Unknown packet ID") - - def readFlags(self): - self.crcOK = not not (self.flags & 1) - self.direction = not not (self.flags & 2) - self.encrypted = not not (self.flags & 4) - self.micOK = not not (self.flags & 8) - self.phy = (self.flags >> 4) & 7 - self.OK = self.crcOK and (self.micOK or not self.encrypted) - - def getList(self): - return self.packetList - - def validatePacketList(self, packetList): - try: - if (self.payloadLength + HEADER_LENGTH) == len(packetList): - return True - else: - return False - except: - logging.exception("Invalid packet: %s" % str(packetList)) - return False - - -class BlePacket: - def __init__(self, type, packetList, phy): - self.type = type - - offset = 0 - offset = self.extractAccessAddress(packetList, offset) - offset = self.extractFormat(packetList, phy, offset) - - if self.type == PACKET_TYPE_ADVERTISING: - offset = self.extractAdvHeader(packetList, offset) - else: - offset = self.extractConnHeader(packetList, offset) - - offset = self.extractLength(packetList, offset) - self.payload = packetList[offset:] - - if self.type == PACKET_TYPE_ADVERTISING: - offset = self.extractAddresses(packetList, offset) - self.extractName(packetList, offset) - - def __repr__(self): - return "BLE packet, AAddr: " + str(self.accessAddress) - - def extractAccessAddress(self, packetList, offset): - self.accessAddress = packetList[offset : offset + 4] - return offset + 4 - - def extractFormat(self, packetList, phy, offset): - self.coded = phy == PHY_CODED - if self.coded: - self.codingIndicator = packetList[offset] & 3 - return offset + 1 - - return offset - - def extractAdvHeader(self, packetList, offset): - self.advType = packetList[offset] & 15 - self.txAddrType = (packetList[offset] >> 6) & 1 - if self.advType in [1, 3, 5]: - self.rxAddrType = (packetList[offset] << 7) & 1 - elif self.advType == 7: - flags = packetList[offset + 2] - if flags & 0x02: - self.rxAddrType = (packetList[offset] << 7) & 1 - return offset + 1 - - def extractConnHeader(self, packetList, offset): - self.llid = packetList[offset] & 3 - self.sn = (packetList[offset] >> 2) & 1 - self.nesn = (packetList[offset] >> 3) & 1 - self.md = (packetList[offset] >> 4) & 1 - return offset + 1 - - def extractAddresses(self, packetList, offset): - addr = None - scanAddr = None - - if self.advType in [0, 1, 2, 4, 6]: - addr = packetList[offset : offset + 6] - addr.reverse() - addr += [self.txAddrType] - offset += 6 - - if self.advType in [3, 5]: - scanAddr = packetList[offset : offset + 6] - scanAddr.reverse() - scanAddr += [self.txAddrType] - offset += 6 - addr = packetList[offset : offset + 6] - addr.reverse() - addr += [self.rxAddrType] - offset += 6 - - if self.advType == 1: - scanAddr = packetList[offset : offset + 6] - scanAddr.reverse() - scanAddr += [self.rxAddrType] - offset += 6 - - if self.advType == 7: - ext_header_len = packetList[offset] & 0x3F - offset += 1 - - ext_header_offset = offset - flags = packetList[offset] - ext_header_offset += 1 - - if flags & 0x01: - addr = packetList[ext_header_offset : ext_header_offset + 6] - addr.reverse() - addr += [self.txAddrType] - ext_header_offset += 6 - - if flags & 0x02: - scanAddr = packetList[ext_header_offset : ext_header_offset + 6] - scanAddr.reverse() - scanAddr += [self.rxAddrType] - ext_header_offset += 6 - - offset += ext_header_len - - self.advAddress = addr - self.scanAddress = scanAddr - return offset - - def extractName(self, packetList, offset): - name = "" - if self.advType in [0, 2, 4, 6, 7]: - i = offset - while i < len(packetList): - length = packetList[i] - if (i + length + 1) > len(packetList) or length == 0: - break - type = packetList[i + 1] - if type == 8 or type == 9: - nameList = packetList[i + 2 : i + length + 1] - name = "" - for j in nameList: - name += chr(j) - i += length + 1 - name = '"' + name + '"' - elif self.advType == 1: - name = "[ADV_DIRECT_IND]" - - self.name = name - - def extractLength(self, packetList, offset): - self.length = packetList[offset] - return offset + 1 - - -def parseLittleEndian(list): - total = 0 - for i in range(len(list)): - total += list[i] << (8 * i) - return total - - -def toLittleEndian(value, size): - list = [0] * size - for i in range(size): - list[i] = (value >> (i * 8)) % 256 - return list diff --git a/modules/wireshark/extcap/SnifferAPI/Pcap.py b/modules/wireshark/extcap/SnifferAPI/Pcap.py deleted file mode 100644 index 8b0445a..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Pcap.py +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -import struct - - -# See: -# - https://github.com/pcapng/pcapng -# - https://www.tcpdump.org/linktypes/LINKTYPE_NORDIC_BLE.html -PACKET_HEADER = struct.Struct("= PROTOVER_V3: - if self._last_time is None: - # Timestamp from Host - packet.time = time.time() - else: - # Timestamp using reference and packet timestamp diff - if packet.timestamp < self._last_timestamp: - time_diff = (1 << 32) - (self._last_timestamp - packet.timestamp) - else: - time_diff = packet.timestamp - self._last_timestamp - - packet.time = self._last_time + (time_diff / 1_000_000) - - self._last_time = packet.time - self._last_timestamp = packet.timestamp - else: - # Timestamp from Host - packet.time = time.time() - - self._appendPacket(packet) - - self.notify("NEW_BLE_PACKET", {"packet": packet}) - self._captureHandler.writePacket(packet) - - self._nProcessedPackets += 1 - if packet.OK: - try: - if packet.blePacket.type == PACKET_TYPE_ADVERTISING: - - if self.state == STATE_FOLLOWING and packet.blePacket.advType == 5: - self._connectionAccessAddress = packet.blePacket.accessAddress - - if self.state == STATE_FOLLOWING and packet.blePacket.advType == 4: - newDevice = Devices.Device( - address=packet.blePacket.advAddress, - name=packet.blePacket.name, - RSSI=packet.RSSI, - ) - self._devices.appendOrUpdate(newDevice) - - if self.state == STATE_SCANNING: - if ( - packet.blePacket.advType in [0, 1, 2, 4, 6, 7] - and packet.blePacket.advAddress != None - and packet.crcOK - and not packet.direction - ): - newDevice = Devices.Device( - address=packet.blePacket.advAddress, - name=packet.blePacket.name, - RSSI=packet.RSSI, - ) - self._devices.appendOrUpdate(newDevice) - - except Exception as e: - logging.exception("packet processing error %s" % str(e)) - self.notify("PACKET_PROCESSING_ERROR", {"errorString": str(e)}) - - def _continuouslyPipe(self): - while not self._exit: - try: - packet = self._packetReader.getPacket(timeout=12) - if packet == None or not packet.valid: - raise Exceptions.InvalidPacketException("") - except Exceptions.SnifferTimeout as e: - logging.info(str(e)) - packet = None - except (SerialException, ValueError): - logging.exception("UART read error") - logging.error("Lost contact with sniffer hardware.") - self._doExit() - except Exceptions.InvalidPacketException: - pass - else: - if ( - packet.id == EVENT_PACKET_DATA_PDU - or packet.id == EVENT_PACKET_ADV_PDU - ): - self._processBLEPacket(packet) - elif packet.id == EVENT_FOLLOW: - # This packet has no value for the user. - pass - elif packet.id == EVENT_CONNECT: - self._connectEventPacketCounterValue = packet.packetCounter - self._inConnection = True - # copy it because packets are eventually deleted - self._currentConnectRequest = copy.copy( - self._findPacketByPacketCounter( - self._connectEventPacketCounterValue - 1 - ) - ) - elif packet.id == EVENT_DISCONNECT: - if self._inConnection: - self._packetsInLastConnection = ( - packet.packetCounter - self._connectEventPacketCounterValue - ) - self._inConnection = False - elif packet.id == SWITCH_BAUD_RATE_RESP and self._switchingBaudRate: - self._switchingBaudRate = False - if packet.baudRate == self._proposedBaudRate: - self._packetReader.switchBaudRate(self._proposedBaudRate) - else: - self._switchBaudRate(packet.baudRate) - elif packet.id == PING_RESP: - if hasattr(packet, "version"): - versions = { - 1116: "3.1.0", - 1115: "3.0.0", - 1114: "2.0.0", - 1113: "2.0.0-beta-3", - 1112: "2.0.0-beta-1", - } - self._fwversion = versions.get( - packet.version, "SVN rev: %d" % packet.version - ) - logging.info("Firmware version %s" % self._fwversion) - elif packet.id == RESP_VERSION: - self._fwversion = packet.version - logging.info("Firmware version %s" % self._fwversion) - elif packet.id == RESP_TIMESTAMP: - # Use current time as timestamp reference - self._last_time = time.time() - self._last_timestamp = packet.timestamp - - lt = time.localtime(self._last_time) - usecs = int((self._last_time - int(self._last_time)) * 1_000_000) - logging.info( - f"Firmware timestamp {self._last_timestamp} reference: " - f'{time.strftime("%b %d %Y %X", lt)}.{usecs} {time.strftime("%Z", lt)}' - ) - else: - logging.info("Unknown packet ID") - - def _findPacketByPacketCounter(self, packetCounterValue): - with self._packetListLock: - for i in range(-1, -1 - len(self._packets), -1): - # iterate backwards through packets - if self._packets[i].packetCounter == packetCounterValue: - return self._packets[i] - return None - - def _startScanning(self, findScanRsp=False, findAux=False, scanCoded=False): - logging.info("starting scan") - - if self.state == STATE_FOLLOWING: - logging.info("Stopped sniffing device") - - self._setState(STATE_SCANNING) - self._packetReader.sendScan(findScanRsp, findAux, scanCoded) - self._packetReader.sendTK([0]) - - def _doExit(self): - self._exit = True - self.notify("APP_EXIT") - self._packetReader.doExit() - # Clear method references to avoid uncollectable cyclic references - self.clearCallbacks() - self._devices.clearCallbacks() - - def _startFollowing( - self, - device, - followOnlyAdvertisements=False, - followOnlyLegacy=False, - followCoded=False, - ): - self._devices.setFollowed(device) - logging.info( - "Sniffing device " - + str(self._devices.index(device)) - + ' - "' - + device.name - + '"' - ) - self._packetReader.sendFollow( - device.address, followOnlyAdvertisements, followOnlyLegacy, followCoded - ) - self._setState(STATE_FOLLOWING) - - def _clearDevices(self): - self._devices.clear() - - def _appendPacket(self, packet): - with self._packetListLock: - if len(self._packets) > 100000: - self._packets = self._packets[20000:] - self._packets.append(packet) - - def _getPackets(self, number=-1): - with self._packetListLock: - returnList = self._packets[0:number] - self._packets = self._packets[number:] - return returnList - - def _clearPackets(self): - with self._packetListLock: - del self._packets[:] diff --git a/modules/wireshark/extcap/SnifferAPI/Types.py b/modules/wireshark/extcap/SnifferAPI/Types.py deleted file mode 100644 index eac7609..0000000 --- a/modules/wireshark/extcap/SnifferAPI/Types.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -SLIP_START = 0xAB -SLIP_END = 0xBC -SLIP_ESC = 0xCD -SLIP_ESC_START = SLIP_START + 1 -SLIP_ESC_END = SLIP_END + 1 -SLIP_ESC_ESC = SLIP_ESC + 1 - -PROTOVER_V3 = 3 -PROTOVER_V2 = 2 -PROTOVER_V1 = 1 - -# UART protocol packet codes (see sniffer_uart_protocol.pdf) -REQ_FOLLOW = 0x00 -EVENT_FOLLOW = 0x01 -EVENT_PACKET_ADV_PDU = 0x02 -EVENT_CONNECT = 0x05 -EVENT_PACKET_DATA_PDU = 0x06 -REQ_SCAN_CONT = 0x07 -EVENT_DISCONNECT = 0x09 -SET_TEMPORARY_KEY = 0x0C -PING_REQ = 0x0D -PING_RESP = 0x0E -SWITCH_BAUD_RATE_REQ = 0x13 -SWITCH_BAUD_RATE_RESP = 0x14 -SET_ADV_CHANNEL_HOP_SEQ = 0x17 -SET_PRIVATE_KEY = 0x18 -SET_LEGACY_LONG_TERM_KEY = 0x19 -SET_SC_LONG_TERM_KEY = 0x1A -REQ_VERSION = 0x1B -RESP_VERSION = 0x1C -REQ_TIMESTAMP = 0x1D -RESP_TIMESTAMP = 0x1E -SET_IDENTITY_RESOLVING_KEY = 0x1F -GO_IDLE = 0xFE - -PACKET_TYPE_UNKNOWN = 0x00 -PACKET_TYPE_ADVERTISING = 0x01 -PACKET_TYPE_DATA = 0x02 - -ADV_TYPE_ADV_IND = 0x0 -ADV_TYPE_ADV_DIRECT_IND = 0x1 -ADV_TYPE_ADV_NONCONN_IND = 0x2 -ADV_TYPE_ADV_SCAN_IND = 0x6 -ADV_TYPE_SCAN_REQ = 0x3 -ADV_TYPE_SCAN_RSP = 0x4 -ADV_TYPE_CONNECT_REQ = 0x5 -ADV_TYPE_ADV_EXT_IND = 0x7 - -PHY_1M = 0 -PHY_2M = 1 -PHY_CODED = 2 - -PHY_CODED_CI_S8 = 0 -PHY_CODED_CI_S2 = 1 diff --git a/modules/wireshark/extcap/SnifferAPI/UART.py b/modules/wireshark/extcap/SnifferAPI/UART.py deleted file mode 100644 index ecd16d2..0000000 --- a/modules/wireshark/extcap/SnifferAPI/UART.py +++ /dev/null @@ -1,238 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import collections -import logging -import serial -from threading import Thread, Event - -import serial.tools.list_ports as list_ports - -from . import Exceptions -from . import Packet -from . import Filelock - -import os - -if os.name == "posix": - import termios - -SNIFFER_OLD_DEFAULT_BAUDRATE = 460800 -# Baudrates that should be tried (add more if required) -SNIFFER_BAUDRATES = [1000000, 460800] - - -def find_sniffer(write_data=False): - open_ports = list_ports.comports() - - sniffers = [] - for port in [x.device for x in open_ports]: - for rate in SNIFFER_BAUDRATES: - reader = None - l_errors = [ - serial.SerialException, - ValueError, - Exceptions.LockedException, - OSError, - ] - if os.name == "posix": - l_errors.append(termios.error) - try: - reader = Packet.PacketReader(portnum=port, baudrate=rate) - try: - if write_data: - reader.sendPingReq() - _ = reader.decodeFromSLIP(0.1, complete_timeout=0.1) - else: - _ = reader.decodeFromSLIP(0.3, complete_timeout=0.3) - - # FIXME: Should add the baud rate here, but that will be a breaking change - sniffers.append(port) - break - except (Exceptions.SnifferTimeout, Exceptions.UARTPacketError): - pass - except tuple(l_errors): - continue - finally: - if reader is not None: - reader.doExit() - return sniffers - - -def find_sniffer_baudrates(port, write_data=False): - for rate in SNIFFER_BAUDRATES: - reader = None - try: - reader = Packet.PacketReader(portnum=port, baudrate=rate) - try: - if write_data: - reader.sendPingReq() - _ = reader.decodeFromSLIP(0.1, complete_timeout=0.1) - else: - _ = reader.decodeFromSLIP(0.3, complete_timeout=0.3) - - # TODO: possibly include additional rates based on protocol version - return {"default": rate, "other": []} - except (Exceptions.SnifferTimeout, Exceptions.UARTPacketError): - pass - finally: - if reader is not None: - reader.doExit() - return None - - -class Uart: - def __init__(self, portnum=None, baudrate=None): - self.ser = None - try: - if baudrate is not None and baudrate not in SNIFFER_BAUDRATES: - raise Exception("Invalid baudrate: " + str(baudrate)) - - logging.info("Opening serial port {}".format(portnum)) - - self.portnum = portnum - if self.portnum: - Filelock.lock(portnum) - - self.ser = serial.Serial( - port=portnum, baudrate=9600, rtscts=True, exclusive=True - ) - self.ser.baudrate = baudrate - - except Exception: - if self.ser: - self.ser.close() - self.ser = None - raise - - self.read_queue = collections.deque() - self.read_queue_has_data = Event() - - self.worker_thread = Thread(target=self._read_worker) - self.reading = True - self.worker_thread.setDaemon(True) - self.worker_thread.start() - - def _read_worker(self): - self.ser.reset_input_buffer() - while self.reading: - try: - # Read any data available, or wait for at least one byte - data_read = self.ser.read(self.ser.in_waiting or 1) - # logging.info('type: {}'.format(data_read.__class__)) - self._read_queue_extend(data_read) - except serial.SerialException as e: - logging.info("Unable to read UART: %s" % e) - self.reading = False - return - - def close(self): - if self.ser: - logging.info("closing UART") - self.reading = False - # Wake any threads waiting on the queue - self.read_queue_has_data.set() - if hasattr(self.ser, "cancel_read"): - self.ser.cancel_read() - self.worker_thread.join() - self.ser.close() - else: - self.ser.close() - self.worker_thread.join() - self.ser = None - - if self.portnum: - Filelock.unlock(self.portnum) - - def __del__(self): - self.close() - - def switchBaudRate(self, newBaudRate): - self.ser.baudrate = newBaudRate - - def readByte(self, timeout=None): - r = self._read_queue_get(timeout) - return r - - def writeList(self, array): - try: - self.ser.write(array) - except serial.SerialTimeoutException: - logging.info("Got write timeout, ignoring error") - - except serial.SerialException as e: - self.ser.close() - raise e - - def _read_queue_extend(self, data): - if len(data) > 0: - self.read_queue.extend(data) - self.read_queue_has_data.set() - - def _read_queue_get(self, timeout=None): - data = None - if self.read_queue_has_data.wait(timeout): - self.read_queue_has_data.clear() - try: - data = self.read_queue.popleft() - except IndexError: - # This will happen when the class is destroyed - return None - if len(self.read_queue) > 0: - self.read_queue_has_data.set() - return data - - -def list_serial_ports(): - # Scan for available ports. - return list_ports.comports() - - -if __name__ == "__main__": - import time - - t_start = time.time() - s = find_sniffer() - tn = time.time() - print(s) - print("find_sniffer took %f seconds" % (tn - t_start)) - for p in s: - t = time.time() - print(find_sniffer_baudrates(p)) - tn = time.time() - print("find_sniffer_baudrate took %f seconds" % (tn - t)) - tn = time.time() - print("total runtime %f" % (tn - t_start)) diff --git a/modules/wireshark/extcap/SnifferAPI/__init__.py b/modules/wireshark/extcap/SnifferAPI/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/modules/wireshark/extcap/SnifferAPI/version.py b/modules/wireshark/extcap/SnifferAPI/version.py deleted file mode 100644 index 5b94c32..0000000 --- a/modules/wireshark/extcap/SnifferAPI/version.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -VERSION_STRING = "4.1.1" diff --git a/modules/wireshark/extcap/nrf_sniffer_ble.py b/modules/wireshark/extcap/nrf_sniffer_ble.py deleted file mode 100644 index 1aa1380..0000000 --- a/modules/wireshark/extcap/nrf_sniffer_ble.py +++ /dev/null @@ -1,991 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) Nordic Semiconductor ASA -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without modification, -# are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form, except as embedded into a Nordic -# Semiconductor ASA integrated circuit in a product or a software update for -# such product, must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other -# materials provided with the distribution. -# -# 3. Neither the name of Nordic Semiconductor ASA nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# 4. This software, with or without modification, must only be used with a -# Nordic Semiconductor ASA integrated circuit. -# -# 5. Any software provided in binary form under this license must not be reverse -# engineered, decompiled, modified and/or disassembled. -# -# THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -# OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT -# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -""" -Wireshark extcap wrapper for the nRF Sniffer for Bluetooth LE by Nordic Semiconductor. -""" - -import os -import sys -import argparse -import re -import time -import struct -import logging - -from SnifferAPI import Logger - -try: - import serial -except ImportError: - Logger.initLogger() - logging.error( - f'pyserial not found, please run: "{sys.executable} -m pip install -r requirements.txt" and retry' - ) - sys.exit( - f'pyserial not found, please run: "{sys.executable} -m pip install -r requirements.txt" and retry' - ) - -from SnifferAPI import Sniffer, UART, Devices, Pcap, Exceptions - -ERROR_USAGE = 0 -ERROR_ARG = 1 -ERROR_INTERFACE = 2 -ERROR_FIFO = 3 -ERROR_INTERNAL = 4 - -CTRL_CMD_INIT = 0 -CTRL_CMD_SET = 1 -CTRL_CMD_ADD = 2 -CTRL_CMD_REMOVE = 3 -CTRL_CMD_ENABLE = 4 -CTRL_CMD_DISABLE = 5 -CTRL_CMD_STATUSBAR = 6 -CTRL_CMD_INFO_MSG = 7 -CTRL_CMD_WARN_MSG = 8 -CTRL_CMD_ERROR_MSG = 9 - -CTRL_ARG_DEVICE = 0 -CTRL_ARG_KEY_TYPE = 1 -CTRL_ARG_KEY_VAL = 2 -CTRL_ARG_ADVHOP = 3 -CTRL_ARG_HELP = 4 -CTRL_ARG_RESTORE = 5 -CTRL_ARG_LOG = 6 -CTRL_ARG_DEVICE_CLEAR = 7 -CTRL_ARG_NONE = 255 - -CTRL_KEY_TYPE_PASSKEY = 0 -CTRL_KEY_TYPE_OOB = 1 -CTRL_KEY_TYPE_LEGACY_LTK = 2 -CTRL_KEY_TYPE_SC_LTK = 3 -CTRL_KEY_TYPE_DH_PRIVATE_KEY = 4 -CTRL_KEY_TYPE_IRK = 5 -CTRL_KEY_TYPE_ADD_ADDR = 6 -CTRL_KEY_TYPE_FOLLOW_ADDR = 7 - -fn_capture = None -fn_ctrl_in = None -fn_ctrl_out = None - -extcap_log_handler = None -extcap_version = None - -# Wireshark nRF Sniffer for Bluetooth LE Toolbar will always cache the last used key and adv hop and send -# this when starting a capture. To ensure that the key and adv hop is always shown correctly -# in the Toolbar, even if the user has changed it but not applied it, we send the last used -# key and adv hop back as a default value. -last_used_key_type = CTRL_KEY_TYPE_PASSKEY -last_used_key_val = "" -last_used_advhop = "37,38,39" - -zero_addr = "[00,00,00,00,00,00,0]" - -# While searching for a selected Device we must not write packets to the pipe until -# the device is found to avoid getting advertising packets from other devices. -write_new_packets = False - -# The RSSI capture filter value given from Wireshark. -rssi_filter = 0 - -# The RSSI filtering is not on when in follow mode. -in_follow_mode = False - -# nRF Sniffer for Bluetooth LE interface option to only capture advertising packets -capture_only_advertising = False -capture_only_legacy_advertising = False -capture_scan_response = True -capture_scan_aux_pointer = True -capture_coded = False - - -def extcap_config(interface): - """List configuration for the given interface""" - print( - "arg {number=0}{call=--only-advertising}{display=Only advertising packets}" - "{tooltip=The sniffer will only capture advertising packets from the selected device}{type=boolflag}{save=true}" - ) - print( - "arg {number=1}{call=--only-legacy-advertising}{display=Only legacy advertising packets}" - "{tooltip=The sniffer will only capture legacy advertising packets from the selected device}{type=boolflag}{save=true}" - ) - print( - "arg {number=2}{call=--scan-follow-rsp}{display=Find scan response data}" - "{tooltip=The sniffer will follow scan requests and scan responses in scan mode}{type=boolflag}{default=true}{save=true}" - ) - print( - "arg {number=3}{call=--scan-follow-aux}{display=Find auxiliary pointer data}" - "{tooltip=The sniffer will follow aux pointers in scan mode}{type=boolflag}{default=true}{save=true}" - ) - print( - "arg {number=3}{call=--coded}{display=Scan and follow devices on LE Coded PHY}" - "{tooltip=Scan for devices and follow advertiser on LE Coded PHY}{type=boolflag}{default=false}{save=true}" - ) - - -def extcap_dlts(interface): - """List DLTs for the given interface""" - print("dlt {number=272}{name=NORDIC_BLE}{display=nRF Sniffer for Bluetooth LE}") - - -def get_baud_rates(interface): - if not hasattr(serial, "__version__") or not serial.__version__.startswith("3."): - raise RuntimeError( - "Too old version of python 'serial' Library. Version 3 required." - ) - return UART.find_sniffer_baudrates(interface) - - -def get_interfaces(): - if not hasattr(serial, "__version__") or not serial.__version__.startswith("3."): - raise RuntimeError( - "Too old version of python 'serial' Library. Version 3 required." - ) - - devices = UART.find_sniffer() - return devices - - -def extcap_interfaces(): - """List available interfaces to capture from""" - print( - "extcap {version=%s}{display=nRF Sniffer for Bluetooth LE}" - "{help=https://www.nordicsemi.com/Software-and-Tools/Development-Tools/nRF-Sniffer-for-Bluetooth-LE}" - % Sniffer.VERSION_STRING - ) - - for interface_port in get_interfaces(): - if sys.platform == "win32": - print( - "interface {value=%s-%s}{display=nRF Sniffer for Bluetooth LE %s}" - % (interface_port, extcap_version, interface_port) - ) - else: - print( - "interface {value=%s-%s}{display=nRF Sniffer for Bluetooth LE}" - % (interface_port, extcap_version) - ) - - print( - "control {number=%d}{type=selector}{display=Device}{tooltip=Device list}" - % CTRL_ARG_DEVICE - ) - print( - "control {number=%d}{type=selector}{display=Key}{tooltip=}" % CTRL_ARG_KEY_TYPE - ) - print( - "control {number=%d}{type=string}{display=Value}" - "{tooltip=6 digit passkey or 16 or 32 bytes encryption key in hexadecimal starting with '0x', big endian format." - "If the entered key is shorter than 16 or 32 bytes, it will be zero-padded in front'}" - "{validation=\\b^(([0-9]{6})|(0x[0-9a-fA-F]{1,64})|([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random))$\\b}" - % CTRL_ARG_KEY_VAL - ) - print( - "control {number=%d}{type=string}{display=Adv Hop}" - "{default=37,38,39}" - "{tooltip=Advertising channel hop sequence. " - "Change the order in which the sniffer switches advertising channels. " - "Valid channels are 37, 38 and 39 separated by comma.}" - r"{validation=^\s*((37|38|39)\s*,\s*){0,2}(37|38|39){1}\s*$}{required=true}" - % CTRL_ARG_ADVHOP - ) - print( - "control {number=%d}{type=button}{display=Clear}{tooltop=Clear or remove device from Device list}" - % CTRL_ARG_DEVICE_CLEAR - ) - print( - "control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Access user guide (launches browser)}" - % CTRL_ARG_HELP - ) - print( - "control {number=%d}{type=button}{role=restore}{display=Defaults}{tooltip=Resets the user interface and clears the log file}" - % CTRL_ARG_RESTORE - ) - print( - "control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Log per interface}" - % CTRL_ARG_LOG - ) - - print( - "value {control=%d}{value= }{display=All advertising devices}{default=true}" - % CTRL_ARG_DEVICE - ) - print( - "value {control=%d}{value=%s}{display=Follow IRK}" - % (CTRL_ARG_DEVICE, zero_addr) - ) - - print( - "value {control=%d}{value=%d}{display=Legacy Passkey}{default=true}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_PASSKEY) - ) - print( - "value {control=%d}{value=%d}{display=Legacy OOB data}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_OOB) - ) - print( - "value {control=%d}{value=%d}{display=Legacy LTK}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_LEGACY_LTK) - ) - print( - "value {control=%d}{value=%d}{display=SC LTK}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_SC_LTK) - ) - print( - "value {control=%d}{value=%d}{display=SC Private Key}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_DH_PRIVATE_KEY) - ) - print( - "value {control=%d}{value=%d}{display=IRK}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_IRK) - ) - print( - "value {control=%d}{value=%d}{display=Add LE address}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_ADD_ADDR) - ) - print( - "value {control=%d}{value=%d}{display=Follow LE address}" - % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_FOLLOW_ADDR) - ) - - -def string_address(address): - """Make a string representation of the address""" - if len(address) < 7: - return None - - addr_string = "" - - for i in range(5): - addr_string += format(address[i], "02x") + ":" - addr_string += format(address[5], "02x") + " " - - if address[6]: - addr_string += " random " - else: - addr_string += " public " - - return addr_string - - -def control_read(): - """Read a message from the control channel""" - header = fn_ctrl_in.read(6) - if not header: - # Ref. https://docs.python.org/3/tutorial/inputoutput.html#methods-of-file-objects: - # > If the end of the file has been reached, f.read() will return an - # > empty string ('') - return None, None, None - - _, _, length, arg, typ = struct.unpack(">sBHBB", header) - - payload = bytearray() - if length > 2: - payload = fn_ctrl_in.read(length - 2) - - return arg, typ, payload - - -def control_write(arg, typ, message): - """Write the message to the control channel""" - - if not fn_ctrl_out: - # No control out has been opened - return - - packet = bytearray() - packet += struct.pack(">BBHBB", ord("T"), 0, len(message) + 2, arg, typ) - packet += message.encode("utf-8") - - fn_ctrl_out.write(packet) - - -def capture_write(message): - """Write the message to the capture pipe""" - fn_capture.write(message) - fn_capture.flush() - - -def new_packet(notification): - """A new Bluetooth LE packet has arrived""" - if write_new_packets == True: - packet = notification.msg["packet"] - - if rssi_filter == 0 or in_follow_mode == True or packet.RSSI > rssi_filter: - p = bytes([packet.boardId] + packet.getList()) - capture_write(Pcap.create_packet(p, packet.time)) - - -def device_added(notification): - """A device is added or updated""" - device = notification.msg - - # Only add devices matching RSSI filter - if rssi_filter == 0 or device.RSSI > rssi_filter: - # Extcap selector uses \0 character to separate value and display value, - # therefore the display value cannot contain the \0 character as this - # would lead to truncation of the display value. - display = ( - device.name.replace("\0", "\\0") - + (" " + str(device.RSSI) + " dBm " if device.RSSI != 0 else " ") - + string_address(device.address) - ) - - message = str(device.address) + "\0" + display - - control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, message) - - -def device_removed(notification): - """A device is removed""" - device = notification.msg - display = device.name + " " + string_address(device.address) - - message = "" - message += str(device.address) - - control_write(CTRL_ARG_DEVICE, CTRL_CMD_REMOVE, message) - logging.info("Removed: " + display) - - -def devices_cleared(notification): - """Devices have been cleared""" - message = "" - control_write(CTRL_ARG_DEVICE, CTRL_CMD_REMOVE, message) - - control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, " " + "\0" + "All advertising devices") - control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, zero_addr + "\0" + "Follow IRK") - control_write(CTRL_ARG_DEVICE, CTRL_CMD_SET, " ") - - -def handle_control_command(sniffer, arg, typ, payload): - """Handle command from control channel""" - global last_used_key_type - - if arg == CTRL_ARG_DEVICE: - if payload == b" ": - scan_for_devices(sniffer) - else: - values = payload - values = values.replace(b"[", b"") - values = values.replace(b"]", b"") - device_address = values.split(b",") - - logging.info("follow_device: {}".format(device_address)) - for i in range(6): - device_address[i] = int(device_address[i]) - - device_address[6] = 1 if device_address[6] == b" 1" else 0 - - device = Devices.Device(address=device_address, name='""', RSSI=0) - - follow_device(sniffer, device) - - elif arg == CTRL_ARG_DEVICE_CLEAR: - clear_devices(sniffer) - elif arg == CTRL_ARG_KEY_TYPE: - last_used_key_type = int(payload.decode("utf-8")) - elif arg == CTRL_ARG_KEY_VAL: - set_key_value(sniffer, payload) - elif arg == CTRL_ARG_ADVHOP: - set_advhop(sniffer, payload) - - -def control_read_initial_values(sniffer): - """Read initial control values""" - initialized = False - - while not initialized: - arg, typ, payload = control_read() - if typ == CTRL_CMD_INIT: - initialized = True - else: - handle_control_command(sniffer, arg, typ, payload) - - -def control_write_defaults(): - """Write default control values""" - control_write(CTRL_ARG_KEY_TYPE, CTRL_CMD_SET, str(last_used_key_type)) - control_write(CTRL_ARG_KEY_VAL, CTRL_CMD_SET, last_used_key_val) - control_write(CTRL_ARG_ADVHOP, CTRL_CMD_SET, last_used_advhop) - - -def scan_for_devices(sniffer): - """Start scanning for advertising devices""" - global in_follow_mode - if sniffer.state == 2: - log = "Scanning all advertising devices" - logging.info(log) - sniffer.scan(capture_scan_response, capture_scan_aux_pointer, capture_coded) - - in_follow_mode = False - - -def clear_devices(sniffer): - """Clear the advertising devices list""" - global in_follow_mode - - sniffer.clearDevices() - scan_for_devices(sniffer) - - in_follow_mode = False - - -def follow_device(sniffer, device): - """Follow the selected device""" - global write_new_packets, in_follow_mode - - sniffer.follow( - device, capture_only_advertising, capture_only_legacy_advertising, capture_coded - ) - time.sleep(0.1) - - in_follow_mode = True - logging.info("Following " + string_address(device.address)) - - -def set_key_value(sniffer, payload): - """Send key value to device""" - global last_used_key_val - - payload = payload.decode("utf-8") - last_used_key_val = payload - - if last_used_key_type == CTRL_KEY_TYPE_PASSKEY: - if re.match("^[0-9]{6}$", payload): - set_passkey(sniffer, payload) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_OOB: - if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): - set_OOB(sniffer, payload[2:]) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_DH_PRIVATE_KEY: - if re.match("^0[xX][0-9A-Za-z]{1,64}$", payload): - set_dh_private_key(sniffer, payload[2:]) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_LEGACY_LTK: - if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): - set_legacy_ltk(sniffer, payload[2:]) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_SC_LTK: - if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): - set_sc_ltk(sniffer, payload[2:]) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_IRK: - if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): - set_irk(sniffer, payload[2:]) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_ADD_ADDR: - if re.match( - "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload - ): - add_address(sniffer, payload) - else: - logging.info("Invalid key value: " + str(payload)) - elif last_used_key_type == CTRL_KEY_TYPE_FOLLOW_ADDR: - if re.match( - "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload - ): - follow_address(sniffer, payload) - else: - logging.info("Invalid key value: " + str(payload)) - else: - logging.info("Invalid key type: " + str(last_used_key_type)) - - -def parse_hex(value): - if len(value) % 2 != 0: - value = "0" + value - - a = list(value) - return [int(x + y, 16) for x, y in zip(a[::2], a[1::2])] - - -def set_passkey(sniffer, payload): - """Send passkey to device""" - passkey = [] - logging.info("Setting Passkey: " + payload) - init_payload = int(payload, 10) - if len(payload) >= 6: - passkey = [] - passkey += [(init_payload >> 16) & 0xFF] - passkey += [(init_payload >> 8) & 0xFF] - passkey += [(init_payload >> 0) & 0xFF] - - sniffer.sendTK(passkey) - - -def set_OOB(sniffer, payload): - """Send OOB to device""" - logging.info("Setting OOB data: " + payload) - sniffer.sendTK(parse_hex(payload)) - - -def set_dh_private_key(sniffer, payload): - """Send Diffie-Hellman private key to device""" - logging.info("Setting DH private key: " + payload) - sniffer.sendPrivateKey(parse_hex(payload)) - - -def set_legacy_ltk(sniffer, payload): - """Send Legacy Long Term Key (LTK) to device""" - logging.info("Setting Legacy LTK: " + payload) - sniffer.sendLegacyLTK(parse_hex(payload)) - - -def set_sc_ltk(sniffer, payload): - """Send LE secure connections Long Term Key (LTK) to device""" - logging.info("Setting SC LTK: " + payload) - sniffer.sendSCLTK(parse_hex(payload)) - - -def set_irk(sniffer, payload): - """Send Identity Resolving Key (IRK) to device""" - logging.info("Setting IRK: " + payload) - sniffer.sendIRK(parse_hex(payload)) - - -def add_address(sniffer, payload): - """Add LE address to device list""" - logging.info("Adding LE address: " + payload) - - (addr, addr_type) = payload.split(" ") - device = [int(a, 16) for a in addr.split(":")] - - device.append(1 if addr_type == "random" else 0) - - new_device = Devices.Device(address=device, name='""', RSSI=0) - sniffer.addDevice(new_device) - - -def follow_address(sniffer, payload): - """Add LE address to device list""" - logging.info("Adding LE address: " + payload) - - (addr, addr_type) = payload.split(" ") - device = [int(a, 16) for a in addr.split(":")] - - device.append(1 if addr_type == "random" else 0) - - new_device = Devices.Device(address=device, name='""', RSSI=0) - sniffer.addDevice(new_device) - - control_write(CTRL_ARG_DEVICE, CTRL_CMD_SET, f"{new_device.address}") - follow_device(sniffer, new_device) - - -def set_advhop(sniffer, payload): - """Set advertising channel hop sequence""" - global last_used_advhop - - payload = payload.decode("utf-8") - - last_used_advhop = payload - - hops = [int(channel) for channel in payload.split(",")] - - sniffer.setAdvHopSequence(hops) - - log = "AdvHopSequence: " + str(hops) - logging.info(log) - - -def control_loop(sniffer): - """Main loop reading control messages""" - arg_read = CTRL_ARG_NONE - while arg_read is not None: - arg_read, typ, payload = control_read() - handle_control_command(sniffer, arg_read, typ, payload) - - -def error_interface_not_found(interface, fifo): - log = "nRF Sniffer for Bluetooth LE could not find interface: " + interface - control_write(CTRL_ARG_NONE, CTRL_CMD_ERROR_MSG, log) - extcap_close_fifo(fifo) - sys.exit(ERROR_INTERFACE) - - -def validate_interface(interface, fifo): - """Check if interface exists""" - if sys.platform != "win32" and not os.path.exists(interface): - error_interface_not_found(interface, fifo) - - -def get_default_baudrate(interface, fifo): - """Return the baud rate that interface is running at, or exit if the board is not found""" - rates = get_baud_rates(interface) - if rates is None: - error_interface_not_found(interface, fifo) - return rates["default"] - - -def get_supported_protocol_version(extcap_version): - """Return the maximum supported Packet Protocol Version""" - if extcap_version == "None": - return 2 - - (major, minor) = extcap_version.split(".") - - major = int(major) - minor = int(minor) - - if major > 3 or (major == 3 and minor >= 4): - return 3 - else: - return 2 - - -def setup_extcap_log_handler(): - """Add the a handler that emits log messages through the extcap control out channel""" - global extcap_log_handler - extcap_log_handler = ExtcapLoggerHandler() - Logger.addLogHandler(extcap_log_handler) - control_write(CTRL_ARG_LOG, CTRL_CMD_SET, "") - - -def teardown_extcap_log_handler(): - """Remove and reset the extcap log handler""" - global extcap_log_handler - if extcap_log_handler: - Logger.removeLogHandler(extcap_log_handler) - extcap_log_handler = None - - -def sniffer_capture(interface, baudrate, fifo, control_in, control_out): - """Start the sniffer to capture packets""" - global fn_capture, fn_ctrl_in, fn_ctrl_out, write_new_packets, extcap_log_handler - - try: - fn_capture = open(fifo, "wb", 0) - - if control_out is not None: - fn_ctrl_out = open(control_out, "wb", 0) - setup_extcap_log_handler() - - if control_in is not None: - fn_ctrl_in = open(control_in, "rb", 0) - - logging.info("Log started at %s", time.strftime("%c")) - - interface, extcap_version = interface.split("-") - logging.info("Extcap version %s", str(extcap_version)) - - capture_write(Pcap.get_global_header()) - validate_interface(interface, fifo) - if baudrate is None: - baudrate = get_default_baudrate(interface, fifo) - - sniffer = Sniffer.Sniffer(interface, baudrate) - sniffer.subscribe("NEW_BLE_PACKET", new_packet) - sniffer.subscribe("DEVICE_ADDED", device_added) - sniffer.subscribe("DEVICE_UPDATED", device_added) - sniffer.subscribe("DEVICE_REMOVED", device_removed) - sniffer.subscribe("DEVICES_CLEARED", devices_cleared) - sniffer.setAdvHopSequence([37, 38, 39]) - sniffer.setSupportedProtocolVersion( - get_supported_protocol_version(extcap_version) - ) - logging.info("Sniffer created") - - logging.info("Software version: %s" % sniffer.swversion) - sniffer.getFirmwareVersion() - sniffer.getTimestamp() - sniffer.start() - logging.info("sniffer started") - sniffer.scan(capture_scan_response, capture_scan_aux_pointer, capture_coded) - logging.info("scanning started") - - if fn_ctrl_in is not None and fn_ctrl_out is not None: - # First read initial control values - control_read_initial_values(sniffer) - - # Then write default values - control_write_defaults() - logging.info("defaults written") - - # Start receiving packets - write_new_packets = True - - # Start the control loop - logging.info("control loop") - control_loop(sniffer) - logging.info("exiting control loop") - - else: - logging.info("") - # Start receiving packets - write_new_packets = True - while True: - # Wait for keyboardinterrupt - pass - - except Exceptions.LockedException as e: - logging.info("{}".format(e.message)) - - except OSError: - # We'll get OSError=22 when/if wireshark kills the pipe(s) on capture - # stop. - pass - - finally: - # The first thing we should do is to tear down the extcap log handler. - # This might already have triggered an OSError, or we will trigger one - # by attempting to log at this point. - teardown_extcap_log_handler() - - # Safe to use logging again. - logging.info("Tearing down") - - sniffer.doExit() - if fn_capture is not None and not fn_capture.closed: - fn_capture.close() - - if fn_ctrl_in is not None and not fn_ctrl_in.closed: - fn_ctrl_in.close() - - if fn_ctrl_out is not None and not fn_ctrl_out.closed: - fn_ctrl_out.close() - - fn_capture = None - fn_ctrl_out = None - fn_ctrl_in = None - - logging.info("Exiting") - - -def extcap_close_fifo(fifo): - """ "Close extcap fifo""" - if not os.path.exists(fifo): - print("FIFO does not exist!", file=sys.stderr) - return - - # This is apparently needed to workaround an issue on Windows/macOS - # where the message cannot be read. (really?) - fh = open(fifo, "wb", 0) - fh.close() - - -class ExtcapLoggerHandler(logging.Handler): - """Handler used to display all logging messages in extcap""" - - def emit(self, record): - """Send log message to extcap""" - message = record.message.replace("\0", "\\0") - log_message = f"{record.levelname}: {message}\n" - control_write(CTRL_ARG_LOG, CTRL_CMD_ADD, log_message) - - -def parse_capture_filter(capture_filter): - """ "Parse given capture filter""" - global rssi_filter - m = re.search(r"^\s*rssi\s*(>=?)\s*(-?[0-9]+)\s*$", capture_filter, re.IGNORECASE) - if m: - rssi_filter = int(m.group(2)) - if rssi_filter > -10 or rssi_filter < -256: - print("Illegal RSSI value, must be between -10 and -256") - # Handle >= by modifying the threshold, since comparisons are always done with - # the > operator - if m.group(1) == ">=": - rssi_filter = rssi_filter - 1 - else: - print('Filter syntax: "RSSI >= -value"') - - -import atexit - - -@atexit.register -def goodbye(): - logging.info("Exiting PID {}".format(os.getpid())) - - -if __name__ == "__main__": - - # Capture options - parser = argparse.ArgumentParser( - description="Nordic Semiconductor nRF Sniffer for Bluetooth LE extcap plugin" - ) - - # Extcap Arguments - parser.add_argument("--capture", help="Start the capture", action="store_true") - - parser.add_argument( - "--extcap-interfaces", - help="List available interfaces to capture from", - action="store_true", - ) - - parser.add_argument("--extcap-interface", help="The interface to capture from") - - parser.add_argument( - "--extcap-dlts", help="List DLTs for the given interface", action="store_true" - ) - - parser.add_argument( - "--extcap-config", - help="List configurations for the given interface", - action="store_true", - ) - - parser.add_argument( - "--extcap-capture-filter", - help="Used together with capture to provide a capture filter", - ) - - parser.add_argument( - "--fifo", help="Use together with capture to provide the fifo to dump data to" - ) - - parser.add_argument( - "--extcap-control-in", - help="Used together with capture to get control messages from toolbar", - ) - - parser.add_argument( - "--extcap-control-out", - help="Used together with capture to send control messages to toolbar", - ) - - parser.add_argument("--extcap-version", help="Set extcap supported version") - - # Interface Arguments - parser.add_argument("--device", help="Device", default="") - parser.add_argument("--baudrate", type=int, help="The sniffer baud rate") - parser.add_argument( - "--only-advertising", help="Only advertising packets", action="store_true" - ) - parser.add_argument( - "--only-legacy-advertising", - help="Only legacy advertising packets", - action="store_true", - ) - parser.add_argument( - "--scan-follow-rsp", help="Find scan response data ", action="store_true" - ) - parser.add_argument( - "--scan-follow-aux", help="Find auxiliary pointer data", action="store_true" - ) - parser.add_argument( - "--coded", help="Scan and follow on LE Coded PHY", action="store_true" - ) - - logging.info("Started PID {}".format(os.getpid())) - - try: - args, unknown = parser.parse_known_args() - logging.info(args) - - except argparse.ArgumentError as exc: - print("%s" % exc, file=sys.stderr) - fifo_found = False - fifo = "" - for arg in sys.argv: - if arg == "--fifo" or arg == "--extcap-fifo": - fifo_found = True - elif fifo_found: - fifo = arg - break - extcap_close_fifo(fifo) - sys.exit(ERROR_ARG) - - if len(sys.argv) <= 1: - parser.exit("No arguments given!") - - if args.extcap_version: - extcap_version = args.extcap_version - - if args.extcap_capture_filter: - parse_capture_filter(args.extcap_capture_filter) - if args.extcap_interface and len(sys.argv) == 5: - sys.exit(0) - - if not args.extcap_interfaces and args.extcap_interface is None: - parser.exit("An interface must be provided or the selection must be displayed") - - if args.extcap_interfaces or args.extcap_interface is None: - extcap_interfaces() - sys.exit(0) - - if len(unknown) > 0: - print("Sniffer %d unknown arguments given" % len(unknown)) - logging.info("Sniffer %d unknown arguments given" % len(unknown)) - - interface = args.extcap_interface - - capture_only_advertising = args.only_advertising - capture_only_legacy_advertising = args.only_legacy_advertising - capture_scan_response = args.scan_follow_rsp - capture_scan_aux_pointer = args.scan_follow_aux - capture_coded = args.coded - - if args.extcap_config: - extcap_config(interface) - elif args.extcap_dlts: - extcap_dlts(interface) - elif args.capture: - if args.fifo is None: - parser.print_help() - sys.exit(ERROR_FIFO) - try: - logging.info("sniffer capture") - sniffer_capture( - interface, - args.baudrate, - args.fifo, - args.extcap_control_in, - args.extcap_control_out, - ) - except KeyboardInterrupt: - pass - except Exception as e: - import traceback - - logging.info(traceback.format_exc()) - logging.info("internal error: {}".format(repr(e))) - sys.exit(ERROR_INTERNAL) - else: - parser.print_help() - sys.exit(ERROR_USAGE) - logging.info("main exit PID {}".format(os.getpid()))