# Keepalive

Maintain persistent gRPC connections to NextBlock for optimal performance using Python's asyncio.

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone

<strong># Connection health tracking
</strong>@dataclass
class ConnectionHealth:
    is_healthy: bool = True
    last_successful_ping: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    consecutive_failures: int = 0
    total_pings_sent: int = 0
    total_pings_successful: int = 0
    average_ping_time: float = 0.0
    
    def success_rate(self) -> float:
<strong>        """Calculate ping success rate percentage"""
</strong>        if self.total_pings_sent == 0:
            return 100.0
        return (self.total_pings_successful / self.total_pings_sent) * 100.0
    
    def update_success(self, ping_time: float):
<strong>        """Update health after successful ping"""
</strong>        self.is_healthy = True
        self.last_successful_ping = datetime.now(timezone.utc)
        self.consecutive_failures = 0
        self.total_pings_sent += 1
        self.total_pings_successful += 1
        
<strong>        # Update average ping time
</strong>        if self.total_pings_successful == 1:
            self.average_ping_time = ping_time
        else:
            self.average_ping_time = (
                (self.average_ping_time * (self.total_pings_successful - 1) + ping_time) 
                / self.total_pings_successful
            )
    
    def update_failure(self, max_failures: int = 3):
<strong>        """Update health after failed ping"""
</strong>        self.consecutive_failures += 1
        self.total_pings_sent += 1
        
        if self.consecutive_failures >= max_failures:
            self.is_healthy = False

<strong># Keepalive configuration
</strong>@dataclass
class KeepaliveConfig:
    ping_interval: float = 60.0  # seconds
    max_consecutive_failures: int = 3
    reconnect_delay: float = 5.0  # seconds
    health_check_enabled: bool = True
    timeout: float = 15.0  # seconds

<strong># Basic keepalive implementation
</strong>async def start_keepalive_task(
    # nextblock_client,  # Your generated gRPC client
    config: KeepaliveConfig = None,
) -> None:
<strong>    """Start basic keepalive task"""
</strong>    if config is None:
        config = KeepaliveConfig()
    
    print(f"Starting keepalive task with {config.ping_interval}s interval")
    
    while True:
        try:
            await asyncio.sleep(config.ping_interval)
            
<strong>            # Send ping request
</strong>            start_time = time.time()
            
            """ Uncomment when you have the generated gRPC client
            try:
                await asyncio.wait_for(
                    nextblock_client.ping(Empty()),
                    timeout=config.timeout
                )
                
                ping_time = time.time() - start_time
                print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
                
            except asyncio.TimeoutError:
                print(f"Keepalive ping timeout after {config.timeout}s")
            except Exception as e:
                print(f"Keepalive ping failed: {e}")
                # Optionally implement reconnection logic
                break
            """
            
<strong>            # Mock ping for demonstration
</strong>            await asyncio.sleep(0.1)  # Simulate network delay
            ping_time = time.time() - start_time
            print(f"Mock keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
            
        except Exception as e:
            logging.error(f"Keepalive task error: {e}")
            break

<strong># Advanced keepalive manager
</strong>class KeepaliveManager:
    def __init__(
        self,
        # nextblock_client,  # Your generated gRPC client
        config: KeepaliveConfig = None,
    ):
        # self.client = nextblock_client
        self.config = config or KeepaliveConfig()
        self.health = ConnectionHealth()
        self.keepalive_task: Optional[asyncio.Task] = None
        self.is_running = False
    
    async def start(self) -> None:
<strong>        """Start the keepalive manager"""
</strong>        if self.is_running:
            return
        
        self.is_running = True
        self.keepalive_task = asyncio.create_task(self._keepalive_loop())
        print("Keepalive manager started")
    
    async def stop(self) -> None:
<strong>        """Stop the keepalive manager"""
</strong>        self.is_running = False
        
        if self.keepalive_task and not self.keepalive_task.done():
            self.keepalive_task.cancel()
            try:
                await self.keepalive_task
            except asyncio.CancelledError:
                pass
        
        print("Keepalive manager stopped")
    
    async def _keepalive_loop(self) -> None:
<strong>        """Main keepalive loop"""
</strong>        while self.is_running:
            try:
                await asyncio.sleep(self.config.ping_interval)
                await self._send_ping()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logging.error(f"Keepalive loop error: {e}")
                
<strong>                # Handle connection recovery
</strong>                if not self.health.is_healthy:
                    await self._handle_connection_recovery()
    
    async def _send_ping(self) -> None:
<strong>        """Send a single ping and update health"""
</strong>        start_time = time.time()
        
        try:
            """ Uncomment when you have the generated gRPC client
            await asyncio.wait_for(
                self.client.ping(Empty()),
                timeout=self.config.timeout
            )
            """
            
<strong>            # Mock ping delay
</strong>            await asyncio.sleep(0.05 + (time.time() % 0.1))  # Variable delay 50-150ms
            
            ping_time = time.time() - start_time
            self.health.update_success(ping_time)
            
            print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) - "
                  f"Health: {self.health.success_rate():.1f}%")
            
        except asyncio.TimeoutError:
            self.health.update_failure(self.config.max_consecutive_failures)
            logging.warning(f"Keepalive ping timeout after {self.config.timeout}s")
            
        except Exception as e:
            self.health.update_failure(self.config.max_consecutive_failures)
            logging.error(f"Keepalive ping failed: {e}")
    
    async def _handle_connection_recovery(self) -> None:
<strong>        """Handle connection recovery when unhealthy"""
</strong>        logging.warning("Connection unhealthy, attempting recovery...")
        
<strong>        # Wait before attempting recovery
</strong>        await asyncio.sleep(self.config.reconnect_delay)
        
<strong>        # Try to recover connection
</strong>        try:
<strong>            # Implement connection recovery logic here
</strong>            # await self._reconnect()
            print("Connection recovery attempted")
            
        except Exception as e:
            logging.error(f"Connection recovery failed: {e}")
    
    def get_health(self) -> ConnectionHealth:
<strong>        """Get current connection health"""
</strong>        return self.health
    
    def is_healthy(self) -> bool:
<strong>        """Check if connection is healthy"""
</strong>        return self.health.is_healthy

<strong># Connection manager with integrated keepalive
</strong>class ConnectionManagerWithKeepalive:
    def __init__(self, nextblock_config, keepalive_config: KeepaliveConfig = None):
        self.nextblock_config = nextblock_config
        self.keepalive_config = keepalive_config or KeepaliveConfig()
        # self.client = None
        self.keepalive_manager: Optional[KeepaliveManager] = None
        self.is_connected = False
    
    async def connect(self) -> bool:
<strong>        """Establish connection and start keepalive"""
</strong>        try:
<strong>            # Create connection (see connection.md)
</strong>            # channel, client = await create_nextblock_client(self.nextblock_config)
            # self.client = client
            
<strong>            # Test connection
</strong>            # await self.client.ping(Empty())
            
            self.is_connected = True
            print("Successfully connected to NextBlock")
            
<strong>            # Start keepalive
</strong>            self.keepalive_manager = KeepaliveManager(
                # self.client,
                self.keepalive_config
            )
            await self.keepalive_manager.start()
            
            return True
            
        except Exception as e:
            logging.error(f"Failed to connect: {e}")
            self.is_connected = False
            return False
    
    async def disconnect(self) -> None:
<strong>        """Disconnect and stop keepalive"""
</strong>        if self.keepalive_manager:
            await self.keepalive_manager.stop()
        
        self.is_connected = False
        print("Disconnected from NextBlock")
    
    def get_connection_health(self) -> Optional[ConnectionHealth]:
<strong>        """Get current connection health"""
</strong>        if self.keepalive_manager:
            return self.keepalive_manager.get_health()
        return None
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

<strong># Health monitoring and alerting
</strong>class HealthMonitor:
    def __init__(self, keepalive_manager: KeepaliveManager):
        self.keepalive_manager = keepalive_manager
        self.alert_thresholds = {
            "success_rate": 90.0,  # Alert if success rate &#x3C; 90%
            "avg_ping_time": 1.0,  # Alert if avg ping time > 1s
            "consecutive_failures": 2,  # Alert after 2 consecutive failures
        }
    
    async def start_monitoring(self, check_interval: float = 30.0) -> None:
<strong>        """Start health monitoring with periodic checks"""
</strong>        while True:
            await asyncio.sleep(check_interval)
            await self._check_health()
    
    async def _check_health(self) -> None:
<strong>        """Check connection health and trigger alerts if needed"""
</strong>        health = self.keepalive_manager.get_health()
        
<strong>        # Check success rate
</strong>        if health.success_rate() &#x3C; self.alert_thresholds["success_rate"]:
            await self._trigger_alert(
                "Low success rate",
                f"Success rate: {health.success_rate():.1f}%"
            )
        
<strong>        # Check average ping time
</strong>        if health.average_ping_time > self.alert_thresholds["avg_ping_time"]:
            await self._trigger_alert(
                "High ping time",
                f"Average ping time: {health.average_ping_time*1000:.1f}ms"
            )
        
<strong>        # Check consecutive failures
</strong>        if health.consecutive_failures >= self.alert_thresholds["consecutive_failures"]:
            await self._trigger_alert(
                "Connection issues",
                f"Consecutive failures: {health.consecutive_failures}"
            )
    
    async def _trigger_alert(self, alert_type: str, details: str) -> None:
<strong>        """Trigger health alert"""
</strong>        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"HEALTH ALERT [{timestamp}] {alert_type}: {details}")
        
<strong>        # Implement additional alerting logic here
</strong>        # - Send email notifications
        # - Post to Slack/Discord
        # - Write to monitoring system
</code></pre>

## Usage Examples

<pre class="language-python"><code class="lang-python"><strong>async def basic_keepalive_example():
</strong><strong>    """Basic keepalive usage"""
</strong>    # Connect to NextBlock (see connection.md)
    # config = NextBlockConfig.from_env()
    # channel, client = await create_nextblock_client(config)
    
<strong>    # Start keepalive task
</strong>    keepalive_task = asyncio.create_task(
        start_keepalive_task(
            # client,
            KeepaliveConfig(ping_interval=60.0)
        )
    )
    
<strong>    # Your main application logic here
</strong>    print("Application running with keepalive...")
    
    try:
<strong>        # Simulate application work
</strong>        await asyncio.sleep(300)  # Run for 5 minutes
    except KeyboardInterrupt:
        print("Stopping application...")
    finally:
        keepalive_task.cancel()
        try:
            await keepalive_task
        except asyncio.CancelledError:
            pass

<strong>async def advanced_keepalive_example():
</strong><strong>    """Advanced keepalive with health monitoring"""
</strong>    # from connection import NextBlockConfig
    
<strong>    # Configuration
</strong>    # nextblock_config = NextBlockConfig.from_env()
    keepalive_config = KeepaliveConfig(
        ping_interval=30.0,  # Ping every 30 seconds
        max_consecutive_failures=3,
        reconnect_delay=10.0,
        timeout=15.0,
    )
    
<strong>    # Use connection manager with integrated keepalive
</strong>    async with ConnectionManagerWithKeepalive(
        # nextblock_config,
        None,  # Placeholder
        keepalive_config
    ) as manager:
        
        if manager.is_connected:
            print("Connected with keepalive enabled")
            
<strong>            # Start health monitoring
</strong>            health_monitor = HealthMonitor(manager.keepalive_manager)
            monitor_task = asyncio.create_task(
                health_monitor.start_monitoring(check_interval=60.0)
            )
            
            try:
<strong>                # Your application logic here
</strong>                for i in range(10):
                    await asyncio.sleep(30)
                    
                    health = manager.get_connection_health()
                    if health:
                        print(f"Connection health check {i+1}:")
                        print(f"  Healthy: {health.is_healthy}")
                        print(f"  Success rate: {health.success_rate():.1f}%")
                        print(f"  Avg ping time: {health.average_ping_time*1000:.1f}ms")
                        print(f"  Total pings: {health.total_pings_sent}")
                
            except KeyboardInterrupt:
                print("Application interrupted")
            finally:
                monitor_task.cancel()
                try:
                    await monitor_task
                except asyncio.CancelledError:
                    pass

<strong>async def main():
</strong><strong>    """Main example runner"""
</strong>    print("Choose example:")
    print("1. Basic keepalive")
    print("2. Advanced keepalive with monitoring")
    
    choice = input("Enter choice (1 or 2): ").strip()
    
    if choice == "1":
        await basic_keepalive_example()
    elif choice == "2":
        await advanced_keepalive_example()
    else:
        print("Invalid choice")

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    asyncio.run(main())
</code></pre>

## Best Practices

1. **Appropriate intervals**: Use 30-60 second ping intervals for most applications
2. **Health monitoring**: Track connection health and implement alerting
3. **Graceful recovery**: Handle connection failures with exponential backoff
4. **Resource cleanup**: Always stop keepalive tasks when shutting down
5. **Timeout handling**: Set reasonable timeouts for ping requests
6. **Logging**: Log keepalive events for debugging and monitoring
7. **Integration**: Integrate keepalive with your connection management system
