# media_node_orchestrator.py import json import socket import subprocess import threading import time import yaml import os from flask import Flask, request, jsonify, render_template_string import paho.mqtt.client as mqtt # ---------------------- CONFIG ---------------------- CONFIG_PATH = "config.yaml" if not os.path.exists(CONFIG_PATH): raise FileNotFoundError(f"Missing config file: {CONFIG_PATH}") with open(CONFIG_PATH) as f: config = yaml.safe_load(f) ROLE = config.get("role", "agent") NODE_NAME = config.get("name", "unnamed_node") MQTT_BROKER = config.get("mqtt_broker", "localhost") MQTT_PORT = config.get("mqtt_port", 1883) LOCATION = config.get("location", "Unknown") CAPABILITIES = config.get("capabilities", []) COMMAND_TOPIC = f"media/commands/{NODE_NAME}/#" STATUS_TOPIC = f"media/status/{NODE_NAME}" HEARTBEAT_TOPIC = "media/heartbeat" player_proc = None nodes = {} # only used by orchestrator app = Flask(__name__) client = mqtt.Client() # ---------------------- AGENT FUNCTIONS ---------------------- def run_player(data): global player_proc try: if player_proc and player_proc.poll() is None: player_proc.terminate() player_proc.wait() cmd = [data.get("player", "mpv"), data.get("resolved_url") or data.get("media")] cmd += data.get("options", []) print("Launching:", " ".join(cmd)) player_proc = subprocess.Popen(cmd) publish_status("playing", data.get("media")) except Exception as e: publish_status("error", str(e)) def stop_player(): global player_proc if player_proc and player_proc.poll() is None: player_proc.terminate() player_proc.wait() publish_status("stopped", None) def publish_status(state, media): payload = json.dumps({"state": state, "media": media, "node": NODE_NAME}) client.publish(STATUS_TOPIC, payload) def send_heartbeat(): while True: ip = socket.gethostbyname(socket.gethostname()) msg = { "node": NODE_NAME, "ip": ip, "location": LOCATION, "capabilities": CAPABILITIES, "timestamp": time.time() } client.publish(HEARTBEAT_TOPIC, json.dumps(msg)) time.sleep(10) # ---------------------- ORCHESTRATOR API ---------------------- @app.route("/play", methods=["POST"]) def play(): data = request.json media = data.get("media") target = data.get("target") if not media or not target: return jsonify({"error": "Missing 'media' or 'target'"}), 400 msg = { "command": "PLAYBACK", "media": media, "resolved_url": data.get("url", media), "player": data.get("player", "mpv"), "options": data.get("options", ["--fs"]) } topic = f"media/commands/{target}/playback" client.publish(topic, json.dumps(msg)) return jsonify({"status": "sent", "target": target}) @app.route("/stop", methods=["POST"]) def stop(): data = request.json target = data.get("target") if not target: return jsonify({"error": "Missing 'target'"}), 400 msg = {"command": "STOP"} topic = f"media/commands/{target}/stop" client.publish(topic, json.dumps(msg)) return jsonify({"status": "sent", "target": target}) @app.route("/nodes", methods=["GET"]) def get_nodes(): return jsonify(nodes) @app.route("/") def dashboard(): html = '''
| Node | IP | Location | Capabilities | Last Seen |
|---|---|---|---|---|
| {{ node }} | {{ data.ip }} | {{ data.location }} | {{ ', '.join(data.capabilities) }} | {{ data.timestamp | int }} |