diff --git a/docs/skeleton-gap-analysis.md b/docs/skeleton-gap-analysis.md new file mode 100644 index 0000000..ab69822 --- /dev/null +++ b/docs/skeleton-gap-analysis.md @@ -0,0 +1,101 @@ +# Salior Skeleton — Gap Analysis vs. Plan + +## Plan v/s Implementation + +### ✅ Implemented + +| Plan Item | File | Status | +|-----------|------|--------| +| Config from env | `salior/core/config.py` | ✅ | +| structlog logging | `salior/core/logging.py` | ✅ | +| Long-term memory | `salior/core/memory.py` | ✅ | +| Base Agent class | `salior/core/agent.py` | ✅ | +| data_agent (HL WS) | `salior/agents/data/agent.py` | ✅ | +| signal_agent (regime) | `salior/agents/signal/agent.py` | ✅ | +| exec_agent (CLOB) | `salior/agents/exec/agent.py` | ✅ Stub — no real signing | +| risk_agent | `salior/agents/risk/agent.py` | ✅ | +| Schema (8 hypertables) | `salior/db/schema.sql` | ✅ | +| TimescaleDB client | `salior/db/timescale_client.py` | ✅ | +| Supabase client | `salior/db/supabase_client.py` | ✅ | +| LLM client | `salior/llm/client.py` | ✅ | +| 6 skills | `salior/skills/*.md` | ✅ | +| MCP server | `salior/mcp/server.py` | ✅ | +| Plugin registry | `salior/plugins/__init__.py` | ✅ | +| 4 built-in plugins | `plugins/*/` | ✅ | +| Wallet connect | `salior/wallet/connect.py` | ✅ | +| Dashboard | `salior/dashboard/` | ✅ | +| Compute node manager | `salior/compute/node_manager.py` | ✅ | +| Plugin deploy | `salior/compute/deploy.py` | ✅ | +| Daemon class | `salior/daemon.py` | ✅ No CLI integration | + +### ❌ Missing / Incomplete + +| Item | Priority | Why | +|------|----------|-----| +| **HL real wallet signing** | CRITICAL | exec_agent is a stub — can't place real orders | +| **Hooks event system** | HIGH | No on_signal / on_fill / on_error events | +| **Agent scheduler** | HIGH | No cron-like scheduling per agent | +| **Swarm/coordinator** | MEDIUM | No multi-agent orchestration | +| **Signal planner/validator** | MEDIUM | signal_agent is a single LLM call, no self-validation | +| **Telegram bot** | MEDIUM | No Telegram alerts or commands | +| **Daemon CLI** | MEDIUM | `daemon start/stop/status` not in cli.py | +| **Compute `add/remove` CLI** | MEDIUM | Can't register nodes via CLI | +| **`nodes.yaml` seeded** | LOW | VPS3 not pre-registered in node manager | +| **`compute/status.py`** | LOW | No per-node plugin status | +| **`hooks/` directory** | HIGH | Event hooks system entirely missing | +| **wallet/vault.py** | LOW | Renamed to connect.py (minor) | +| **Dashboard: real PnL data** | LOW | Performance endpoint is placeholder | +| **Dashboard: order signing flow** | LOW | Order requires wallet popup, backend not wired for tx signing | +| **Agent `__pycache__` in gitignore** | LOW | Some leaked into last commit | + +--- + +## Priority Build Order + +### 1. Hooks event system (HIGH) +on_signal → trigger other agents or webhooks +on_fill → update portfolio, notify +on_error → alert, pause agent +on_schedule → cron-like scheduling per agent + +### 2. Agent scheduler (HIGH) +Decouple loop intervals per agent +data: continuous (WS-based, no loop) +signal: 60s +exec: 300s +risk: 30s + +### 3. Telegram bot (MEDIUM) +`salior telegram serve` — bot commands + alerts + +### 4. Daemon CLI integration (MEDIUM) +`salior daemon start/stop/status` + +### 5. Compute add/remove node CLI (MEDIUM) + +### 6. VPS3 pre-seeded in nodes.yaml (LOW) +Auto-discover local node on install + +### 7. Signal planner + validator (MEDIUM) +Decompose regime detection, confidence scoring + +--- + +## Skeleton Completeness Score + +| Area | Score | Notes | +|------|-------|-------| +| Core | 8/10 | Missing scheduler, swarm, hooks | +| Agents | 7/10 | 4/5 built; exec signing stub | +| Database | 10/10 | Full schema + both clients | +| LLM | 10/10 | Routing + batch | +| Skills | 10/10 | 6/6 built | +| MCP | 10/10 | 5 tools | +| Dashboard | 8/10 | UI complete; order signing not wired | +| Compute | 8/10 | Node manager + deploy; no status.py | +| Plugins | 10/10 | 4/4 built | +| Wallet | 9/10 | EIP-4361 done; tx signing not wired | +| Daemon | 6/10 | Class done; no CLI integration | +| **Total** | **~85%** | | + +**Remaining ~15%**: hooks system, scheduler, telegram bot, daemon CLI, node add/remove CLI, signal planner. \ No newline at end of file diff --git a/salior/cli.py b/salior/cli.py index f3e595a..7e3a798 100644 --- a/salior/cli.py +++ b/salior/cli.py @@ -2,6 +2,8 @@ from __future__ import annotations import asyncio +import os +import sys import click from pathlib import Path @@ -17,8 +19,11 @@ def main() -> None: pass +# ─── Status ─────────────────────────────────────────────────────────────────── + @main.command() -def status() -> None: +@click.option("--verbose", is_flag=True, help="Show nodes + plugins too") +def status(verbose: bool) -> None: """Check system status.""" click.echo("=== Salior Status ===") click.echo(f"Host: {config.host}:{config.port}") @@ -28,6 +33,22 @@ def status() -> None: click.echo(f"TimescaleDB: {config.timeseries_host}:{config.timeseries_port}/{config.timeseries_db}") click.echo(f"Supabase: {config.supabase_url}") + if verbose: + from salior.compute import NodeManager, full_status + mgr = NodeManager() + nodes = mgr.list() + click.echo(f"\n=== Nodes ({len(nodes)}) ===") + for n in nodes: + gpu = "🖥️" if n["gpu"] else "💻" + click.echo(f"{gpu} {n['name']}: {n['user']}@{n['host']}:{n['port']}") + + st = full_status() + click.echo(f"\n=== Plugins ({len(st['plugins'])}) ===") + for p in st["plugins"]: + click.echo(f" {'✅' if p['enabled'] else '❌'} {p['name']}") + + +# ─── Database ───────────────────────────────────────────────────────────────── @main.command() def db_init() -> None: @@ -47,8 +68,16 @@ def db_init() -> None: asyncio.run(run()) -@main.command() -def agent_start() -> None: +# ─── Agents ─────────────────────────────────────────────────────────────────── + +@main.group() +def agent() -> None: + """Agent management commands.""" + pass + + +@agent.command() +def start() -> None: """Start all agents (data + signal + exec + risk).""" click.echo("Starting agents...") @@ -76,8 +105,25 @@ def agent_start() -> None: click.echo("Agents stopped.") -@main.command() -def dashboard_serve() -> None: +@agent.command() +def list() -> None: + """List all known agents.""" + agents = ["data_agent", "signal_agent", "exec_agent", "risk_agent"] + click.echo("=== Agents ===") + for a in agents: + click.echo(f" {a}") + + +# ─── Dashboard ──────────────────────────────────────────────────────────────── + +@main.group() +def dashboard() -> None: + """Dashboard commands.""" + pass + + +@dashboard.command() +def serve() -> None: """Start the web dashboard.""" from salior.dashboard.server import create_app from aiohttp import web @@ -96,8 +142,16 @@ def dashboard_serve() -> None: asyncio.run(run()) -@main.command() -def mcp_serve() -> None: +# ─── MCP ────────────────────────────────────────────────────────────────────── + +@main.group() +def mcp() -> None: + """MCP server commands.""" + pass + + +@mcp.command() +def serve() -> None: """Start the MCP server.""" from salior.mcp.server import MCPServer @@ -111,7 +165,215 @@ def mcp_serve() -> None: asyncio.run(run()) -@main.command() +# ─── Daemon ─────────────────────────────────────────────────────────────────── + +@main.group() +def daemon() -> None: + """Daemon management (PID file + background).""" + pass + + +@daemon.command() +@click.option("--detach", is_flag=True, help="Fork and run in background") +def start(detach: bool) -> None: + """Start the salior daemon in background.""" + from salior.daemon import Daemon + + d = Daemon("salior") + if d.is_running(): + click.echo(f"Daemon already running (PID {d.pid()})") + return + + if detach: + pid = os.fork() + if pid == 0: + # Child — become session leader, redirect output + os.setsid() + sys.stdout.flush() + sys.stderr.flush() + with open("/dev/null", "r") as devnull: + os.dup2(devnull.fileno(), 0) + with open("/tmp/salior_daemon.log", "a") as logfile: + os.dup2(logfile.fileno(), 1) + os.dup2(logfile.fileno(), 2) + + async def run() -> None: + from salior.daemon import Daemon + d = Daemon("salior") + await d.start(asyncio.sleep(1e9)) # Sleep forever until stopped + + click.echo(f"Daemon started (PID {os.getpid()})") + asyncio.run(run()) + + +@daemon.command() +def stop() -> None: + """Stop the running daemon.""" + from salior.daemon import Daemon + + d = Daemon("salior") + pid = d.pid() + if not pid or not d.is_running(): + click.echo("Daemon not running") + return + + os.kill(pid, 15) # SIGTERM + click.echo(f"Daemon {pid} stopped") + + +@daemon.command() +def status() -> None: + """Show daemon PID and running state.""" + from salior.daemon import Daemon + + d = Daemon("salior") + if d.is_running(): + click.echo(f"Daemon running — PID {d.pid()}") + else: + click.echo("Daemon not running") + + +# ─── Telegram ───────────────────────────────────────────────────────────────── + +@main.group() +def telegram() -> None: + """Telegram bot commands.""" + pass + + +@telegram.command() +def serve() -> None: + """Start the Telegram bot.""" + from salior.telegram_bot import TelegramBot + + token = os.getenv("TELEGRAM_BOT_TOKEN", "") + if not token: + click.echo("Error: TELEGRAM_BOT_TOKEN not set") + return + + bot = TelegramBot(token) + + async def run() -> None: + await bot.start() + await asyncio.Event().wait() + + click.echo("Telegram bot started") + asyncio.run(run()) + + +# ─── Compute ────────────────────────────────────────────────────────────────── + +@main.group() +def compute() -> None: + """Compute node management.""" + pass + + +@compute.command("list") +def compute_list() -> None: + """List registered compute nodes.""" + from salior.compute import NodeManager + + mgr = NodeManager() + nodes = mgr.list() + if not nodes: + click.echo("No nodes registered. Add one: salior compute add [options]") + return + click.echo(f"=== Nodes ({len(nodes)}) ===") + for n in nodes: + gpu = "🖥️" if n["gpu"] else "💻" + click.echo(f"{gpu} {n['name']}: {n['user']}@{n['host']}:{n['port']}") + + +@compute.command("add") +@click.argument("name") +@click.argument("host") +@click.option("--port", "-p", default=22, help="SSH port") +@click.option("--user", "-u", default="root", help="SSH user") +@click.option("--gpu", is_flag=True, help="Node has GPU") +@click.option("--gpu-mem", default=0, help="GPU memory in GB") +@click.option("--labels", default="", help="Comma-separated labels") +@click.option("--ssh-key", help="Path to SSH private key") +def compute_add(name: str, host: str, port: int, user: str, gpu: bool, gpu_mem: int, labels: str, ssh_key: str) -> None: + """Register a new compute node.""" + from salior.compute import NodeManager, Node + + mgr = NodeManager() + node = Node( + name=name, + host=host, + port=port, + user=user, + gpu=gpu, + gpu_memory_gb=gpu_mem, + labels=[l for l in labels.split(",") if l], + ssh_key_path=ssh_key, + ) + mgr.add(node) + click.echo(f"Node '{name}' added: {user}@{host}:{port} (gpu={gpu})") + + +@compute.command("remove") +@click.argument("name") +def compute_remove(name: str) -> None: + """Remove a compute node.""" + from salior.compute import NodeManager + + mgr = NodeManager() + node = mgr.get(name) + if not node: + click.echo(f"Node '{name}' not found") + return + mgr._nodes.pop(name) + mgr.save() + click.echo(f"Node '{name}' removed") + + +@compute.command("ping") +@click.argument("name") +def compute_ping(name: str) -> None: + """Ping a compute node.""" + from salior.compute import NodeManager + + async def run() -> None: + mgr = NodeManager() + ok = await mgr.ping(name) + if ok: + click.echo(f"Node '{name}': ✅ reachable") + else: + click.echo(f"Node '{name}': ❌ unreachable") + + asyncio.run(run()) + + +@compute.command("deploy") +@click.argument("plugin") +@click.argument("target") +def compute_deploy(plugin: str, target: str) -> None: + """Deploy a plugin to a target node.""" + from salior.compute import deploy_plugin + + async def run() -> None: + result = await deploy_plugin(plugin, target) + if result["status"] == "deployed": + click.echo(f"✅ Plugin '{plugin}' deployed to '{target}'") + elif result["status"] == "ok": + click.echo(f"ℹ️ Plugin '{plugin}' already local") + else: + click.echo(f"❌ Deploy failed: {result['message']}") + + asyncio.run(run()) + + +# ─── Plugins ─────────────────────────────────────────────────────────────────── + +@main.group() +def plugin() -> None: + """Plugin management commands.""" + pass + + +@plugin.command("list") def plugin_list() -> None: """List available plugins.""" from salior.plugins import registry @@ -123,7 +385,35 @@ def plugin_list() -> None: click.echo(f"{status} {p['name']}: {p['description']}") -@main.command() +@plugin.command("enable") +@click.argument("name") +def plugin_enable(name: str) -> None: + """Enable a plugin.""" + from salior.plugins import registry + registry.discover() + registry.enable(name) + click.echo(f"Plugin '{name}' enabled") + + +@plugin.command("disable") +@click.argument("name") +def plugin_disable(name: str) -> None: + """Disable a plugin.""" + from salior.plugins import registry + registry.discover() + registry.disable(name) + click.echo(f"Plugin '{name}' disabled") + + +# ─── Skills ──────────────────────────────────────────────────────────────────── + +@main.group() +def skill() -> None: + """Skill management commands.""" + pass + + +@skill.command("list") def skill_list() -> None: """List available skills.""" from salior.skills import SkillRegistry @@ -135,20 +425,55 @@ def skill_list() -> None: click.echo(f" {name}") -@main.command() -def compute_status() -> None: - """List registered compute nodes.""" - from salior.compute import NodeManager +@skill.command("show") +@click.argument("name") +def skill_show(name: str) -> None: + """Show the content of a skill.""" + from salior.skills import SkillRegistry - mgr = NodeManager() - nodes = mgr.list() - if not nodes: - click.echo("No nodes registered. Add one with: salior compute add ") + reg = SkillRegistry() + content = reg.render(name) + click.echo(content) + + +# ─── Hooks ───────────────────────────────────────────────────────────────────── + +@main.group() +def hook() -> None: + """Hook event management.""" + pass + + +@hook.command("list") +def hook_list() -> None: + """List registered hooks.""" + from salior.hooks import global_hooks + + hooks = global_hooks.list() + if not hooks: + click.echo("No hooks registered") return - click.echo(f"=== Nodes ({len(nodes)}) ===") - for n in nodes: - gpu = "🖥️" if n["gpu"] else "💻" - click.echo(f"{gpu} {n['name']}: {n['user']}@{n['host']}:{n['port']}") + click.echo(f"=== Hooks ({len(hooks)}) ===") + for name, count in hooks.items(): + click.echo(f" {name}: {count} handler(s)") + + +@hook.command("fire") +@click.argument("event_name") +def hook_fire(event_name: str) -> None: + """Manually fire a hook event (for testing).""" + from salior.hooks import global_hooks + from salior.hooks.registry import HookEvent + + async def run() -> None: + await global_hooks.emit(HookEvent( + name=event_name, + source="cli", + data={"triggered_by": "salior hook fire"}, + )) + click.echo(f"Hook '{event_name}' fired") + + asyncio.run(run()) if __name__ == "__main__": diff --git a/salior/compute/__init__.py b/salior/compute/__init__.py index 0f06a80..d3204b0 100644 --- a/salior/compute/__init__.py +++ b/salior/compute/__init__.py @@ -1,5 +1,6 @@ """Compute module — plugin deployment orchestration.""" -from salior.compute.node_manager import NodeManager -from salior.compute.deploy import deploy_plugin +from salior.compute.node_manager import Node, NodeManager +from salior.compute.deploy import deploy_plugin, status_plugin +from salior.compute.status import full_status, node_status -__all__ = ["NodeManager", "deploy_plugin"] \ No newline at end of file +__all__ = ["Node", "NodeManager", "deploy_plugin", "status_plugin", "full_status", "node_status"] \ No newline at end of file diff --git a/salior/compute/status.py b/salior/compute/status.py new file mode 100644 index 0000000..4a9b2d9 --- /dev/null +++ b/salior/compute/status.py @@ -0,0 +1,44 @@ +"""Compute node and plugin status.""" +from __future__ import annotations + +from salior.compute.node_manager import NodeManager +from salior.plugins import registry + +from salior.core.logging import setup_logging + +log = setup_logging() + + +def full_status() -> dict: + """Return full system status for `salior status --verbose`.""" + mgr = NodeManager() + nodes = mgr.list() + plugins = registry.list() + + return { + "nodes": nodes, + "plugins": plugins, + "total_nodes": len(nodes), + "total_plugins": len(plugins), + } + + +def node_status(name: str) -> dict: + """Return detailed status for a specific node.""" + mgr = NodeManager() + node = mgr.get(name) + + if not node: + return {"error": f"Node '{name}' not found"} + + return { + "node": { + "name": node.name, + "host": node.host, + "port": node.port, + "user": node.user, + "gpu": node.gpu, + "gpu_memory_gb": node.gpu_memory_gb, + "labels": node.labels, + }, + } \ No newline at end of file diff --git a/salior/hooks/__init__.py b/salior/hooks/__init__.py new file mode 100644 index 0000000..5634849 --- /dev/null +++ b/salior/hooks/__init__.py @@ -0,0 +1,4 @@ +"""Hooks module — event system for agent communication.""" +from salior.hooks.registry import HookRegistry, global_hooks + +__all__ = ["HookRegistry", "global_hooks"] \ No newline at end of file diff --git a/salior/hooks/registry.py b/salior/hooks/registry.py new file mode 100644 index 0000000..05b9d8d --- /dev/null +++ b/salior/hooks/registry.py @@ -0,0 +1,144 @@ +"""Hook event system — emit events, register handlers, dispatch callbacks.""" +from __future__ import annotations + +import asyncio +import fnmatch +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Awaitable, Callable + +from salior.core.logging import setup_logging + +log = setup_logging() + + +@dataclass +class HookEvent: + """A hook event with metadata.""" + name: str # e.g. "on_signal", "on_fill", "on_error" + data: dict # Event payload + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + source: str = "" # Which agent emitted this + + +HookHandler = Callable[[HookEvent], Awaitable[None]] | Callable[[HookEvent], None] + + +class HookRegistry: + """Register and dispatch hook handlers.""" + + def __init__(self) -> None: + self._handlers: dict[str, list[HookHandler]] = {} + + def on(self, event_name: str, handler: HookHandler) -> None: + """Register a handler for an event.""" + if event_name not in self._handlers: + self._handlers[event_name] = [] + # Prevent duplicate registration + if handler not in self._handlers[event_name]: + self._handlers[event_name].append(handler) + log.debug("hook_registered", event=event_name, handler=handler.__name__) + + def off(self, event_name: str, handler: HookHandler) -> None: + """Unregister a handler.""" + if event_name in self._handlers: + try: + self._handlers[event_name].remove(handler) + log.debug("hook_unregistered", event=event_name, handler=handler.__name__) + except ValueError: + pass + + async def emit(self, event: HookEvent) -> None: + """Fire all handlers for an event, async-safe.""" + handlers = self._handlers.get(event.name, []) + if not handlers: + return + + log.debug("hook_fired", event=event.name, count=len(handlers), data=event.data) + + for handler in handlers: + try: + if asyncio.iscoroutinefunction(handler): + await handler(event) + else: + handler(event) + except Exception as e: + log.error("hook_handler_error", event=event.name, handler=handler.__name__, error=str(e)) + + def list(self) -> dict[str, int]: + """List registered hooks with handler counts.""" + return {name: len(handlers) for name, handlers in self._handlers.items()} + + +# Global registry — shared across all agents +global_hooks = HookRegistry() + + +# ─── Built-in hook events ───────────────────────────────────────────────────── + +async def on_signal(coin: str, regime: str, conviction: float, reasoning: str) -> None: + """Emit a signal event.""" + await global_hooks.emit(HookEvent( + name="on_signal", + source="signal_agent", + data={"coin": coin, "regime": regime, "conviction": conviction, "reasoning": reasoning}, + )) + + +async def on_fill( + coin: str, + side: str, + size: float, + price: float, + exec_id: str, + mode: str, +) -> None: + """Emit a fill event when an order fills.""" + await global_hooks.emit(HookEvent( + name="on_fill", + source="exec_agent", + data={"coin": coin, "side": side, "size": size, "price": price, "exec_id": exec_id, "mode": mode}, + )) + + +async def on_execution( + coin: str, + side: str, + size: float, + price: float, + status: str, + error: str | None = None, +) -> None: + """Emit an execution event (placed, filled, cancelled, failed).""" + await global_hooks.emit(HookEvent( + name="on_execution", + source="exec_agent", + data={"coin": coin, "side": side, "size": size, "price": price, "status": status, "error": error}, + )) + + +async def on_error(agent: str, error: str, details: dict | None = None) -> None: + """Emit an error event.""" + await global_hooks.emit(HookEvent( + name="on_error", + source=agent, + data={"agent": agent, "error": error, "details": details or {}}, + )) + + +async def on_risk_breach(reason: str, details: dict) -> None: + """Emit a risk breach event.""" + await global_hooks.emit(HookEvent( + name="on_risk_breach", + source="risk_agent", + data={"reason": reason, "details": details}, + )) + + +async def on_agent_health(agent: str, status: str, iteration: int) -> None: + """Emit a health heartbeat.""" + await global_hooks.emit(HookEvent( + name="on_agent_health", + source=agent, + data={"agent": agent, "status": status, "iteration": iteration}, + )) \ No newline at end of file diff --git a/salior/scheduler.py b/salior/scheduler.py new file mode 100644 index 0000000..12a4902 --- /dev/null +++ b/salior/scheduler.py @@ -0,0 +1,87 @@ +"""Scheduler — decoupled per-agent loop timing.""" +from __future__ import annotations + +import asyncio +from typing import Callable, Awaitable + +from salior.core.logging import setup_logging + +log = setup_logging() + + +class IntervalTask: + """A task that runs on a fixed interval.""" + + def __init__( + self, + name: str, + coro: Callable[[], Awaitable[None]], + interval: float, + ) -> None: + self.name = name + self._coro = coro + self.interval = interval + self._task: asyncio.Task | None = None + self._stopping = False + self._log = log.bind(task=name) + + async def start(self) -> None: + self._stopping = False + self._task = asyncio.create_task(self._run()) + self._log.info("scheduler_task_started", interval=self.interval) + + async def stop(self) -> None: + self._stopping = True + if self._task: + self._task.cancel() + try: + await asyncio.wait_for(self._task, timeout=5.0) + except asyncio.CancelledError: + pass + self._log.info("scheduler_task_stopped") + + async def _run(self) -> None: + """Run the task on the given interval, with initial delay.""" + while not self._stopping: + try: + await asyncio.wait_for(self._coro(), timeout=self.interval * 2) + except asyncio.TimeoutError: + self._log.warning("task_timeout", interval=self.interval) + except Exception as e: + self._log.error("task_error", error=str(e)) + + if not self._stopping: + await asyncio.sleep(self.interval) + + +class Scheduler: + """Run multiple tasks on independent intervals.""" + + def __init__(self) -> None: + self._tasks: dict[str, IntervalTask] = {} + self._running = False + + def schedule(self, name: str, coro: Callable[[], Awaitable[None]], interval: float) -> None: + """Register a task to run on an interval.""" + self._tasks[name] = IntervalTask(name, coro, interval) + self._log = log.bind(scheduler=True) + + async def start(self) -> None: + """Start all scheduled tasks.""" + self._running = True + self._log.info("scheduler_started", tasks=list(self._tasks.keys())) + await asyncio.gather(*[t.start() for t in self._tasks.values()]) + + async def stop(self) -> None: + """Stop all scheduled tasks gracefully.""" + self._running = False + self._log.info("scheduler_stopping", tasks=list(self._tasks.keys())) + await asyncio.gather(*[t.stop() for t in self._tasks.values()]) + self._log.info("scheduler_stopped") + + def task_status(self) -> dict[str, dict]: + """Return status of all tasks.""" + return { + name: {"interval": task.interval, "running": not task._stopping} + for name, task in self._tasks.items() + } \ No newline at end of file diff --git a/salior/telegram_bot.py b/salior/telegram_bot.py new file mode 100644 index 0000000..32a0a2d --- /dev/null +++ b/salior/telegram_bot.py @@ -0,0 +1,239 @@ +"""Telegram bot — alerts + commands.""" +from __future__ import annotations + +import asyncio +import os +from typing import Optional + +import httpx +from aiohttp import web + +from salior.core.config import config +from salior.core.logging import setup_logging +from salior.hooks import global_hooks +from salior.hooks.registry import HookEvent + +log = setup_logging() + +BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") + + +class TelegramBot: + """Telegram bot for alerts and commands. + + Commands: + /start — Welcome message + /status — Agent health + portfolio summary + /signals — Recent conviction signals + /pnl — Performance summary + /help — Show commands + """ + + def __init__(self, token: Optional[str] = None) -> None: + self.token = token or BOT_TOKEN + self.api_url = f"https://api.telegram.org/bot{self.token}" + self._offset = 0 + self._running = False + + async def send(self, chat_id: int, text: str, parse_mode: str = "HTML") -> None: + """Send a message to a chat.""" + if not self.token: + return + async with httpx.AsyncClient() as client: + await client.post( + f"{self.api_url}/sendMessage", + json={"chat_id": chat_id, "text": text, "parse_mode": parse_mode}, + timeout=10.0, + ) + + async def send_alert(self, text: str) -> None: + """Send an alert to configured chat (broadcasts to all known chats).""" + for chat_id in self._get_broadcast_chats(): + try: + await self.send(chat_id, text) + except Exception as e: + log.error("telegram_send_error", error=str(e)) + + def _get_broadcast_chats(self) -> list[int]: + """Return list of chat IDs to broadcast to. Stored in ~/.salior/telegram_chats.txt.""" + path = os.path.expanduser("~/.salior/telegram_chats.txt") + if not os.path.exists(path): + return [] + return [int(line.strip()) for line in open(path).readlines() if line.strip()] + + async def handle_update(self, update: dict) -> Optional[str]: + """Handle an incoming Telegram update, return command response or None.""" + if "message" not in update: + return None + + msg = update["message"] + chat_id = msg["chat"]["id"] + text = msg.get("text", "") + user = msg["from"].get("first_name", "trader") + + if text == "/start": + return f"👋 Welcome to Salior, {user}.\n\nI send alerts when signals fire and orders fill.\n\nCommands:\n/status — Agent health\n/signals — Recent signals\n/pnl — Performance\n/help — This message" + + if text == "/help": + return ( + "📋 Salior Commands\n\n" + "/start — Welcome\n" + "/status — Agent health + portfolio\n" + "/signals — Recent conviction signals\n" + "/pnl — Performance summary\n" + "/help — This message" + ) + + if text == "/status": + return await self._cmd_status() + + if text == "/signals": + return await self._cmd_signals() + + if text == "/pnl": + return await self._cmd_pnl() + + return None + + async def _cmd_status(self) -> str: + """Return agent + portfolio status.""" + from salior.db.supabase_client import SupabaseClient + from salior.db.timescale_client import TimescaleDB + + supabase = SupabaseClient() + db = TimescaleDB() + await db.connect() + + # Agent health + health = await db.fetch( + "SELECT agent, status, heartbeat FROM agent_health ORDER BY heartbeat DESC LIMIT 5" + ) + + # Portfolio + portfolio = await supabase.get_portfolio() + + lines = ["🤖 Agent Status"] + for h in (health or [])[:4]: + dot = "🟢" if h["status"] == "running" else "🔴" + lines.append(f"{dot} {h['agent']}: {h['status']}") + + lines.append("\n💼 Portfolio") + if portfolio: + for pos in portfolio: + pnl = pos.get("unrealized_pnl", 0) + pnl_str = f"+${pnl:.2f}" if pnl >= 0 else f"-${abs(pnl):.2f}" + lines.append(f"{pos['coin']}: {pos['pos_size']} pos | {pnl_str}") + else: + lines.append("No positions") + + return "\n".join(lines) + + async def _cmd_signals(self) -> str: + """Return recent signals.""" + from salior.db.supabase_client import SupabaseClient + + supabase = SupabaseClient() + signals = await supabase.get_recent_signals(limit=5) + + if not signals: + return "📊 No recent signals" + + lines = ["📊 Recent Signals"] + for sig in signals: + c = sig["conviction"] + arrow = "🟢" if c >= 0 else "🔴" + lines.append(f"{arrow} {sig['coin']} | {sig['regime']} | {c:+.2f}") + + return "\n".join(lines) + + async def _cmd_pnl(self) -> str: + """Return performance summary.""" + return "📈 PnL: Connect TimescaleDB performance table for full data.\n\nCurrently placeholder." + + async def poll_loop(self) -> None: + """Poll Telegram for updates.""" + if not self.token: + log.warning("telegram_bot_no_token") + return + + self._running = True + log.info("telegram_bot_started") + + while self._running: + try: + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{self.api_url}/getUpdates", + params={"offset": self._offset, "timeout": 30}, + timeout=35.0, + ) + data = resp.json() + + if not data.get("ok"): + await asyncio.sleep(5) + continue + + for update in data.get("result", []): + self._offset = update["update_id"] + 1 + response = await self.handle_update(update) + if response and "message" in update: + chat_id = update["message"]["chat"]["id"] + await self.send(chat_id, response) + + except asyncio.TimeoutError: + pass + except Exception as e: + log.error("telegram_poll_error", error=str(e)) + await asyncio.sleep(5) + + async def start(self) -> None: + """Start the bot poll loop.""" + asyncio.create_task(self.poll_loop()) + # Register hook listeners + from salior.hooks import global_hooks + + async def on_fill_handler(event: HookEvent) -> None: + d = event.data + await self.send_alert( + f"✅ Order Filled\n" + f"{d['side'].upper()} {d['size']} {d['coin']} @ ${d['price']}\n" + f"Mode: {d['mode']}" + ) + + async def on_risk_handler(event: HookEvent) -> None: + await self.send_alert(f"⚠️ Risk Breach\n{event.data['reason']}") + + async def on_error_handler(event: HookEvent) -> None: + await self.send_alert(f"🔴 Agent Error\n{event.data['agent']}: {event.data['error']}") + + global_hooks.on("on_fill", on_fill_handler) + global_hooks.on("on_risk_breach", on_risk_handler) + global_hooks.on("on_error", on_error_handler) + + +# Hook emitter helpers — call these from agents +async def emit_signal(coin: str, regime: str, conviction: float, reasoning: str) -> None: + from salior.hooks import global_hooks + await global_hooks.emit(HookEvent( + name="on_signal", + source="signal_agent", + data={"coin": coin, "regime": regime, "conviction": conviction, "reasoning": reasoning}, + )) + + +async def emit_fill(coin: str, side: str, size: float, price: float, exec_id: str, mode: str) -> None: + from salior.hooks import global_hooks + await global_hooks.emit(HookEvent( + name="on_fill", + source="exec_agent", + data={"coin": coin, "side": side, "size": size, "price": price, "exec_id": exec_id, "mode": mode}, + )) + + +async def emit_risk_breach(reason: str, details: dict) -> None: + from salior.hooks import global_hooks + await global_hooks.emit(HookEvent( + name="on_risk_breach", + source="risk_agent", + data={"reason": reason, "details": details}, + )) \ No newline at end of file