Keepalive

Maintain persistent gRPC connections to NextBlock for optimal performance using Rust's Tonic client.

Basic Keepalive Implementation

use std::time::Duration;
use tokio::time::{interval, sleep};
use tonic::Request;

// Send periodic ping to keep connection alive
async fn start_keepalive_task(
    // mut nextblock_client: YourGeneratedApiClient,
    ping_interval: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut interval_timer = interval(ping_interval);
    
    println!("Starting keepalive task with interval: {:?}", ping_interval);
    
    loop {
        interval_timer.tick().await;
        
        // Send ping request
        /* Uncomment when you have the generated API client
        match nextblock_client.ping(Request::new(())).await {
            Ok(_) => {
                println!("Keepalive ping successful at {}", 
                        chrono::Utc::now().format("%H:%M:%S"));
            }
            Err(e) => {
                eprintln!("Keepalive ping failed: {}", e);
                // Optionally implement reconnection logic here
                break;
            }
        }
        */
        
        // Mock ping for demonstration
        println!("Mock keepalive ping sent at {}", 
                chrono::Utc::now().format("%H:%M:%S"));
    }
    
    Ok(())
}

Advanced Keepalive with Connection Management

use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::transport::Channel;

// Connection health tracker
#[derive(Debug, Clone)]
pub struct ConnectionHealth {
    pub is_healthy: bool,
    pub last_successful_ping: chrono::DateTime<chrono::Utc>,
    pub consecutive_failures: u32,
    pub total_pings_sent: u64,
    pub total_pings_successful: u64,
}

impl Default for ConnectionHealth {
    fn default() -> Self {
        Self {
            is_healthy: true,
            last_successful_ping: chrono::Utc::now(),
            consecutive_failures: 0,
            total_pings_sent: 0,
            total_pings_successful: 0,
        }
    }
}

// Advanced keepalive manager
pub struct KeepaliveManager {
    // client: Arc<RwLock<YourGeneratedApiClient>>,
    health: Arc<RwLock<ConnectionHealth>>,
    config: KeepaliveConfig,
}

#[derive(Debug, Clone)]
pub struct KeepaliveConfig {
    pub ping_interval: Duration,
    pub max_consecutive_failures: u32,
    pub reconnect_delay: Duration,
    pub health_check_enabled: bool,
}

impl Default for KeepaliveConfig {
    fn default() -> Self {
        Self {
            ping_interval: Duration::from_secs(60),
            max_consecutive_failures: 3,
            reconnect_delay: Duration::from_secs(5),
            health_check_enabled: true,
        }
    }
}

impl KeepaliveManager {
    pub fn new(
        // client: YourGeneratedApiClient,
        config: KeepaliveConfig,
    ) -> Self {
        Self {
            // client: Arc::new(RwLock::new(client)),
            health: Arc::new(RwLock::new(ConnectionHealth::default())),
            config,
        }
    }
    
    // Start keepalive task
    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
        let health = Arc::clone(&self.health);
        // let client = Arc::clone(&self.client);
        let config = self.config.clone();
        
        tokio::spawn(async move {
            let mut interval_timer = interval(config.ping_interval);
            
            loop {
                interval_timer.tick().await;
                
                // Send ping and update health
                Self::send_ping_and_update_health(
                    // Arc::clone(&client),
                    Arc::clone(&health),
                    &config,
                ).await;
            }
        });
        
        Ok(())
    }
    
    // Send ping and update connection health
    async fn send_ping_and_update_health(
        // client: Arc<RwLock<YourGeneratedApiClient>>,
        health: Arc<RwLock<ConnectionHealth>>,
        config: &KeepaliveConfig,
    ) {
        let ping_start = std::time::Instant::now();
        
        // Update ping attempt counter
        {
            let mut health_guard = health.write().await;
            health_guard.total_pings_sent += 1;
        }
        
        // Send ping
        /* Uncomment when you have the generated API client
        let ping_result = {
            let mut client_guard = client.write().await;
            client_guard.ping(Request::new(())).await
        };
        
        match ping_result {
            Ok(_) => {
                let ping_duration = ping_start.elapsed();
                let mut health_guard = health.write().await;
                
                health_guard.is_healthy = true;
                health_guard.last_successful_ping = chrono::Utc::now();
                health_guard.consecutive_failures = 0;
                health_guard.total_pings_successful += 1;
                
                println!("Keepalive ping successful ({}ms) - Health: {:.1}%", 
                        ping_duration.as_millis(),
                        (health_guard.total_pings_successful as f64 / 
                         health_guard.total_pings_sent as f64) * 100.0);
            }
            Err(e) => {
                let mut health_guard = health.write().await;
                health_guard.consecutive_failures += 1;
                
                if health_guard.consecutive_failures >= config.max_consecutive_failures {
                    health_guard.is_healthy = false;
                    eprintln!("Connection marked as unhealthy after {} consecutive failures", 
                             health_guard.consecutive_failures);
                }
                
                eprintln!("Keepalive ping failed (attempt {}): {}", 
                         health_guard.consecutive_failures, e);
            }
        }
        */
        
        // Mock ping result for demonstration
        let ping_duration = ping_start.elapsed();
        let mut health_guard = health.write().await;
        health_guard.is_healthy = true;
        health_guard.last_successful_ping = chrono::Utc::now();
        health_guard.consecutive_failures = 0;
        health_guard.total_pings_successful += 1;
        
        println!("Mock keepalive ping successful ({}ms) - Health: {:.1}%", 
                ping_duration.as_millis(),
                (health_guard.total_pings_successful as f64 / 
                 health_guard.total_pings_sent as f64) * 100.0);
    }
    
    // Get connection health status
    pub async fn get_health(&self) -> ConnectionHealth {
        self.health.read().await.clone()
    }
    
    // Check if connection is healthy
    pub async fn is_healthy(&self) -> bool {
        self.health.read().await.is_healthy
    }
}

Connection Recovery

// Automatic connection recovery
pub struct ConnectionManager {
    config: NextBlockConfig,
    keepalive_manager: Option<KeepaliveManager>,
    // client: Option<YourGeneratedApiClient>,
}

impl ConnectionManager {
    pub fn new(config: NextBlockConfig) -> Self {
        Self {
            config,
            keepalive_manager: None,
            // client: None,
        }
    }
    
    // Establish connection with automatic recovery
    pub async fn connect_with_recovery(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        loop {
            match self.attempt_connection().await {
                Ok(()) => {
                    println!("Successfully connected to NextBlock");
                    
                    // Start keepalive
                    self.start_keepalive().await?;
                    
                    // Monitor connection health
                    self.monitor_connection_health().await?;
                    break;
                }
                Err(e) => {
                    eprintln!("Connection failed: {}. Retrying in 5 seconds...", e);
                    sleep(Duration::from_secs(5)).await;
                }
            }
        }
        
        Ok(())
    }
    
    // Attempt to establish connection
    async fn attempt_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        // let client = create_nextblock_client(self.config.clone()).await?;
        // self.client = Some(client);
        println!("Mock connection established");
        Ok(())
    }
    
    // Start keepalive task
    async fn start_keepalive(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        let keepalive_config = KeepaliveConfig::default();
        
        // let keepalive_manager = KeepaliveManager::new(
        //     self.client.as_ref().unwrap().clone(),
        //     keepalive_config,
        // );
        
        // keepalive_manager.start().await?;
        // self.keepalive_manager = Some(keepalive_manager);
        
        println!("Keepalive task started");
        Ok(())
    }
    
    // Monitor connection health and trigger recovery
    async fn monitor_connection_health(&self) -> Result<(), Box<dyn std::error::Error>> {
        let mut health_check_interval = interval(Duration::from_secs(30));
        
        loop {
            health_check_interval.tick().await;
            
            if let Some(ref keepalive_manager) = self.keepalive_manager {
                if !keepalive_manager.is_healthy().await {
                    eprintln!("Connection unhealthy, attempting recovery...");
                    // Trigger reconnection logic
                    return Err("Connection lost".into());
                }
            }
        }
    }
}

Usage Example

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Basic keepalive usage
    // let config = NextBlockConfig::default();
    // let client = create_nextblock_client(config).await?;
    
    // Start simple keepalive task
    let keepalive_handle = tokio::spawn(async move {
        start_keepalive_task(
            // client,
            Duration::from_secs(60), // Ping every minute
        ).await
    });
    
    // Advanced usage with connection management
    let config = NextBlockConfig::default();
    let connection_manager = ConnectionManager::new(config);
    
    let manager_handle = tokio::spawn(async move {
        if let Err(e) = connection_manager.connect_with_recovery().await {
            eprintln!("Connection manager failed: {}", e);
        }
    });
    
    // Your main application logic here
    println!("Application running with keepalive...");
    
    // Wait for tasks
    tokio::select! {
        result = keepalive_handle => {
            if let Err(e) = result? {
                eprintln!("Keepalive task failed: {}", e);
            }
        }
        result = manager_handle => {
            if let Err(e) = result {
                eprintln!("Connection manager task failed: {}", e);
            }
        }
    }
    
    Ok(())
}

Best Practices

  1. Ping frequency: Use 60-second intervals for most applications

  2. Health monitoring: Track connection health and implement recovery

  3. Error handling: Handle ping failures gracefully with exponential backoff

  4. Resource cleanup: Properly close connections on shutdown

  5. Logging: Log keepalive status for debugging and monitoring

  6. Timeouts: Set appropriate timeouts for ping requests

  7. Reconnection logic: Implement automatic reconnection on connection loss

Last updated