Keepalive
Maintain persistent gRPC connections to NextBlock for optimal performance using Rust's Tonic client.
Basic Keepalive Implementation
use std::time::Duration;
use tokio::time::{interval, sleep};
use tonic::Request;
// Send periodic ping to keep connection alive
async fn start_keepalive_task(
// mut nextblock_client: YourGeneratedApiClient,
ping_interval: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let mut interval_timer = interval(ping_interval);
println!("Starting keepalive task with interval: {:?}", ping_interval);
loop {
interval_timer.tick().await;
// Send ping request
/* Uncomment when you have the generated API client
match nextblock_client.ping(Request::new(())).await {
Ok(_) => {
println!("Keepalive ping successful at {}",
chrono::Utc::now().format("%H:%M:%S"));
}
Err(e) => {
eprintln!("Keepalive ping failed: {}", e);
// Optionally implement reconnection logic here
break;
}
}
*/
// Mock ping for demonstration
println!("Mock keepalive ping sent at {}",
chrono::Utc::now().format("%H:%M:%S"));
}
Ok(())
}
Advanced Keepalive with Connection Management
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::transport::Channel;
// Connection health tracker
#[derive(Debug, Clone)]
pub struct ConnectionHealth {
pub is_healthy: bool,
pub last_successful_ping: chrono::DateTime<chrono::Utc>,
pub consecutive_failures: u32,
pub total_pings_sent: u64,
pub total_pings_successful: u64,
}
impl Default for ConnectionHealth {
fn default() -> Self {
Self {
is_healthy: true,
last_successful_ping: chrono::Utc::now(),
consecutive_failures: 0,
total_pings_sent: 0,
total_pings_successful: 0,
}
}
}
// Advanced keepalive manager
pub struct KeepaliveManager {
// client: Arc<RwLock<YourGeneratedApiClient>>,
health: Arc<RwLock<ConnectionHealth>>,
config: KeepaliveConfig,
}
#[derive(Debug, Clone)]
pub struct KeepaliveConfig {
pub ping_interval: Duration,
pub max_consecutive_failures: u32,
pub reconnect_delay: Duration,
pub health_check_enabled: bool,
}
impl Default for KeepaliveConfig {
fn default() -> Self {
Self {
ping_interval: Duration::from_secs(60),
max_consecutive_failures: 3,
reconnect_delay: Duration::from_secs(5),
health_check_enabled: true,
}
}
}
impl KeepaliveManager {
pub fn new(
// client: YourGeneratedApiClient,
config: KeepaliveConfig,
) -> Self {
Self {
// client: Arc::new(RwLock::new(client)),
health: Arc::new(RwLock::new(ConnectionHealth::default())),
config,
}
}
// Start keepalive task
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let health = Arc::clone(&self.health);
// let client = Arc::clone(&self.client);
let config = self.config.clone();
tokio::spawn(async move {
let mut interval_timer = interval(config.ping_interval);
loop {
interval_timer.tick().await;
// Send ping and update health
Self::send_ping_and_update_health(
// Arc::clone(&client),
Arc::clone(&health),
&config,
).await;
}
});
Ok(())
}
// Send ping and update connection health
async fn send_ping_and_update_health(
// client: Arc<RwLock<YourGeneratedApiClient>>,
health: Arc<RwLock<ConnectionHealth>>,
config: &KeepaliveConfig,
) {
let ping_start = std::time::Instant::now();
// Update ping attempt counter
{
let mut health_guard = health.write().await;
health_guard.total_pings_sent += 1;
}
// Send ping
/* Uncomment when you have the generated API client
let ping_result = {
let mut client_guard = client.write().await;
client_guard.ping(Request::new(())).await
};
match ping_result {
Ok(_) => {
let ping_duration = ping_start.elapsed();
let mut health_guard = health.write().await;
health_guard.is_healthy = true;
health_guard.last_successful_ping = chrono::Utc::now();
health_guard.consecutive_failures = 0;
health_guard.total_pings_successful += 1;
println!("Keepalive ping successful ({}ms) - Health: {:.1}%",
ping_duration.as_millis(),
(health_guard.total_pings_successful as f64 /
health_guard.total_pings_sent as f64) * 100.0);
}
Err(e) => {
let mut health_guard = health.write().await;
health_guard.consecutive_failures += 1;
if health_guard.consecutive_failures >= config.max_consecutive_failures {
health_guard.is_healthy = false;
eprintln!("Connection marked as unhealthy after {} consecutive failures",
health_guard.consecutive_failures);
}
eprintln!("Keepalive ping failed (attempt {}): {}",
health_guard.consecutive_failures, e);
}
}
*/
// Mock ping result for demonstration
let ping_duration = ping_start.elapsed();
let mut health_guard = health.write().await;
health_guard.is_healthy = true;
health_guard.last_successful_ping = chrono::Utc::now();
health_guard.consecutive_failures = 0;
health_guard.total_pings_successful += 1;
println!("Mock keepalive ping successful ({}ms) - Health: {:.1}%",
ping_duration.as_millis(),
(health_guard.total_pings_successful as f64 /
health_guard.total_pings_sent as f64) * 100.0);
}
// Get connection health status
pub async fn get_health(&self) -> ConnectionHealth {
self.health.read().await.clone()
}
// Check if connection is healthy
pub async fn is_healthy(&self) -> bool {
self.health.read().await.is_healthy
}
}
Connection Recovery
// Automatic connection recovery
pub struct ConnectionManager {
config: NextBlockConfig,
keepalive_manager: Option<KeepaliveManager>,
// client: Option<YourGeneratedApiClient>,
}
impl ConnectionManager {
pub fn new(config: NextBlockConfig) -> Self {
Self {
config,
keepalive_manager: None,
// client: None,
}
}
// Establish connection with automatic recovery
pub async fn connect_with_recovery(&mut self) -> Result<(), Box<dyn std::error::Error>> {
loop {
match self.attempt_connection().await {
Ok(()) => {
println!("Successfully connected to NextBlock");
// Start keepalive
self.start_keepalive().await?;
// Monitor connection health
self.monitor_connection_health().await?;
break;
}
Err(e) => {
eprintln!("Connection failed: {}. Retrying in 5 seconds...", e);
sleep(Duration::from_secs(5)).await;
}
}
}
Ok(())
}
// Attempt to establish connection
async fn attempt_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// let client = create_nextblock_client(self.config.clone()).await?;
// self.client = Some(client);
println!("Mock connection established");
Ok(())
}
// Start keepalive task
async fn start_keepalive(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let keepalive_config = KeepaliveConfig::default();
// let keepalive_manager = KeepaliveManager::new(
// self.client.as_ref().unwrap().clone(),
// keepalive_config,
// );
// keepalive_manager.start().await?;
// self.keepalive_manager = Some(keepalive_manager);
println!("Keepalive task started");
Ok(())
}
// Monitor connection health and trigger recovery
async fn monitor_connection_health(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut health_check_interval = interval(Duration::from_secs(30));
loop {
health_check_interval.tick().await;
if let Some(ref keepalive_manager) = self.keepalive_manager {
if !keepalive_manager.is_healthy().await {
eprintln!("Connection unhealthy, attempting recovery...");
// Trigger reconnection logic
return Err("Connection lost".into());
}
}
}
}
}
Usage Example
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Basic keepalive usage
// let config = NextBlockConfig::default();
// let client = create_nextblock_client(config).await?;
// Start simple keepalive task
let keepalive_handle = tokio::spawn(async move {
start_keepalive_task(
// client,
Duration::from_secs(60), // Ping every minute
).await
});
// Advanced usage with connection management
let config = NextBlockConfig::default();
let connection_manager = ConnectionManager::new(config);
let manager_handle = tokio::spawn(async move {
if let Err(e) = connection_manager.connect_with_recovery().await {
eprintln!("Connection manager failed: {}", e);
}
});
// Your main application logic here
println!("Application running with keepalive...");
// Wait for tasks
tokio::select! {
result = keepalive_handle => {
if let Err(e) = result? {
eprintln!("Keepalive task failed: {}", e);
}
}
result = manager_handle => {
if let Err(e) = result {
eprintln!("Connection manager task failed: {}", e);
}
}
}
Ok(())
}
Best Practices
Ping frequency: Use 60-second intervals for most applications
Health monitoring: Track connection health and implement recovery
Error handling: Handle ping failures gracefully with exponential backoff
Resource cleanup: Properly close connections on shutdown
Logging: Log keepalive status for debugging and monitoring
Timeouts: Set appropriate timeouts for ping requests
Reconnection logic: Implement automatic reconnection on connection loss
Last updated