From cc301951fca3034993f374aac8b3505336002dfa Mon Sep 17 00:00:00 2001 From: Jakob Lechner Date: Tue, 11 Nov 2025 19:34:40 +0100 Subject: [PATCH] Apply black --- .../extcap/SnifferAPI/CaptureFiles.py | 8 +- .../wireshark/extcap/SnifferAPI/Devices.py | 21 +- .../wireshark/extcap/SnifferAPI/Exceptions.py | 6 + .../wireshark/extcap/SnifferAPI/Filelock.py | 26 +- modules/wireshark/extcap/SnifferAPI/Logger.py | 17 +- .../extcap/SnifferAPI/Notifications.py | 18 +- modules/wireshark/extcap/SnifferAPI/Packet.py | 250 ++++++---- modules/wireshark/extcap/SnifferAPI/Pcap.py | 29 +- .../wireshark/extcap/SnifferAPI/Sniffer.py | 42 +- .../extcap/SnifferAPI/SnifferCollector.py | 113 +++-- modules/wireshark/extcap/SnifferAPI/Types.py | 90 ++-- modules/wireshark/extcap/SnifferAPI/UART.py | 21 +- .../wireshark/extcap/SnifferAPI/version.py | 1 - modules/wireshark/extcap/nrf_sniffer_ble.py | 429 ++++++++++++------ 14 files changed, 684 insertions(+), 387 deletions(-) diff --git a/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py b/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py index f5cf6cb..8c218e5 100644 --- a/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py +++ b/modules/wireshark/extcap/SnifferAPI/CaptureFiles.py @@ -59,13 +59,13 @@ class CaptureFileHandler: if not os.path.isdir(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) self.filename = filename - self.backupFilename = self.filename+".1" + self.backupFilename = self.filename + ".1" if not os.path.isfile(self.filename): self.startNewFile() elif os.path.getsize(self.filename) > 20000000: self.doRollover() if clear: - #clear file + # clear file self.startNewFile() def startNewFile(self): @@ -86,6 +86,6 @@ class CaptureFileHandler: def writePacket(self, packet): with open(self.filename, "ab") as f: packet = Pcap.create_packet( - bytes([packet.boardId] + packet.getList()), - packet.time) + bytes([packet.boardId] + packet.getList()), packet.time + ) f.write(packet) diff --git a/modules/wireshark/extcap/SnifferAPI/Devices.py b/modules/wireshark/extcap/SnifferAPI/Devices.py index 8ea3c0e..61ac961 100644 --- a/modules/wireshark/extcap/SnifferAPI/Devices.py +++ b/modules/wireshark/extcap/SnifferAPI/Devices.py @@ -39,6 +39,7 @@ from . import Notifications import logging, threading + class DeviceList(Notifications.Notifier): def __init__(self, *args, **kwargs): Notifications.Notifier.__init__(self, *args, **kwargs) @@ -52,7 +53,7 @@ class DeviceList(Notifications.Notifier): return len(self.devices) def __repr__(self): - return "Sniffer Device List: "+str(self.asList()) + return "Sniffer Device List: " + str(self.asList()) def clear(self): logging.info("Clearing") @@ -69,11 +70,15 @@ class DeviceList(Notifications.Notifier): self.append(newDevice) else: updated = False - if (newDevice.name != "\"\"") and (existingDevice.name == "\"\""): + if (newDevice.name != '""') and (existingDevice.name == '""'): existingDevice.name = newDevice.name updated = True - if (newDevice.RSSI != 0 and (existingDevice.RSSI < (newDevice.RSSI - 5)) or (existingDevice.RSSI > (newDevice.RSSI+2))): + if ( + newDevice.RSSI != 0 + and (existingDevice.RSSI < (newDevice.RSSI - 5)) + or (existingDevice.RSSI > (newDevice.RSSI + 2)) + ): existingDevice.RSSI = newDevice.RSSI updated = True @@ -93,14 +98,14 @@ class DeviceList(Notifications.Notifier): return self.devices[id] elif type(id) == str: for dev in self.devices: - if dev.name in [id, '"'+id+'"']: + if dev.name in [id, '"' + id + '"']: return dev elif id.__class__.__name__ == "Device": return self.find(id.address) return None def remove(self, id): - if type(id) == list: #address + if type(id) == list: # address device = self.devices.pop(self.devices.index(self.find(id))) elif type(id) == int: device = self.devices.pop(id) @@ -126,6 +131,7 @@ class DeviceList(Notifications.Notifier): def asList(self): return self.devices[:] + class Device: def __init__(self, address, name, RSSI): self.address = address @@ -134,10 +140,11 @@ class Device: self.followed = False def __repr__(self): - return 'Bluetooth LE device "'+self.name+'" ('+str(self.address)+')' + return 'Bluetooth LE device "' + self.name + '" (' + str(self.address) + ")" + def listToString(list): str = "" for i in list: - str+=chr(i) + str += chr(i) return str diff --git a/modules/wireshark/extcap/SnifferAPI/Exceptions.py b/modules/wireshark/extcap/SnifferAPI/Exceptions.py index f3ab20c..86f356a 100644 --- a/modules/wireshark/extcap/SnifferAPI/Exceptions.py +++ b/modules/wireshark/extcap/SnifferAPI/Exceptions.py @@ -38,23 +38,29 @@ class SnifferTimeout(Exception): pass + class UARTPacketError(Exception): pass + class LockedException(Exception): def __init__(self, message): self.message = message + class InvalidPacketException(Exception): pass + class InvalidAdvChannel(Exception): pass + # Internal Use class SnifferWatchDogTimeout(SnifferTimeout): pass + # Internal Use class ExitCodeException(Exception): pass diff --git a/modules/wireshark/extcap/SnifferAPI/Filelock.py b/modules/wireshark/extcap/SnifferAPI/Filelock.py index 5570942..7bf21b5 100644 --- a/modules/wireshark/extcap/SnifferAPI/Filelock.py +++ b/modules/wireshark/extcap/SnifferAPI/Filelock.py @@ -2,7 +2,7 @@ import os import logging from sys import platform -if platform == 'linux': +if platform == "linux": import psutil from . import Exceptions @@ -16,8 +16,9 @@ from . import Exceptions # HDB UUCP lock file format: # process identifier (PID) as a ten byte ASCII decimal number, with a trailing newline + def lockpid(lockfile): - if (os.path.isfile(lockfile)): + if os.path.isfile(lockfile): with open(lockfile) as fd: lockpid = fd.read() @@ -30,17 +31,13 @@ def lockpid(lockfile): return 0 + def lock(port): - if platform != 'linux': + if platform != "linux": return tty = os.path.basename(port) - lockfile = os.path.join( - '/run', - 'user', - f'{os.getuid()}', - f'{tty}.lock' - ) + lockfile = os.path.join("/run", "user", f"{os.getuid()}", f"{tty}.lock") lockedpid = lockpid(lockfile) if lockedpid: @@ -53,16 +50,17 @@ def lock(port): logging.info("Lockfile is stale. Overriding it..") os.remove(lockfile) - fd = open(lockfile, 'w') - with open(lockfile, 'w') as fd: - fd.write(f'{os.getpid():10}') + fd = open(lockfile, "w") + with open(lockfile, "w") as fd: + fd.write(f"{os.getpid():10}") + def unlock(port): - if platform != 'linux': + if platform != "linux": return tty = os.path.basename(port) - lockfile = f'/var/lock/LCK..{tty}' + lockfile = f"/var/lock/LCK..{tty}" lockedpid = lockpid(lockfile) if lockedpid == os.getpid(): diff --git a/modules/wireshark/extcap/SnifferAPI/Logger.py b/modules/wireshark/extcap/SnifferAPI/Logger.py index cbb1e7c..228a0f1 100644 --- a/modules/wireshark/extcap/SnifferAPI/Logger.py +++ b/modules/wireshark/extcap/SnifferAPI/Logger.py @@ -48,9 +48,11 @@ import logging.handlers as logHandlers # will result in the line being appended to the log file # ################################################################# -appdata = os.getenv('appdata') +appdata = os.getenv("appdata") if appdata: - DEFAULT_LOG_FILE_DIR = os.path.join(appdata, 'Nordic Semiconductor', 'Sniffer', 'logs') + DEFAULT_LOG_FILE_DIR = os.path.join( + appdata, "Nordic Semiconductor", "Sniffer", "logs" + ) else: DEFAULT_LOG_FILE_DIR = "/tmp/logs" @@ -89,8 +91,12 @@ def initLogger(): global logFlusher global logHandlerArray - logHandler = MyRotatingFileHandler(logFileName, mode='a', maxBytes=myMaxBytes, backupCount=3) - logFormatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%d-%b-%Y %H:%M:%S (%z)') + logHandler = MyRotatingFileHandler( + logFileName, mode="a", maxBytes=myMaxBytes, backupCount=3 + ) + logFormatter = logging.Formatter( + "%(asctime)s %(levelname)s: %(message)s", datefmt="%d-%b-%Y %H:%M:%S (%z)" + ) logHandler.setFormatter(logFormatter) logger = logging.getLogger() logger.addHandler(logHandler) @@ -154,6 +160,7 @@ def addLogHandler(logHandler): logger.setLevel(logging.INFO) logHandlerArray.append(logHandler) + def removeLogHandler(logHandler): global logHandlerArray logger = logging.getLogger() @@ -200,7 +207,7 @@ class LogFlusher(threading.Thread): self.exit.set() -if __name__ == '__main__': +if __name__ == "__main__": initLogger() for i in range(50): logging.info("test log no. " + str(i)) diff --git a/modules/wireshark/extcap/SnifferAPI/Notifications.py b/modules/wireshark/extcap/SnifferAPI/Notifications.py index d9fd8e4..b7cba37 100644 --- a/modules/wireshark/extcap/SnifferAPI/Notifications.py +++ b/modules/wireshark/extcap/SnifferAPI/Notifications.py @@ -37,18 +37,20 @@ import threading, logging -class Notification(): - def __init__(self, key, msg = None): + +class Notification: + def __init__(self, key, msg=None): if type(key) is not str: - raise TypeError("Invalid notification key: "+str(key)) + raise TypeError("Invalid notification key: " + str(key)) self.key = key self.msg = msg def __repr__(self): return "Notification (key: %s, msg: %s)" % (str(self.key), str(self.msg)) -class Notifier(): - def __init__(self, callbacks = [], **kwargs): + +class Notifier: + def __init__(self, callbacks=[], **kwargs): self.callbacks = {} self.callbackLock = threading.RLock() @@ -75,10 +77,10 @@ class Notifier(): self.callbacks[key] = [] return self.callbacks[key] - def notify(self, key = None, msg = None, notification = None): + def notify(self, key=None, msg=None, notification=None): with self.callbackLock: if notification == None: - notification = Notification(key,msg) + notification = Notification(key, msg) for callback in self.getCallbacks(notification.key): callback(notification) @@ -87,4 +89,4 @@ class Notifier(): callback(notification) def passOnNotification(self, notification): - self.notify(notification = notification) + self.notify(notification=notification) diff --git a/modules/wireshark/extcap/SnifferAPI/Packet.py b/modules/wireshark/extcap/SnifferAPI/Packet.py index ddac78d..bc4abd9 100644 --- a/modules/wireshark/extcap/SnifferAPI/Packet.py +++ b/modules/wireshark/extcap/SnifferAPI/Packet.py @@ -43,17 +43,17 @@ ADV_ACCESS_ADDRESS = [0xD6, 0xBE, 0x89, 0x8E] SYNCWORD_POS = 0 PAYLOAD_LEN_POS_V1 = 1 PAYLOAD_LEN_POS = 0 -PROTOVER_POS = PAYLOAD_LEN_POS+2 -PACKETCOUNTER_POS = PROTOVER_POS+1 -ID_POS = PACKETCOUNTER_POS+2 +PROTOVER_POS = PAYLOAD_LEN_POS + 2 +PACKETCOUNTER_POS = PROTOVER_POS + 1 +ID_POS = PACKETCOUNTER_POS + 2 -BLE_HEADER_LEN_POS = ID_POS+1 -FLAGS_POS = BLE_HEADER_LEN_POS+1 -CHANNEL_POS = FLAGS_POS+1 -RSSI_POS = CHANNEL_POS+1 -EVENTCOUNTER_POS = RSSI_POS+1 -TIMESTAMP_POS = EVENTCOUNTER_POS+2 -BLEPACKET_POS = TIMESTAMP_POS+4 +BLE_HEADER_LEN_POS = ID_POS + 1 +FLAGS_POS = BLE_HEADER_LEN_POS + 1 +CHANNEL_POS = FLAGS_POS + 1 +RSSI_POS = CHANNEL_POS + 1 +EVENTCOUNTER_POS = RSSI_POS + 1 +TIMESTAMP_POS = EVENTCOUNTER_POS + 2 +BLEPACKET_POS = TIMESTAMP_POS + 4 TXADD_POS = BLEPACKET_POS + 4 TXADD_MSK = 0x40 PAYLOAD_POS = BLE_HEADER_LEN_POS @@ -109,7 +109,7 @@ class PacketReader(Notifications.Notifier): tempSLIPBuffer.append(SLIP_END) return tempSLIPBuffer - # This function uses getSerialByte() function to get SLIP encoded bytes from the serial port and return a decoded byte list + # This function uses getSerialByte() function to get SLIP encoded bytes from the serial port and return a decoded byte list # Based on https://github.com/mehdix/pyslip/ def decodeFromSLIP(self, timeout=None, complete_timeout=None): dataBuffer = [] @@ -119,11 +119,15 @@ class PacketReader(Notifications.Notifier): if complete_timeout is not None: time_start = time.time() - while not startOfPacket and (complete_timeout is None or (time.time() - time_start < complete_timeout)): + while not startOfPacket and ( + complete_timeout is None or (time.time() - time_start < complete_timeout) + ): res = self.getSerialByte(timeout) - startOfPacket = (res == SLIP_START) + startOfPacket = res == SLIP_START - while not endOfPacket and (complete_timeout is None or (time.time() - time_start < complete_timeout)): + while not endOfPacket and ( + complete_timeout is None or (time.time() - time_start < complete_timeout) + ): serialByte = self.getSerialByte(timeout) if serialByte == SLIP_END: endOfPacket = True @@ -138,9 +142,11 @@ class PacketReader(Notifications.Notifier): else: dataBuffer.append(SLIP_END) else: - dataBuffer.append(serialByte) + dataBuffer.append(serialByte) if not endOfPacket: - raise Exceptions.UARTPacketError("Exceeded max timeout of %f seconds." % complete_timeout) + raise Exceptions.UARTPacketError( + "Exceeded max timeout of %f seconds." % complete_timeout + ) return dataBuffer # This function read byte chuncks from the serial port and return one byte at a time @@ -153,13 +159,23 @@ class PacketReader(Notifications.Notifier): def handlePacketHistory(self, packet): # Reads and validates packet counter - if self.lastReceivedPacket is not None \ - and packet.packetCounter != (self.lastReceivedPacket.packetCounter + 1) % PACKET_COUNTER_CAP \ - and self.lastReceivedPacket.packetCounter != 0: + if ( + self.lastReceivedPacket is not None + and packet.packetCounter + != (self.lastReceivedPacket.packetCounter + 1) % PACKET_COUNTER_CAP + and self.lastReceivedPacket.packetCounter != 0 + ): - logging.info("gap in packets, between " + str(self.lastReceivedPacket.packetCounter) + " and " - + str(packet.packetCounter) + " packet before: " + str(self.lastReceivedPacket.packetList) - + " packet after: " + str(packet.packetList)) + logging.info( + "gap in packets, between " + + str(self.lastReceivedPacket.packetCounter) + + " and " + + str(packet.packetCounter) + + " packet before: " + + str(self.lastReceivedPacket.packetList) + + " packet after: " + + str(packet.packetList) + ) self.lastReceivedPacket = packet if packet.id in [EVENT_PACKET_DATA_PDU, EVENT_PACKET_ADV_PDU]: @@ -198,25 +214,34 @@ class PacketReader(Notifications.Notifier): # Convert time-stamp to End to Start delta time_delta = 0 - if self.lastReceivedTimestampPacket is not None and self.lastReceivedTimestampPacket.valid: - time_delta = (packet.timestamp - - (self.lastReceivedTimestampPacket.timestamp + - self.getPacketTime(self.lastReceivedTimestampPacket))) + if ( + self.lastReceivedTimestampPacket is not None + and self.lastReceivedTimestampPacket.valid + ): + time_delta = packet.timestamp - ( + self.lastReceivedTimestampPacket.timestamp + + self.getPacketTime(self.lastReceivedTimestampPacket) + ) time_delta = toLittleEndian(time_delta, 4) - packet.packetList[TIMESTAMP_POS ] = time_delta[0] - packet.packetList[TIMESTAMP_POS+1] = time_delta[1] - packet.packetList[TIMESTAMP_POS+2] = time_delta[2] - packet.packetList[TIMESTAMP_POS+3] = time_delta[3] - + packet.packetList[TIMESTAMP_POS] = time_delta[0] + packet.packetList[TIMESTAMP_POS + 1] = time_delta[1] + packet.packetList[TIMESTAMP_POS + 2] = time_delta[2] + packet.packetList[TIMESTAMP_POS + 3] = time_delta[3] def handlePacketCompatibility(self, packet): - if self.supportedProtocolVersion == PROTOVER_V2 and packet.packetList[PROTOVER_POS] > PROTOVER_V2: + if ( + self.supportedProtocolVersion == PROTOVER_V2 + and packet.packetList[PROTOVER_POS] > PROTOVER_V2 + ): self.convertPacketListProtoVer2(packet) def setSupportedProtocolVersion(self, supportedProtocolVersion): - if (supportedProtocolVersion != PROTOVER_V3): - logging.info("Using packet compatibility, converting packets to protocol version %d", supportedProtocolVersion) + if supportedProtocolVersion != PROTOVER_V3: + logging.info( + "Using packet compatibility, converting packets to protocol version %d", + supportedProtocolVersion, + ) self.supportedProtocolVersion = supportedProtocolVersion def getPacket(self, timeout=None): @@ -234,17 +259,30 @@ class PacketReader(Notifications.Notifier): return packet def sendPacket(self, id, payload): - packetList = [HEADER_LENGTH] + [len(payload)] + [PROTOVER_V1] + toLittleEndian(self.packetCounter, 2) + [id] + payload + packetList = ( + [HEADER_LENGTH] + + [len(payload)] + + [PROTOVER_V1] + + toLittleEndian(self.packetCounter, 2) + + [id] + + payload + ) packetList = self.encodeToSLIP(packetList) self.packetCounter += 1 self.uart.writeList(packetList) - def sendScan(self, findScanRsp = False, findAux = False, scanCoded = False): + def sendScan(self, findScanRsp=False, findAux=False, scanCoded=False): flags0 = findScanRsp | (findAux << 1) | (scanCoded << 2) self.sendPacket(REQ_SCAN_CONT, [flags0]) logging.info("Scan flags: %s" % bin(flags0)) - def sendFollow(self, addr, followOnlyAdvertisements = False, followOnlyLegacy = False, followCoded = False): + def sendFollow( + self, + addr, + followOnlyAdvertisements=False, + followOnlyLegacy=False, + followCoded=False, + ): flags0 = followOnlyAdvertisements | (followOnlyLegacy << 1) | (followCoded << 2) logging.info("Follow flags: %s" % bin(flags0)) self.sendPacket(REQ_FOLLOW, addr + [flags0]) @@ -253,7 +291,7 @@ class PacketReader(Notifications.Notifier): self.sendPacket(PING_REQ, []) def getBytes(self, value, size): - if (len(value) < size): + if len(value) < size: value = [0] * (size - len(value)) + value else: value = value[:size] @@ -294,10 +332,12 @@ class PacketReader(Notifications.Notifier): def sendHopSequence(self, hopSequence): for chan in hopSequence: if chan not in VALID_ADV_CHANS: - raise Exceptions.InvalidAdvChannel("%s is not an adv channel" % str(chan)) - payload = [len(hopSequence)] + hopSequence + [37]*(3-len(hopSequence)) + raise Exceptions.InvalidAdvChannel( + "%s is not an adv channel" % str(chan) + ) + payload = [len(hopSequence)] + hopSequence + [37] * (3 - len(hopSequence)) self.sendPacket(SET_ADV_CHANNEL_HOP_SEQ, payload) - self.notify("NEW_ADV_HOP_SEQ", {"hopSequence":hopSequence}) + self.notify("NEW_ADV_HOP_SEQ", {"hopSequence": hopSequence}) def sendVersionReq(self): self.sendPacket(REQ_VERSION, []) @@ -313,21 +353,31 @@ class Packet: def __init__(self, packetList): try: if not packetList: - raise Exceptions.InvalidPacketException("packet list not valid: %s" % str(packetList)) + raise Exceptions.InvalidPacketException( + "packet list not valid: %s" % str(packetList) + ) self.protover = packetList[PROTOVER_POS] if self.protover > PROTOVER_V3: - logging.exception("Unsupported protocol version %s" % str(self.protover)) - raise RuntimeError("Unsupported protocol version %s" % str(self.protover)) + logging.exception( + "Unsupported protocol version %s" % str(self.protover) + ) + raise RuntimeError( + "Unsupported protocol version %s" % str(self.protover) + ) - self.packetCounter = parseLittleEndian(packetList[PACKETCOUNTER_POS:PACKETCOUNTER_POS + 2]) + self.packetCounter = parseLittleEndian( + packetList[PACKETCOUNTER_POS : PACKETCOUNTER_POS + 2] + ) self.id = packetList[ID_POS] if int(self.protover) == PROTOVER_V1: self.payloadLength = packetList[PAYLOAD_LEN_POS_V1] else: - self.payloadLength = parseLittleEndian(packetList[PAYLOAD_LEN_POS:PAYLOAD_LEN_POS + 2]) + self.payloadLength = parseLittleEndian( + packetList[PAYLOAD_LEN_POS : PAYLOAD_LEN_POS + 2] + ) self.packetList = packetList self.readPayload(packetList) @@ -337,24 +387,26 @@ class Packet: self.OK = False self.valid = False except Exception as e: - logging.exception("packet creation error %s" %str(e)) + logging.exception("packet creation error %s" % str(e)) logging.info("packetList: " + str(packetList)) self.OK = False self.valid = False def __repr__(self): - return "UART packet, type: "+str(self.id)+", PC: "+str(self.packetCounter) + return "UART packet, type: " + str(self.id) + ", PC: " + str(self.packetCounter) def readPayload(self, packetList): self.blePacket = None self.OK = False if not self.validatePacketList(packetList): - raise Exceptions.InvalidPacketException("packet list not valid: %s" % str(packetList)) + raise Exceptions.InvalidPacketException( + "packet list not valid: %s" % str(packetList) + ) else: self.valid = True - self.payload = packetList[PAYLOAD_POS:PAYLOAD_POS+self.payloadLength] + self.payload = packetList[PAYLOAD_POS : PAYLOAD_POS + self.payloadLength] if self.id == EVENT_PACKET_ADV_PDU or self.id == EVENT_PACKET_DATA_PDU: try: @@ -365,23 +417,27 @@ class Packet: self.channel = packetList[CHANNEL_POS] self.rawRSSI = packetList[RSSI_POS] self.RSSI = -self.rawRSSI - self.eventCounter = parseLittleEndian(packetList[EVENTCOUNTER_POS:EVENTCOUNTER_POS+2]) + self.eventCounter = parseLittleEndian( + packetList[EVENTCOUNTER_POS : EVENTCOUNTER_POS + 2] + ) - self.timestamp = parseLittleEndian(packetList[TIMESTAMP_POS:TIMESTAMP_POS+4]) + self.timestamp = parseLittleEndian( + packetList[TIMESTAMP_POS : TIMESTAMP_POS + 4] + ) # The hardware adds a padding byte which isn't sent on air. # We remove it, and update the payload length in the packet list. if self.phy == PHY_CODED: - self.packetList.pop(BLEPACKET_POS+6+1) + self.packetList.pop(BLEPACKET_POS + 6 + 1) else: - self.packetList.pop(BLEPACKET_POS+6) + self.packetList.pop(BLEPACKET_POS + 6) self.payloadLength -= 1 if self.protover >= PROTOVER_V2: # Write updated payload length back to the packet list. payloadLength = toLittleEndian(self.payloadLength, 2) - packetList[PAYLOAD_LEN_POS ] = payloadLength[0] - packetList[PAYLOAD_LEN_POS+1] = payloadLength[1] - else: # PROTOVER_V1 + packetList[PAYLOAD_LEN_POS] = payloadLength[0] + packetList[PAYLOAD_LEN_POS + 1] = payloadLength[1] + else: # PROTOVER_V1 packetList[PAYLOAD_LEN_POS_V1] = self.payloadLength else: logging.info("Invalid BLE Header Length " + str(packetList)) @@ -390,15 +446,22 @@ class Packet: if self.OK: try: if self.protover >= PROTOVER_V3: - packet_type = (PACKET_TYPE_ADVERTISING - if self.id == EVENT_PACKET_ADV_PDU else - PACKET_TYPE_DATA) + packet_type = ( + PACKET_TYPE_ADVERTISING + if self.id == EVENT_PACKET_ADV_PDU + else PACKET_TYPE_DATA + ) else: - packet_type = (PACKET_TYPE_ADVERTISING - if packetList[BLEPACKET_POS : BLEPACKET_POS + 4] == ADV_ACCESS_ADDRESS else - PACKET_TYPE_DATA) + packet_type = ( + PACKET_TYPE_ADVERTISING + if packetList[BLEPACKET_POS : BLEPACKET_POS + 4] + == ADV_ACCESS_ADDRESS + else PACKET_TYPE_DATA + ) - self.blePacket = BlePacket(packet_type, packetList[BLEPACKET_POS:], self.phy) + self.blePacket = BlePacket( + packet_type, packetList[BLEPACKET_POS:], self.phy + ) except Exception as e: logging.exception("blePacket error %s" % str(e)) except Exception as e: @@ -407,13 +470,17 @@ class Packet: self.OK = False elif self.id == PING_RESP: if self.protover < PROTOVER_V3: - self.version = parseLittleEndian(packetList[PAYLOAD_POS:PAYLOAD_POS+2]) + self.version = parseLittleEndian( + packetList[PAYLOAD_POS : PAYLOAD_POS + 2] + ) elif self.id == RESP_VERSION: - self.version = ''.join([chr(i) for i in packetList[PAYLOAD_POS:]]) + self.version = "".join([chr(i) for i in packetList[PAYLOAD_POS:]]) elif self.id == RESP_TIMESTAMP: - self.timestamp = parseLittleEndian(packetList[PAYLOAD_POS:PAYLOAD_POS+4]) + self.timestamp = parseLittleEndian( + packetList[PAYLOAD_POS : PAYLOAD_POS + 4] + ) elif self.id == SWITCH_BAUD_RATE_RESP or self.id == SWITCH_BAUD_RATE_REQ: - self.baudRate = parseLittleEndian(packetList[PAYLOAD_POS:PAYLOAD_POS+4]) + self.baudRate = parseLittleEndian(packetList[PAYLOAD_POS : PAYLOAD_POS + 4]) else: logging.info("Unknown packet ID") @@ -438,7 +505,8 @@ class Packet: logging.exception("Invalid packet: %s" % str(packetList)) return False -class BlePacket(): + +class BlePacket: def __init__(self, type, packetList, phy): self.type = type @@ -458,12 +526,11 @@ class BlePacket(): offset = self.extractAddresses(packetList, offset) self.extractName(packetList, offset) - def __repr__(self): - return "BLE packet, AAddr: "+str(self.accessAddress) + return "BLE packet, AAddr: " + str(self.accessAddress) def extractAccessAddress(self, packetList, offset): - self.accessAddress = packetList[offset:offset+4] + self.accessAddress = packetList[offset : offset + 4] return offset + 4 def extractFormat(self, packetList, phy, offset): @@ -497,29 +564,29 @@ class BlePacket(): scanAddr = None if self.advType in [0, 1, 2, 4, 6]: - addr = packetList[offset:offset+6] + addr = packetList[offset : offset + 6] addr.reverse() addr += [self.txAddrType] offset += 6 if self.advType in [3, 5]: - scanAddr = packetList[offset:offset+6] + scanAddr = packetList[offset : offset + 6] scanAddr.reverse() scanAddr += [self.txAddrType] offset += 6 - addr = packetList[offset:offset+6] + addr = packetList[offset : offset + 6] addr.reverse() addr += [self.rxAddrType] offset += 6 if self.advType == 1: - scanAddr = packetList[offset:offset+6] + scanAddr = packetList[offset : offset + 6] scanAddr.reverse() scanAddr += [self.rxAddrType] offset += 6 if self.advType == 7: - ext_header_len = packetList[offset] & 0x3f + ext_header_len = packetList[offset] & 0x3F offset += 1 ext_header_offset = offset @@ -527,13 +594,13 @@ class BlePacket(): ext_header_offset += 1 if flags & 0x01: - addr = packetList[ext_header_offset:ext_header_offset+6] + addr = packetList[ext_header_offset : ext_header_offset + 6] addr.reverse() addr += [self.txAddrType] ext_header_offset += 6 if flags & 0x02: - scanAddr = packetList[ext_header_offset:ext_header_offset+6] + scanAddr = packetList[ext_header_offset : ext_header_offset + 6] scanAddr.reverse() scanAddr += [self.rxAddrType] ext_header_offset += 6 @@ -550,17 +617,17 @@ class BlePacket(): i = offset while i < len(packetList): length = packetList[i] - if (i+length+1) > len(packetList) or length == 0: + if (i + length + 1) > len(packetList) or length == 0: break - type = packetList[i+1] + type = packetList[i + 1] if type == 8 or type == 9: - nameList = packetList[i+2:i+length+1] + nameList = packetList[i + 2 : i + length + 1] name = "" for j in nameList: name += chr(j) - i += (length+1) - name = '"'+name+'"' - elif (self.advType == 1): + i += length + 1 + name = '"' + name + '"' + elif self.advType == 1: name = "[ADV_DIRECT_IND]" self.name = name @@ -569,15 +636,16 @@ class BlePacket(): self.length = packetList[offset] return offset + 1 + def parseLittleEndian(list): total = 0 for i in range(len(list)): - total+=(list[i] << (8*i)) + total += list[i] << (8 * i) return total -def toLittleEndian(value, size): - list = [0]*size - for i in range(size): - list[i] = (value >> (i*8)) % 256 - return list +def toLittleEndian(value, size): + list = [0] * size + for i in range(size): + list[i] = (value >> (i * 8)) % 256 + return list diff --git a/modules/wireshark/extcap/SnifferAPI/Pcap.py b/modules/wireshark/extcap/SnifferAPI/Pcap.py index 63cbe41..8b0445a 100644 --- a/modules/wireshark/extcap/SnifferAPI/Pcap.py +++ b/modules/wireshark/extcap/SnifferAPI/Pcap.py @@ -44,14 +44,16 @@ import struct # - https://github.com/pcapng/pcapng # - https://www.tcpdump.org/linktypes/LINKTYPE_NORDIC_BLE.html PACKET_HEADER = struct.Struct(" empty string ('') return None, None, None - _, _, length, arg, typ = struct.unpack('>sBHBB', header) + _, _, length, arg, typ = struct.unpack(">sBHBB", header) payload = bytearray() if length > 2: @@ -239,6 +321,7 @@ def control_read(): return arg, typ, payload + def control_write(arg, typ, message): """Write the message to the control channel""" @@ -247,8 +330,8 @@ def control_write(arg, typ, message): return packet = bytearray() - packet += struct.pack('>BBHBB', ord('T'), 0, len(message) + 2, arg, typ) - packet += message.encode('utf-8') + packet += struct.pack(">BBHBB", ord("T"), 0, len(message) + 2, arg, typ) + packet += message.encode("utf-8") fn_ctrl_out.write(packet) @@ -278,11 +361,13 @@ def device_added(notification): # Extcap selector uses \0 character to separate value and display value, # therefore the display value cannot contain the \0 character as this # would lead to truncation of the display value. - display = (device.name.replace('\0', '\\0') + - (" " + str(device.RSSI) + " dBm " if device.RSSI != 0 else " ") + - string_address(device.address)) + display = ( + device.name.replace("\0", "\\0") + + (" " + str(device.RSSI) + " dBm " if device.RSSI != 0 else " ") + + string_address(device.address) + ) - message = str(device.address) + '\0' + display + message = str(device.address) + "\0" + display control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, message) @@ -298,33 +383,35 @@ def device_removed(notification): control_write(CTRL_ARG_DEVICE, CTRL_CMD_REMOVE, message) logging.info("Removed: " + display) + def devices_cleared(notification): """Devices have been cleared""" message = "" control_write(CTRL_ARG_DEVICE, CTRL_CMD_REMOVE, message) - control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, " " + '\0' + "All advertising devices") - control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, zero_addr + '\0' + "Follow IRK") + control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, " " + "\0" + "All advertising devices") + control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, zero_addr + "\0" + "Follow IRK") control_write(CTRL_ARG_DEVICE, CTRL_CMD_SET, " ") + def handle_control_command(sniffer, arg, typ, payload): """Handle command from control channel""" global last_used_key_type if arg == CTRL_ARG_DEVICE: - if payload == b' ': + if payload == b" ": scan_for_devices(sniffer) else: values = payload - values = values.replace(b'[', b'') - values = values.replace(b']', b'') - device_address = values.split(b',') + values = values.replace(b"[", b"") + values = values.replace(b"]", b"") + device_address = values.split(b",") - logging.info('follow_device: {}'.format(device_address)) + logging.info("follow_device: {}".format(device_address)) for i in range(6): device_address[i] = int(device_address[i]) - device_address[6] = 1 if device_address[6] == b' 1' else 0 + device_address[6] = 1 if device_address[6] == b" 1" else 0 device = Devices.Device(address=device_address, name='""', RSSI=0) @@ -333,7 +420,7 @@ def handle_control_command(sniffer, arg, typ, payload): elif arg == CTRL_ARG_DEVICE_CLEAR: clear_devices(sniffer) elif arg == CTRL_ARG_KEY_TYPE: - last_used_key_type = int(payload.decode('utf-8')) + last_used_key_type = int(payload.decode("utf-8")) elif arg == CTRL_ARG_KEY_VAL: set_key_value(sniffer, payload) elif arg == CTRL_ARG_ADVHOP: @@ -384,8 +471,10 @@ def follow_device(sniffer, device): """Follow the selected device""" global write_new_packets, in_follow_mode - sniffer.follow(device, capture_only_advertising, capture_only_legacy_advertising, capture_coded) - time.sleep(.1) + sniffer.follow( + device, capture_only_advertising, capture_only_legacy_advertising, capture_coded + ) + time.sleep(0.1) in_follow_mode = True logging.info("Following " + string_address(device.address)) @@ -395,58 +484,64 @@ def set_key_value(sniffer, payload): """Send key value to device""" global last_used_key_val - payload = payload.decode('utf-8') + payload = payload.decode("utf-8") last_used_key_val = payload - if (last_used_key_type == CTRL_KEY_TYPE_PASSKEY): + if last_used_key_type == CTRL_KEY_TYPE_PASSKEY: if re.match("^[0-9]{6}$", payload): set_passkey(sniffer, payload) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_OOB): + elif last_used_key_type == CTRL_KEY_TYPE_OOB: if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): set_OOB(sniffer, payload[2:]) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_DH_PRIVATE_KEY): - if (re.match("^0[xX][0-9A-Za-z]{1,64}$", payload)): + elif last_used_key_type == CTRL_KEY_TYPE_DH_PRIVATE_KEY: + if re.match("^0[xX][0-9A-Za-z]{1,64}$", payload): set_dh_private_key(sniffer, payload[2:]) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_LEGACY_LTK): - if (re.match("^0[xX][0-9A-Za-z]{1,32}$", payload)): + elif last_used_key_type == CTRL_KEY_TYPE_LEGACY_LTK: + if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): set_legacy_ltk(sniffer, payload[2:]) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_SC_LTK): - if (re.match("^0[xX][0-9A-Za-z]{1,32}$", payload)): + elif last_used_key_type == CTRL_KEY_TYPE_SC_LTK: + if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): set_sc_ltk(sniffer, payload[2:]) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_IRK): - if (re.match("^0[xX][0-9A-Za-z]{1,32}$", payload)): + elif last_used_key_type == CTRL_KEY_TYPE_IRK: + if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload): set_irk(sniffer, payload[2:]) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_ADD_ADDR): - if (re.match("^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload)): + elif last_used_key_type == CTRL_KEY_TYPE_ADD_ADDR: + if re.match( + "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload + ): add_address(sniffer, payload) else: logging.info("Invalid key value: " + str(payload)) - elif (last_used_key_type == CTRL_KEY_TYPE_FOLLOW_ADDR): - if (re.match("^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload)): + elif last_used_key_type == CTRL_KEY_TYPE_FOLLOW_ADDR: + if re.match( + "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload + ): follow_address(sniffer, payload) else: logging.info("Invalid key value: " + str(payload)) else: logging.info("Invalid key type: " + str(last_used_key_type)) -def parse_hex(value): - if len(value) % 2 != 0: - value = '0' + value - a = list(value) - return [int(x + y, 16) for x,y in zip(a[::2], a[1::2])] +def parse_hex(value): + if len(value) % 2 != 0: + value = "0" + value + + a = list(value) + return [int(x + y, 16) for x, y in zip(a[::2], a[1::2])] + def set_passkey(sniffer, payload): """Send passkey to device""" @@ -461,36 +556,42 @@ def set_passkey(sniffer, payload): sniffer.sendTK(passkey) + def set_OOB(sniffer, payload): """Send OOB to device""" logging.info("Setting OOB data: " + payload) sniffer.sendTK(parse_hex(payload)) + def set_dh_private_key(sniffer, payload): """Send Diffie-Hellman private key to device""" logging.info("Setting DH private key: " + payload) sniffer.sendPrivateKey(parse_hex(payload)) + def set_legacy_ltk(sniffer, payload): """Send Legacy Long Term Key (LTK) to device""" logging.info("Setting Legacy LTK: " + payload) sniffer.sendLegacyLTK(parse_hex(payload)) + def set_sc_ltk(sniffer, payload): """Send LE secure connections Long Term Key (LTK) to device""" logging.info("Setting SC LTK: " + payload) sniffer.sendSCLTK(parse_hex(payload)) + def set_irk(sniffer, payload): """Send Identity Resolving Key (IRK) to device""" logging.info("Setting IRK: " + payload) sniffer.sendIRK(parse_hex(payload)) + def add_address(sniffer, payload): """Add LE address to device list""" logging.info("Adding LE address: " + payload) - (addr,addr_type) = payload.split(' ') + (addr, addr_type) = payload.split(" ") device = [int(a, 16) for a in addr.split(":")] device.append(1 if addr_type == "random" else 0) @@ -498,11 +599,12 @@ def add_address(sniffer, payload): new_device = Devices.Device(address=device, name='""', RSSI=0) sniffer.addDevice(new_device) + def follow_address(sniffer, payload): """Add LE address to device list""" logging.info("Adding LE address: " + payload) - (addr,addr_type) = payload.split(' ') + (addr, addr_type) = payload.split(" ") device = [int(a, 16) for a in addr.split(":")] device.append(1 if addr_type == "random" else 0) @@ -513,15 +615,16 @@ def follow_address(sniffer, payload): control_write(CTRL_ARG_DEVICE, CTRL_CMD_SET, f"{new_device.address}") follow_device(sniffer, new_device) + def set_advhop(sniffer, payload): """Set advertising channel hop sequence""" global last_used_advhop - payload = payload.decode('utf-8') + payload = payload.decode("utf-8") last_used_advhop = payload - hops = [int(channel) for channel in payload.split(',')] + hops = [int(channel) for channel in payload.split(",")] sniffer.setAdvHopSequence(hops) @@ -546,7 +649,7 @@ def error_interface_not_found(interface, fifo): def validate_interface(interface, fifo): """Check if interface exists""" - if sys.platform != 'win32' and not os.path.exists(interface): + if sys.platform != "win32" and not os.path.exists(interface): error_interface_not_found(interface, fifo) @@ -557,12 +660,13 @@ def get_default_baudrate(interface, fifo): error_interface_not_found(interface, fifo) return rates["default"] + def get_supported_protocol_version(extcap_version): """Return the maximum supported Packet Protocol Version""" - if extcap_version == 'None': + if extcap_version == "None": return 2 - (major, minor) = extcap_version.split('.') + (major, minor) = extcap_version.split(".") major = int(major) minor = int(minor) @@ -572,6 +676,7 @@ def get_supported_protocol_version(extcap_version): else: return 2 + def setup_extcap_log_handler(): """Add the a handler that emits log messages through the extcap control out channel""" global extcap_log_handler @@ -593,18 +698,18 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out): global fn_capture, fn_ctrl_in, fn_ctrl_out, write_new_packets, extcap_log_handler try: - fn_capture = open(fifo, 'wb', 0) + fn_capture = open(fifo, "wb", 0) if control_out is not None: - fn_ctrl_out = open(control_out, 'wb', 0) + fn_ctrl_out = open(control_out, "wb", 0) setup_extcap_log_handler() if control_in is not None: - fn_ctrl_in = open(control_in, 'rb', 0) + fn_ctrl_in = open(control_in, "rb", 0) logging.info("Log started at %s", time.strftime("%c")) - interface, extcap_version = interface.split('-') + interface, extcap_version = interface.split("-") logging.info("Extcap version %s", str(extcap_version)) capture_write(Pcap.get_global_header()) @@ -619,7 +724,9 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out): sniffer.subscribe("DEVICE_REMOVED", device_removed) sniffer.subscribe("DEVICES_CLEARED", devices_cleared) sniffer.setAdvHopSequence([37, 38, 39]) - sniffer.setSupportedProtocolVersion(get_supported_protocol_version(extcap_version)) + sniffer.setSupportedProtocolVersion( + get_supported_protocol_version(extcap_version) + ) logging.info("Sniffer created") logging.info("Software version: %s" % sniffer.swversion) @@ -655,7 +762,7 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out): pass except Exceptions.LockedException as e: - logging.info('{}'.format(e.message)) + logging.info("{}".format(e.message)) except OSError: # We'll get OSError=22 when/if wireshark kills the pipe(s) on capture @@ -689,14 +796,14 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out): def extcap_close_fifo(fifo): - """"Close extcap fifo""" + """ "Close extcap fifo""" if not os.path.exists(fifo): print("FIFO does not exist!", file=sys.stderr) return # This is apparently needed to workaround an issue on Windows/macOS # where the message cannot be read. (really?) - fh = open(fifo, 'wb', 0) + fh = open(fifo, "wb", 0) fh.close() @@ -705,13 +812,13 @@ class ExtcapLoggerHandler(logging.Handler): def emit(self, record): """Send log message to extcap""" - message = record.message.replace('\0', '\\0') + message = record.message.replace("\0", "\\0") log_message = f"{record.levelname}: {message}\n" control_write(CTRL_ARG_LOG, CTRL_CMD_ADD, log_message) def parse_capture_filter(capture_filter): - """"Parse given capture filter""" + """ "Parse given capture filter""" global rssi_filter m = re.search(r"^\s*rssi\s*(>=?)\s*(-?[0-9]+)\s*$", capture_filter, re.IGNORECASE) if m: @@ -720,66 +827,89 @@ def parse_capture_filter(capture_filter): print("Illegal RSSI value, must be between -10 and -256") # Handle >= by modifying the threshold, since comparisons are always done with # the > operator - if m.group(1) == '>=': + if m.group(1) == ">=": rssi_filter = rssi_filter - 1 else: - print("Filter syntax: \"RSSI >= -value\"") + print('Filter syntax: "RSSI >= -value"') + import atexit + @atexit.register def goodbye(): - logging.info("Exiting PID {}".format(os.getpid())) + logging.info("Exiting PID {}".format(os.getpid())) -if __name__ == '__main__': +if __name__ == "__main__": # Capture options - parser = argparse.ArgumentParser(description="Nordic Semiconductor nRF Sniffer for Bluetooth LE extcap plugin") + parser = argparse.ArgumentParser( + description="Nordic Semiconductor nRF Sniffer for Bluetooth LE extcap plugin" + ) # Extcap Arguments - parser.add_argument("--capture", - help="Start the capture", - action="store_true") + parser.add_argument("--capture", help="Start the capture", action="store_true") - parser.add_argument("--extcap-interfaces", - help="List available interfaces to capture from", - action="store_true") + parser.add_argument( + "--extcap-interfaces", + help="List available interfaces to capture from", + action="store_true", + ) - parser.add_argument("--extcap-interface", - help="The interface to capture from") + parser.add_argument("--extcap-interface", help="The interface to capture from") - parser.add_argument("--extcap-dlts", - help="List DLTs for the given interface", - action="store_true") + parser.add_argument( + "--extcap-dlts", help="List DLTs for the given interface", action="store_true" + ) - parser.add_argument("--extcap-config", - help="List configurations for the given interface", - action="store_true") + parser.add_argument( + "--extcap-config", + help="List configurations for the given interface", + action="store_true", + ) - parser.add_argument("--extcap-capture-filter", - help="Used together with capture to provide a capture filter") + parser.add_argument( + "--extcap-capture-filter", + help="Used together with capture to provide a capture filter", + ) - parser.add_argument("--fifo", - help="Use together with capture to provide the fifo to dump data to") + parser.add_argument( + "--fifo", help="Use together with capture to provide the fifo to dump data to" + ) - parser.add_argument("--extcap-control-in", - help="Used together with capture to get control messages from toolbar") + parser.add_argument( + "--extcap-control-in", + help="Used together with capture to get control messages from toolbar", + ) - parser.add_argument("--extcap-control-out", - help="Used together with capture to send control messages to toolbar") + parser.add_argument( + "--extcap-control-out", + help="Used together with capture to send control messages to toolbar", + ) - parser.add_argument("--extcap-version", - help="Set extcap supported version") + parser.add_argument("--extcap-version", help="Set extcap supported version") # Interface Arguments parser.add_argument("--device", help="Device", default="") parser.add_argument("--baudrate", type=int, help="The sniffer baud rate") - parser.add_argument("--only-advertising", help="Only advertising packets", action="store_true") - parser.add_argument("--only-legacy-advertising", help="Only legacy advertising packets", action="store_true") - parser.add_argument("--scan-follow-rsp", help="Find scan response data ", action="store_true") - parser.add_argument("--scan-follow-aux", help="Find auxiliary pointer data", action="store_true") - parser.add_argument("--coded", help="Scan and follow on LE Coded PHY", action="store_true") + parser.add_argument( + "--only-advertising", help="Only advertising packets", action="store_true" + ) + parser.add_argument( + "--only-legacy-advertising", + help="Only legacy advertising packets", + action="store_true", + ) + parser.add_argument( + "--scan-follow-rsp", help="Find scan response data ", action="store_true" + ) + parser.add_argument( + "--scan-follow-aux", help="Find auxiliary pointer data", action="store_true" + ) + parser.add_argument( + "--coded", help="Scan and follow on LE Coded PHY", action="store_true" + ) logging.info("Started PID {}".format(os.getpid())) @@ -839,16 +969,23 @@ if __name__ == '__main__': parser.print_help() sys.exit(ERROR_FIFO) try: - logging.info('sniffer capture') - sniffer_capture(interface, args.baudrate, args.fifo, args.extcap_control_in, args.extcap_control_out) + logging.info("sniffer capture") + sniffer_capture( + interface, + args.baudrate, + args.fifo, + args.extcap_control_in, + args.extcap_control_out, + ) except KeyboardInterrupt: pass except Exception as e: import traceback + logging.info(traceback.format_exc()) - logging.info('internal error: {}'.format(repr(e))) + logging.info("internal error: {}".format(repr(e))) sys.exit(ERROR_INTERNAL) else: parser.print_help() sys.exit(ERROR_USAGE) - logging.info('main exit PID {}'.format(os.getpid())) + logging.info("main exit PID {}".format(os.getpid()))