nixos-configuration/pkgs/myintercom-doorbell/myintercom_doorbell/service.py
2023-11-08 23:49:46 +00:00

98 lines
2.8 KiB
Python

import json
import os
import tempfile
import urllib3
import time
def send_open_door_request(host, username, password):
urllib3.PoolManager().request(
"GET",
f"http://{host}/local/Doorcom/door.cgi?r=1",
headers=urllib3.make_headers(basic_auth=f"{username}:{password}"),
)
def get_ring_status(host, username, password):
response = urllib3.PoolManager().request(
"GET",
f"http://{host}/local/Doorcom/monitor.cgi?ring=1",
headers=urllib3.make_headers(basic_auth=f"{username}:{password}"),
preload_content=False,
decode_content=True,
)
while True:
line = response.readline()
if line != b"--ioboundary\r\n":
continue
header = response.readline()
if header != b"Content-Type: text/plain\r\n":
continue
if response.readline() != b"\r\n":
continue
data = []
while True:
line = response.readline()
if line != b"\r\n":
data.append(line.decode().rstrip())
else:
if data:
yield data
break
def read_file_contents(file_name):
with open(file_name, "r", encoding="utf-8") as f:
return f.read()
def open_door():
with open(
"/etc/myintercom-doorbell/settings.json", "r", encoding="utf-8"
) as config_file:
config = json.load(config_file)
send_open_door_request(
host=config["host"],
username=config["username"],
password=read_file_contents(config["passwordFile"]),
)
def poll():
outgoing_dir = "/var/spool/asterisk/outgoing/"
with open(
"/etc/myintercom-doorbell/settings.json", "r", encoding="utf-8"
) as config_file:
config = json.load(config_file)
audiosocket = f"{config['audiosocket']['address']}:{config['audiosocket']['port']}/{config['audiosocket']['uuid']}"
callfile_content = (
f"Channel: Audiosocket/{audiosocket}\n"
"Context: doorbell\n"
f"CallerID: {config['callerId']}\n"
"Extension: s\n"
"Priority: 1\n"
)
while True:
for status in get_ring_status(
host=config["host"],
username=config["username"],
password=read_file_contents(config["passwordFile"]),
):
if status == ["1:H"]:
print("ringing", flush=True)
with tempfile.NamedTemporaryFile(dir="/var/tmp", mode="w") as f:
f.write(callfile_content)
os.chmod(f.name, 0o644)
os.link(
f.name, os.path.join(outgoing_dir, os.path.basename(f.name))
)
time.sleep(config["dialTime"])
else:
print(".", end="", flush=True)