salior/salior/core/agent.py

101 lines
3.2 KiB
Python

"""Base agent class."""
from __future__ import annotations
import asyncio
import signal
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
import structlog
logger = structlog.get_logger("salior.agent")
class Agent(ABC):
"""Base class for all Salior agents.
Provides:
- Lifecycle (start/stop/health)
- Self-validation hooks
- Loop detection (max iterations)
- Graceful shutdown
"""
name: str = "agent"
def __init__(self) -> None:
self._running = False
self._stopping = False
self._task: Optional[asyncio.Task] = None
self._last_heartbeat: Optional[datetime] = None
self._iteration = 0
self._log = logger.bind(agent=self.name)
async def start(self) -> None:
"""Start the agent. Override in subclass."""
self._running = True
self._last_heartbeat = datetime.utcnow()
self._task = asyncio.create_task(self._run())
self._log.info("agent_started")
async def stop(self) -> None:
"""Stop the agent gracefully."""
self._stopping = True
self._running = False
self._log.info("agent_stopping")
if self._task:
try:
self._task.cancel()
await asyncio.wait_for(self._task, timeout=5.0)
except asyncio.CancelledError:
pass
except Exception as e:
self._log.error("agent_stop_error", error=str(e))
self._log.info("agent_stopped")
async def _run(self) -> None:
"""Main agent loop. Calls run() repeatedly until stopped."""
try:
while self._running and not self._stopping:
try:
await asyncio.wait_for(self.run(), timeout=self.loop_interval())
except asyncio.TimeoutError:
# run() took too long — continue to next iteration
self._log.warning("agent_loop_timeout", iteration=self._iteration)
self._iteration += 1
self._last_heartbeat = datetime.utcnow()
except Exception as e:
self._log.error("agent_loop_error", error=str(e))
self._running = False
@abstractmethod
async def run(self) -> None:
"""One iteration of the agent loop. Implement in subclass."""
...
def loop_interval(self) -> float:
"""Seconds between run() calls. Override in subclass."""
return 60.0
def is_healthy(self) -> bool:
"""Return True if agent is running and heartbeat is recent."""
if not self._running:
return False
if self._last_heartbeat is None:
return True
# Unhealthy if no heartbeat in 5x the loop interval
elapsed = (datetime.utcnow() - self._last_heartbeat).total_seconds()
return elapsed < self.loop_interval() * 5
def health_status(self) -> dict:
"""Return health status dict."""
return {
"agent": self.name,
"running": self._running,
"iteration": self._iteration,
"last_heartbeat": (
self._last_heartbeat.isoformat() if self._last_heartbeat else None
),
"healthy": self.is_healthy(),
}