Add: hooks event system, scheduler, Telegram bot, compute status, daemon/telegram CLI groups

This commit is contained in:
Hermes 2026-05-11 10:50:58 +00:00
parent 78710620d3
commit f4947da143
8 changed files with 969 additions and 24 deletions

View File

@ -0,0 +1,101 @@
# Salior Skeleton — Gap Analysis vs. Plan
## Plan v/s Implementation
### ✅ Implemented
| Plan Item | File | Status |
|-----------|------|--------|
| Config from env | `salior/core/config.py` | ✅ |
| structlog logging | `salior/core/logging.py` | ✅ |
| Long-term memory | `salior/core/memory.py` | ✅ |
| Base Agent class | `salior/core/agent.py` | ✅ |
| data_agent (HL WS) | `salior/agents/data/agent.py` | ✅ |
| signal_agent (regime) | `salior/agents/signal/agent.py` | ✅ |
| exec_agent (CLOB) | `salior/agents/exec/agent.py` | ✅ Stub — no real signing |
| risk_agent | `salior/agents/risk/agent.py` | ✅ |
| Schema (8 hypertables) | `salior/db/schema.sql` | ✅ |
| TimescaleDB client | `salior/db/timescale_client.py` | ✅ |
| Supabase client | `salior/db/supabase_client.py` | ✅ |
| LLM client | `salior/llm/client.py` | ✅ |
| 6 skills | `salior/skills/*.md` | ✅ |
| MCP server | `salior/mcp/server.py` | ✅ |
| Plugin registry | `salior/plugins/__init__.py` | ✅ |
| 4 built-in plugins | `plugins/*/` | ✅ |
| Wallet connect | `salior/wallet/connect.py` | ✅ |
| Dashboard | `salior/dashboard/` | ✅ |
| Compute node manager | `salior/compute/node_manager.py` | ✅ |
| Plugin deploy | `salior/compute/deploy.py` | ✅ |
| Daemon class | `salior/daemon.py` | ✅ No CLI integration |
### ❌ Missing / Incomplete
| Item | Priority | Why |
|------|----------|-----|
| **HL real wallet signing** | CRITICAL | exec_agent is a stub — can't place real orders |
| **Hooks event system** | HIGH | No on_signal / on_fill / on_error events |
| **Agent scheduler** | HIGH | No cron-like scheduling per agent |
| **Swarm/coordinator** | MEDIUM | No multi-agent orchestration |
| **Signal planner/validator** | MEDIUM | signal_agent is a single LLM call, no self-validation |
| **Telegram bot** | MEDIUM | No Telegram alerts or commands |
| **Daemon CLI** | MEDIUM | `daemon start/stop/status` not in cli.py |
| **Compute `add/remove` CLI** | MEDIUM | Can't register nodes via CLI |
| **`nodes.yaml` seeded** | LOW | VPS3 not pre-registered in node manager |
| **`compute/status.py`** | LOW | No per-node plugin status |
| **`hooks/` directory** | HIGH | Event hooks system entirely missing |
| **wallet/vault.py** | LOW | Renamed to connect.py (minor) |
| **Dashboard: real PnL data** | LOW | Performance endpoint is placeholder |
| **Dashboard: order signing flow** | LOW | Order requires wallet popup, backend not wired for tx signing |
| **Agent `__pycache__` in gitignore** | LOW | Some leaked into last commit |
---
## Priority Build Order
### 1. Hooks event system (HIGH)
on_signal → trigger other agents or webhooks
on_fill → update portfolio, notify
on_error → alert, pause agent
on_schedule → cron-like scheduling per agent
### 2. Agent scheduler (HIGH)
Decouple loop intervals per agent
data: continuous (WS-based, no loop)
signal: 60s
exec: 300s
risk: 30s
### 3. Telegram bot (MEDIUM)
`salior telegram serve` — bot commands + alerts
### 4. Daemon CLI integration (MEDIUM)
`salior daemon start/stop/status`
### 5. Compute add/remove node CLI (MEDIUM)
### 6. VPS3 pre-seeded in nodes.yaml (LOW)
Auto-discover local node on install
### 7. Signal planner + validator (MEDIUM)
Decompose regime detection, confidence scoring
---
## Skeleton Completeness Score
| Area | Score | Notes |
|------|-------|-------|
| Core | 8/10 | Missing scheduler, swarm, hooks |
| Agents | 7/10 | 4/5 built; exec signing stub |
| Database | 10/10 | Full schema + both clients |
| LLM | 10/10 | Routing + batch |
| Skills | 10/10 | 6/6 built |
| MCP | 10/10 | 5 tools |
| Dashboard | 8/10 | UI complete; order signing not wired |
| Compute | 8/10 | Node manager + deploy; no status.py |
| Plugins | 10/10 | 4/4 built |
| Wallet | 9/10 | EIP-4361 done; tx signing not wired |
| Daemon | 6/10 | Class done; no CLI integration |
| **Total** | **~85%** | |
**Remaining ~15%**: hooks system, scheduler, telegram bot, daemon CLI, node add/remove CLI, signal planner.

View File

@ -2,6 +2,8 @@
from __future__ import annotations
import asyncio
import os
import sys
import click
from pathlib import Path
@ -17,8 +19,11 @@ def main() -> None:
pass
# ─── Status ───────────────────────────────────────────────────────────────────
@main.command()
def status() -> None:
@click.option("--verbose", is_flag=True, help="Show nodes + plugins too")
def status(verbose: bool) -> None:
"""Check system status."""
click.echo("=== Salior Status ===")
click.echo(f"Host: {config.host}:{config.port}")
@ -28,6 +33,22 @@ def status() -> None:
click.echo(f"TimescaleDB: {config.timeseries_host}:{config.timeseries_port}/{config.timeseries_db}")
click.echo(f"Supabase: {config.supabase_url}")
if verbose:
from salior.compute import NodeManager, full_status
mgr = NodeManager()
nodes = mgr.list()
click.echo(f"\n=== Nodes ({len(nodes)}) ===")
for n in nodes:
gpu = "🖥️" if n["gpu"] else "💻"
click.echo(f"{gpu} {n['name']}: {n['user']}@{n['host']}:{n['port']}")
st = full_status()
click.echo(f"\n=== Plugins ({len(st['plugins'])}) ===")
for p in st["plugins"]:
click.echo(f" {'' if p['enabled'] else ''} {p['name']}")
# ─── Database ─────────────────────────────────────────────────────────────────
@main.command()
def db_init() -> None:
@ -47,8 +68,16 @@ def db_init() -> None:
asyncio.run(run())
@main.command()
def agent_start() -> None:
# ─── Agents ───────────────────────────────────────────────────────────────────
@main.group()
def agent() -> None:
"""Agent management commands."""
pass
@agent.command()
def start() -> None:
"""Start all agents (data + signal + exec + risk)."""
click.echo("Starting agents...")
@ -76,8 +105,25 @@ def agent_start() -> None:
click.echo("Agents stopped.")
@main.command()
def dashboard_serve() -> None:
@agent.command()
def list() -> None:
"""List all known agents."""
agents = ["data_agent", "signal_agent", "exec_agent", "risk_agent"]
click.echo("=== Agents ===")
for a in agents:
click.echo(f" {a}")
# ─── Dashboard ────────────────────────────────────────────────────────────────
@main.group()
def dashboard() -> None:
"""Dashboard commands."""
pass
@dashboard.command()
def serve() -> None:
"""Start the web dashboard."""
from salior.dashboard.server import create_app
from aiohttp import web
@ -96,8 +142,16 @@ def dashboard_serve() -> None:
asyncio.run(run())
@main.command()
def mcp_serve() -> None:
# ─── MCP ──────────────────────────────────────────────────────────────────────
@main.group()
def mcp() -> None:
"""MCP server commands."""
pass
@mcp.command()
def serve() -> None:
"""Start the MCP server."""
from salior.mcp.server import MCPServer
@ -111,7 +165,215 @@ def mcp_serve() -> None:
asyncio.run(run())
@main.command()
# ─── Daemon ───────────────────────────────────────────────────────────────────
@main.group()
def daemon() -> None:
"""Daemon management (PID file + background)."""
pass
@daemon.command()
@click.option("--detach", is_flag=True, help="Fork and run in background")
def start(detach: bool) -> None:
"""Start the salior daemon in background."""
from salior.daemon import Daemon
d = Daemon("salior")
if d.is_running():
click.echo(f"Daemon already running (PID {d.pid()})")
return
if detach:
pid = os.fork()
if pid == 0:
# Child — become session leader, redirect output
os.setsid()
sys.stdout.flush()
sys.stderr.flush()
with open("/dev/null", "r") as devnull:
os.dup2(devnull.fileno(), 0)
with open("/tmp/salior_daemon.log", "a") as logfile:
os.dup2(logfile.fileno(), 1)
os.dup2(logfile.fileno(), 2)
async def run() -> None:
from salior.daemon import Daemon
d = Daemon("salior")
await d.start(asyncio.sleep(1e9)) # Sleep forever until stopped
click.echo(f"Daemon started (PID {os.getpid()})")
asyncio.run(run())
@daemon.command()
def stop() -> None:
"""Stop the running daemon."""
from salior.daemon import Daemon
d = Daemon("salior")
pid = d.pid()
if not pid or not d.is_running():
click.echo("Daemon not running")
return
os.kill(pid, 15) # SIGTERM
click.echo(f"Daemon {pid} stopped")
@daemon.command()
def status() -> None:
"""Show daemon PID and running state."""
from salior.daemon import Daemon
d = Daemon("salior")
if d.is_running():
click.echo(f"Daemon running — PID {d.pid()}")
else:
click.echo("Daemon not running")
# ─── Telegram ─────────────────────────────────────────────────────────────────
@main.group()
def telegram() -> None:
"""Telegram bot commands."""
pass
@telegram.command()
def serve() -> None:
"""Start the Telegram bot."""
from salior.telegram_bot import TelegramBot
token = os.getenv("TELEGRAM_BOT_TOKEN", "")
if not token:
click.echo("Error: TELEGRAM_BOT_TOKEN not set")
return
bot = TelegramBot(token)
async def run() -> None:
await bot.start()
await asyncio.Event().wait()
click.echo("Telegram bot started")
asyncio.run(run())
# ─── Compute ──────────────────────────────────────────────────────────────────
@main.group()
def compute() -> None:
"""Compute node management."""
pass
@compute.command("list")
def compute_list() -> None:
"""List registered compute nodes."""
from salior.compute import NodeManager
mgr = NodeManager()
nodes = mgr.list()
if not nodes:
click.echo("No nodes registered. Add one: salior compute add <name> <host> [options]")
return
click.echo(f"=== Nodes ({len(nodes)}) ===")
for n in nodes:
gpu = "🖥️" if n["gpu"] else "💻"
click.echo(f"{gpu} {n['name']}: {n['user']}@{n['host']}:{n['port']}")
@compute.command("add")
@click.argument("name")
@click.argument("host")
@click.option("--port", "-p", default=22, help="SSH port")
@click.option("--user", "-u", default="root", help="SSH user")
@click.option("--gpu", is_flag=True, help="Node has GPU")
@click.option("--gpu-mem", default=0, help="GPU memory in GB")
@click.option("--labels", default="", help="Comma-separated labels")
@click.option("--ssh-key", help="Path to SSH private key")
def compute_add(name: str, host: str, port: int, user: str, gpu: bool, gpu_mem: int, labels: str, ssh_key: str) -> None:
"""Register a new compute node."""
from salior.compute import NodeManager, Node
mgr = NodeManager()
node = Node(
name=name,
host=host,
port=port,
user=user,
gpu=gpu,
gpu_memory_gb=gpu_mem,
labels=[l for l in labels.split(",") if l],
ssh_key_path=ssh_key,
)
mgr.add(node)
click.echo(f"Node '{name}' added: {user}@{host}:{port} (gpu={gpu})")
@compute.command("remove")
@click.argument("name")
def compute_remove(name: str) -> None:
"""Remove a compute node."""
from salior.compute import NodeManager
mgr = NodeManager()
node = mgr.get(name)
if not node:
click.echo(f"Node '{name}' not found")
return
mgr._nodes.pop(name)
mgr.save()
click.echo(f"Node '{name}' removed")
@compute.command("ping")
@click.argument("name")
def compute_ping(name: str) -> None:
"""Ping a compute node."""
from salior.compute import NodeManager
async def run() -> None:
mgr = NodeManager()
ok = await mgr.ping(name)
if ok:
click.echo(f"Node '{name}': ✅ reachable")
else:
click.echo(f"Node '{name}': ❌ unreachable")
asyncio.run(run())
@compute.command("deploy")
@click.argument("plugin")
@click.argument("target")
def compute_deploy(plugin: str, target: str) -> None:
"""Deploy a plugin to a target node."""
from salior.compute import deploy_plugin
async def run() -> None:
result = await deploy_plugin(plugin, target)
if result["status"] == "deployed":
click.echo(f"✅ Plugin '{plugin}' deployed to '{target}'")
elif result["status"] == "ok":
click.echo(f" Plugin '{plugin}' already local")
else:
click.echo(f"❌ Deploy failed: {result['message']}")
asyncio.run(run())
# ─── Plugins ───────────────────────────────────────────────────────────────────
@main.group()
def plugin() -> None:
"""Plugin management commands."""
pass
@plugin.command("list")
def plugin_list() -> None:
"""List available plugins."""
from salior.plugins import registry
@ -123,7 +385,35 @@ def plugin_list() -> None:
click.echo(f"{status} {p['name']}: {p['description']}")
@main.command()
@plugin.command("enable")
@click.argument("name")
def plugin_enable(name: str) -> None:
"""Enable a plugin."""
from salior.plugins import registry
registry.discover()
registry.enable(name)
click.echo(f"Plugin '{name}' enabled")
@plugin.command("disable")
@click.argument("name")
def plugin_disable(name: str) -> None:
"""Disable a plugin."""
from salior.plugins import registry
registry.discover()
registry.disable(name)
click.echo(f"Plugin '{name}' disabled")
# ─── Skills ────────────────────────────────────────────────────────────────────
@main.group()
def skill() -> None:
"""Skill management commands."""
pass
@skill.command("list")
def skill_list() -> None:
"""List available skills."""
from salior.skills import SkillRegistry
@ -135,20 +425,55 @@ def skill_list() -> None:
click.echo(f" {name}")
@main.command()
def compute_status() -> None:
"""List registered compute nodes."""
from salior.compute import NodeManager
@skill.command("show")
@click.argument("name")
def skill_show(name: str) -> None:
"""Show the content of a skill."""
from salior.skills import SkillRegistry
mgr = NodeManager()
nodes = mgr.list()
if not nodes:
click.echo("No nodes registered. Add one with: salior compute add <name> <host>")
reg = SkillRegistry()
content = reg.render(name)
click.echo(content)
# ─── Hooks ─────────────────────────────────────────────────────────────────────
@main.group()
def hook() -> None:
"""Hook event management."""
pass
@hook.command("list")
def hook_list() -> None:
"""List registered hooks."""
from salior.hooks import global_hooks
hooks = global_hooks.list()
if not hooks:
click.echo("No hooks registered")
return
click.echo(f"=== Nodes ({len(nodes)}) ===")
for n in nodes:
gpu = "🖥️" if n["gpu"] else "💻"
click.echo(f"{gpu} {n['name']}: {n['user']}@{n['host']}:{n['port']}")
click.echo(f"=== Hooks ({len(hooks)}) ===")
for name, count in hooks.items():
click.echo(f" {name}: {count} handler(s)")
@hook.command("fire")
@click.argument("event_name")
def hook_fire(event_name: str) -> None:
"""Manually fire a hook event (for testing)."""
from salior.hooks import global_hooks
from salior.hooks.registry import HookEvent
async def run() -> None:
await global_hooks.emit(HookEvent(
name=event_name,
source="cli",
data={"triggered_by": "salior hook fire"},
))
click.echo(f"Hook '{event_name}' fired")
asyncio.run(run())
if __name__ == "__main__":

View File

@ -1,5 +1,6 @@
"""Compute module — plugin deployment orchestration."""
from salior.compute.node_manager import NodeManager
from salior.compute.deploy import deploy_plugin
from salior.compute.node_manager import Node, NodeManager
from salior.compute.deploy import deploy_plugin, status_plugin
from salior.compute.status import full_status, node_status
__all__ = ["NodeManager", "deploy_plugin"]
__all__ = ["Node", "NodeManager", "deploy_plugin", "status_plugin", "full_status", "node_status"]

44
salior/compute/status.py Normal file
View File

@ -0,0 +1,44 @@
"""Compute node and plugin status."""
from __future__ import annotations
from salior.compute.node_manager import NodeManager
from salior.plugins import registry
from salior.core.logging import setup_logging
log = setup_logging()
def full_status() -> dict:
"""Return full system status for `salior status --verbose`."""
mgr = NodeManager()
nodes = mgr.list()
plugins = registry.list()
return {
"nodes": nodes,
"plugins": plugins,
"total_nodes": len(nodes),
"total_plugins": len(plugins),
}
def node_status(name: str) -> dict:
"""Return detailed status for a specific node."""
mgr = NodeManager()
node = mgr.get(name)
if not node:
return {"error": f"Node '{name}' not found"}
return {
"node": {
"name": node.name,
"host": node.host,
"port": node.port,
"user": node.user,
"gpu": node.gpu,
"gpu_memory_gb": node.gpu_memory_gb,
"labels": node.labels,
},
}

4
salior/hooks/__init__.py Normal file
View File

@ -0,0 +1,4 @@
"""Hooks module — event system for agent communication."""
from salior.hooks.registry import HookRegistry, global_hooks
__all__ = ["HookRegistry", "global_hooks"]

144
salior/hooks/registry.py Normal file
View File

@ -0,0 +1,144 @@
"""Hook event system — emit events, register handlers, dispatch callbacks."""
from __future__ import annotations
import asyncio
import fnmatch
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable
from salior.core.logging import setup_logging
log = setup_logging()
@dataclass
class HookEvent:
"""A hook event with metadata."""
name: str # e.g. "on_signal", "on_fill", "on_error"
data: dict # Event payload
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
source: str = "" # Which agent emitted this
HookHandler = Callable[[HookEvent], Awaitable[None]] | Callable[[HookEvent], None]
class HookRegistry:
"""Register and dispatch hook handlers."""
def __init__(self) -> None:
self._handlers: dict[str, list[HookHandler]] = {}
def on(self, event_name: str, handler: HookHandler) -> None:
"""Register a handler for an event."""
if event_name not in self._handlers:
self._handlers[event_name] = []
# Prevent duplicate registration
if handler not in self._handlers[event_name]:
self._handlers[event_name].append(handler)
log.debug("hook_registered", event=event_name, handler=handler.__name__)
def off(self, event_name: str, handler: HookHandler) -> None:
"""Unregister a handler."""
if event_name in self._handlers:
try:
self._handlers[event_name].remove(handler)
log.debug("hook_unregistered", event=event_name, handler=handler.__name__)
except ValueError:
pass
async def emit(self, event: HookEvent) -> None:
"""Fire all handlers for an event, async-safe."""
handlers = self._handlers.get(event.name, [])
if not handlers:
return
log.debug("hook_fired", event=event.name, count=len(handlers), data=event.data)
for handler in handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
except Exception as e:
log.error("hook_handler_error", event=event.name, handler=handler.__name__, error=str(e))
def list(self) -> dict[str, int]:
"""List registered hooks with handler counts."""
return {name: len(handlers) for name, handlers in self._handlers.items()}
# Global registry — shared across all agents
global_hooks = HookRegistry()
# ─── Built-in hook events ─────────────────────────────────────────────────────
async def on_signal(coin: str, regime: str, conviction: float, reasoning: str) -> None:
"""Emit a signal event."""
await global_hooks.emit(HookEvent(
name="on_signal",
source="signal_agent",
data={"coin": coin, "regime": regime, "conviction": conviction, "reasoning": reasoning},
))
async def on_fill(
coin: str,
side: str,
size: float,
price: float,
exec_id: str,
mode: str,
) -> None:
"""Emit a fill event when an order fills."""
await global_hooks.emit(HookEvent(
name="on_fill",
source="exec_agent",
data={"coin": coin, "side": side, "size": size, "price": price, "exec_id": exec_id, "mode": mode},
))
async def on_execution(
coin: str,
side: str,
size: float,
price: float,
status: str,
error: str | None = None,
) -> None:
"""Emit an execution event (placed, filled, cancelled, failed)."""
await global_hooks.emit(HookEvent(
name="on_execution",
source="exec_agent",
data={"coin": coin, "side": side, "size": size, "price": price, "status": status, "error": error},
))
async def on_error(agent: str, error: str, details: dict | None = None) -> None:
"""Emit an error event."""
await global_hooks.emit(HookEvent(
name="on_error",
source=agent,
data={"agent": agent, "error": error, "details": details or {}},
))
async def on_risk_breach(reason: str, details: dict) -> None:
"""Emit a risk breach event."""
await global_hooks.emit(HookEvent(
name="on_risk_breach",
source="risk_agent",
data={"reason": reason, "details": details},
))
async def on_agent_health(agent: str, status: str, iteration: int) -> None:
"""Emit a health heartbeat."""
await global_hooks.emit(HookEvent(
name="on_agent_health",
source=agent,
data={"agent": agent, "status": status, "iteration": iteration},
))

87
salior/scheduler.py Normal file
View File

@ -0,0 +1,87 @@
"""Scheduler — decoupled per-agent loop timing."""
from __future__ import annotations
import asyncio
from typing import Callable, Awaitable
from salior.core.logging import setup_logging
log = setup_logging()
class IntervalTask:
"""A task that runs on a fixed interval."""
def __init__(
self,
name: str,
coro: Callable[[], Awaitable[None]],
interval: float,
) -> None:
self.name = name
self._coro = coro
self.interval = interval
self._task: asyncio.Task | None = None
self._stopping = False
self._log = log.bind(task=name)
async def start(self) -> None:
self._stopping = False
self._task = asyncio.create_task(self._run())
self._log.info("scheduler_task_started", interval=self.interval)
async def stop(self) -> None:
self._stopping = True
if self._task:
self._task.cancel()
try:
await asyncio.wait_for(self._task, timeout=5.0)
except asyncio.CancelledError:
pass
self._log.info("scheduler_task_stopped")
async def _run(self) -> None:
"""Run the task on the given interval, with initial delay."""
while not self._stopping:
try:
await asyncio.wait_for(self._coro(), timeout=self.interval * 2)
except asyncio.TimeoutError:
self._log.warning("task_timeout", interval=self.interval)
except Exception as e:
self._log.error("task_error", error=str(e))
if not self._stopping:
await asyncio.sleep(self.interval)
class Scheduler:
"""Run multiple tasks on independent intervals."""
def __init__(self) -> None:
self._tasks: dict[str, IntervalTask] = {}
self._running = False
def schedule(self, name: str, coro: Callable[[], Awaitable[None]], interval: float) -> None:
"""Register a task to run on an interval."""
self._tasks[name] = IntervalTask(name, coro, interval)
self._log = log.bind(scheduler=True)
async def start(self) -> None:
"""Start all scheduled tasks."""
self._running = True
self._log.info("scheduler_started", tasks=list(self._tasks.keys()))
await asyncio.gather(*[t.start() for t in self._tasks.values()])
async def stop(self) -> None:
"""Stop all scheduled tasks gracefully."""
self._running = False
self._log.info("scheduler_stopping", tasks=list(self._tasks.keys()))
await asyncio.gather(*[t.stop() for t in self._tasks.values()])
self._log.info("scheduler_stopped")
def task_status(self) -> dict[str, dict]:
"""Return status of all tasks."""
return {
name: {"interval": task.interval, "running": not task._stopping}
for name, task in self._tasks.items()
}

239
salior/telegram_bot.py Normal file
View File

@ -0,0 +1,239 @@
"""Telegram bot — alerts + commands."""
from __future__ import annotations
import asyncio
import os
from typing import Optional
import httpx
from aiohttp import web
from salior.core.config import config
from salior.core.logging import setup_logging
from salior.hooks import global_hooks
from salior.hooks.registry import HookEvent
log = setup_logging()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
class TelegramBot:
"""Telegram bot for alerts and commands.
Commands:
/start Welcome message
/status Agent health + portfolio summary
/signals Recent conviction signals
/pnl Performance summary
/help Show commands
"""
def __init__(self, token: Optional[str] = None) -> None:
self.token = token or BOT_TOKEN
self.api_url = f"https://api.telegram.org/bot{self.token}"
self._offset = 0
self._running = False
async def send(self, chat_id: int, text: str, parse_mode: str = "HTML") -> None:
"""Send a message to a chat."""
if not self.token:
return
async with httpx.AsyncClient() as client:
await client.post(
f"{self.api_url}/sendMessage",
json={"chat_id": chat_id, "text": text, "parse_mode": parse_mode},
timeout=10.0,
)
async def send_alert(self, text: str) -> None:
"""Send an alert to configured chat (broadcasts to all known chats)."""
for chat_id in self._get_broadcast_chats():
try:
await self.send(chat_id, text)
except Exception as e:
log.error("telegram_send_error", error=str(e))
def _get_broadcast_chats(self) -> list[int]:
"""Return list of chat IDs to broadcast to. Stored in ~/.salior/telegram_chats.txt."""
path = os.path.expanduser("~/.salior/telegram_chats.txt")
if not os.path.exists(path):
return []
return [int(line.strip()) for line in open(path).readlines() if line.strip()]
async def handle_update(self, update: dict) -> Optional[str]:
"""Handle an incoming Telegram update, return command response or None."""
if "message" not in update:
return None
msg = update["message"]
chat_id = msg["chat"]["id"]
text = msg.get("text", "")
user = msg["from"].get("first_name", "trader")
if text == "/start":
return f"👋 Welcome to Salior, {user}.\n\nI send alerts when signals fire and orders fill.\n\nCommands:\n/status — Agent health\n/signals — Recent signals\n/pnl — Performance\n/help — This message"
if text == "/help":
return (
"📋 <b>Salior Commands</b>\n\n"
"/start — Welcome\n"
"/status — Agent health + portfolio\n"
"/signals — Recent conviction signals\n"
"/pnl — Performance summary\n"
"/help — This message"
)
if text == "/status":
return await self._cmd_status()
if text == "/signals":
return await self._cmd_signals()
if text == "/pnl":
return await self._cmd_pnl()
return None
async def _cmd_status(self) -> str:
"""Return agent + portfolio status."""
from salior.db.supabase_client import SupabaseClient
from salior.db.timescale_client import TimescaleDB
supabase = SupabaseClient()
db = TimescaleDB()
await db.connect()
# Agent health
health = await db.fetch(
"SELECT agent, status, heartbeat FROM agent_health ORDER BY heartbeat DESC LIMIT 5"
)
# Portfolio
portfolio = await supabase.get_portfolio()
lines = ["🤖 <b>Agent Status</b>"]
for h in (health or [])[:4]:
dot = "🟢" if h["status"] == "running" else "🔴"
lines.append(f"{dot} {h['agent']}: {h['status']}")
lines.append("\n💼 <b>Portfolio</b>")
if portfolio:
for pos in portfolio:
pnl = pos.get("unrealized_pnl", 0)
pnl_str = f"+${pnl:.2f}" if pnl >= 0 else f"-${abs(pnl):.2f}"
lines.append(f"{pos['coin']}: {pos['pos_size']} pos | {pnl_str}")
else:
lines.append("No positions")
return "\n".join(lines)
async def _cmd_signals(self) -> str:
"""Return recent signals."""
from salior.db.supabase_client import SupabaseClient
supabase = SupabaseClient()
signals = await supabase.get_recent_signals(limit=5)
if not signals:
return "📊 No recent signals"
lines = ["📊 <b>Recent Signals</b>"]
for sig in signals:
c = sig["conviction"]
arrow = "🟢" if c >= 0 else "🔴"
lines.append(f"{arrow} {sig['coin']} | {sig['regime']} | {c:+.2f}")
return "\n".join(lines)
async def _cmd_pnl(self) -> str:
"""Return performance summary."""
return "📈 PnL: Connect TimescaleDB performance table for full data.\n\nCurrently placeholder."
async def poll_loop(self) -> None:
"""Poll Telegram for updates."""
if not self.token:
log.warning("telegram_bot_no_token")
return
self._running = True
log.info("telegram_bot_started")
while self._running:
try:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.api_url}/getUpdates",
params={"offset": self._offset, "timeout": 30},
timeout=35.0,
)
data = resp.json()
if not data.get("ok"):
await asyncio.sleep(5)
continue
for update in data.get("result", []):
self._offset = update["update_id"] + 1
response = await self.handle_update(update)
if response and "message" in update:
chat_id = update["message"]["chat"]["id"]
await self.send(chat_id, response)
except asyncio.TimeoutError:
pass
except Exception as e:
log.error("telegram_poll_error", error=str(e))
await asyncio.sleep(5)
async def start(self) -> None:
"""Start the bot poll loop."""
asyncio.create_task(self.poll_loop())
# Register hook listeners
from salior.hooks import global_hooks
async def on_fill_handler(event: HookEvent) -> None:
d = event.data
await self.send_alert(
f"✅ <b>Order Filled</b>\n"
f"{d['side'].upper()} {d['size']} {d['coin']} @ ${d['price']}\n"
f"Mode: {d['mode']}"
)
async def on_risk_handler(event: HookEvent) -> None:
await self.send_alert(f"⚠️ <b>Risk Breach</b>\n{event.data['reason']}")
async def on_error_handler(event: HookEvent) -> None:
await self.send_alert(f"🔴 <b>Agent Error</b>\n{event.data['agent']}: {event.data['error']}")
global_hooks.on("on_fill", on_fill_handler)
global_hooks.on("on_risk_breach", on_risk_handler)
global_hooks.on("on_error", on_error_handler)
# Hook emitter helpers — call these from agents
async def emit_signal(coin: str, regime: str, conviction: float, reasoning: str) -> None:
from salior.hooks import global_hooks
await global_hooks.emit(HookEvent(
name="on_signal",
source="signal_agent",
data={"coin": coin, "regime": regime, "conviction": conviction, "reasoning": reasoning},
))
async def emit_fill(coin: str, side: str, size: float, price: float, exec_id: str, mode: str) -> None:
from salior.hooks import global_hooks
await global_hooks.emit(HookEvent(
name="on_fill",
source="exec_agent",
data={"coin": coin, "side": side, "size": size, "price": price, "exec_id": exec_id, "mode": mode},
))
async def emit_risk_breach(reason: str, details: dict) -> None:
from salior.hooks import global_hooks
await global_hooks.emit(HookEvent(
name="on_risk_breach",
source="risk_agent",
data={"reason": reason, "details": details},
))