Connection

Establish a secure gRPC connection to NextBlock's API using Python and grpcio.

Prerequisites

Install the required dependencies:

pip install grpcio grpcio-tools
pip install solders solana
pip install asyncio

Generate the Python gRPC client from the proto specs:

# Clone the proto repository
git clone https://github.com/nextblock-ag/nextblock-proto
cd nextblock-proto

# Generate Python gRPC client
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto

Connection Setup

import asyncio
import grpc
from typing import Optional, Dict, Any
from dataclasses import dataclass
import ssl

# Authentication metadata interceptor
class AuthInterceptor(grpc.aio.ClientInterceptor):
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    async def intercept_unary_unary(self, continuation, client_call_details, request):
        # Add authorization header to all requests
        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', self.api_key))
        
        new_details = client_call_details._replace(metadata=metadata)
        return await continuation(new_details, request)
    
    async def intercept_unary_stream(self, continuation, client_call_details, request):
        # Add authorization header to streaming requests
        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', self.api_key))
        
        new_details = client_call_details._replace(metadata=metadata)
        return await continuation(new_details, request)

# Connection configuration
@dataclass
class NextBlockConfig:
    endpoint: str = "fra.nextblock.io:443"
    api_key: str = ""
    use_tls: bool = True
    timeout: float = 30.0
    keepalive_time_ms: int = 60000  # 60 seconds
    keepalive_timeout_ms: int = 15000  # 15 seconds
    
    @classmethod
    def from_env(cls) -> 'NextBlockConfig':
        import os
        return cls(
            endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'fra.nextblock.io:443'),
            api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
            use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
            timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30')),
        )

# Create secure channel with authentication
async def create_nextblock_channel(config: NextBlockConfig) -> grpc.aio.Channel:
    # Configure channel options
    options = [
        ('grpc.keepalive_time_ms', config.keepalive_time_ms),
        ('grpc.keepalive_timeout_ms', config.keepalive_timeout_ms),
        ('grpc.keepalive_permit_without_stream', True),
        ('grpc.http2.max_pings_without_data', 0),
        ('grpc.http2.min_ping_interval_without_data_ms', 300000),  # 5 minutes
    ]
    
    if config.use_tls:
        # Create secure channel with TLS
        credentials = grpc.ssl_channel_credentials()
        channel = grpc.aio.secure_channel(
            config.endpoint,
            credentials,
            options=options
        )
    else:
        # Create insecure channel
        channel = grpc.aio.insecure_channel(
            config.endpoint,
            options=options
        )
    
    return channel

# Create authenticated client
async def create_nextblock_client(config: NextBlockConfig):
    # Create the channel
    channel = await create_nextblock_channel(config)
    
    # Add authentication interceptor
    auth_interceptor = AuthInterceptor(config.api_key)
    intercept_channel = grpc.aio.intercept_channel(channel, auth_interceptor)
    
    # Create the client stub
    # Import your generated gRPC client here
    # from your_generated_proto import api_pb2_grpc
    # client = api_pb2_grpc.ApiStub(intercept_channel)
    
    return intercept_channel, None  # Return channel and client

# Connection manager with health checking
class NextBlockConnectionManager:
    def __init__(self, config: NextBlockConfig):
        self.config = config
        self.channel: Optional[grpc.aio.Channel] = None
        self.client = None
        self.is_connected = False
        
    async def connect(self) -> bool:
        """Establish connection to NextBlock"""
        try:
            self.channel, self.client = await create_nextblock_client(self.config)
            
            # Test the connection
            await self.health_check()
            self.is_connected = True
            print(f"Successfully connected to NextBlock at {self.config.endpoint}")
            return True
            
        except Exception as e:
            print(f"Failed to connect to NextBlock: {e}")
            self.is_connected = False
            return False
    
    async def health_check(self) -> bool:
        """Check if the connection is healthy"""
        if not self.channel:
            return False
            
        try:
            # Test connection with a simple call
            # await self.client.ping(Empty())  # Uncomment with real client
            print("Connection health check passed")
            return True
        except Exception as e:
            print(f"Health check failed: {e}")
            return False
    
    async def disconnect(self):
        """Close the connection"""
        if self.channel:
            await self.channel.close()
            self.is_connected = False
            print("Disconnected from NextBlock")
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

Usage Example

async def main():
    # Configure connection
    config = NextBlockConfig(
        endpoint="fra.nextblock.io:443",
        api_key="<your-api-key-here>",
        use_tls=True,
        timeout=30.0
    )
    
    # Method 1: Direct connection
    channel, client = await create_nextblock_client(config)
    
    try:
        # Use the client for API calls
        print("Connected to NextBlock!")
        
        # Your API calls would go here
        # response = await client.ping(Empty())
        
    finally:
        await channel.close()
    
    # Method 2: Using connection manager (recommended)
    async with NextBlockConnectionManager(config) as manager:
        if manager.is_connected:
            print("Connection manager established connection")
            
            # Use manager.client for API calls
            # response = await manager.client.ping(Empty())
            
        else:
            print("Failed to establish connection")

# Run the example
if __name__ == "__main__":
    asyncio.run(main())

Advanced Connection Features

import logging
from typing import Callable, Any
import backoff

# Enhanced connection manager with retry logic
class EnhancedConnectionManager(NextBlockConnectionManager):
    def __init__(self, config: NextBlockConfig, max_retries: int = 3):
        super().__init__(config)
        self.max_retries = max_retries
        self.retry_count = 0
        
    @backoff.on_exception(
        backoff.expo,
        (grpc.RpcError, ConnectionError),
        max_tries=3,
        base=2,
        max_value=60
    )
    async def connect_with_retry(self) -> bool:
        """Connect with exponential backoff retry"""
        self.retry_count += 1
        logging.info(f"Connection attempt {self.retry_count}")
        
        success = await self.connect()
        if not success:
            raise ConnectionError(f"Failed to connect on attempt {self.retry_count}")
        
        self.retry_count = 0  # Reset on success
        return True
    
    async def call_with_retry(self, func: Callable, *args, **kwargs) -> Any:
        """Execute gRPC call with automatic retry"""
        @backoff.on_exception(
            backoff.expo,
            grpc.RpcError,
            max_tries=3,
            giveup=lambda e: e.code() == grpc.StatusCode.UNAUTHENTICATED
        )
        async def _call():
            return await func(*args, **kwargs)
        
        return await _call()

# Connection pool for high-throughput applications
class ConnectionPool:
    def __init__(self, config: NextBlockConfig, pool_size: int = 5):
        self.config = config
        self.pool_size = pool_size
        self.connections: List[NextBlockConnectionManager] = []
        self.current_index = 0
        
    async def initialize(self):
        """Initialize connection pool"""
        for i in range(self.pool_size):
            manager = NextBlockConnectionManager(self.config)
            if await manager.connect():
                self.connections.append(manager)
            else:
                logging.warning(f"Failed to create connection {i+1}/{self.pool_size}")
        
        logging.info(f"Connection pool initialized with {len(self.connections)} connections")
    
    def get_connection(self) -> NextBlockConnectionManager:
        """Get next available connection (round-robin)"""
        if not self.connections:
            raise RuntimeError("No available connections in pool")
        
        connection = self.connections[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.connections)
        return connection
    
    async def close_all(self):
        """Close all connections in the pool"""
        for connection in self.connections:
            await connection.disconnect()
        self.connections.clear()

Environment Configuration

import os
from typing import Dict, Any

# Load configuration from environment variables
def load_config_from_env() -> NextBlockConfig:
    return NextBlockConfig(
        endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'fra.nextblock.io:443'),
        api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
        use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
        timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30.0')),
        keepalive_time_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIME_MS', '60000')),
        keepalive_timeout_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIMEOUT_MS', '15000')),
    )

# Configuration validation
def validate_config(config: NextBlockConfig) -> bool:
    if not config.api_key:
        raise ValueError("API key is required")
    
    if not config.endpoint:
        raise ValueError("Endpoint is required")
    
    if config.timeout <= 0:
        raise ValueError("Timeout must be positive")
    
    return True

# Example with configuration validation
async def main_with_validation():
    try:
        config = load_config_from_env()
        validate_config(config)
        
        async with EnhancedConnectionManager(config) as manager:
            await manager.connect_with_retry()
            print("Successfully connected with retry logic!")
            
    except ValueError as e:
        print(f"Configuration error: {e}")
    except Exception as e:
        print(f"Connection error: {e}")

if __name__ == "__main__":
    asyncio.run(main_with_validation())

Available Endpoints

  • Frankfurt: frankfurt.nextblock.io:80 (Europe)

  • Amsterdam: amsterdam.nextblock.io:80 (Europe)

  • London: london.nextblock.io:80 (Europe)

  • Singapore: singapore.nextblock.io:80 (Asia)

  • Tokyo: tokyo.nextblock.io:80 (Asia)

  • New York: ny.nextblock.io:80 (US East)

  • Salt Lake City: slc.nextblock.io:80 (US West)

Best Practices

  1. Use async/await: Leverage Python's asyncio for non-blocking operations

  2. Use insecure connections: Disable TLS for better performance (NextBlock handles security at network level)

  3. Implement keepalive: Configure appropriate keepalive settings

  4. Handle errors gracefully: Use retry logic with exponential backoff

  5. Validate configuration: Check all required settings before connecting

  6. Use connection pooling: For high-throughput applications

  7. Monitor connection health: Implement regular health checks

  8. Close connections properly: Always close connections when done

Last updated