Keepalive
import asyncio
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
# Connection health tracking
@dataclass
class ConnectionHealth:
is_healthy: bool = True
last_successful_ping: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
consecutive_failures: int = 0
total_pings_sent: int = 0
total_pings_successful: int = 0
average_ping_time: float = 0.0
def success_rate(self) -> float:
"""Calculate ping success rate percentage"""
if self.total_pings_sent == 0:
return 100.0
return (self.total_pings_successful / self.total_pings_sent) * 100.0
def update_success(self, ping_time: float):
"""Update health after successful ping"""
self.is_healthy = True
self.last_successful_ping = datetime.now(timezone.utc)
self.consecutive_failures = 0
self.total_pings_sent += 1
self.total_pings_successful += 1
# Update average ping time
if self.total_pings_successful == 1:
self.average_ping_time = ping_time
else:
self.average_ping_time = (
(self.average_ping_time * (self.total_pings_successful - 1) + ping_time)
/ self.total_pings_successful
)
def update_failure(self, max_failures: int = 3):
"""Update health after failed ping"""
self.consecutive_failures += 1
self.total_pings_sent += 1
if self.consecutive_failures >= max_failures:
self.is_healthy = False
# Keepalive configuration
@dataclass
class KeepaliveConfig:
ping_interval: float = 60.0 # seconds
max_consecutive_failures: int = 3
reconnect_delay: float = 5.0 # seconds
health_check_enabled: bool = True
timeout: float = 15.0 # seconds
# Basic keepalive implementation
async def start_keepalive_task(
# nextblock_client, # Your generated gRPC client
config: KeepaliveConfig = None,
) -> None:
"""Start basic keepalive task"""
if config is None:
config = KeepaliveConfig()
print(f"Starting keepalive task with {config.ping_interval}s interval")
while True:
try:
await asyncio.sleep(config.ping_interval)
# Send ping request
start_time = time.time()
""" Uncomment when you have the generated gRPC client
try:
await asyncio.wait_for(
nextblock_client.ping(Empty()),
timeout=config.timeout
)
ping_time = time.time() - start_time
print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
except asyncio.TimeoutError:
print(f"Keepalive ping timeout after {config.timeout}s")
except Exception as e:
print(f"Keepalive ping failed: {e}")
# Optionally implement reconnection logic
break
"""
# Mock ping for demonstration
await asyncio.sleep(0.1) # Simulate network delay
ping_time = time.time() - start_time
print(f"Mock keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
except Exception as e:
logging.error(f"Keepalive task error: {e}")
break
# Advanced keepalive manager
class KeepaliveManager:
def __init__(
self,
# nextblock_client, # Your generated gRPC client
config: KeepaliveConfig = None,
):
# self.client = nextblock_client
self.config = config or KeepaliveConfig()
self.health = ConnectionHealth()
self.keepalive_task: Optional[asyncio.Task] = None
self.is_running = False
async def start(self) -> None:
"""Start the keepalive manager"""
if self.is_running:
return
self.is_running = True
self.keepalive_task = asyncio.create_task(self._keepalive_loop())
print("Keepalive manager started")
async def stop(self) -> None:
"""Stop the keepalive manager"""
self.is_running = False
if self.keepalive_task and not self.keepalive_task.done():
self.keepalive_task.cancel()
try:
await self.keepalive_task
except asyncio.CancelledError:
pass
print("Keepalive manager stopped")
async def _keepalive_loop(self) -> None:
"""Main keepalive loop"""
while self.is_running:
try:
await asyncio.sleep(self.config.ping_interval)
await self._send_ping()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Keepalive loop error: {e}")
# Handle connection recovery
if not self.health.is_healthy:
await self._handle_connection_recovery()
async def _send_ping(self) -> None:
"""Send a single ping and update health"""
start_time = time.time()
try:
""" Uncomment when you have the generated gRPC client
await asyncio.wait_for(
self.client.ping(Empty()),
timeout=self.config.timeout
)
"""
# Mock ping delay
await asyncio.sleep(0.05 + (time.time() % 0.1)) # Variable delay 50-150ms
ping_time = time.time() - start_time
self.health.update_success(ping_time)
print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) - "
f"Health: {self.health.success_rate():.1f}%")
except asyncio.TimeoutError:
self.health.update_failure(self.config.max_consecutive_failures)
logging.warning(f"Keepalive ping timeout after {self.config.timeout}s")
except Exception as e:
self.health.update_failure(self.config.max_consecutive_failures)
logging.error(f"Keepalive ping failed: {e}")
async def _handle_connection_recovery(self) -> None:
"""Handle connection recovery when unhealthy"""
logging.warning("Connection unhealthy, attempting recovery...")
# Wait before attempting recovery
await asyncio.sleep(self.config.reconnect_delay)
# Try to recover connection
try:
# Implement connection recovery logic here
# await self._reconnect()
print("Connection recovery attempted")
except Exception as e:
logging.error(f"Connection recovery failed: {e}")
def get_health(self) -> ConnectionHealth:
"""Get current connection health"""
return self.health
def is_healthy(self) -> bool:
"""Check if connection is healthy"""
return self.health.is_healthy
# Connection manager with integrated keepalive
class ConnectionManagerWithKeepalive:
def __init__(self, nextblock_config, keepalive_config: KeepaliveConfig = None):
self.nextblock_config = nextblock_config
self.keepalive_config = keepalive_config or KeepaliveConfig()
# self.client = None
self.keepalive_manager: Optional[KeepaliveManager] = None
self.is_connected = False
async def connect(self) -> bool:
"""Establish connection and start keepalive"""
try:
# Create connection (see connection.md)
# channel, client = await create_nextblock_client(self.nextblock_config)
# self.client = client
# Test connection
# await self.client.ping(Empty())
self.is_connected = True
print("Successfully connected to NextBlock")
# Start keepalive
self.keepalive_manager = KeepaliveManager(
# self.client,
self.keepalive_config
)
await self.keepalive_manager.start()
return True
except Exception as e:
logging.error(f"Failed to connect: {e}")
self.is_connected = False
return False
async def disconnect(self) -> None:
"""Disconnect and stop keepalive"""
if self.keepalive_manager:
await self.keepalive_manager.stop()
self.is_connected = False
print("Disconnected from NextBlock")
def get_connection_health(self) -> Optional[ConnectionHealth]:
"""Get current connection health"""
if self.keepalive_manager:
return self.keepalive_manager.get_health()
return None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
# Health monitoring and alerting
class HealthMonitor:
def __init__(self, keepalive_manager: KeepaliveManager):
self.keepalive_manager = keepalive_manager
self.alert_thresholds = {
"success_rate": 90.0, # Alert if success rate < 90%
"avg_ping_time": 1.0, # Alert if avg ping time > 1s
"consecutive_failures": 2, # Alert after 2 consecutive failures
}
async def start_monitoring(self, check_interval: float = 30.0) -> None:
"""Start health monitoring with periodic checks"""
while True:
await asyncio.sleep(check_interval)
await self._check_health()
async def _check_health(self) -> None:
"""Check connection health and trigger alerts if needed"""
health = self.keepalive_manager.get_health()
# Check success rate
if health.success_rate() < self.alert_thresholds["success_rate"]:
await self._trigger_alert(
"Low success rate",
f"Success rate: {health.success_rate():.1f}%"
)
# Check average ping time
if health.average_ping_time > self.alert_thresholds["avg_ping_time"]:
await self._trigger_alert(
"High ping time",
f"Average ping time: {health.average_ping_time*1000:.1f}ms"
)
# Check consecutive failures
if health.consecutive_failures >= self.alert_thresholds["consecutive_failures"]:
await self._trigger_alert(
"Connection issues",
f"Consecutive failures: {health.consecutive_failures}"
)
async def _trigger_alert(self, alert_type: str, details: str) -> None:
"""Trigger health alert"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"HEALTH ALERT [{timestamp}] {alert_type}: {details}")
# Implement additional alerting logic here
# - Send email notifications
# - Post to Slack/Discord
# - Write to monitoring systemUsage Examples
Best Practices
Last updated