Keepalive
Maintain persistent gRPC connections to NextBlock for optimal performance using Python's asyncio.
import asyncio
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
# Connection health tracking
@dataclass
class ConnectionHealth:
is_healthy: bool = True
last_successful_ping: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
consecutive_failures: int = 0
total_pings_sent: int = 0
total_pings_successful: int = 0
average_ping_time: float = 0.0
def success_rate(self) -> float:
"""Calculate ping success rate percentage"""
if self.total_pings_sent == 0:
return 100.0
return (self.total_pings_successful / self.total_pings_sent) * 100.0
def update_success(self, ping_time: float):
"""Update health after successful ping"""
self.is_healthy = True
self.last_successful_ping = datetime.now(timezone.utc)
self.consecutive_failures = 0
self.total_pings_sent += 1
self.total_pings_successful += 1
# Update average ping time
if self.total_pings_successful == 1:
self.average_ping_time = ping_time
else:
self.average_ping_time = (
(self.average_ping_time * (self.total_pings_successful - 1) + ping_time)
/ self.total_pings_successful
)
def update_failure(self, max_failures: int = 3):
"""Update health after failed ping"""
self.consecutive_failures += 1
self.total_pings_sent += 1
if self.consecutive_failures >= max_failures:
self.is_healthy = False
# Keepalive configuration
@dataclass
class KeepaliveConfig:
ping_interval: float = 60.0 # seconds
max_consecutive_failures: int = 3
reconnect_delay: float = 5.0 # seconds
health_check_enabled: bool = True
timeout: float = 15.0 # seconds
# Basic keepalive implementation
async def start_keepalive_task(
# nextblock_client, # Your generated gRPC client
config: KeepaliveConfig = None,
) -> None:
"""Start basic keepalive task"""
if config is None:
config = KeepaliveConfig()
print(f"Starting keepalive task with {config.ping_interval}s interval")
while True:
try:
await asyncio.sleep(config.ping_interval)
# Send ping request
start_time = time.time()
""" Uncomment when you have the generated gRPC client
try:
await asyncio.wait_for(
nextblock_client.ping(Empty()),
timeout=config.timeout
)
ping_time = time.time() - start_time
print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
except asyncio.TimeoutError:
print(f"Keepalive ping timeout after {config.timeout}s")
except Exception as e:
print(f"Keepalive ping failed: {e}")
# Optionally implement reconnection logic
break
"""
# Mock ping for demonstration
await asyncio.sleep(0.1) # Simulate network delay
ping_time = time.time() - start_time
print(f"Mock keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
except Exception as e:
logging.error(f"Keepalive task error: {e}")
break
# Advanced keepalive manager
class KeepaliveManager:
def __init__(
self,
# nextblock_client, # Your generated gRPC client
config: KeepaliveConfig = None,
):
# self.client = nextblock_client
self.config = config or KeepaliveConfig()
self.health = ConnectionHealth()
self.keepalive_task: Optional[asyncio.Task] = None
self.is_running = False
async def start(self) -> None:
"""Start the keepalive manager"""
if self.is_running:
return
self.is_running = True
self.keepalive_task = asyncio.create_task(self._keepalive_loop())
print("Keepalive manager started")
async def stop(self) -> None:
"""Stop the keepalive manager"""
self.is_running = False
if self.keepalive_task and not self.keepalive_task.done():
self.keepalive_task.cancel()
try:
await self.keepalive_task
except asyncio.CancelledError:
pass
print("Keepalive manager stopped")
async def _keepalive_loop(self) -> None:
"""Main keepalive loop"""
while self.is_running:
try:
await asyncio.sleep(self.config.ping_interval)
await self._send_ping()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Keepalive loop error: {e}")
# Handle connection recovery
if not self.health.is_healthy:
await self._handle_connection_recovery()
async def _send_ping(self) -> None:
"""Send a single ping and update health"""
start_time = time.time()
try:
""" Uncomment when you have the generated gRPC client
await asyncio.wait_for(
self.client.ping(Empty()),
timeout=self.config.timeout
)
"""
# Mock ping delay
await asyncio.sleep(0.05 + (time.time() % 0.1)) # Variable delay 50-150ms
ping_time = time.time() - start_time
self.health.update_success(ping_time)
print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) - "
f"Health: {self.health.success_rate():.1f}%")
except asyncio.TimeoutError:
self.health.update_failure(self.config.max_consecutive_failures)
logging.warning(f"Keepalive ping timeout after {self.config.timeout}s")
except Exception as e:
self.health.update_failure(self.config.max_consecutive_failures)
logging.error(f"Keepalive ping failed: {e}")
async def _handle_connection_recovery(self) -> None:
"""Handle connection recovery when unhealthy"""
logging.warning("Connection unhealthy, attempting recovery...")
# Wait before attempting recovery
await asyncio.sleep(self.config.reconnect_delay)
# Try to recover connection
try:
# Implement connection recovery logic here
# await self._reconnect()
print("Connection recovery attempted")
except Exception as e:
logging.error(f"Connection recovery failed: {e}")
def get_health(self) -> ConnectionHealth:
"""Get current connection health"""
return self.health
def is_healthy(self) -> bool:
"""Check if connection is healthy"""
return self.health.is_healthy
# Connection manager with integrated keepalive
class ConnectionManagerWithKeepalive:
def __init__(self, nextblock_config, keepalive_config: KeepaliveConfig = None):
self.nextblock_config = nextblock_config
self.keepalive_config = keepalive_config or KeepaliveConfig()
# self.client = None
self.keepalive_manager: Optional[KeepaliveManager] = None
self.is_connected = False
async def connect(self) -> bool:
"""Establish connection and start keepalive"""
try:
# Create connection (see connection.md)
# channel, client = await create_nextblock_client(self.nextblock_config)
# self.client = client
# Test connection
# await self.client.ping(Empty())
self.is_connected = True
print("Successfully connected to NextBlock")
# Start keepalive
self.keepalive_manager = KeepaliveManager(
# self.client,
self.keepalive_config
)
await self.keepalive_manager.start()
return True
except Exception as e:
logging.error(f"Failed to connect: {e}")
self.is_connected = False
return False
async def disconnect(self) -> None:
"""Disconnect and stop keepalive"""
if self.keepalive_manager:
await self.keepalive_manager.stop()
self.is_connected = False
print("Disconnected from NextBlock")
def get_connection_health(self) -> Optional[ConnectionHealth]:
"""Get current connection health"""
if self.keepalive_manager:
return self.keepalive_manager.get_health()
return None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
# Health monitoring and alerting
class HealthMonitor:
def __init__(self, keepalive_manager: KeepaliveManager):
self.keepalive_manager = keepalive_manager
self.alert_thresholds = {
"success_rate": 90.0, # Alert if success rate < 90%
"avg_ping_time": 1.0, # Alert if avg ping time > 1s
"consecutive_failures": 2, # Alert after 2 consecutive failures
}
async def start_monitoring(self, check_interval: float = 30.0) -> None:
"""Start health monitoring with periodic checks"""
while True:
await asyncio.sleep(check_interval)
await self._check_health()
async def _check_health(self) -> None:
"""Check connection health and trigger alerts if needed"""
health = self.keepalive_manager.get_health()
# Check success rate
if health.success_rate() < self.alert_thresholds["success_rate"]:
await self._trigger_alert(
"Low success rate",
f"Success rate: {health.success_rate():.1f}%"
)
# Check average ping time
if health.average_ping_time > self.alert_thresholds["avg_ping_time"]:
await self._trigger_alert(
"High ping time",
f"Average ping time: {health.average_ping_time*1000:.1f}ms"
)
# Check consecutive failures
if health.consecutive_failures >= self.alert_thresholds["consecutive_failures"]:
await self._trigger_alert(
"Connection issues",
f"Consecutive failures: {health.consecutive_failures}"
)
async def _trigger_alert(self, alert_type: str, details: str) -> None:
"""Trigger health alert"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"HEALTH ALERT [{timestamp}] {alert_type}: {details}")
# Implement additional alerting logic here
# - Send email notifications
# - Post to Slack/Discord
# - Write to monitoring system
Usage Examples
async def basic_keepalive_example():
"""Basic keepalive usage"""
# Connect to NextBlock (see connection.md)
# config = NextBlockConfig.from_env()
# channel, client = await create_nextblock_client(config)
# Start keepalive task
keepalive_task = asyncio.create_task(
start_keepalive_task(
# client,
KeepaliveConfig(ping_interval=60.0)
)
)
# Your main application logic here
print("Application running with keepalive...")
try:
# Simulate application work
await asyncio.sleep(300) # Run for 5 minutes
except KeyboardInterrupt:
print("Stopping application...")
finally:
keepalive_task.cancel()
try:
await keepalive_task
except asyncio.CancelledError:
pass
async def advanced_keepalive_example():
"""Advanced keepalive with health monitoring"""
# from connection import NextBlockConfig
# Configuration
# nextblock_config = NextBlockConfig.from_env()
keepalive_config = KeepaliveConfig(
ping_interval=30.0, # Ping every 30 seconds
max_consecutive_failures=3,
reconnect_delay=10.0,
timeout=15.0,
)
# Use connection manager with integrated keepalive
async with ConnectionManagerWithKeepalive(
# nextblock_config,
None, # Placeholder
keepalive_config
) as manager:
if manager.is_connected:
print("Connected with keepalive enabled")
# Start health monitoring
health_monitor = HealthMonitor(manager.keepalive_manager)
monitor_task = asyncio.create_task(
health_monitor.start_monitoring(check_interval=60.0)
)
try:
# Your application logic here
for i in range(10):
await asyncio.sleep(30)
health = manager.get_connection_health()
if health:
print(f"Connection health check {i+1}:")
print(f" Healthy: {health.is_healthy}")
print(f" Success rate: {health.success_rate():.1f}%")
print(f" Avg ping time: {health.average_ping_time*1000:.1f}ms")
print(f" Total pings: {health.total_pings_sent}")
except KeyboardInterrupt:
print("Application interrupted")
finally:
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
async def main():
"""Main example runner"""
print("Choose example:")
print("1. Basic keepalive")
print("2. Advanced keepalive with monitoring")
choice = input("Enter choice (1 or 2): ").strip()
if choice == "1":
await basic_keepalive_example()
elif choice == "2":
await advanced_keepalive_example()
else:
print("Invalid choice")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
asyncio.run(main())
Best Practices
Appropriate intervals: Use 30-60 second ping intervals for most applications
Health monitoring: Track connection health and implement alerting
Graceful recovery: Handle connection failures with exponential backoff
Resource cleanup: Always stop keepalive tasks when shutting down
Timeout handling: Set reasonable timeouts for ping requests
Logging: Log keepalive events for debugging and monitoring
Integration: Integrate keepalive with your connection management system
Last updated