from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse, PlainTextResponse from tracker import start_all_streams, get_metrics, get_latest_frame, STREAMS from prometheus_client import generate_latest, CONTENT_TYPE_LATEST import time app = FastAPI() @app.on_event("startup") def startup_event(): start_all_streams() @app.get("/", response_class=HTMLResponse) def index(request: Request): current_stream = request.query_params.get("stream") stream_options = "\n".join( [ f'' for stream_id in STREAMS.keys() ] ) return f"""

People Counter Dashboard

Prometheus Metrics

""" @app.get("/metrics") def metrics(): return PlainTextResponse( generate_latest(get_metrics()), media_type=CONTENT_TYPE_LATEST ) @app.get("/video_feed/{stream_id}") def video_feed(stream_id: str): boundary = "--boundarydonotcross" def generate(): last_version = time.time() while True: frame, version = get_latest_frame(stream_id) if frame is not None and version > last_version: last_version = version print(f"yielding new frame @{version}") yield ( boundary.encode() + b"\r\n" + b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n" ) else: # wait for new frame time.sleep(0.05) return StreamingResponse( generate(), media_type=f"multipart/x-mixed-replace; boundary={boundary}" )