From ab1b028698195ccefdc8112ec76f8fdb4217a4d2 Mon Sep 17 00:00:00 2001 From: Peter Howell Date: Thu, 31 Jul 2025 13:36:47 -0700 Subject: [PATCH] first --- agent2.py | 205 ++++++++++++++++++++++++++++++++++++++++++ config.yaml | 4 + livroom.config.json | 8 ++ myagent.py | 215 ++++++++++++++++++++++++++++++++++++++++++++ orch.config.json | 8 ++ 5 files changed, 440 insertions(+) create mode 100644 agent2.py create mode 100644 config.yaml create mode 100644 livroom.config.json create mode 100644 myagent.py create mode 100644 orch.config.json diff --git a/agent2.py b/agent2.py new file mode 100644 index 0000000..e119c51 --- /dev/null +++ b/agent2.py @@ -0,0 +1,205 @@ +# media_node_orchestrator.py + +import json +import socket +import subprocess +import threading +import time +import yaml +import os +from flask import Flask, request, jsonify, render_template_string +import paho.mqtt.client as mqtt + +# ---------------------- CONFIG ---------------------- + +CONFIG_PATH = "config.yaml" + +if not os.path.exists(CONFIG_PATH): + raise FileNotFoundError(f"Missing config file: {CONFIG_PATH}") + +with open(CONFIG_PATH) as f: + config = yaml.safe_load(f) + +ROLE = config.get("role", "agent") +NODE_NAME = config.get("name", "unnamed_node") +MQTT_BROKER = config.get("mqtt_broker", "localhost") +MQTT_PORT = config.get("mqtt_port", 1883) +LOCATION = config.get("location", "Unknown") +CAPABILITIES = config.get("capabilities", []) + +COMMAND_TOPIC = f"media/commands/{NODE_NAME}/#" +STATUS_TOPIC = f"media/status/{NODE_NAME}" +HEARTBEAT_TOPIC = "media/heartbeat" + +player_proc = None +nodes = {} # only used by orchestrator + +app = Flask(__name__) +client = mqtt.Client() + +# ---------------------- AGENT FUNCTIONS ---------------------- + +def run_player(data): + global player_proc + try: + if player_proc and player_proc.poll() is None: + player_proc.terminate() + player_proc.wait() + + cmd = [data.get("player", "mpv"), data.get("resolved_url") or data.get("media")] + cmd += data.get("options", []) + + print("Launching:", " ".join(cmd)) + player_proc = subprocess.Popen(cmd) + + publish_status("playing", data.get("media")) + except Exception as e: + publish_status("error", str(e)) + +def stop_player(): + global player_proc + if player_proc and player_proc.poll() is None: + player_proc.terminate() + player_proc.wait() + publish_status("stopped", None) + +def publish_status(state, media): + payload = json.dumps({"state": state, "media": media, "node": NODE_NAME}) + client.publish(STATUS_TOPIC, payload) + +def send_heartbeat(): + while True: + ip = socket.gethostbyname(socket.gethostname()) + msg = { + "node": NODE_NAME, + "ip": ip, + "location": LOCATION, + "capabilities": CAPABILITIES, + "timestamp": time.time() + } + client.publish(HEARTBEAT_TOPIC, json.dumps(msg)) + time.sleep(10) + +# ---------------------- ORCHESTRATOR API ---------------------- + +@app.route("/play", methods=["POST"]) +def play(): + data = request.json + media = data.get("media") + target = data.get("target") + + if not media or not target: + return jsonify({"error": "Missing 'media' or 'target'"}), 400 + + msg = { + "command": "PLAYBACK", + "media": media, + "resolved_url": data.get("url", media), + "player": data.get("player", "mpv"), + "options": data.get("options", ["--fs"]) + } + + topic = f"media/commands/{target}/playback" + client.publish(topic, json.dumps(msg)) + return jsonify({"status": "sent", "target": target}) + +@app.route("/stop", methods=["POST"]) +def stop(): + data = request.json + target = data.get("target") + + if not target: + return jsonify({"error": "Missing 'target'"}), 400 + + msg = {"command": "STOP"} + topic = f"media/commands/{target}/stop" + client.publish(topic, json.dumps(msg)) + return jsonify({"status": "sent", "target": target}) + +@app.route("/nodes", methods=["GET"]) +def get_nodes(): + return jsonify(nodes) + +@app.route("/") +def dashboard(): + html = ''' + + + Media Node Dashboard + + + + +

Active Nodes

+ + + {% for node, data in nodes.items() %} + + + + + + + + {% endfor %} +
NodeIPLocationCapabilitiesLast Seen
{{ node }}{{ data.ip }}{{ data.location }}{{ ', '.join(data.capabilities) }}{{ data.timestamp | int }}
+ + + ''' + return render_template_string(html, nodes=nodes) + +# ---------------------- MQTT ---------------------- + +def on_connect(client, userdata, flags, rc): + print("Connected to MQTT broker") + client.subscribe(COMMAND_TOPIC) + if ROLE == "orchestrator": + client.subscribe(HEARTBEAT_TOPIC) + +def on_message(client, userdata, msg): + topic = msg.topic + payload = msg.payload.decode() + + if ROLE == "orchestrator" and topic.startswith("media/heartbeat"): + data = json.loads(payload) + nodes[data["node"]] = data + + elif topic.startswith(f"media/commands/{NODE_NAME}/"): + try: + payload = json.loads(payload) + command = payload.get("command") + print(f"Received command: {command}") + if command == "PLAYBACK": + run_player(payload) + elif command == "STOP": + stop_player() + else: + publish_status("error", f"Unknown command: {command}") + except Exception as e: + publish_status("error", str(e)) + +# ---------------------- MAIN ---------------------- + +if __name__ == "__main__": + client.on_connect = on_connect + client.on_message = on_message + + # todo try catch + client.connect(MQTT_BROKER, MQTT_PORT, 60) + + threading.Thread(target=client.loop_forever, daemon=True).start() + + if ROLE == "agent": + threading.Thread(target=send_heartbeat, daemon=True).start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + stop_player() + print("Agent shutting down") + + elif ROLE == "orchestrator": + app.run(host="0.0.0.0", port=5000) diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..c632053 --- /dev/null +++ b/config.yaml @@ -0,0 +1,4 @@ +role: orchestrator +name: orchestrator1 +mqtt_broker: 192.168.1.70 +mqtt_port: 1883 diff --git a/livroom.config.json b/livroom.config.json new file mode 100644 index 0000000..bd5ed77 --- /dev/null +++ b/livroom.config.json @@ -0,0 +1,8 @@ +{ + "node_name": "livingroom", + "command_topic": "media/commands/livingroom/#", + "status_topic": "media/status/livingroom", + "heartbeat_topic": "media/heartbeat", + "capabilities": ["playback", "vlc", "mpv"], + "location": "Downstairs Living Room" +} diff --git a/myagent.py b/myagent.py new file mode 100644 index 0000000..1bc1585 --- /dev/null +++ b/myagent.py @@ -0,0 +1,215 @@ +# orchestrator_and_agent.py with Dashboard + +# to run: +# +# python media_node.py --role agent --name upstairs_pi +# python media_node.py --role orchestrator --name orchestrator01 + + +import json +import socket +import subprocess +import threading +import time +from flask import Flask, request, jsonify, render_template_string +import paho.mqtt.client as mqtt + +import argparse + +parser = argparse.ArgumentParser() +parser.add_argument("--role", choices=["agent", "orchestrator"], default="agent") +parser.add_argument("--name", default="unnamed_node") +args = parser.parse_args() + +NODE_NAME = args.name +is_orchestrator = args.role == "orchestrator" + + + +# Configuration +MQTT_BROKER = "192.168.1.70" +MQTT_PORT = 1883 + +config = json.loads( open('config.json','r').read()) + +# todo +config['node_name'] = NODE_NAME + +player_proc = None +nodes = {} + +app = Flask(__name__) + +client = mqtt.Client() + +# ---------------------- AGENT FUNCTIONS ---------------------- + +def run_player(data): + global player_proc + try: + if player_proc and player_proc.poll() is None: + player_proc.terminate() + player_proc.wait() + + cmd = [data.get("player", "mpv"), data.get("resolved_url") or data.get("media")] + cmd += data.get("options", []) + + print("Launching:", " ".join(cmd)) + player_proc = subprocess.Popen(cmd) + + publish_status("playing", data.get("media")) + except Exception as e: + publish_status("error", str(e)) + +def stop_player(): + global player_proc + if player_proc and player_proc.poll() is None: + player_proc.terminate() + player_proc.wait() + publish_status("stopped", None) + +def publish_status(state, media): + payload = json.dumps({"state": state, "media": media, "node": config['node_name']}) + client.publish(config['status_topic'], payload) + +def send_heartbeat(): + while True: + ip = socket.gethostbyname(socket.gethostname()) + msg = { + "node": config['node_name'], + "ip": ip, + "location": config['location'], + "capabilities": config['capabilities'], + "timestamp": time.time() + } + client.publish(config['heartbeat_topic'], json.dumps(msg)) + time.sleep(10) + +# ---------------------- ORCHESTRATOR API ---------------------- + +@app.route("/play", methods=["POST"]) +def play(): + data = request.json + media = data.get("media") + target = data.get("target") + + if not media or not target: + return jsonify({"error": "Missing 'media' or 'target'"}), 400 + + msg = { + "command": "PLAYBACK", + "media": media, + "resolved_url": data.get("url", media), + "player": data.get("player", "mpv"), + "options": data.get("options", ["--fs"]) + } + + topic = f"media/commands/{target}/playback" + client.publish(topic, json.dumps(msg)) + return jsonify({"status": "sent", "target": target}) + +@app.route("/stop", methods=["POST"]) +def stop(): + data = request.json + target = data.get("target") + + if not target: + return jsonify({"error": "Missing 'target'"}), 400 + + msg = {"command": "STOP"} + topic = f"media/commands/{target}/stop" + client.publish(topic, json.dumps(msg)) + return jsonify({"status": "sent", "target": target}) + +@app.route("/nodes", methods=["GET"]) +def get_nodes(): + return jsonify(nodes) + +@app.route("/") +def dashboard(): + html = ''' + + + Media Node Dashboard + + + + +

Active Nodes

+ + + {% for node, data in nodes.items() %} + + + + + + + + {% endfor %} +
NodeIPLocationCapabilitiesLast Seen
{{ node }}{{ data.ip }}{{ data.location }}{{ ', '.join(data.capabilities) }}{{ data.timestamp | int }}
+ + + ''' + return render_template_string(html, nodes=nodes) + +# ---------------------- MQTT ---------------------- + +def on_connect(client, userdata, flags, rc): + print("Connected to MQTT broker") + client.subscribe(config['command_topic']) + client.subscribe(config['heartbeat_topic']) + +def on_message(client, userdata, msg): + topic = msg.topic + payload = msg.payload.decode() + + if topic.startswith("media/heartbeat"): + data = json.loads(payload) + nodes[data["node"]] = data + elif topic.startswith(f"media/commands/{config['node_name']}/"): + try: + payload = json.loads(msg.payload.decode()) + command = payload.get("command") + print(f"Received command: {command}") + if command == "PLAYBACK": + run_player(payload) + elif command == "STOP": + stop_player() + else: + publish_status("error", f"Unknown command: {command}") + except Exception as e: + publish_status("error", str(e)) + +# ---------------------- MAIN ---------------------- + + +if is_orchestrator: + @app.route("/nodes") + def get_nodes(): ... + + if __name__ == "__main__": + client.on_connect = on_connect + client.on_message = on_message + client.connect(MQTT_BROKER, MQTT_PORT, 60) + + threading.Thread(target=client.loop_forever, daemon=True).start() + threading.Thread(target=send_heartbeat, daemon=True).start() + + try: + app.run(host="0.0.0.0", port=5000) + except KeyboardInterrupt: + stop_player() + print("Shutting down") + +else: + if __name__ == "__main__": + client.on_connect = on_connect + client.on_message = on_message + client.connect(MQTT_BROKER, MQTT_PORT, 60) + threading.Thread(target=client.loop_forever, daemon=True).start() + threading.Thread(target=send_heartbeat, daemon=True).start() + diff --git a/orch.config.json b/orch.config.json new file mode 100644 index 0000000..9b72d69 --- /dev/null +++ b/orch.config.json @@ -0,0 +1,8 @@ +{ + "node_name": "orchestrator", + "command_topic": "media/commands/orchestrator/#", + "status_topic": "media/status/orchestrator", + "heartbeat_topic": "media/orchestrator", + "capabilities": ["orchestrator"], + "location": "server closet" +}