Apply black

This commit is contained in:
Jakob Lechner 2025-11-11 19:34:40 +01:00
parent 9dafacebda
commit cc301951fc
14 changed files with 684 additions and 387 deletions

View file

@ -59,13 +59,13 @@ class CaptureFileHandler:
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
self.filename = filename
self.backupFilename = self.filename+".1"
self.backupFilename = self.filename + ".1"
if not os.path.isfile(self.filename):
self.startNewFile()
elif os.path.getsize(self.filename) > 20000000:
self.doRollover()
if clear:
#clear file
# clear file
self.startNewFile()
def startNewFile(self):
@ -86,6 +86,6 @@ class CaptureFileHandler:
def writePacket(self, packet):
with open(self.filename, "ab") as f:
packet = Pcap.create_packet(
bytes([packet.boardId] + packet.getList()),
packet.time)
bytes([packet.boardId] + packet.getList()), packet.time
)
f.write(packet)

View file

@ -39,6 +39,7 @@
from . import Notifications
import logging, threading
class DeviceList(Notifications.Notifier):
def __init__(self, *args, **kwargs):
Notifications.Notifier.__init__(self, *args, **kwargs)
@ -52,7 +53,7 @@ class DeviceList(Notifications.Notifier):
return len(self.devices)
def __repr__(self):
return "Sniffer Device List: "+str(self.asList())
return "Sniffer Device List: " + str(self.asList())
def clear(self):
logging.info("Clearing")
@ -69,11 +70,15 @@ class DeviceList(Notifications.Notifier):
self.append(newDevice)
else:
updated = False
if (newDevice.name != "\"\"") and (existingDevice.name == "\"\""):
if (newDevice.name != '""') and (existingDevice.name == '""'):
existingDevice.name = newDevice.name
updated = True
if (newDevice.RSSI != 0 and (existingDevice.RSSI < (newDevice.RSSI - 5)) or (existingDevice.RSSI > (newDevice.RSSI+2))):
if (
newDevice.RSSI != 0
and (existingDevice.RSSI < (newDevice.RSSI - 5))
or (existingDevice.RSSI > (newDevice.RSSI + 2))
):
existingDevice.RSSI = newDevice.RSSI
updated = True
@ -93,14 +98,14 @@ class DeviceList(Notifications.Notifier):
return self.devices[id]
elif type(id) == str:
for dev in self.devices:
if dev.name in [id, '"'+id+'"']:
if dev.name in [id, '"' + id + '"']:
return dev
elif id.__class__.__name__ == "Device":
return self.find(id.address)
return None
def remove(self, id):
if type(id) == list: #address
if type(id) == list: # address
device = self.devices.pop(self.devices.index(self.find(id)))
elif type(id) == int:
device = self.devices.pop(id)
@ -126,6 +131,7 @@ class DeviceList(Notifications.Notifier):
def asList(self):
return self.devices[:]
class Device:
def __init__(self, address, name, RSSI):
self.address = address
@ -134,10 +140,11 @@ class Device:
self.followed = False
def __repr__(self):
return 'Bluetooth LE device "'+self.name+'" ('+str(self.address)+')'
return 'Bluetooth LE device "' + self.name + '" (' + str(self.address) + ")"
def listToString(list):
str = ""
for i in list:
str+=chr(i)
str += chr(i)
return str

View file

@ -38,23 +38,29 @@
class SnifferTimeout(Exception):
pass
class UARTPacketError(Exception):
pass
class LockedException(Exception):
def __init__(self, message):
self.message = message
class InvalidPacketException(Exception):
pass
class InvalidAdvChannel(Exception):
pass
# Internal Use
class SnifferWatchDogTimeout(SnifferTimeout):
pass
# Internal Use
class ExitCodeException(Exception):
pass

View file

@ -2,7 +2,7 @@ import os
import logging
from sys import platform
if platform == 'linux':
if platform == "linux":
import psutil
from . import Exceptions
@ -16,8 +16,9 @@ from . import Exceptions
# HDB UUCP lock file format:
# process identifier (PID) as a ten byte ASCII decimal number, with a trailing newline
def lockpid(lockfile):
if (os.path.isfile(lockfile)):
if os.path.isfile(lockfile):
with open(lockfile) as fd:
lockpid = fd.read()
@ -30,17 +31,13 @@ def lockpid(lockfile):
return 0
def lock(port):
if platform != 'linux':
if platform != "linux":
return
tty = os.path.basename(port)
lockfile = os.path.join(
'/run',
'user',
f'{os.getuid()}',
f'{tty}.lock'
)
lockfile = os.path.join("/run", "user", f"{os.getuid()}", f"{tty}.lock")
lockedpid = lockpid(lockfile)
if lockedpid:
@ -53,16 +50,17 @@ def lock(port):
logging.info("Lockfile is stale. Overriding it..")
os.remove(lockfile)
fd = open(lockfile, 'w')
with open(lockfile, 'w') as fd:
fd.write(f'{os.getpid():10}')
fd = open(lockfile, "w")
with open(lockfile, "w") as fd:
fd.write(f"{os.getpid():10}")
def unlock(port):
if platform != 'linux':
if platform != "linux":
return
tty = os.path.basename(port)
lockfile = f'/var/lock/LCK..{tty}'
lockfile = f"/var/lock/LCK..{tty}"
lockedpid = lockpid(lockfile)
if lockedpid == os.getpid():

View file

@ -48,9 +48,11 @@ import logging.handlers as logHandlers
# will result in the line being appended to the log file #
#################################################################
appdata = os.getenv('appdata')
appdata = os.getenv("appdata")
if appdata:
DEFAULT_LOG_FILE_DIR = os.path.join(appdata, 'Nordic Semiconductor', 'Sniffer', 'logs')
DEFAULT_LOG_FILE_DIR = os.path.join(
appdata, "Nordic Semiconductor", "Sniffer", "logs"
)
else:
DEFAULT_LOG_FILE_DIR = "/tmp/logs"
@ -89,8 +91,12 @@ def initLogger():
global logFlusher
global logHandlerArray
logHandler = MyRotatingFileHandler(logFileName, mode='a', maxBytes=myMaxBytes, backupCount=3)
logFormatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s', datefmt='%d-%b-%Y %H:%M:%S (%z)')
logHandler = MyRotatingFileHandler(
logFileName, mode="a", maxBytes=myMaxBytes, backupCount=3
)
logFormatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s", datefmt="%d-%b-%Y %H:%M:%S (%z)"
)
logHandler.setFormatter(logFormatter)
logger = logging.getLogger()
logger.addHandler(logHandler)
@ -154,6 +160,7 @@ def addLogHandler(logHandler):
logger.setLevel(logging.INFO)
logHandlerArray.append(logHandler)
def removeLogHandler(logHandler):
global logHandlerArray
logger = logging.getLogger()
@ -200,7 +207,7 @@ class LogFlusher(threading.Thread):
self.exit.set()
if __name__ == '__main__':
if __name__ == "__main__":
initLogger()
for i in range(50):
logging.info("test log no. " + str(i))

View file

@ -37,18 +37,20 @@
import threading, logging
class Notification():
def __init__(self, key, msg = None):
class Notification:
def __init__(self, key, msg=None):
if type(key) is not str:
raise TypeError("Invalid notification key: "+str(key))
raise TypeError("Invalid notification key: " + str(key))
self.key = key
self.msg = msg
def __repr__(self):
return "Notification (key: %s, msg: %s)" % (str(self.key), str(self.msg))
class Notifier():
def __init__(self, callbacks = [], **kwargs):
class Notifier:
def __init__(self, callbacks=[], **kwargs):
self.callbacks = {}
self.callbackLock = threading.RLock()
@ -75,10 +77,10 @@ class Notifier():
self.callbacks[key] = []
return self.callbacks[key]
def notify(self, key = None, msg = None, notification = None):
def notify(self, key=None, msg=None, notification=None):
with self.callbackLock:
if notification == None:
notification = Notification(key,msg)
notification = Notification(key, msg)
for callback in self.getCallbacks(notification.key):
callback(notification)
@ -87,4 +89,4 @@ class Notifier():
callback(notification)
def passOnNotification(self, notification):
self.notify(notification = notification)
self.notify(notification=notification)

View file

@ -43,17 +43,17 @@ ADV_ACCESS_ADDRESS = [0xD6, 0xBE, 0x89, 0x8E]
SYNCWORD_POS = 0
PAYLOAD_LEN_POS_V1 = 1
PAYLOAD_LEN_POS = 0
PROTOVER_POS = PAYLOAD_LEN_POS+2
PACKETCOUNTER_POS = PROTOVER_POS+1
ID_POS = PACKETCOUNTER_POS+2
PROTOVER_POS = PAYLOAD_LEN_POS + 2
PACKETCOUNTER_POS = PROTOVER_POS + 1
ID_POS = PACKETCOUNTER_POS + 2
BLE_HEADER_LEN_POS = ID_POS+1
FLAGS_POS = BLE_HEADER_LEN_POS+1
CHANNEL_POS = FLAGS_POS+1
RSSI_POS = CHANNEL_POS+1
EVENTCOUNTER_POS = RSSI_POS+1
TIMESTAMP_POS = EVENTCOUNTER_POS+2
BLEPACKET_POS = TIMESTAMP_POS+4
BLE_HEADER_LEN_POS = ID_POS + 1
FLAGS_POS = BLE_HEADER_LEN_POS + 1
CHANNEL_POS = FLAGS_POS + 1
RSSI_POS = CHANNEL_POS + 1
EVENTCOUNTER_POS = RSSI_POS + 1
TIMESTAMP_POS = EVENTCOUNTER_POS + 2
BLEPACKET_POS = TIMESTAMP_POS + 4
TXADD_POS = BLEPACKET_POS + 4
TXADD_MSK = 0x40
PAYLOAD_POS = BLE_HEADER_LEN_POS
@ -119,11 +119,15 @@ class PacketReader(Notifications.Notifier):
if complete_timeout is not None:
time_start = time.time()
while not startOfPacket and (complete_timeout is None or (time.time() - time_start < complete_timeout)):
while not startOfPacket and (
complete_timeout is None or (time.time() - time_start < complete_timeout)
):
res = self.getSerialByte(timeout)
startOfPacket = (res == SLIP_START)
startOfPacket = res == SLIP_START
while not endOfPacket and (complete_timeout is None or (time.time() - time_start < complete_timeout)):
while not endOfPacket and (
complete_timeout is None or (time.time() - time_start < complete_timeout)
):
serialByte = self.getSerialByte(timeout)
if serialByte == SLIP_END:
endOfPacket = True
@ -138,9 +142,11 @@ class PacketReader(Notifications.Notifier):
else:
dataBuffer.append(SLIP_END)
else:
dataBuffer.append(serialByte)
dataBuffer.append(serialByte)
if not endOfPacket:
raise Exceptions.UARTPacketError("Exceeded max timeout of %f seconds." % complete_timeout)
raise Exceptions.UARTPacketError(
"Exceeded max timeout of %f seconds." % complete_timeout
)
return dataBuffer
# This function read byte chuncks from the serial port and return one byte at a time
@ -153,13 +159,23 @@ class PacketReader(Notifications.Notifier):
def handlePacketHistory(self, packet):
# Reads and validates packet counter
if self.lastReceivedPacket is not None \
and packet.packetCounter != (self.lastReceivedPacket.packetCounter + 1) % PACKET_COUNTER_CAP \
and self.lastReceivedPacket.packetCounter != 0:
if (
self.lastReceivedPacket is not None
and packet.packetCounter
!= (self.lastReceivedPacket.packetCounter + 1) % PACKET_COUNTER_CAP
and self.lastReceivedPacket.packetCounter != 0
):
logging.info("gap in packets, between " + str(self.lastReceivedPacket.packetCounter) + " and "
+ str(packet.packetCounter) + " packet before: " + str(self.lastReceivedPacket.packetList)
+ " packet after: " + str(packet.packetList))
logging.info(
"gap in packets, between "
+ str(self.lastReceivedPacket.packetCounter)
+ " and "
+ str(packet.packetCounter)
+ " packet before: "
+ str(self.lastReceivedPacket.packetList)
+ " packet after: "
+ str(packet.packetList)
)
self.lastReceivedPacket = packet
if packet.id in [EVENT_PACKET_DATA_PDU, EVENT_PACKET_ADV_PDU]:
@ -198,25 +214,34 @@ class PacketReader(Notifications.Notifier):
# Convert time-stamp to End to Start delta
time_delta = 0
if self.lastReceivedTimestampPacket is not None and self.lastReceivedTimestampPacket.valid:
time_delta = (packet.timestamp -
(self.lastReceivedTimestampPacket.timestamp +
self.getPacketTime(self.lastReceivedTimestampPacket)))
if (
self.lastReceivedTimestampPacket is not None
and self.lastReceivedTimestampPacket.valid
):
time_delta = packet.timestamp - (
self.lastReceivedTimestampPacket.timestamp
+ self.getPacketTime(self.lastReceivedTimestampPacket)
)
time_delta = toLittleEndian(time_delta, 4)
packet.packetList[TIMESTAMP_POS ] = time_delta[0]
packet.packetList[TIMESTAMP_POS+1] = time_delta[1]
packet.packetList[TIMESTAMP_POS+2] = time_delta[2]
packet.packetList[TIMESTAMP_POS+3] = time_delta[3]
packet.packetList[TIMESTAMP_POS] = time_delta[0]
packet.packetList[TIMESTAMP_POS + 1] = time_delta[1]
packet.packetList[TIMESTAMP_POS + 2] = time_delta[2]
packet.packetList[TIMESTAMP_POS + 3] = time_delta[3]
def handlePacketCompatibility(self, packet):
if self.supportedProtocolVersion == PROTOVER_V2 and packet.packetList[PROTOVER_POS] > PROTOVER_V2:
if (
self.supportedProtocolVersion == PROTOVER_V2
and packet.packetList[PROTOVER_POS] > PROTOVER_V2
):
self.convertPacketListProtoVer2(packet)
def setSupportedProtocolVersion(self, supportedProtocolVersion):
if (supportedProtocolVersion != PROTOVER_V3):
logging.info("Using packet compatibility, converting packets to protocol version %d", supportedProtocolVersion)
if supportedProtocolVersion != PROTOVER_V3:
logging.info(
"Using packet compatibility, converting packets to protocol version %d",
supportedProtocolVersion,
)
self.supportedProtocolVersion = supportedProtocolVersion
def getPacket(self, timeout=None):
@ -234,17 +259,30 @@ class PacketReader(Notifications.Notifier):
return packet
def sendPacket(self, id, payload):
packetList = [HEADER_LENGTH] + [len(payload)] + [PROTOVER_V1] + toLittleEndian(self.packetCounter, 2) + [id] + payload
packetList = (
[HEADER_LENGTH]
+ [len(payload)]
+ [PROTOVER_V1]
+ toLittleEndian(self.packetCounter, 2)
+ [id]
+ payload
)
packetList = self.encodeToSLIP(packetList)
self.packetCounter += 1
self.uart.writeList(packetList)
def sendScan(self, findScanRsp = False, findAux = False, scanCoded = False):
def sendScan(self, findScanRsp=False, findAux=False, scanCoded=False):
flags0 = findScanRsp | (findAux << 1) | (scanCoded << 2)
self.sendPacket(REQ_SCAN_CONT, [flags0])
logging.info("Scan flags: %s" % bin(flags0))
def sendFollow(self, addr, followOnlyAdvertisements = False, followOnlyLegacy = False, followCoded = False):
def sendFollow(
self,
addr,
followOnlyAdvertisements=False,
followOnlyLegacy=False,
followCoded=False,
):
flags0 = followOnlyAdvertisements | (followOnlyLegacy << 1) | (followCoded << 2)
logging.info("Follow flags: %s" % bin(flags0))
self.sendPacket(REQ_FOLLOW, addr + [flags0])
@ -253,7 +291,7 @@ class PacketReader(Notifications.Notifier):
self.sendPacket(PING_REQ, [])
def getBytes(self, value, size):
if (len(value) < size):
if len(value) < size:
value = [0] * (size - len(value)) + value
else:
value = value[:size]
@ -294,10 +332,12 @@ class PacketReader(Notifications.Notifier):
def sendHopSequence(self, hopSequence):
for chan in hopSequence:
if chan not in VALID_ADV_CHANS:
raise Exceptions.InvalidAdvChannel("%s is not an adv channel" % str(chan))
payload = [len(hopSequence)] + hopSequence + [37]*(3-len(hopSequence))
raise Exceptions.InvalidAdvChannel(
"%s is not an adv channel" % str(chan)
)
payload = [len(hopSequence)] + hopSequence + [37] * (3 - len(hopSequence))
self.sendPacket(SET_ADV_CHANNEL_HOP_SEQ, payload)
self.notify("NEW_ADV_HOP_SEQ", {"hopSequence":hopSequence})
self.notify("NEW_ADV_HOP_SEQ", {"hopSequence": hopSequence})
def sendVersionReq(self):
self.sendPacket(REQ_VERSION, [])
@ -313,21 +353,31 @@ class Packet:
def __init__(self, packetList):
try:
if not packetList:
raise Exceptions.InvalidPacketException("packet list not valid: %s" % str(packetList))
raise Exceptions.InvalidPacketException(
"packet list not valid: %s" % str(packetList)
)
self.protover = packetList[PROTOVER_POS]
if self.protover > PROTOVER_V3:
logging.exception("Unsupported protocol version %s" % str(self.protover))
raise RuntimeError("Unsupported protocol version %s" % str(self.protover))
logging.exception(
"Unsupported protocol version %s" % str(self.protover)
)
raise RuntimeError(
"Unsupported protocol version %s" % str(self.protover)
)
self.packetCounter = parseLittleEndian(packetList[PACKETCOUNTER_POS:PACKETCOUNTER_POS + 2])
self.packetCounter = parseLittleEndian(
packetList[PACKETCOUNTER_POS : PACKETCOUNTER_POS + 2]
)
self.id = packetList[ID_POS]
if int(self.protover) == PROTOVER_V1:
self.payloadLength = packetList[PAYLOAD_LEN_POS_V1]
else:
self.payloadLength = parseLittleEndian(packetList[PAYLOAD_LEN_POS:PAYLOAD_LEN_POS + 2])
self.payloadLength = parseLittleEndian(
packetList[PAYLOAD_LEN_POS : PAYLOAD_LEN_POS + 2]
)
self.packetList = packetList
self.readPayload(packetList)
@ -337,24 +387,26 @@ class Packet:
self.OK = False
self.valid = False
except Exception as e:
logging.exception("packet creation error %s" %str(e))
logging.exception("packet creation error %s" % str(e))
logging.info("packetList: " + str(packetList))
self.OK = False
self.valid = False
def __repr__(self):
return "UART packet, type: "+str(self.id)+", PC: "+str(self.packetCounter)
return "UART packet, type: " + str(self.id) + ", PC: " + str(self.packetCounter)
def readPayload(self, packetList):
self.blePacket = None
self.OK = False
if not self.validatePacketList(packetList):
raise Exceptions.InvalidPacketException("packet list not valid: %s" % str(packetList))
raise Exceptions.InvalidPacketException(
"packet list not valid: %s" % str(packetList)
)
else:
self.valid = True
self.payload = packetList[PAYLOAD_POS:PAYLOAD_POS+self.payloadLength]
self.payload = packetList[PAYLOAD_POS : PAYLOAD_POS + self.payloadLength]
if self.id == EVENT_PACKET_ADV_PDU or self.id == EVENT_PACKET_DATA_PDU:
try:
@ -365,23 +417,27 @@ class Packet:
self.channel = packetList[CHANNEL_POS]
self.rawRSSI = packetList[RSSI_POS]
self.RSSI = -self.rawRSSI
self.eventCounter = parseLittleEndian(packetList[EVENTCOUNTER_POS:EVENTCOUNTER_POS+2])
self.eventCounter = parseLittleEndian(
packetList[EVENTCOUNTER_POS : EVENTCOUNTER_POS + 2]
)
self.timestamp = parseLittleEndian(packetList[TIMESTAMP_POS:TIMESTAMP_POS+4])
self.timestamp = parseLittleEndian(
packetList[TIMESTAMP_POS : TIMESTAMP_POS + 4]
)
# The hardware adds a padding byte which isn't sent on air.
# We remove it, and update the payload length in the packet list.
if self.phy == PHY_CODED:
self.packetList.pop(BLEPACKET_POS+6+1)
self.packetList.pop(BLEPACKET_POS + 6 + 1)
else:
self.packetList.pop(BLEPACKET_POS+6)
self.packetList.pop(BLEPACKET_POS + 6)
self.payloadLength -= 1
if self.protover >= PROTOVER_V2:
# Write updated payload length back to the packet list.
payloadLength = toLittleEndian(self.payloadLength, 2)
packetList[PAYLOAD_LEN_POS ] = payloadLength[0]
packetList[PAYLOAD_LEN_POS+1] = payloadLength[1]
else: # PROTOVER_V1
packetList[PAYLOAD_LEN_POS] = payloadLength[0]
packetList[PAYLOAD_LEN_POS + 1] = payloadLength[1]
else: # PROTOVER_V1
packetList[PAYLOAD_LEN_POS_V1] = self.payloadLength
else:
logging.info("Invalid BLE Header Length " + str(packetList))
@ -390,15 +446,22 @@ class Packet:
if self.OK:
try:
if self.protover >= PROTOVER_V3:
packet_type = (PACKET_TYPE_ADVERTISING
if self.id == EVENT_PACKET_ADV_PDU else
PACKET_TYPE_DATA)
packet_type = (
PACKET_TYPE_ADVERTISING
if self.id == EVENT_PACKET_ADV_PDU
else PACKET_TYPE_DATA
)
else:
packet_type = (PACKET_TYPE_ADVERTISING
if packetList[BLEPACKET_POS : BLEPACKET_POS + 4] == ADV_ACCESS_ADDRESS else
PACKET_TYPE_DATA)
packet_type = (
PACKET_TYPE_ADVERTISING
if packetList[BLEPACKET_POS : BLEPACKET_POS + 4]
== ADV_ACCESS_ADDRESS
else PACKET_TYPE_DATA
)
self.blePacket = BlePacket(packet_type, packetList[BLEPACKET_POS:], self.phy)
self.blePacket = BlePacket(
packet_type, packetList[BLEPACKET_POS:], self.phy
)
except Exception as e:
logging.exception("blePacket error %s" % str(e))
except Exception as e:
@ -407,13 +470,17 @@ class Packet:
self.OK = False
elif self.id == PING_RESP:
if self.protover < PROTOVER_V3:
self.version = parseLittleEndian(packetList[PAYLOAD_POS:PAYLOAD_POS+2])
self.version = parseLittleEndian(
packetList[PAYLOAD_POS : PAYLOAD_POS + 2]
)
elif self.id == RESP_VERSION:
self.version = ''.join([chr(i) for i in packetList[PAYLOAD_POS:]])
self.version = "".join([chr(i) for i in packetList[PAYLOAD_POS:]])
elif self.id == RESP_TIMESTAMP:
self.timestamp = parseLittleEndian(packetList[PAYLOAD_POS:PAYLOAD_POS+4])
self.timestamp = parseLittleEndian(
packetList[PAYLOAD_POS : PAYLOAD_POS + 4]
)
elif self.id == SWITCH_BAUD_RATE_RESP or self.id == SWITCH_BAUD_RATE_REQ:
self.baudRate = parseLittleEndian(packetList[PAYLOAD_POS:PAYLOAD_POS+4])
self.baudRate = parseLittleEndian(packetList[PAYLOAD_POS : PAYLOAD_POS + 4])
else:
logging.info("Unknown packet ID")
@ -438,7 +505,8 @@ class Packet:
logging.exception("Invalid packet: %s" % str(packetList))
return False
class BlePacket():
class BlePacket:
def __init__(self, type, packetList, phy):
self.type = type
@ -458,12 +526,11 @@ class BlePacket():
offset = self.extractAddresses(packetList, offset)
self.extractName(packetList, offset)
def __repr__(self):
return "BLE packet, AAddr: "+str(self.accessAddress)
return "BLE packet, AAddr: " + str(self.accessAddress)
def extractAccessAddress(self, packetList, offset):
self.accessAddress = packetList[offset:offset+4]
self.accessAddress = packetList[offset : offset + 4]
return offset + 4
def extractFormat(self, packetList, phy, offset):
@ -497,29 +564,29 @@ class BlePacket():
scanAddr = None
if self.advType in [0, 1, 2, 4, 6]:
addr = packetList[offset:offset+6]
addr = packetList[offset : offset + 6]
addr.reverse()
addr += [self.txAddrType]
offset += 6
if self.advType in [3, 5]:
scanAddr = packetList[offset:offset+6]
scanAddr = packetList[offset : offset + 6]
scanAddr.reverse()
scanAddr += [self.txAddrType]
offset += 6
addr = packetList[offset:offset+6]
addr = packetList[offset : offset + 6]
addr.reverse()
addr += [self.rxAddrType]
offset += 6
if self.advType == 1:
scanAddr = packetList[offset:offset+6]
scanAddr = packetList[offset : offset + 6]
scanAddr.reverse()
scanAddr += [self.rxAddrType]
offset += 6
if self.advType == 7:
ext_header_len = packetList[offset] & 0x3f
ext_header_len = packetList[offset] & 0x3F
offset += 1
ext_header_offset = offset
@ -527,13 +594,13 @@ class BlePacket():
ext_header_offset += 1
if flags & 0x01:
addr = packetList[ext_header_offset:ext_header_offset+6]
addr = packetList[ext_header_offset : ext_header_offset + 6]
addr.reverse()
addr += [self.txAddrType]
ext_header_offset += 6
if flags & 0x02:
scanAddr = packetList[ext_header_offset:ext_header_offset+6]
scanAddr = packetList[ext_header_offset : ext_header_offset + 6]
scanAddr.reverse()
scanAddr += [self.rxAddrType]
ext_header_offset += 6
@ -550,17 +617,17 @@ class BlePacket():
i = offset
while i < len(packetList):
length = packetList[i]
if (i+length+1) > len(packetList) or length == 0:
if (i + length + 1) > len(packetList) or length == 0:
break
type = packetList[i+1]
type = packetList[i + 1]
if type == 8 or type == 9:
nameList = packetList[i+2:i+length+1]
nameList = packetList[i + 2 : i + length + 1]
name = ""
for j in nameList:
name += chr(j)
i += (length+1)
name = '"'+name+'"'
elif (self.advType == 1):
i += length + 1
name = '"' + name + '"'
elif self.advType == 1:
name = "[ADV_DIRECT_IND]"
self.name = name
@ -569,15 +636,16 @@ class BlePacket():
self.length = packetList[offset]
return offset + 1
def parseLittleEndian(list):
total = 0
for i in range(len(list)):
total+=(list[i] << (8*i))
total += list[i] << (8 * i)
return total
def toLittleEndian(value, size):
list = [0]*size
for i in range(size):
list[i] = (value >> (i*8)) % 256
return list
def toLittleEndian(value, size):
list = [0] * size
for i in range(size):
list[i] = (value >> (i * 8)) % 256
return list

View file

@ -44,14 +44,16 @@ import struct
# - https://github.com/pcapng/pcapng
# - https://www.tcpdump.org/linktypes/LINKTYPE_NORDIC_BLE.html
PACKET_HEADER = struct.Struct("<LLLL")
GLOBAL_HEADER = struct.pack("<LHHIILL",
0xa1b2c3d4, # PCAP magic number
2, # PCAP major version
4, # PCAP minor version
0, # Reserved
0, # Reserved
0x0000ffff, # Max length of capture frame
272) # Nordic BLE link type
GLOBAL_HEADER = struct.pack(
"<LHHIILL",
0xA1B2C3D4, # PCAP magic number
2, # PCAP major version
4, # PCAP minor version
0, # Reserved
0, # Reserved
0x0000FFFF, # Max length of capture frame
272,
) # Nordic BLE link type
def get_global_header():
@ -72,8 +74,9 @@ def create_packet(packet: bytes, timestamp_seconds: float):
timestamp_floor = int(timestamp_seconds)
timestamp_offset_us = int((timestamp_seconds - timestamp_floor) * 1_000_000)
return struct.pack("<LLLL",
timestamp_floor,
timestamp_offset_us,
len(packet),
len(packet)) + packet
return (
struct.pack(
"<LLLL", timestamp_floor, timestamp_offset_us, len(packet), len(packet)
)
+ packet
)

View file

@ -39,6 +39,7 @@ from . import Logger
from . import UART
from .Types import *
try:
from .version import VERSION_STRING
except:
@ -55,25 +56,28 @@ def initLog():
initLog()
import sys, os, threading
from . import SnifferCollector
class Sniffer(threading.Thread, SnifferCollector.SnifferCollector):
# Sniffer constructor. portnum argument is optional. If not provided,
# the software will try to locate the firwmare automatically (may take time).
# NOTE: portnum is 0-indexed, while Windows names are 1-indexed
def __init__(self, portnum=None, baudrate=UART.SNIFFER_OLD_DEFAULT_BAUDRATE, **kwargs):
def __init__(
self, portnum=None, baudrate=UART.SNIFFER_OLD_DEFAULT_BAUDRATE, **kwargs
):
threading.Thread.__init__(self)
SnifferCollector.SnifferCollector.__init__(self, portnum, baudrate=baudrate, **kwargs)
SnifferCollector.SnifferCollector.__init__(
self, portnum, baudrate=baudrate, **kwargs
)
self.daemon = True
self.subscribe("COMPORT_FOUND", self.comPortFound)
# API STARTS HERE
# Get [number] number of packets since last fetch (-1 means all)
# Note that the packet buffer is limited to about 80000 packets.
# Returns: A list of Packet objects
@ -93,8 +97,16 @@ class Sniffer(threading.Thread, SnifferCollector.SnifferCollector):
# "device" argument is of type Device
# if "followOnlyAdvertisements" is True, the sniffer will not follow the device into a connection.
# Returns nothing
def follow(self, device=None, followOnlyAdvertisements = False, followOnlyLegacy = False, followCoded = False):
self._startFollowing(device, followOnlyAdvertisements, followOnlyLegacy, followCoded)
def follow(
self,
device=None,
followOnlyAdvertisements=False,
followOnlyLegacy=False,
followCoded=False,
):
self._startFollowing(
device, followOnlyAdvertisements, followOnlyLegacy, followCoded
)
# Clear the list of devices
def clearDevices(self):
@ -103,7 +115,7 @@ class Sniffer(threading.Thread, SnifferCollector.SnifferCollector):
# Signal the Sniffer to scan for advertising devices by sending the REQ_SCAN_CONT UART packet.
# This will cause it to stop sniffing any device it is sniffing at the moment.
# Returns nothing.
def scan(self, findScanRsp = False, findAux = False, scanCoded = False):
def scan(self, findScanRsp=False, findAux=False, scanCoded=False):
self._startScanning(findScanRsp, findAux, scanCoded)
# Send a temporary key to the sniffer to use when decrypting encrypted communication.
@ -217,20 +229,30 @@ class Sniffer(threading.Thread, SnifferCollector.SnifferCollector):
try:
self._setup()
self.runSniffer()
except (KeyboardInterrupt) as e:
except KeyboardInterrupt as e:
_, _, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
lineno = exc_tb.tb_lineno
logging.info("exiting ("+str(type(e))+" in "+fname+" at "+str(lineno)+"): "+str(e))
logging.info(
"exiting ("
+ str(type(e))
+ " in "
+ fname
+ " at "
+ str(lineno)
+ "): "
+ str(e)
)
self.goodExit = False
except (BrokenPipeError, OSError):
logging.info("capture pipe closed before sniffer thread was stopped")
self.goodExit = True
except Exception as e:
import traceback
logging.exception("CRASH: {}".format(e))
logging.exception(traceback.format_exc())
logging.exception('internal error: {}'.format(repr(e)))
logging.exception("internal error: {}".format(repr(e)))
self.goodExit = False
else:
self.goodExit = True

View file

@ -43,21 +43,25 @@ STATE_INITIALIZING = 0
STATE_SCANNING = 1
STATE_FOLLOWING = 2
class SnifferCollector(Notifications.Notifier):
def __init__(self, portnum=None, baudrate=None, *args, **kwargs):
Notifications.Notifier.__init__(self, *args, **kwargs)
self._portnum = portnum
self._fwversion = "Unknown version"
self._setState(STATE_INITIALIZING)
self._captureHandler = CaptureFiles.CaptureFileHandler(capture_file_path=kwargs.get("capture_file_path", None))
self._captureHandler = CaptureFiles.CaptureFileHandler(
capture_file_path=kwargs.get("capture_file_path", None)
)
self._exit = False
self._connectionAccessAddress = None
self._packetListLock = threading.RLock()
with self._packetListLock:
self._packets = []
self._packetReader = Packet.PacketReader(self._portnum, baudrate=baudrate,
callbacks=[("*", self.passOnNotification)])
self._packetReader = Packet.PacketReader(
self._portnum, baudrate=baudrate, callbacks=[("*", self.passOnNotification)]
)
self._devices = Devices.DeviceList(callbacks=[("*", self.passOnNotification)])
self._missedPackets = 0
@ -84,10 +88,10 @@ class SnifferCollector(Notifications.Notifier):
def _makeBoardId(self):
try:
if sys.platform == 'win32':
if sys.platform == "win32":
boardId = int(self._packetReader.portnum.split("COM")[1])
logging.info("board ID: %d" % boardId)
elif sys.platform == 'linux':
elif sys.platform == "linux":
boardId = int(self._packetReader.portnum.split("ttyACM")[1])
logging.info("board ID: %d" % boardId)
else:
@ -95,8 +99,9 @@ class SnifferCollector(Notifications.Notifier):
raise IndexError()
except (IndexError, AttributeError):
import random
random.seed()
boardId = random.randint(0,255)
boardId = random.randint(0, 255)
logging.info("board ID (random): %d" % boardId)
return boardId
@ -131,7 +136,7 @@ class SnifferCollector(Notifications.Notifier):
if packet.timestamp < self._last_timestamp:
time_diff = (1 << 32) - (self._last_timestamp - packet.timestamp)
else:
time_diff = (packet.timestamp - self._last_timestamp)
time_diff = packet.timestamp - self._last_timestamp
packet.time = self._last_time + (time_diff / 1_000_000)
@ -155,16 +160,25 @@ class SnifferCollector(Notifications.Notifier):
self._connectionAccessAddress = packet.blePacket.accessAddress
if self.state == STATE_FOLLOWING and packet.blePacket.advType == 4:
newDevice = Devices.Device(address=packet.blePacket.advAddress, name=packet.blePacket.name, RSSI=packet.RSSI)
newDevice = Devices.Device(
address=packet.blePacket.advAddress,
name=packet.blePacket.name,
RSSI=packet.RSSI,
)
self._devices.appendOrUpdate(newDevice)
if self.state == STATE_SCANNING:
if (packet.blePacket.advType in [0, 1, 2, 4, 6, 7] and
packet.blePacket.advAddress != None and
packet.crcOK and
not packet.direction
):
newDevice = Devices.Device(address=packet.blePacket.advAddress, name=packet.blePacket.name, RSSI=packet.RSSI)
if (
packet.blePacket.advType in [0, 1, 2, 4, 6, 7]
and packet.blePacket.advAddress != None
and packet.crcOK
and not packet.direction
):
newDevice = Devices.Device(
address=packet.blePacket.advAddress,
name=packet.blePacket.name,
RSSI=packet.RSSI,
)
self._devices.appendOrUpdate(newDevice)
except Exception as e:
@ -187,7 +201,10 @@ class SnifferCollector(Notifications.Notifier):
except Exceptions.InvalidPacketException:
pass
else:
if packet.id == EVENT_PACKET_DATA_PDU or packet.id == EVENT_PACKET_ADV_PDU:
if (
packet.id == EVENT_PACKET_DATA_PDU
or packet.id == EVENT_PACKET_ADV_PDU
):
self._processBLEPacket(packet)
elif packet.id == EVENT_FOLLOW:
# This packet has no value for the user.
@ -195,26 +212,36 @@ class SnifferCollector(Notifications.Notifier):
elif packet.id == EVENT_CONNECT:
self._connectEventPacketCounterValue = packet.packetCounter
self._inConnection = True
# copy it because packets are eventually deleted
self._currentConnectRequest = copy.copy(self._findPacketByPacketCounter(self._connectEventPacketCounterValue-1))
# copy it because packets are eventually deleted
self._currentConnectRequest = copy.copy(
self._findPacketByPacketCounter(
self._connectEventPacketCounterValue - 1
)
)
elif packet.id == EVENT_DISCONNECT:
if self._inConnection:
self._packetsInLastConnection = packet.packetCounter - self._connectEventPacketCounterValue
self._packetsInLastConnection = (
packet.packetCounter - self._connectEventPacketCounterValue
)
self._inConnection = False
elif packet.id == SWITCH_BAUD_RATE_RESP and self._switchingBaudRate:
self._switchingBaudRate = False
if (packet.baudRate == self._proposedBaudRate):
if packet.baudRate == self._proposedBaudRate:
self._packetReader.switchBaudRate(self._proposedBaudRate)
else:
self._switchBaudRate(packet.baudRate)
elif packet.id == PING_RESP:
if hasattr(packet, 'version'):
versions = { 1116: '3.1.0',
1115: '3.0.0',
1114: '2.0.0',
1113: '2.0.0-beta-3',
1112: '2.0.0-beta-1' }
self._fwversion = versions.get(packet.version, 'SVN rev: %d' % packet.version)
if hasattr(packet, "version"):
versions = {
1116: "3.1.0",
1115: "3.0.0",
1114: "2.0.0",
1113: "2.0.0-beta-3",
1112: "2.0.0-beta-1",
}
self._fwversion = versions.get(
packet.version, "SVN rev: %d" % packet.version
)
logging.info("Firmware version %s" % self._fwversion)
elif packet.id == RESP_VERSION:
self._fwversion = packet.version
@ -226,20 +253,22 @@ class SnifferCollector(Notifications.Notifier):
lt = time.localtime(self._last_time)
usecs = int((self._last_time - int(self._last_time)) * 1_000_000)
logging.info(f'Firmware timestamp {self._last_timestamp} reference: '
f'{time.strftime("%b %d %Y %X", lt)}.{usecs} {time.strftime("%Z", lt)}')
logging.info(
f"Firmware timestamp {self._last_timestamp} reference: "
f'{time.strftime("%b %d %Y %X", lt)}.{usecs} {time.strftime("%Z", lt)}'
)
else:
logging.info("Unknown packet ID")
def _findPacketByPacketCounter(self, packetCounterValue):
with self._packetListLock:
for i in range(-1, -1-len(self._packets), -1):
# iterate backwards through packets
for i in range(-1, -1 - len(self._packets), -1):
# iterate backwards through packets
if self._packets[i].packetCounter == packetCounterValue:
return self._packets[i]
return None
def _startScanning(self, findScanRsp = False, findAux = False, scanCoded = False):
def _startScanning(self, findScanRsp=False, findAux=False, scanCoded=False):
logging.info("starting scan")
if self.state == STATE_FOLLOWING:
@ -257,10 +286,24 @@ class SnifferCollector(Notifications.Notifier):
self.clearCallbacks()
self._devices.clearCallbacks()
def _startFollowing(self, device, followOnlyAdvertisements = False, followOnlyLegacy = False, followCoded = False):
def _startFollowing(
self,
device,
followOnlyAdvertisements=False,
followOnlyLegacy=False,
followCoded=False,
):
self._devices.setFollowed(device)
logging.info("Sniffing device " + str(self._devices.index(device)) + ' - "'+device.name+'"')
self._packetReader.sendFollow(device.address, followOnlyAdvertisements, followOnlyLegacy, followCoded)
logging.info(
"Sniffing device "
+ str(self._devices.index(device))
+ ' - "'
+ device.name
+ '"'
)
self._packetReader.sendFollow(
device.address, followOnlyAdvertisements, followOnlyLegacy, followCoded
)
self._setState(STATE_FOLLOWING)
def _clearDevices(self):
@ -272,7 +315,7 @@ class SnifferCollector(Notifications.Notifier):
self._packets = self._packets[20000:]
self._packets.append(packet)
def _getPackets(self, number = -1):
def _getPackets(self, number=-1):
with self._packetListLock:
returnList = self._packets[0:number]
self._packets = self._packets[number:]

View file

@ -35,56 +35,56 @@
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
SLIP_START = 0xAB
SLIP_END = 0xBC
SLIP_ESC = 0xCD
SLIP_END = 0xBC
SLIP_ESC = 0xCD
SLIP_ESC_START = SLIP_START + 1
SLIP_ESC_END = SLIP_END + 1
SLIP_ESC_ESC = SLIP_ESC + 1
SLIP_ESC_END = SLIP_END + 1
SLIP_ESC_ESC = SLIP_ESC + 1
PROTOVER_V3 = 3
PROTOVER_V2 = 2
PROTOVER_V1 = 1
PROTOVER_V3 = 3
PROTOVER_V2 = 2
PROTOVER_V1 = 1
# UART protocol packet codes (see sniffer_uart_protocol.pdf)
REQ_FOLLOW = 0x00
EVENT_FOLLOW = 0x01
EVENT_PACKET_ADV_PDU = 0x02
EVENT_CONNECT = 0x05
EVENT_PACKET_DATA_PDU = 0x06
REQ_SCAN_CONT = 0x07
EVENT_DISCONNECT = 0x09
SET_TEMPORARY_KEY = 0x0C
PING_REQ = 0x0D
PING_RESP = 0x0E
SWITCH_BAUD_RATE_REQ = 0x13
SWITCH_BAUD_RATE_RESP = 0x14
SET_ADV_CHANNEL_HOP_SEQ = 0x17
SET_PRIVATE_KEY = 0x18
SET_LEGACY_LONG_TERM_KEY = 0x19
SET_SC_LONG_TERM_KEY = 0x1A
REQ_VERSION = 0x1B
RESP_VERSION = 0x1C
REQ_TIMESTAMP = 0x1D
RESP_TIMESTAMP = 0x1E
SET_IDENTITY_RESOLVING_KEY= 0x1F
GO_IDLE = 0xFE
REQ_FOLLOW = 0x00
EVENT_FOLLOW = 0x01
EVENT_PACKET_ADV_PDU = 0x02
EVENT_CONNECT = 0x05
EVENT_PACKET_DATA_PDU = 0x06
REQ_SCAN_CONT = 0x07
EVENT_DISCONNECT = 0x09
SET_TEMPORARY_KEY = 0x0C
PING_REQ = 0x0D
PING_RESP = 0x0E
SWITCH_BAUD_RATE_REQ = 0x13
SWITCH_BAUD_RATE_RESP = 0x14
SET_ADV_CHANNEL_HOP_SEQ = 0x17
SET_PRIVATE_KEY = 0x18
SET_LEGACY_LONG_TERM_KEY = 0x19
SET_SC_LONG_TERM_KEY = 0x1A
REQ_VERSION = 0x1B
RESP_VERSION = 0x1C
REQ_TIMESTAMP = 0x1D
RESP_TIMESTAMP = 0x1E
SET_IDENTITY_RESOLVING_KEY = 0x1F
GO_IDLE = 0xFE
PACKET_TYPE_UNKNOWN = 0x00
PACKET_TYPE_ADVERTISING = 0x01
PACKET_TYPE_DATA = 0x02
PACKET_TYPE_UNKNOWN = 0x00
PACKET_TYPE_ADVERTISING = 0x01
PACKET_TYPE_DATA = 0x02
ADV_TYPE_ADV_IND = 0x0
ADV_TYPE_ADV_DIRECT_IND = 0x1
ADV_TYPE_ADV_NONCONN_IND = 0x2
ADV_TYPE_ADV_SCAN_IND = 0x6
ADV_TYPE_SCAN_REQ = 0x3
ADV_TYPE_SCAN_RSP = 0x4
ADV_TYPE_CONNECT_REQ = 0x5
ADV_TYPE_ADV_EXT_IND = 0x7
ADV_TYPE_ADV_IND = 0x0
ADV_TYPE_ADV_DIRECT_IND = 0x1
ADV_TYPE_ADV_NONCONN_IND = 0x2
ADV_TYPE_ADV_SCAN_IND = 0x6
ADV_TYPE_SCAN_REQ = 0x3
ADV_TYPE_SCAN_RSP = 0x4
ADV_TYPE_CONNECT_REQ = 0x5
ADV_TYPE_ADV_EXT_IND = 0x7
PHY_1M = 0
PHY_2M = 1
PHY_CODED = 2
PHY_1M = 0
PHY_2M = 1
PHY_CODED = 2
PHY_CODED_CI_S8 = 0
PHY_CODED_CI_S2 = 1
PHY_CODED_CI_S8 = 0
PHY_CODED_CI_S2 = 1

View file

@ -46,6 +46,7 @@ from . import Packet
from . import Filelock
import os
if os.name == "posix":
import termios
@ -61,8 +62,13 @@ def find_sniffer(write_data=False):
for port in [x.device for x in open_ports]:
for rate in SNIFFER_BAUDRATES:
reader = None
l_errors = [serial.SerialException, ValueError, Exceptions.LockedException, OSError]
if os.name == 'posix':
l_errors = [
serial.SerialException,
ValueError,
Exceptions.LockedException,
OSError,
]
if os.name == "posix":
l_errors.append(termios.error)
try:
reader = Packet.PacketReader(portnum=port, baudrate=rate)
@ -115,17 +121,14 @@ class Uart:
if baudrate is not None and baudrate not in SNIFFER_BAUDRATES:
raise Exception("Invalid baudrate: " + str(baudrate))
logging.info('Opening serial port {}'.format(portnum))
logging.info("Opening serial port {}".format(portnum))
self.portnum = portnum
if self.portnum:
Filelock.lock(portnum)
self.ser = serial.Serial(
port=portnum,
baudrate=9600,
rtscts=True,
exclusive=True
port=portnum, baudrate=9600, rtscts=True, exclusive=True
)
self.ser.baudrate = baudrate
@ -149,7 +152,7 @@ class Uart:
try:
# Read any data available, or wait for at least one byte
data_read = self.ser.read(self.ser.in_waiting or 1)
#logging.info('type: {}'.format(data_read.__class__))
# logging.info('type: {}'.format(data_read.__class__))
self._read_queue_extend(data_read)
except serial.SerialException as e:
logging.info("Unable to read UART: %s" % e)
@ -217,8 +220,10 @@ def list_serial_ports():
# Scan for available ports.
return list_ports.comports()
if __name__ == "__main__":
import time
t_start = time.time()
s = find_sniffer()
tn = time.time()

View file

@ -35,4 +35,3 @@
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
VERSION_STRING = "4.1.1"

View file

@ -50,12 +50,17 @@ import struct
import logging
from SnifferAPI import Logger
try:
import serial
except ImportError:
Logger.initLogger()
logging.error(f'pyserial not found, please run: "{sys.executable} -m pip install -r requirements.txt" and retry')
sys.exit(f'pyserial not found, please run: "{sys.executable} -m pip install -r requirements.txt" and retry')
logging.error(
f'pyserial not found, please run: "{sys.executable} -m pip install -r requirements.txt" and retry'
)
sys.exit(
f'pyserial not found, please run: "{sys.executable} -m pip install -r requirements.txt" and retry'
)
from SnifferAPI import Sniffer, UART, Devices, Pcap, Exceptions
@ -129,18 +134,30 @@ capture_scan_response = True
capture_scan_aux_pointer = True
capture_coded = False
def extcap_config(interface):
"""List configuration for the given interface"""
print("arg {number=0}{call=--only-advertising}{display=Only advertising packets}"
"{tooltip=The sniffer will only capture advertising packets from the selected device}{type=boolflag}{save=true}")
print("arg {number=1}{call=--only-legacy-advertising}{display=Only legacy advertising packets}"
"{tooltip=The sniffer will only capture legacy advertising packets from the selected device}{type=boolflag}{save=true}")
print("arg {number=2}{call=--scan-follow-rsp}{display=Find scan response data}"
"{tooltip=The sniffer will follow scan requests and scan responses in scan mode}{type=boolflag}{default=true}{save=true}")
print("arg {number=3}{call=--scan-follow-aux}{display=Find auxiliary pointer data}"
"{tooltip=The sniffer will follow aux pointers in scan mode}{type=boolflag}{default=true}{save=true}")
print("arg {number=3}{call=--coded}{display=Scan and follow devices on LE Coded PHY}"
"{tooltip=Scan for devices and follow advertiser on LE Coded PHY}{type=boolflag}{default=false}{save=true}")
print(
"arg {number=0}{call=--only-advertising}{display=Only advertising packets}"
"{tooltip=The sniffer will only capture advertising packets from the selected device}{type=boolflag}{save=true}"
)
print(
"arg {number=1}{call=--only-legacy-advertising}{display=Only legacy advertising packets}"
"{tooltip=The sniffer will only capture legacy advertising packets from the selected device}{type=boolflag}{save=true}"
)
print(
"arg {number=2}{call=--scan-follow-rsp}{display=Find scan response data}"
"{tooltip=The sniffer will follow scan requests and scan responses in scan mode}{type=boolflag}{default=true}{save=true}"
)
print(
"arg {number=3}{call=--scan-follow-aux}{display=Find auxiliary pointer data}"
"{tooltip=The sniffer will follow aux pointers in scan mode}{type=boolflag}{default=true}{save=true}"
)
print(
"arg {number=3}{call=--coded}{display=Scan and follow devices on LE Coded PHY}"
"{tooltip=Scan for devices and follow advertiser on LE Coded PHY}{type=boolflag}{default=false}{save=true}"
)
def extcap_dlts(interface):
"""List DLTs for the given interface"""
@ -148,14 +165,18 @@ def extcap_dlts(interface):
def get_baud_rates(interface):
if not hasattr(serial, "__version__") or not serial.__version__.startswith('3.'):
raise RuntimeError("Too old version of python 'serial' Library. Version 3 required.")
if not hasattr(serial, "__version__") or not serial.__version__.startswith("3."):
raise RuntimeError(
"Too old version of python 'serial' Library. Version 3 required."
)
return UART.find_sniffer_baudrates(interface)
def get_interfaces():
if not hasattr(serial, "__version__") or not serial.__version__.startswith('3.'):
raise RuntimeError("Too old version of python 'serial' Library. Version 3 required.")
if not hasattr(serial, "__version__") or not serial.__version__.startswith("3."):
raise RuntimeError(
"Too old version of python 'serial' Library. Version 3 required."
)
devices = UART.find_sniffer()
return devices
@ -163,44 +184,105 @@ def get_interfaces():
def extcap_interfaces():
"""List available interfaces to capture from"""
print("extcap {version=%s}{display=nRF Sniffer for Bluetooth LE}"
"{help=https://www.nordicsemi.com/Software-and-Tools/Development-Tools/nRF-Sniffer-for-Bluetooth-LE}"
% Sniffer.VERSION_STRING)
print(
"extcap {version=%s}{display=nRF Sniffer for Bluetooth LE}"
"{help=https://www.nordicsemi.com/Software-and-Tools/Development-Tools/nRF-Sniffer-for-Bluetooth-LE}"
% Sniffer.VERSION_STRING
)
for interface_port in get_interfaces():
if sys.platform == 'win32':
print("interface {value=%s-%s}{display=nRF Sniffer for Bluetooth LE %s}" % (interface_port, extcap_version, interface_port))
if sys.platform == "win32":
print(
"interface {value=%s-%s}{display=nRF Sniffer for Bluetooth LE %s}"
% (interface_port, extcap_version, interface_port)
)
else:
print("interface {value=%s-%s}{display=nRF Sniffer for Bluetooth LE}" % (interface_port, extcap_version))
print(
"interface {value=%s-%s}{display=nRF Sniffer for Bluetooth LE}"
% (interface_port, extcap_version)
)
print("control {number=%d}{type=selector}{display=Device}{tooltip=Device list}" % CTRL_ARG_DEVICE)
print("control {number=%d}{type=selector}{display=Key}{tooltip=}" % CTRL_ARG_KEY_TYPE)
print("control {number=%d}{type=string}{display=Value}"
"{tooltip=6 digit passkey or 16 or 32 bytes encryption key in hexadecimal starting with '0x', big endian format."
"If the entered key is shorter than 16 or 32 bytes, it will be zero-padded in front'}"
"{validation=\\b^(([0-9]{6})|(0x[0-9a-fA-F]{1,64})|([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random))$\\b}" % CTRL_ARG_KEY_VAL)
print("control {number=%d}{type=string}{display=Adv Hop}"
"{default=37,38,39}"
"{tooltip=Advertising channel hop sequence. "
"Change the order in which the sniffer switches advertising channels. "
"Valid channels are 37, 38 and 39 separated by comma.}"
r"{validation=^\s*((37|38|39)\s*,\s*){0,2}(37|38|39){1}\s*$}{required=true}" % CTRL_ARG_ADVHOP)
print("control {number=%d}{type=button}{display=Clear}{tooltop=Clear or remove device from Device list}" % CTRL_ARG_DEVICE_CLEAR)
print("control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Access user guide (launches browser)}" % CTRL_ARG_HELP)
print("control {number=%d}{type=button}{role=restore}{display=Defaults}{tooltip=Resets the user interface and clears the log file}" % CTRL_ARG_RESTORE)
print("control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Log per interface}" % CTRL_ARG_LOG)
print(
"control {number=%d}{type=selector}{display=Device}{tooltip=Device list}"
% CTRL_ARG_DEVICE
)
print(
"control {number=%d}{type=selector}{display=Key}{tooltip=}" % CTRL_ARG_KEY_TYPE
)
print(
"control {number=%d}{type=string}{display=Value}"
"{tooltip=6 digit passkey or 16 or 32 bytes encryption key in hexadecimal starting with '0x', big endian format."
"If the entered key is shorter than 16 or 32 bytes, it will be zero-padded in front'}"
"{validation=\\b^(([0-9]{6})|(0x[0-9a-fA-F]{1,64})|([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random))$\\b}"
% CTRL_ARG_KEY_VAL
)
print(
"control {number=%d}{type=string}{display=Adv Hop}"
"{default=37,38,39}"
"{tooltip=Advertising channel hop sequence. "
"Change the order in which the sniffer switches advertising channels. "
"Valid channels are 37, 38 and 39 separated by comma.}"
r"{validation=^\s*((37|38|39)\s*,\s*){0,2}(37|38|39){1}\s*$}{required=true}"
% CTRL_ARG_ADVHOP
)
print(
"control {number=%d}{type=button}{display=Clear}{tooltop=Clear or remove device from Device list}"
% CTRL_ARG_DEVICE_CLEAR
)
print(
"control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Access user guide (launches browser)}"
% CTRL_ARG_HELP
)
print(
"control {number=%d}{type=button}{role=restore}{display=Defaults}{tooltip=Resets the user interface and clears the log file}"
% CTRL_ARG_RESTORE
)
print(
"control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Log per interface}"
% CTRL_ARG_LOG
)
print("value {control=%d}{value= }{display=All advertising devices}{default=true}" % CTRL_ARG_DEVICE)
print("value {control=%d}{value=%s}{display=Follow IRK}" % (CTRL_ARG_DEVICE, zero_addr))
print(
"value {control=%d}{value= }{display=All advertising devices}{default=true}"
% CTRL_ARG_DEVICE
)
print(
"value {control=%d}{value=%s}{display=Follow IRK}"
% (CTRL_ARG_DEVICE, zero_addr)
)
print("value {control=%d}{value=%d}{display=Legacy Passkey}{default=true}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_PASSKEY))
print("value {control=%d}{value=%d}{display=Legacy OOB data}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_OOB))
print("value {control=%d}{value=%d}{display=Legacy LTK}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_LEGACY_LTK))
print("value {control=%d}{value=%d}{display=SC LTK}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_SC_LTK))
print("value {control=%d}{value=%d}{display=SC Private Key}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_DH_PRIVATE_KEY))
print("value {control=%d}{value=%d}{display=IRK}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_IRK))
print("value {control=%d}{value=%d}{display=Add LE address}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_ADD_ADDR))
print("value {control=%d}{value=%d}{display=Follow LE address}" % (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_FOLLOW_ADDR))
print(
"value {control=%d}{value=%d}{display=Legacy Passkey}{default=true}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_PASSKEY)
)
print(
"value {control=%d}{value=%d}{display=Legacy OOB data}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_OOB)
)
print(
"value {control=%d}{value=%d}{display=Legacy LTK}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_LEGACY_LTK)
)
print(
"value {control=%d}{value=%d}{display=SC LTK}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_SC_LTK)
)
print(
"value {control=%d}{value=%d}{display=SC Private Key}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_DH_PRIVATE_KEY)
)
print(
"value {control=%d}{value=%d}{display=IRK}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_IRK)
)
print(
"value {control=%d}{value=%d}{display=Add LE address}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_ADD_ADDR)
)
print(
"value {control=%d}{value=%d}{display=Follow LE address}"
% (CTRL_ARG_KEY_TYPE, CTRL_KEY_TYPE_FOLLOW_ADDR)
)
def string_address(address):
@ -208,16 +290,16 @@ def string_address(address):
if len(address) < 7:
return None
addr_string = ''
addr_string = ""
for i in range(5):
addr_string += (format(address[i], '02x') + ':')
addr_string += format(address[5], '02x') + ' '
addr_string += format(address[i], "02x") + ":"
addr_string += format(address[5], "02x") + " "
if address[6]:
addr_string += ' random '
addr_string += " random "
else:
addr_string += ' public '
addr_string += " public "
return addr_string
@ -231,7 +313,7 @@ def control_read():
# > empty string ('')
return None, None, None
_, _, length, arg, typ = struct.unpack('>sBHBB', header)
_, _, length, arg, typ = struct.unpack(">sBHBB", header)
payload = bytearray()
if length > 2:
@ -239,6 +321,7 @@ def control_read():
return arg, typ, payload
def control_write(arg, typ, message):
"""Write the message to the control channel"""
@ -247,8 +330,8 @@ def control_write(arg, typ, message):
return
packet = bytearray()
packet += struct.pack('>BBHBB', ord('T'), 0, len(message) + 2, arg, typ)
packet += message.encode('utf-8')
packet += struct.pack(">BBHBB", ord("T"), 0, len(message) + 2, arg, typ)
packet += message.encode("utf-8")
fn_ctrl_out.write(packet)
@ -278,11 +361,13 @@ def device_added(notification):
# Extcap selector uses \0 character to separate value and display value,
# therefore the display value cannot contain the \0 character as this
# would lead to truncation of the display value.
display = (device.name.replace('\0', '\\0') +
(" " + str(device.RSSI) + " dBm " if device.RSSI != 0 else " ") +
string_address(device.address))
display = (
device.name.replace("\0", "\\0")
+ (" " + str(device.RSSI) + " dBm " if device.RSSI != 0 else " ")
+ string_address(device.address)
)
message = str(device.address) + '\0' + display
message = str(device.address) + "\0" + display
control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, message)
@ -298,33 +383,35 @@ def device_removed(notification):
control_write(CTRL_ARG_DEVICE, CTRL_CMD_REMOVE, message)
logging.info("Removed: " + display)
def devices_cleared(notification):
"""Devices have been cleared"""
message = ""
control_write(CTRL_ARG_DEVICE, CTRL_CMD_REMOVE, message)
control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, " " + '\0' + "All advertising devices")
control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, zero_addr + '\0' + "Follow IRK")
control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, " " + "\0" + "All advertising devices")
control_write(CTRL_ARG_DEVICE, CTRL_CMD_ADD, zero_addr + "\0" + "Follow IRK")
control_write(CTRL_ARG_DEVICE, CTRL_CMD_SET, " ")
def handle_control_command(sniffer, arg, typ, payload):
"""Handle command from control channel"""
global last_used_key_type
if arg == CTRL_ARG_DEVICE:
if payload == b' ':
if payload == b" ":
scan_for_devices(sniffer)
else:
values = payload
values = values.replace(b'[', b'')
values = values.replace(b']', b'')
device_address = values.split(b',')
values = values.replace(b"[", b"")
values = values.replace(b"]", b"")
device_address = values.split(b",")
logging.info('follow_device: {}'.format(device_address))
logging.info("follow_device: {}".format(device_address))
for i in range(6):
device_address[i] = int(device_address[i])
device_address[6] = 1 if device_address[6] == b' 1' else 0
device_address[6] = 1 if device_address[6] == b" 1" else 0
device = Devices.Device(address=device_address, name='""', RSSI=0)
@ -333,7 +420,7 @@ def handle_control_command(sniffer, arg, typ, payload):
elif arg == CTRL_ARG_DEVICE_CLEAR:
clear_devices(sniffer)
elif arg == CTRL_ARG_KEY_TYPE:
last_used_key_type = int(payload.decode('utf-8'))
last_used_key_type = int(payload.decode("utf-8"))
elif arg == CTRL_ARG_KEY_VAL:
set_key_value(sniffer, payload)
elif arg == CTRL_ARG_ADVHOP:
@ -384,8 +471,10 @@ def follow_device(sniffer, device):
"""Follow the selected device"""
global write_new_packets, in_follow_mode
sniffer.follow(device, capture_only_advertising, capture_only_legacy_advertising, capture_coded)
time.sleep(.1)
sniffer.follow(
device, capture_only_advertising, capture_only_legacy_advertising, capture_coded
)
time.sleep(0.1)
in_follow_mode = True
logging.info("Following " + string_address(device.address))
@ -395,58 +484,64 @@ def set_key_value(sniffer, payload):
"""Send key value to device"""
global last_used_key_val
payload = payload.decode('utf-8')
payload = payload.decode("utf-8")
last_used_key_val = payload
if (last_used_key_type == CTRL_KEY_TYPE_PASSKEY):
if last_used_key_type == CTRL_KEY_TYPE_PASSKEY:
if re.match("^[0-9]{6}$", payload):
set_passkey(sniffer, payload)
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_OOB):
elif last_used_key_type == CTRL_KEY_TYPE_OOB:
if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload):
set_OOB(sniffer, payload[2:])
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_DH_PRIVATE_KEY):
if (re.match("^0[xX][0-9A-Za-z]{1,64}$", payload)):
elif last_used_key_type == CTRL_KEY_TYPE_DH_PRIVATE_KEY:
if re.match("^0[xX][0-9A-Za-z]{1,64}$", payload):
set_dh_private_key(sniffer, payload[2:])
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_LEGACY_LTK):
if (re.match("^0[xX][0-9A-Za-z]{1,32}$", payload)):
elif last_used_key_type == CTRL_KEY_TYPE_LEGACY_LTK:
if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload):
set_legacy_ltk(sniffer, payload[2:])
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_SC_LTK):
if (re.match("^0[xX][0-9A-Za-z]{1,32}$", payload)):
elif last_used_key_type == CTRL_KEY_TYPE_SC_LTK:
if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload):
set_sc_ltk(sniffer, payload[2:])
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_IRK):
if (re.match("^0[xX][0-9A-Za-z]{1,32}$", payload)):
elif last_used_key_type == CTRL_KEY_TYPE_IRK:
if re.match("^0[xX][0-9A-Za-z]{1,32}$", payload):
set_irk(sniffer, payload[2:])
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_ADD_ADDR):
if (re.match("^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload)):
elif last_used_key_type == CTRL_KEY_TYPE_ADD_ADDR:
if re.match(
"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload
):
add_address(sniffer, payload)
else:
logging.info("Invalid key value: " + str(payload))
elif (last_used_key_type == CTRL_KEY_TYPE_FOLLOW_ADDR):
if (re.match("^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload)):
elif last_used_key_type == CTRL_KEY_TYPE_FOLLOW_ADDR:
if re.match(
"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) (public|random)$", payload
):
follow_address(sniffer, payload)
else:
logging.info("Invalid key value: " + str(payload))
else:
logging.info("Invalid key type: " + str(last_used_key_type))
def parse_hex(value):
if len(value) % 2 != 0:
value = '0' + value
a = list(value)
return [int(x + y, 16) for x,y in zip(a[::2], a[1::2])]
def parse_hex(value):
if len(value) % 2 != 0:
value = "0" + value
a = list(value)
return [int(x + y, 16) for x, y in zip(a[::2], a[1::2])]
def set_passkey(sniffer, payload):
"""Send passkey to device"""
@ -461,36 +556,42 @@ def set_passkey(sniffer, payload):
sniffer.sendTK(passkey)
def set_OOB(sniffer, payload):
"""Send OOB to device"""
logging.info("Setting OOB data: " + payload)
sniffer.sendTK(parse_hex(payload))
def set_dh_private_key(sniffer, payload):
"""Send Diffie-Hellman private key to device"""
logging.info("Setting DH private key: " + payload)
sniffer.sendPrivateKey(parse_hex(payload))
def set_legacy_ltk(sniffer, payload):
"""Send Legacy Long Term Key (LTK) to device"""
logging.info("Setting Legacy LTK: " + payload)
sniffer.sendLegacyLTK(parse_hex(payload))
def set_sc_ltk(sniffer, payload):
"""Send LE secure connections Long Term Key (LTK) to device"""
logging.info("Setting SC LTK: " + payload)
sniffer.sendSCLTK(parse_hex(payload))
def set_irk(sniffer, payload):
"""Send Identity Resolving Key (IRK) to device"""
logging.info("Setting IRK: " + payload)
sniffer.sendIRK(parse_hex(payload))
def add_address(sniffer, payload):
"""Add LE address to device list"""
logging.info("Adding LE address: " + payload)
(addr,addr_type) = payload.split(' ')
(addr, addr_type) = payload.split(" ")
device = [int(a, 16) for a in addr.split(":")]
device.append(1 if addr_type == "random" else 0)
@ -498,11 +599,12 @@ def add_address(sniffer, payload):
new_device = Devices.Device(address=device, name='""', RSSI=0)
sniffer.addDevice(new_device)
def follow_address(sniffer, payload):
"""Add LE address to device list"""
logging.info("Adding LE address: " + payload)
(addr,addr_type) = payload.split(' ')
(addr, addr_type) = payload.split(" ")
device = [int(a, 16) for a in addr.split(":")]
device.append(1 if addr_type == "random" else 0)
@ -513,15 +615,16 @@ def follow_address(sniffer, payload):
control_write(CTRL_ARG_DEVICE, CTRL_CMD_SET, f"{new_device.address}")
follow_device(sniffer, new_device)
def set_advhop(sniffer, payload):
"""Set advertising channel hop sequence"""
global last_used_advhop
payload = payload.decode('utf-8')
payload = payload.decode("utf-8")
last_used_advhop = payload
hops = [int(channel) for channel in payload.split(',')]
hops = [int(channel) for channel in payload.split(",")]
sniffer.setAdvHopSequence(hops)
@ -546,7 +649,7 @@ def error_interface_not_found(interface, fifo):
def validate_interface(interface, fifo):
"""Check if interface exists"""
if sys.platform != 'win32' and not os.path.exists(interface):
if sys.platform != "win32" and not os.path.exists(interface):
error_interface_not_found(interface, fifo)
@ -557,12 +660,13 @@ def get_default_baudrate(interface, fifo):
error_interface_not_found(interface, fifo)
return rates["default"]
def get_supported_protocol_version(extcap_version):
"""Return the maximum supported Packet Protocol Version"""
if extcap_version == 'None':
if extcap_version == "None":
return 2
(major, minor) = extcap_version.split('.')
(major, minor) = extcap_version.split(".")
major = int(major)
minor = int(minor)
@ -572,6 +676,7 @@ def get_supported_protocol_version(extcap_version):
else:
return 2
def setup_extcap_log_handler():
"""Add the a handler that emits log messages through the extcap control out channel"""
global extcap_log_handler
@ -593,18 +698,18 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out):
global fn_capture, fn_ctrl_in, fn_ctrl_out, write_new_packets, extcap_log_handler
try:
fn_capture = open(fifo, 'wb', 0)
fn_capture = open(fifo, "wb", 0)
if control_out is not None:
fn_ctrl_out = open(control_out, 'wb', 0)
fn_ctrl_out = open(control_out, "wb", 0)
setup_extcap_log_handler()
if control_in is not None:
fn_ctrl_in = open(control_in, 'rb', 0)
fn_ctrl_in = open(control_in, "rb", 0)
logging.info("Log started at %s", time.strftime("%c"))
interface, extcap_version = interface.split('-')
interface, extcap_version = interface.split("-")
logging.info("Extcap version %s", str(extcap_version))
capture_write(Pcap.get_global_header())
@ -619,7 +724,9 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out):
sniffer.subscribe("DEVICE_REMOVED", device_removed)
sniffer.subscribe("DEVICES_CLEARED", devices_cleared)
sniffer.setAdvHopSequence([37, 38, 39])
sniffer.setSupportedProtocolVersion(get_supported_protocol_version(extcap_version))
sniffer.setSupportedProtocolVersion(
get_supported_protocol_version(extcap_version)
)
logging.info("Sniffer created")
logging.info("Software version: %s" % sniffer.swversion)
@ -655,7 +762,7 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out):
pass
except Exceptions.LockedException as e:
logging.info('{}'.format(e.message))
logging.info("{}".format(e.message))
except OSError:
# We'll get OSError=22 when/if wireshark kills the pipe(s) on capture
@ -689,14 +796,14 @@ def sniffer_capture(interface, baudrate, fifo, control_in, control_out):
def extcap_close_fifo(fifo):
""""Close extcap fifo"""
""" "Close extcap fifo"""
if not os.path.exists(fifo):
print("FIFO does not exist!", file=sys.stderr)
return
# This is apparently needed to workaround an issue on Windows/macOS
# where the message cannot be read. (really?)
fh = open(fifo, 'wb', 0)
fh = open(fifo, "wb", 0)
fh.close()
@ -705,13 +812,13 @@ class ExtcapLoggerHandler(logging.Handler):
def emit(self, record):
"""Send log message to extcap"""
message = record.message.replace('\0', '\\0')
message = record.message.replace("\0", "\\0")
log_message = f"{record.levelname}: {message}\n"
control_write(CTRL_ARG_LOG, CTRL_CMD_ADD, log_message)
def parse_capture_filter(capture_filter):
""""Parse given capture filter"""
""" "Parse given capture filter"""
global rssi_filter
m = re.search(r"^\s*rssi\s*(>=?)\s*(-?[0-9]+)\s*$", capture_filter, re.IGNORECASE)
if m:
@ -720,66 +827,89 @@ def parse_capture_filter(capture_filter):
print("Illegal RSSI value, must be between -10 and -256")
# Handle >= by modifying the threshold, since comparisons are always done with
# the > operator
if m.group(1) == '>=':
if m.group(1) == ">=":
rssi_filter = rssi_filter - 1
else:
print("Filter syntax: \"RSSI >= -value\"")
print('Filter syntax: "RSSI >= -value"')
import atexit
@atexit.register
def goodbye():
logging.info("Exiting PID {}".format(os.getpid()))
logging.info("Exiting PID {}".format(os.getpid()))
if __name__ == '__main__':
if __name__ == "__main__":
# Capture options
parser = argparse.ArgumentParser(description="Nordic Semiconductor nRF Sniffer for Bluetooth LE extcap plugin")
parser = argparse.ArgumentParser(
description="Nordic Semiconductor nRF Sniffer for Bluetooth LE extcap plugin"
)
# Extcap Arguments
parser.add_argument("--capture",
help="Start the capture",
action="store_true")
parser.add_argument("--capture", help="Start the capture", action="store_true")
parser.add_argument("--extcap-interfaces",
help="List available interfaces to capture from",
action="store_true")
parser.add_argument(
"--extcap-interfaces",
help="List available interfaces to capture from",
action="store_true",
)
parser.add_argument("--extcap-interface",
help="The interface to capture from")
parser.add_argument("--extcap-interface", help="The interface to capture from")
parser.add_argument("--extcap-dlts",
help="List DLTs for the given interface",
action="store_true")
parser.add_argument(
"--extcap-dlts", help="List DLTs for the given interface", action="store_true"
)
parser.add_argument("--extcap-config",
help="List configurations for the given interface",
action="store_true")
parser.add_argument(
"--extcap-config",
help="List configurations for the given interface",
action="store_true",
)
parser.add_argument("--extcap-capture-filter",
help="Used together with capture to provide a capture filter")
parser.add_argument(
"--extcap-capture-filter",
help="Used together with capture to provide a capture filter",
)
parser.add_argument("--fifo",
help="Use together with capture to provide the fifo to dump data to")
parser.add_argument(
"--fifo", help="Use together with capture to provide the fifo to dump data to"
)
parser.add_argument("--extcap-control-in",
help="Used together with capture to get control messages from toolbar")
parser.add_argument(
"--extcap-control-in",
help="Used together with capture to get control messages from toolbar",
)
parser.add_argument("--extcap-control-out",
help="Used together with capture to send control messages to toolbar")
parser.add_argument(
"--extcap-control-out",
help="Used together with capture to send control messages to toolbar",
)
parser.add_argument("--extcap-version",
help="Set extcap supported version")
parser.add_argument("--extcap-version", help="Set extcap supported version")
# Interface Arguments
parser.add_argument("--device", help="Device", default="")
parser.add_argument("--baudrate", type=int, help="The sniffer baud rate")
parser.add_argument("--only-advertising", help="Only advertising packets", action="store_true")
parser.add_argument("--only-legacy-advertising", help="Only legacy advertising packets", action="store_true")
parser.add_argument("--scan-follow-rsp", help="Find scan response data ", action="store_true")
parser.add_argument("--scan-follow-aux", help="Find auxiliary pointer data", action="store_true")
parser.add_argument("--coded", help="Scan and follow on LE Coded PHY", action="store_true")
parser.add_argument(
"--only-advertising", help="Only advertising packets", action="store_true"
)
parser.add_argument(
"--only-legacy-advertising",
help="Only legacy advertising packets",
action="store_true",
)
parser.add_argument(
"--scan-follow-rsp", help="Find scan response data ", action="store_true"
)
parser.add_argument(
"--scan-follow-aux", help="Find auxiliary pointer data", action="store_true"
)
parser.add_argument(
"--coded", help="Scan and follow on LE Coded PHY", action="store_true"
)
logging.info("Started PID {}".format(os.getpid()))
@ -839,16 +969,23 @@ if __name__ == '__main__':
parser.print_help()
sys.exit(ERROR_FIFO)
try:
logging.info('sniffer capture')
sniffer_capture(interface, args.baudrate, args.fifo, args.extcap_control_in, args.extcap_control_out)
logging.info("sniffer capture")
sniffer_capture(
interface,
args.baudrate,
args.fifo,
args.extcap_control_in,
args.extcap_control_out,
)
except KeyboardInterrupt:
pass
except Exception as e:
import traceback
logging.info(traceback.format_exc())
logging.info('internal error: {}'.format(repr(e)))
logging.info("internal error: {}".format(repr(e)))
sys.exit(ERROR_INTERNAL)
else:
parser.print_help()
sys.exit(ERROR_USAGE)
logging.info('main exit PID {}'.format(os.getpid()))
logging.info("main exit PID {}".format(os.getpid()))