41 lines
1.5 KiB
Python
41 lines
1.5 KiB
Python
class Counter:
|
|
def __init__(self, stream_id, line_orientation, metrics):
|
|
self.in_count = 0
|
|
self.out_count = 0
|
|
self.track_memory = {}
|
|
self.line_orientation = line_orientation
|
|
self.metrics = metrics
|
|
self.stream_id = stream_id
|
|
|
|
def update(self, tracks, line_position):
|
|
for track in tracks:
|
|
track_id = track.id
|
|
x, y, w, h = track.bbox
|
|
|
|
if self.line_orientation == "horizontal":
|
|
center = int(y + h / 2)
|
|
elif self.line_orientation == "vertical":
|
|
center = int(x + w / 2)
|
|
else:
|
|
raise NotImplementedError(
|
|
f"Line orientation {self.line_orientation} is invalid!"
|
|
)
|
|
|
|
if track_id not in self.track_memory:
|
|
self.track_memory[track_id] = [center, False]
|
|
continue
|
|
|
|
prev = self.track_memory[track_id][0]
|
|
self.track_memory[track_id][0] = center
|
|
|
|
if self.track_memory[track_id][1]:
|
|
continue
|
|
|
|
if prev < line_position <= center:
|
|
self.in_count += 1
|
|
self.metrics["people_in"].labels(stream=self.stream_id).inc()
|
|
self.track_memory[track_id][1] = True
|
|
elif prev > line_position >= center:
|
|
self.out_count += 1
|
|
self.metrics["people_out"].labels(stream=self.stream_id).inc()
|
|
self.track_memory[track_id][1] = True
|