# media_node_orchestrator.py import json import socket import subprocess import threading import time import yaml import os from flask import Flask, request, jsonify, render_template_string import paho.mqtt.client as mqtt # ---------------------- CONFIG ---------------------- CONFIG_PATH = "config.yaml" if not os.path.exists(CONFIG_PATH): raise FileNotFoundError(f"Missing config file: {CONFIG_PATH}") with open(CONFIG_PATH) as f: config = yaml.safe_load(f) ROLE = config.get("role", "agent") NODE_NAME = config.get("name", "unnamed_node") MQTT_BROKER = config.get("mqtt_broker", "localhost") MQTT_PORT = config.get("mqtt_port", 1883) LOCATION = config.get("location", "Unknown") CAPABILITIES = config.get("capabilities", []) COMMAND_TOPIC = f"media/commands/{NODE_NAME}/#" STATUS_TOPIC = f"media/status/{NODE_NAME}" HEARTBEAT_TOPIC = "media/heartbeat" player_proc = None nodes = {} # only used by orchestrator app = Flask(__name__) client = mqtt.Client() # ---------------------- AGENT FUNCTIONS ---------------------- def run_player(data): global player_proc try: if player_proc and player_proc.poll() is None: player_proc.terminate() player_proc.wait() cmd = [data.get("player", "mpv"), data.get("resolved_url") or data.get("media")] cmd += data.get("options", []) print("Launching:", " ".join(cmd)) player_proc = subprocess.Popen(cmd) publish_status("playing", data.get("media")) except Exception as e: publish_status("error", str(e)) def stop_player(): global player_proc if player_proc and player_proc.poll() is None: player_proc.terminate() player_proc.wait() publish_status("stopped", None) def publish_status(state, media): payload = json.dumps({"state": state, "media": media, "node": NODE_NAME}) client.publish(STATUS_TOPIC, payload) def send_heartbeat(): while True: ip = socket.gethostbyname(socket.gethostname()) msg = { "node": NODE_NAME, "ip": ip, "location": LOCATION, "capabilities": CAPABILITIES, "timestamp": time.time() } client.publish(HEARTBEAT_TOPIC, json.dumps(msg)) time.sleep(10) # ---------------------- ORCHESTRATOR API ---------------------- @app.route("/play", methods=["POST"]) def play(): data = request.json media = data.get("media") target = data.get("target") if not media or not target: return jsonify({"error": "Missing 'media' or 'target'"}), 400 msg = { "command": "PLAYBACK", "media": media, "resolved_url": data.get("url", media), "player": data.get("player", "mpv"), "options": data.get("options", ["--fs"]) } topic = f"media/commands/{target}/playback" client.publish(topic, json.dumps(msg)) return jsonify({"status": "sent", "target": target}) @app.route("/stop", methods=["POST"]) def stop(): data = request.json target = data.get("target") if not target: return jsonify({"error": "Missing 'target'"}), 400 msg = {"command": "STOP"} topic = f"media/commands/{target}/stop" client.publish(topic, json.dumps(msg)) return jsonify({"status": "sent", "target": target}) @app.route("/nodes", methods=["GET"]) def get_nodes(): return jsonify(nodes) @app.route("/") def dashboard(): html = ''' Media Node Dashboard

Active Nodes

{% for node, data in nodes.items() %} {% endfor %}
NodeIPLocationCapabilitiesLast Seen
{{ node }} {{ data.ip }} {{ data.location }} {{ ', '.join(data.capabilities) }} {{ data.timestamp | int }}
''' return render_template_string(html, nodes=nodes) # ---------------------- MQTT ---------------------- def on_connect(client, userdata, flags, rc): print("Connected to MQTT broker") client.subscribe(COMMAND_TOPIC) if ROLE == "orchestrator": client.subscribe(HEARTBEAT_TOPIC) def on_message(client, userdata, msg): topic = msg.topic payload = msg.payload.decode() if ROLE == "orchestrator" and topic.startswith("media/heartbeat"): data = json.loads(payload) nodes[data["node"]] = data elif topic.startswith(f"media/commands/{NODE_NAME}/"): try: payload = json.loads(payload) command = payload.get("command") print(f"Received command: {command}") if command == "PLAYBACK": run_player(payload) elif command == "STOP": stop_player() else: publish_status("error", f"Unknown command: {command}") except Exception as e: publish_status("error", str(e)) # ---------------------- MAIN ---------------------- if __name__ == "__main__": client.on_connect = on_connect client.on_message = on_message # todo try catch client.connect(MQTT_BROKER, MQTT_PORT, 60) threading.Thread(target=client.loop_forever, daemon=True).start() if ROLE == "agent": threading.Thread(target=send_heartbeat, daemon=True).start() try: while True: time.sleep(1) except KeyboardInterrupt: stop_player() print("Agent shutting down") elif ROLE == "orchestrator": app.run(host="0.0.0.0", port=5000)