people_counter/tracker.py
2025-08-02 03:25:54 +02:00

130 lines
3.4 KiB
Python

import cv2
import threading
import time
from ultralytics import YOLO
from utils.counter import Counter
from utils.zones import draw_count_line_vertical
from collections import defaultdict
from metrics import get_metrics
registry, metrics = get_metrics()
STREAMS = {
"Kasse 1": "http://192.168.11.76:8080?action=stream",
"Kasse 2": "http://192.168.11.230:8080?action=stream",
}
FRAME_SKIP = 2
model = YOLO("yolo_weights/yolo11n.pt")
latest_frames = {}
def process_stream(stream_id, url):
print(f"PROCESS STREAM {stream_id} from {url}")
line_orientation = "vertical"
counter = Counter(stream_id, line_orientation, metrics)
cap = cv2.VideoCapture(url)
frame_idx = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
time.sleep(1)
continue
height, width, _ = frame.shape
frame_idx += 1
if frame_idx % FRAME_SKIP != 0:
continue
try:
results = model.track(
frame, persist=True, classes=0, tracker="bytetrack.yaml"
)
except Exception as e:
print(e)
continue
if line_orientation == "horizontal":
line_position = int(height / 2)
elif line_orientation == "vertical":
line_position = int(width / 2)
else:
raise NotImplementedError(
f"Line orientation {line_orientation} is invalid!"
)
boxes = results[0].boxes
if boxes.id is not None:
for box, track_id in zip(boxes.xywh.cpu(), boxes.id.cpu()):
x, y, w, h = map(int, box)
cv2.rectangle(
frame,
(x - w // 2, y - h // 2),
(x + w // 2, y + h // 2),
(0, 255, 0),
2,
)
cv2.putText(
frame,
str(track_id),
(x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2,
)
tracks = [
type("Track", (), {"id": int(track_id), "bbox": box.numpy()})
for box, track_id in zip(boxes.xywh.cpu(), boxes.id.cpu())
]
counter.update(tracks, line_position)
if line_orientation == "horizontal":
draw_count_line_horizontal(frame, line_position)
elif line_orientation == "vertical":
draw_count_line_vertical(frame, line_position)
cv2.putText(
frame,
f"In: {counter.in_count}",
(10, 40),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 255),
2,
)
cv2.putText(
frame,
f"Out: {counter.out_count}",
(10, 80),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 255),
2,
)
_, jpeg = cv2.imencode(".jpg", frame)
latest_frames[stream_id] = (jpeg.tobytes(), time.time())
print(f"[{stream_id}] Frame captured and stored")
cap.release()
def start_all_streams():
for stream_id, url in STREAMS.items():
threading.Thread(
target=process_stream, args=(stream_id, url), daemon=True
).start()
def get_metrics():
return registry
def get_latest_frame(stream_id):
return latest_frames.get(stream_id, (None, None))