Connection
Establish a secure gRPC connection to NextBlock's API using Python and grpcio.
Prerequisites
Install the required dependencies:
pip install grpcio grpcio-tools
pip install solders solana
pip install asyncio
Generate the Python gRPC client from the proto specs:
# Clone the proto repository
git clone https://github.com/nextblock-ag/nextblock-proto
cd nextblock-proto
# Generate Python gRPC client
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto
Connection Setup
import asyncio
import grpc
from typing import Optional, Dict, Any
from dataclasses import dataclass
import ssl
# Authentication metadata interceptor
class AuthInterceptor(grpc.aio.ClientInterceptor):
def __init__(self, api_key: str):
self.api_key = api_key
async def intercept_unary_unary(self, continuation, client_call_details, request):
# Add authorization header to all requests
metadata = list(client_call_details.metadata or [])
metadata.append(('authorization', self.api_key))
new_details = client_call_details._replace(metadata=metadata)
return await continuation(new_details, request)
async def intercept_unary_stream(self, continuation, client_call_details, request):
# Add authorization header to streaming requests
metadata = list(client_call_details.metadata or [])
metadata.append(('authorization', self.api_key))
new_details = client_call_details._replace(metadata=metadata)
return await continuation(new_details, request)
# Connection configuration
@dataclass
class NextBlockConfig:
endpoint: str = "fra.nextblock.io:443"
api_key: str = ""
use_tls: bool = True
timeout: float = 30.0
keepalive_time_ms: int = 60000 # 60 seconds
keepalive_timeout_ms: int = 15000 # 15 seconds
@classmethod
def from_env(cls) -> 'NextBlockConfig':
import os
return cls(
endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'fra.nextblock.io:443'),
api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30')),
)
# Create secure channel with authentication
async def create_nextblock_channel(config: NextBlockConfig) -> grpc.aio.Channel:
# Configure channel options
options = [
('grpc.keepalive_time_ms', config.keepalive_time_ms),
('grpc.keepalive_timeout_ms', config.keepalive_timeout_ms),
('grpc.keepalive_permit_without_stream', True),
('grpc.http2.max_pings_without_data', 0),
('grpc.http2.min_ping_interval_without_data_ms', 300000), # 5 minutes
]
if config.use_tls:
# Create secure channel with TLS
credentials = grpc.ssl_channel_credentials()
channel = grpc.aio.secure_channel(
config.endpoint,
credentials,
options=options
)
else:
# Create insecure channel
channel = grpc.aio.insecure_channel(
config.endpoint,
options=options
)
return channel
# Create authenticated client
async def create_nextblock_client(config: NextBlockConfig):
# Create the channel
channel = await create_nextblock_channel(config)
# Add authentication interceptor
auth_interceptor = AuthInterceptor(config.api_key)
intercept_channel = grpc.aio.intercept_channel(channel, auth_interceptor)
# Create the client stub
# Import your generated gRPC client here
# from your_generated_proto import api_pb2_grpc
# client = api_pb2_grpc.ApiStub(intercept_channel)
return intercept_channel, None # Return channel and client
# Connection manager with health checking
class NextBlockConnectionManager:
def __init__(self, config: NextBlockConfig):
self.config = config
self.channel: Optional[grpc.aio.Channel] = None
self.client = None
self.is_connected = False
async def connect(self) -> bool:
"""Establish connection to NextBlock"""
try:
self.channel, self.client = await create_nextblock_client(self.config)
# Test the connection
await self.health_check()
self.is_connected = True
print(f"Successfully connected to NextBlock at {self.config.endpoint}")
return True
except Exception as e:
print(f"Failed to connect to NextBlock: {e}")
self.is_connected = False
return False
async def health_check(self) -> bool:
"""Check if the connection is healthy"""
if not self.channel:
return False
try:
# Test connection with a simple call
# await self.client.ping(Empty()) # Uncomment with real client
print("Connection health check passed")
return True
except Exception as e:
print(f"Health check failed: {e}")
return False
async def disconnect(self):
"""Close the connection"""
if self.channel:
await self.channel.close()
self.is_connected = False
print("Disconnected from NextBlock")
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()
Usage Example
async def main():
# Configure connection
config = NextBlockConfig(
endpoint="fra.nextblock.io:443",
api_key="<your-api-key-here>",
use_tls=True,
timeout=30.0
)
# Method 1: Direct connection
channel, client = await create_nextblock_client(config)
try:
# Use the client for API calls
print("Connected to NextBlock!")
# Your API calls would go here
# response = await client.ping(Empty())
finally:
await channel.close()
# Method 2: Using connection manager (recommended)
async with NextBlockConnectionManager(config) as manager:
if manager.is_connected:
print("Connection manager established connection")
# Use manager.client for API calls
# response = await manager.client.ping(Empty())
else:
print("Failed to establish connection")
# Run the example
if __name__ == "__main__":
asyncio.run(main())
Advanced Connection Features
import logging
from typing import Callable, Any
import backoff
# Enhanced connection manager with retry logic
class EnhancedConnectionManager(NextBlockConnectionManager):
def __init__(self, config: NextBlockConfig, max_retries: int = 3):
super().__init__(config)
self.max_retries = max_retries
self.retry_count = 0
@backoff.on_exception(
backoff.expo,
(grpc.RpcError, ConnectionError),
max_tries=3,
base=2,
max_value=60
)
async def connect_with_retry(self) -> bool:
"""Connect with exponential backoff retry"""
self.retry_count += 1
logging.info(f"Connection attempt {self.retry_count}")
success = await self.connect()
if not success:
raise ConnectionError(f"Failed to connect on attempt {self.retry_count}")
self.retry_count = 0 # Reset on success
return True
async def call_with_retry(self, func: Callable, *args, **kwargs) -> Any:
"""Execute gRPC call with automatic retry"""
@backoff.on_exception(
backoff.expo,
grpc.RpcError,
max_tries=3,
giveup=lambda e: e.code() == grpc.StatusCode.UNAUTHENTICATED
)
async def _call():
return await func(*args, **kwargs)
return await _call()
# Connection pool for high-throughput applications
class ConnectionPool:
def __init__(self, config: NextBlockConfig, pool_size: int = 5):
self.config = config
self.pool_size = pool_size
self.connections: List[NextBlockConnectionManager] = []
self.current_index = 0
async def initialize(self):
"""Initialize connection pool"""
for i in range(self.pool_size):
manager = NextBlockConnectionManager(self.config)
if await manager.connect():
self.connections.append(manager)
else:
logging.warning(f"Failed to create connection {i+1}/{self.pool_size}")
logging.info(f"Connection pool initialized with {len(self.connections)} connections")
def get_connection(self) -> NextBlockConnectionManager:
"""Get next available connection (round-robin)"""
if not self.connections:
raise RuntimeError("No available connections in pool")
connection = self.connections[self.current_index]
self.current_index = (self.current_index + 1) % len(self.connections)
return connection
async def close_all(self):
"""Close all connections in the pool"""
for connection in self.connections:
await connection.disconnect()
self.connections.clear()
Environment Configuration
import os
from typing import Dict, Any
# Load configuration from environment variables
def load_config_from_env() -> NextBlockConfig:
return NextBlockConfig(
endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'fra.nextblock.io:443'),
api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30.0')),
keepalive_time_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIME_MS', '60000')),
keepalive_timeout_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIMEOUT_MS', '15000')),
)
# Configuration validation
def validate_config(config: NextBlockConfig) -> bool:
if not config.api_key:
raise ValueError("API key is required")
if not config.endpoint:
raise ValueError("Endpoint is required")
if config.timeout <= 0:
raise ValueError("Timeout must be positive")
return True
# Example with configuration validation
async def main_with_validation():
try:
config = load_config_from_env()
validate_config(config)
async with EnhancedConnectionManager(config) as manager:
await manager.connect_with_retry()
print("Successfully connected with retry logic!")
except ValueError as e:
print(f"Configuration error: {e}")
except Exception as e:
print(f"Connection error: {e}")
if __name__ == "__main__":
asyncio.run(main_with_validation())
Available Endpoints
Frankfurt:
frankfurt.nextblock.io:80
(Europe)Amsterdam:
amsterdam.nextblock.io:80
(Europe)London:
london.nextblock.io:80
(Europe)Singapore:
singapore.nextblock.io:80
(Asia)Tokyo:
tokyo.nextblock.io:80
(Asia)New York:
ny.nextblock.io:80
(US East)Salt Lake City:
slc.nextblock.io:80
(US West)
Best Practices
Use async/await: Leverage Python's asyncio for non-blocking operations
Use insecure connections: Disable TLS for better performance (NextBlock handles security at network level)
Implement keepalive: Configure appropriate keepalive settings
Handle errors gracefully: Use retry logic with exponential backoff
Validate configuration: Check all required settings before connecting
Use connection pooling: For high-throughput applications
Monitor connection health: Implement regular health checks
Close connections properly: Always close connections when done
Last updated