mediahouse/myagent.py

216 lines
6.0 KiB
Python

# orchestrator_and_agent.py with Dashboard
# to run:
#
# python media_node.py --role agent --name upstairs_pi
# python media_node.py --role orchestrator --name orchestrator01
import json
import socket
import subprocess
import threading
import time
from flask import Flask, request, jsonify, render_template_string
import paho.mqtt.client as mqtt
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--role", choices=["agent", "orchestrator"], default="agent")
parser.add_argument("--name", default="unnamed_node")
args = parser.parse_args()
NODE_NAME = args.name
is_orchestrator = args.role == "orchestrator"
# Configuration
MQTT_BROKER = "192.168.1.70"
MQTT_PORT = 1883
config = json.loads( open('config.json','r').read())
# todo
config['node_name'] = NODE_NAME
player_proc = None
nodes = {}
app = Flask(__name__)
client = mqtt.Client()
# ---------------------- AGENT FUNCTIONS ----------------------
def run_player(data):
global player_proc
try:
if player_proc and player_proc.poll() is None:
player_proc.terminate()
player_proc.wait()
cmd = [data.get("player", "mpv"), data.get("resolved_url") or data.get("media")]
cmd += data.get("options", [])
print("Launching:", " ".join(cmd))
player_proc = subprocess.Popen(cmd)
publish_status("playing", data.get("media"))
except Exception as e:
publish_status("error", str(e))
def stop_player():
global player_proc
if player_proc and player_proc.poll() is None:
player_proc.terminate()
player_proc.wait()
publish_status("stopped", None)
def publish_status(state, media):
payload = json.dumps({"state": state, "media": media, "node": config['node_name']})
client.publish(config['status_topic'], payload)
def send_heartbeat():
while True:
ip = socket.gethostbyname(socket.gethostname())
msg = {
"node": config['node_name'],
"ip": ip,
"location": config['location'],
"capabilities": config['capabilities'],
"timestamp": time.time()
}
client.publish(config['heartbeat_topic'], json.dumps(msg))
time.sleep(10)
# ---------------------- ORCHESTRATOR API ----------------------
@app.route("/play", methods=["POST"])
def play():
data = request.json
media = data.get("media")
target = data.get("target")
if not media or not target:
return jsonify({"error": "Missing 'media' or 'target'"}), 400
msg = {
"command": "PLAYBACK",
"media": media,
"resolved_url": data.get("url", media),
"player": data.get("player", "mpv"),
"options": data.get("options", ["--fs"])
}
topic = f"media/commands/{target}/playback"
client.publish(topic, json.dumps(msg))
return jsonify({"status": "sent", "target": target})
@app.route("/stop", methods=["POST"])
def stop():
data = request.json
target = data.get("target")
if not target:
return jsonify({"error": "Missing 'target'"}), 400
msg = {"command": "STOP"}
topic = f"media/commands/{target}/stop"
client.publish(topic, json.dumps(msg))
return jsonify({"status": "sent", "target": target})
@app.route("/nodes", methods=["GET"])
def get_nodes():
return jsonify(nodes)
@app.route("/")
def dashboard():
html = '''
<html>
<head>
<title>Media Node Dashboard</title>
<meta http-equiv="refresh" content="5">
<style>
table { border-collapse: collapse; width: 100%; }
th, td { padding: 8px 12px; border: 1px solid #ccc; }
</style>
</head>
<body>
<h2>Active Nodes</h2>
<table>
<tr><th>Node</th><th>IP</th><th>Location</th><th>Capabilities</th><th>Last Seen</th></tr>
{% for node, data in nodes.items() %}
<tr>
<td>{{ node }}</td>
<td>{{ data.ip }}</td>
<td>{{ data.location }}</td>
<td>{{ ', '.join(data.capabilities) }}</td>
<td>{{ data.timestamp | int }}</td>
</tr>
{% endfor %}
</table>
</body>
</html>
'''
return render_template_string(html, nodes=nodes)
# ---------------------- MQTT ----------------------
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT broker")
client.subscribe(config['command_topic'])
client.subscribe(config['heartbeat_topic'])
def on_message(client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode()
if topic.startswith("media/heartbeat"):
data = json.loads(payload)
nodes[data["node"]] = data
elif topic.startswith(f"media/commands/{config['node_name']}/"):
try:
payload = json.loads(msg.payload.decode())
command = payload.get("command")
print(f"Received command: {command}")
if command == "PLAYBACK":
run_player(payload)
elif command == "STOP":
stop_player()
else:
publish_status("error", f"Unknown command: {command}")
except Exception as e:
publish_status("error", str(e))
# ---------------------- MAIN ----------------------
if is_orchestrator:
@app.route("/nodes")
def get_nodes(): ...
if __name__ == "__main__":
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
threading.Thread(target=client.loop_forever, daemon=True).start()
threading.Thread(target=send_heartbeat, daemon=True).start()
try:
app.run(host="0.0.0.0", port=5000)
except KeyboardInterrupt:
stop_player()
print("Shutting down")
else:
if __name__ == "__main__":
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
threading.Thread(target=client.loop_forever, daemon=True).start()
threading.Thread(target=send_heartbeat, daemon=True).start()