Keepalive

Maintain persistent gRPC connections to NextBlock for optimal performance using Python's asyncio.

import asyncio
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone

# Connection health tracking
@dataclass
class ConnectionHealth:
    is_healthy: bool = True
    last_successful_ping: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    consecutive_failures: int = 0
    total_pings_sent: int = 0
    total_pings_successful: int = 0
    average_ping_time: float = 0.0
    
    def success_rate(self) -> float:
        """Calculate ping success rate percentage"""
        if self.total_pings_sent == 0:
            return 100.0
        return (self.total_pings_successful / self.total_pings_sent) * 100.0
    
    def update_success(self, ping_time: float):
        """Update health after successful ping"""
        self.is_healthy = True
        self.last_successful_ping = datetime.now(timezone.utc)
        self.consecutive_failures = 0
        self.total_pings_sent += 1
        self.total_pings_successful += 1
        
        # Update average ping time
        if self.total_pings_successful == 1:
            self.average_ping_time = ping_time
        else:
            self.average_ping_time = (
                (self.average_ping_time * (self.total_pings_successful - 1) + ping_time) 
                / self.total_pings_successful
            )
    
    def update_failure(self, max_failures: int = 3):
        """Update health after failed ping"""
        self.consecutive_failures += 1
        self.total_pings_sent += 1
        
        if self.consecutive_failures >= max_failures:
            self.is_healthy = False

# Keepalive configuration
@dataclass
class KeepaliveConfig:
    ping_interval: float = 60.0  # seconds
    max_consecutive_failures: int = 3
    reconnect_delay: float = 5.0  # seconds
    health_check_enabled: bool = True
    timeout: float = 15.0  # seconds

# Basic keepalive implementation
async def start_keepalive_task(
    # nextblock_client,  # Your generated gRPC client
    config: KeepaliveConfig = None,
) -> None:
    """Start basic keepalive task"""
    if config is None:
        config = KeepaliveConfig()
    
    print(f"Starting keepalive task with {config.ping_interval}s interval")
    
    while True:
        try:
            await asyncio.sleep(config.ping_interval)
            
            # Send ping request
            start_time = time.time()
            
            """ Uncomment when you have the generated gRPC client
            try:
                await asyncio.wait_for(
                    nextblock_client.ping(Empty()),
                    timeout=config.timeout
                )
                
                ping_time = time.time() - start_time
                print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
                
            except asyncio.TimeoutError:
                print(f"Keepalive ping timeout after {config.timeout}s")
            except Exception as e:
                print(f"Keepalive ping failed: {e}")
                # Optionally implement reconnection logic
                break
            """
            
            # Mock ping for demonstration
            await asyncio.sleep(0.1)  # Simulate network delay
            ping_time = time.time() - start_time
            print(f"Mock keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
            
        except Exception as e:
            logging.error(f"Keepalive task error: {e}")
            break

# Advanced keepalive manager
class KeepaliveManager:
    def __init__(
        self,
        # nextblock_client,  # Your generated gRPC client
        config: KeepaliveConfig = None,
    ):
        # self.client = nextblock_client
        self.config = config or KeepaliveConfig()
        self.health = ConnectionHealth()
        self.keepalive_task: Optional[asyncio.Task] = None
        self.is_running = False
    
    async def start(self) -> None:
        """Start the keepalive manager"""
        if self.is_running:
            return
        
        self.is_running = True
        self.keepalive_task = asyncio.create_task(self._keepalive_loop())
        print("Keepalive manager started")
    
    async def stop(self) -> None:
        """Stop the keepalive manager"""
        self.is_running = False
        
        if self.keepalive_task and not self.keepalive_task.done():
            self.keepalive_task.cancel()
            try:
                await self.keepalive_task
            except asyncio.CancelledError:
                pass
        
        print("Keepalive manager stopped")
    
    async def _keepalive_loop(self) -> None:
        """Main keepalive loop"""
        while self.is_running:
            try:
                await asyncio.sleep(self.config.ping_interval)
                await self._send_ping()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logging.error(f"Keepalive loop error: {e}")
                
                # Handle connection recovery
                if not self.health.is_healthy:
                    await self._handle_connection_recovery()
    
    async def _send_ping(self) -> None:
        """Send a single ping and update health"""
        start_time = time.time()
        
        try:
            """ Uncomment when you have the generated gRPC client
            await asyncio.wait_for(
                self.client.ping(Empty()),
                timeout=self.config.timeout
            )
            """
            
            # Mock ping delay
            await asyncio.sleep(0.05 + (time.time() % 0.1))  # Variable delay 50-150ms
            
            ping_time = time.time() - start_time
            self.health.update_success(ping_time)
            
            print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) - "
                  f"Health: {self.health.success_rate():.1f}%")
            
        except asyncio.TimeoutError:
            self.health.update_failure(self.config.max_consecutive_failures)
            logging.warning(f"Keepalive ping timeout after {self.config.timeout}s")
            
        except Exception as e:
            self.health.update_failure(self.config.max_consecutive_failures)
            logging.error(f"Keepalive ping failed: {e}")
    
    async def _handle_connection_recovery(self) -> None:
        """Handle connection recovery when unhealthy"""
        logging.warning("Connection unhealthy, attempting recovery...")
        
        # Wait before attempting recovery
        await asyncio.sleep(self.config.reconnect_delay)
        
        # Try to recover connection
        try:
            # Implement connection recovery logic here
            # await self._reconnect()
            print("Connection recovery attempted")
            
        except Exception as e:
            logging.error(f"Connection recovery failed: {e}")
    
    def get_health(self) -> ConnectionHealth:
        """Get current connection health"""
        return self.health
    
    def is_healthy(self) -> bool:
        """Check if connection is healthy"""
        return self.health.is_healthy

# Connection manager with integrated keepalive
class ConnectionManagerWithKeepalive:
    def __init__(self, nextblock_config, keepalive_config: KeepaliveConfig = None):
        self.nextblock_config = nextblock_config
        self.keepalive_config = keepalive_config or KeepaliveConfig()
        # self.client = None
        self.keepalive_manager: Optional[KeepaliveManager] = None
        self.is_connected = False
    
    async def connect(self) -> bool:
        """Establish connection and start keepalive"""
        try:
            # Create connection (see connection.md)
            # channel, client = await create_nextblock_client(self.nextblock_config)
            # self.client = client
            
            # Test connection
            # await self.client.ping(Empty())
            
            self.is_connected = True
            print("Successfully connected to NextBlock")
            
            # Start keepalive
            self.keepalive_manager = KeepaliveManager(
                # self.client,
                self.keepalive_config
            )
            await self.keepalive_manager.start()
            
            return True
            
        except Exception as e:
            logging.error(f"Failed to connect: {e}")
            self.is_connected = False
            return False
    
    async def disconnect(self) -> None:
        """Disconnect and stop keepalive"""
        if self.keepalive_manager:
            await self.keepalive_manager.stop()
        
        self.is_connected = False
        print("Disconnected from NextBlock")
    
    def get_connection_health(self) -> Optional[ConnectionHealth]:
        """Get current connection health"""
        if self.keepalive_manager:
            return self.keepalive_manager.get_health()
        return None
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

# Health monitoring and alerting
class HealthMonitor:
    def __init__(self, keepalive_manager: KeepaliveManager):
        self.keepalive_manager = keepalive_manager
        self.alert_thresholds = {
            "success_rate": 90.0,  # Alert if success rate < 90%
            "avg_ping_time": 1.0,  # Alert if avg ping time > 1s
            "consecutive_failures": 2,  # Alert after 2 consecutive failures
        }
    
    async def start_monitoring(self, check_interval: float = 30.0) -> None:
        """Start health monitoring with periodic checks"""
        while True:
            await asyncio.sleep(check_interval)
            await self._check_health()
    
    async def _check_health(self) -> None:
        """Check connection health and trigger alerts if needed"""
        health = self.keepalive_manager.get_health()
        
        # Check success rate
        if health.success_rate() < self.alert_thresholds["success_rate"]:
            await self._trigger_alert(
                "Low success rate",
                f"Success rate: {health.success_rate():.1f}%"
            )
        
        # Check average ping time
        if health.average_ping_time > self.alert_thresholds["avg_ping_time"]:
            await self._trigger_alert(
                "High ping time",
                f"Average ping time: {health.average_ping_time*1000:.1f}ms"
            )
        
        # Check consecutive failures
        if health.consecutive_failures >= self.alert_thresholds["consecutive_failures"]:
            await self._trigger_alert(
                "Connection issues",
                f"Consecutive failures: {health.consecutive_failures}"
            )
    
    async def _trigger_alert(self, alert_type: str, details: str) -> None:
        """Trigger health alert"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"HEALTH ALERT [{timestamp}] {alert_type}: {details}")
        
        # Implement additional alerting logic here
        # - Send email notifications
        # - Post to Slack/Discord
        # - Write to monitoring system

Usage Examples

async def basic_keepalive_example():
    """Basic keepalive usage"""
    # Connect to NextBlock (see connection.md)
    # config = NextBlockConfig.from_env()
    # channel, client = await create_nextblock_client(config)
    
    # Start keepalive task
    keepalive_task = asyncio.create_task(
        start_keepalive_task(
            # client,
            KeepaliveConfig(ping_interval=60.0)
        )
    )
    
    # Your main application logic here
    print("Application running with keepalive...")
    
    try:
        # Simulate application work
        await asyncio.sleep(300)  # Run for 5 minutes
    except KeyboardInterrupt:
        print("Stopping application...")
    finally:
        keepalive_task.cancel()
        try:
            await keepalive_task
        except asyncio.CancelledError:
            pass

async def advanced_keepalive_example():
    """Advanced keepalive with health monitoring"""
    # from connection import NextBlockConfig
    
    # Configuration
    # nextblock_config = NextBlockConfig.from_env()
    keepalive_config = KeepaliveConfig(
        ping_interval=30.0,  # Ping every 30 seconds
        max_consecutive_failures=3,
        reconnect_delay=10.0,
        timeout=15.0,
    )
    
    # Use connection manager with integrated keepalive
    async with ConnectionManagerWithKeepalive(
        # nextblock_config,
        None,  # Placeholder
        keepalive_config
    ) as manager:
        
        if manager.is_connected:
            print("Connected with keepalive enabled")
            
            # Start health monitoring
            health_monitor = HealthMonitor(manager.keepalive_manager)
            monitor_task = asyncio.create_task(
                health_monitor.start_monitoring(check_interval=60.0)
            )
            
            try:
                # Your application logic here
                for i in range(10):
                    await asyncio.sleep(30)
                    
                    health = manager.get_connection_health()
                    if health:
                        print(f"Connection health check {i+1}:")
                        print(f"  Healthy: {health.is_healthy}")
                        print(f"  Success rate: {health.success_rate():.1f}%")
                        print(f"  Avg ping time: {health.average_ping_time*1000:.1f}ms")
                        print(f"  Total pings: {health.total_pings_sent}")
                
            except KeyboardInterrupt:
                print("Application interrupted")
            finally:
                monitor_task.cancel()
                try:
                    await monitor_task
                except asyncio.CancelledError:
                    pass

async def main():
    """Main example runner"""
    print("Choose example:")
    print("1. Basic keepalive")
    print("2. Advanced keepalive with monitoring")
    
    choice = input("Enter choice (1 or 2): ").strip()
    
    if choice == "1":
        await basic_keepalive_example()
    elif choice == "2":
        await advanced_keepalive_example()
    else:
        print("Invalid choice")

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    asyncio.run(main())

Best Practices

  1. Appropriate intervals: Use 30-60 second ping intervals for most applications

  2. Health monitoring: Track connection health and implement alerting

  3. Graceful recovery: Handle connection failures with exponential backoff

  4. Resource cleanup: Always stop keepalive tasks when shutting down

  5. Timeout handling: Set reasonable timeouts for ping requests

  6. Logging: Log keepalive events for debugging and monitoring

  7. Integration: Integrate keepalive with your connection management system

Last updated