# Keepalive

Maintain persistent gRPC connections to NextBlock for optimal performance using Python's asyncio.

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone

<strong># Connection health tracking
</strong>@dataclass
class ConnectionHealth:
    is_healthy: bool = True
    last_successful_ping: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    consecutive_failures: int = 0
    total_pings_sent: int = 0
    total_pings_successful: int = 0
    average_ping_time: float = 0.0
    
    def success_rate(self) -> float:
<strong>        """Calculate ping success rate percentage"""
</strong>        if self.total_pings_sent == 0:
            return 100.0
        return (self.total_pings_successful / self.total_pings_sent) * 100.0
    
    def update_success(self, ping_time: float):
<strong>        """Update health after successful ping"""
</strong>        self.is_healthy = True
        self.last_successful_ping = datetime.now(timezone.utc)
        self.consecutive_failures = 0
        self.total_pings_sent += 1
        self.total_pings_successful += 1
        
<strong>        # Update average ping time
</strong>        if self.total_pings_successful == 1:
            self.average_ping_time = ping_time
        else:
            self.average_ping_time = (
                (self.average_ping_time * (self.total_pings_successful - 1) + ping_time) 
                / self.total_pings_successful
            )
    
    def update_failure(self, max_failures: int = 3):
<strong>        """Update health after failed ping"""
</strong>        self.consecutive_failures += 1
        self.total_pings_sent += 1
        
        if self.consecutive_failures >= max_failures:
            self.is_healthy = False

<strong># Keepalive configuration
</strong>@dataclass
class KeepaliveConfig:
    ping_interval: float = 60.0  # seconds
    max_consecutive_failures: int = 3
    reconnect_delay: float = 5.0  # seconds
    health_check_enabled: bool = True
    timeout: float = 15.0  # seconds

<strong># Basic keepalive implementation
</strong>async def start_keepalive_task(
    # nextblock_client,  # Your generated gRPC client
    config: KeepaliveConfig = None,
) -> None:
<strong>    """Start basic keepalive task"""
</strong>    if config is None:
        config = KeepaliveConfig()
    
    print(f"Starting keepalive task with {config.ping_interval}s interval")
    
    while True:
        try:
            await asyncio.sleep(config.ping_interval)
            
<strong>            # Send ping request
</strong>            start_time = time.time()
            
            """ Uncomment when you have the generated gRPC client
            try:
                await asyncio.wait_for(
                    nextblock_client.ping(Empty()),
                    timeout=config.timeout
                )
                
                ping_time = time.time() - start_time
                print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
                
            except asyncio.TimeoutError:
                print(f"Keepalive ping timeout after {config.timeout}s")
            except Exception as e:
                print(f"Keepalive ping failed: {e}")
                # Optionally implement reconnection logic
                break
            """
            
<strong>            # Mock ping for demonstration
</strong>            await asyncio.sleep(0.1)  # Simulate network delay
            ping_time = time.time() - start_time
            print(f"Mock keepalive ping successful ({ping_time*1000:.1f}ms) at {datetime.now().strftime('%H:%M:%S')}")
            
        except Exception as e:
            logging.error(f"Keepalive task error: {e}")
            break

<strong># Advanced keepalive manager
</strong>class KeepaliveManager:
    def __init__(
        self,
        # nextblock_client,  # Your generated gRPC client
        config: KeepaliveConfig = None,
    ):
        # self.client = nextblock_client
        self.config = config or KeepaliveConfig()
        self.health = ConnectionHealth()
        self.keepalive_task: Optional[asyncio.Task] = None
        self.is_running = False
    
    async def start(self) -> None:
<strong>        """Start the keepalive manager"""
</strong>        if self.is_running:
            return
        
        self.is_running = True
        self.keepalive_task = asyncio.create_task(self._keepalive_loop())
        print("Keepalive manager started")
    
    async def stop(self) -> None:
<strong>        """Stop the keepalive manager"""
</strong>        self.is_running = False
        
        if self.keepalive_task and not self.keepalive_task.done():
            self.keepalive_task.cancel()
            try:
                await self.keepalive_task
            except asyncio.CancelledError:
                pass
        
        print("Keepalive manager stopped")
    
    async def _keepalive_loop(self) -> None:
<strong>        """Main keepalive loop"""
</strong>        while self.is_running:
            try:
                await asyncio.sleep(self.config.ping_interval)
                await self._send_ping()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logging.error(f"Keepalive loop error: {e}")
                
<strong>                # Handle connection recovery
</strong>                if not self.health.is_healthy:
                    await self._handle_connection_recovery()
    
    async def _send_ping(self) -> None:
<strong>        """Send a single ping and update health"""
</strong>        start_time = time.time()
        
        try:
            """ Uncomment when you have the generated gRPC client
            await asyncio.wait_for(
                self.client.ping(Empty()),
                timeout=self.config.timeout
            )
            """
            
<strong>            # Mock ping delay
</strong>            await asyncio.sleep(0.05 + (time.time() % 0.1))  # Variable delay 50-150ms
            
            ping_time = time.time() - start_time
            self.health.update_success(ping_time)
            
            print(f"Keepalive ping successful ({ping_time*1000:.1f}ms) - "
                  f"Health: {self.health.success_rate():.1f}%")
            
        except asyncio.TimeoutError:
            self.health.update_failure(self.config.max_consecutive_failures)
            logging.warning(f"Keepalive ping timeout after {self.config.timeout}s")
            
        except Exception as e:
            self.health.update_failure(self.config.max_consecutive_failures)
            logging.error(f"Keepalive ping failed: {e}")
    
    async def _handle_connection_recovery(self) -> None:
<strong>        """Handle connection recovery when unhealthy"""
</strong>        logging.warning("Connection unhealthy, attempting recovery...")
        
<strong>        # Wait before attempting recovery
</strong>        await asyncio.sleep(self.config.reconnect_delay)
        
<strong>        # Try to recover connection
</strong>        try:
<strong>            # Implement connection recovery logic here
</strong>            # await self._reconnect()
            print("Connection recovery attempted")
            
        except Exception as e:
            logging.error(f"Connection recovery failed: {e}")
    
    def get_health(self) -> ConnectionHealth:
<strong>        """Get current connection health"""
</strong>        return self.health
    
    def is_healthy(self) -> bool:
<strong>        """Check if connection is healthy"""
</strong>        return self.health.is_healthy

<strong># Connection manager with integrated keepalive
</strong>class ConnectionManagerWithKeepalive:
    def __init__(self, nextblock_config, keepalive_config: KeepaliveConfig = None):
        self.nextblock_config = nextblock_config
        self.keepalive_config = keepalive_config or KeepaliveConfig()
        # self.client = None
        self.keepalive_manager: Optional[KeepaliveManager] = None
        self.is_connected = False
    
    async def connect(self) -> bool:
<strong>        """Establish connection and start keepalive"""
</strong>        try:
<strong>            # Create connection (see connection.md)
</strong>            # channel, client = await create_nextblock_client(self.nextblock_config)
            # self.client = client
            
<strong>            # Test connection
</strong>            # await self.client.ping(Empty())
            
            self.is_connected = True
            print("Successfully connected to NextBlock")
            
<strong>            # Start keepalive
</strong>            self.keepalive_manager = KeepaliveManager(
                # self.client,
                self.keepalive_config
            )
            await self.keepalive_manager.start()
            
            return True
            
        except Exception as e:
            logging.error(f"Failed to connect: {e}")
            self.is_connected = False
            return False
    
    async def disconnect(self) -> None:
<strong>        """Disconnect and stop keepalive"""
</strong>        if self.keepalive_manager:
            await self.keepalive_manager.stop()
        
        self.is_connected = False
        print("Disconnected from NextBlock")
    
    def get_connection_health(self) -> Optional[ConnectionHealth]:
<strong>        """Get current connection health"""
</strong>        if self.keepalive_manager:
            return self.keepalive_manager.get_health()
        return None
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

<strong># Health monitoring and alerting
</strong>class HealthMonitor:
    def __init__(self, keepalive_manager: KeepaliveManager):
        self.keepalive_manager = keepalive_manager
        self.alert_thresholds = {
            "success_rate": 90.0,  # Alert if success rate &#x3C; 90%
            "avg_ping_time": 1.0,  # Alert if avg ping time > 1s
            "consecutive_failures": 2,  # Alert after 2 consecutive failures
        }
    
    async def start_monitoring(self, check_interval: float = 30.0) -> None:
<strong>        """Start health monitoring with periodic checks"""
</strong>        while True:
            await asyncio.sleep(check_interval)
            await self._check_health()
    
    async def _check_health(self) -> None:
<strong>        """Check connection health and trigger alerts if needed"""
</strong>        health = self.keepalive_manager.get_health()
        
<strong>        # Check success rate
</strong>        if health.success_rate() &#x3C; self.alert_thresholds["success_rate"]:
            await self._trigger_alert(
                "Low success rate",
                f"Success rate: {health.success_rate():.1f}%"
            )
        
<strong>        # Check average ping time
</strong>        if health.average_ping_time > self.alert_thresholds["avg_ping_time"]:
            await self._trigger_alert(
                "High ping time",
                f"Average ping time: {health.average_ping_time*1000:.1f}ms"
            )
        
<strong>        # Check consecutive failures
</strong>        if health.consecutive_failures >= self.alert_thresholds["consecutive_failures"]:
            await self._trigger_alert(
                "Connection issues",
                f"Consecutive failures: {health.consecutive_failures}"
            )
    
    async def _trigger_alert(self, alert_type: str, details: str) -> None:
<strong>        """Trigger health alert"""
</strong>        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"HEALTH ALERT [{timestamp}] {alert_type}: {details}")
        
<strong>        # Implement additional alerting logic here
</strong>        # - Send email notifications
        # - Post to Slack/Discord
        # - Write to monitoring system
</code></pre>

## Usage Examples

<pre class="language-python"><code class="lang-python"><strong>async def basic_keepalive_example():
</strong><strong>    """Basic keepalive usage"""
</strong>    # Connect to NextBlock (see connection.md)
    # config = NextBlockConfig.from_env()
    # channel, client = await create_nextblock_client(config)
    
<strong>    # Start keepalive task
</strong>    keepalive_task = asyncio.create_task(
        start_keepalive_task(
            # client,
            KeepaliveConfig(ping_interval=60.0)
        )
    )
    
<strong>    # Your main application logic here
</strong>    print("Application running with keepalive...")
    
    try:
<strong>        # Simulate application work
</strong>        await asyncio.sleep(300)  # Run for 5 minutes
    except KeyboardInterrupt:
        print("Stopping application...")
    finally:
        keepalive_task.cancel()
        try:
            await keepalive_task
        except asyncio.CancelledError:
            pass

<strong>async def advanced_keepalive_example():
</strong><strong>    """Advanced keepalive with health monitoring"""
</strong>    # from connection import NextBlockConfig
    
<strong>    # Configuration
</strong>    # nextblock_config = NextBlockConfig.from_env()
    keepalive_config = KeepaliveConfig(
        ping_interval=30.0,  # Ping every 30 seconds
        max_consecutive_failures=3,
        reconnect_delay=10.0,
        timeout=15.0,
    )
    
<strong>    # Use connection manager with integrated keepalive
</strong>    async with ConnectionManagerWithKeepalive(
        # nextblock_config,
        None,  # Placeholder
        keepalive_config
    ) as manager:
        
        if manager.is_connected:
            print("Connected with keepalive enabled")
            
<strong>            # Start health monitoring
</strong>            health_monitor = HealthMonitor(manager.keepalive_manager)
            monitor_task = asyncio.create_task(
                health_monitor.start_monitoring(check_interval=60.0)
            )
            
            try:
<strong>                # Your application logic here
</strong>                for i in range(10):
                    await asyncio.sleep(30)
                    
                    health = manager.get_connection_health()
                    if health:
                        print(f"Connection health check {i+1}:")
                        print(f"  Healthy: {health.is_healthy}")
                        print(f"  Success rate: {health.success_rate():.1f}%")
                        print(f"  Avg ping time: {health.average_ping_time*1000:.1f}ms")
                        print(f"  Total pings: {health.total_pings_sent}")
                
            except KeyboardInterrupt:
                print("Application interrupted")
            finally:
                monitor_task.cancel()
                try:
                    await monitor_task
                except asyncio.CancelledError:
                    pass

<strong>async def main():
</strong><strong>    """Main example runner"""
</strong>    print("Choose example:")
    print("1. Basic keepalive")
    print("2. Advanced keepalive with monitoring")
    
    choice = input("Enter choice (1 or 2): ").strip()
    
    if choice == "1":
        await basic_keepalive_example()
    elif choice == "2":
        await advanced_keepalive_example()
    else:
        print("Invalid choice")

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    asyncio.run(main())
</code></pre>

## Best Practices

1. **Appropriate intervals**: Use 30-60 second ping intervals for most applications
2. **Health monitoring**: Track connection health and implement alerting
3. **Graceful recovery**: Handle connection failures with exponential backoff
4. **Resource cleanup**: Always stop keepalive tasks when shutting down
5. **Timeout handling**: Set reasonable timeouts for ping requests
6. **Logging**: Log keepalive events for debugging and monitoring
7. **Integration**: Integrate keepalive with your connection management system


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.nextblock.io/api/examples/python/keepalive.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
