# Connection

Establish a gRPC connection to NextBlock's API using Python and grpcio.

This page shows the connection pattern and authentication flow. Replace the placeholder client creation with the client generated from [`nextblock-proto`](https://github.com/nextblock-ag/nextblock-proto).

## Prerequisites

Install the required dependencies:

```bash
pip install grpcio grpcio-tools
pip install solders solana
pip install asyncio
```

Generate the Python gRPC client from the proto specs:

```bash
# Clone the proto repository
git clone https://github.com/nextblock-ag/nextblock-proto
cd nextblock-proto

# Generate Python gRPC client
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto
```

## Connection Setup

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import grpc
from typing import Optional, Dict, Any
from dataclasses import dataclass
import ssl

<strong># Authentication metadata interceptor
</strong>class AuthInterceptor(grpc.aio.ClientInterceptor):
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    async def intercept_unary_unary(self, continuation, client_call_details, request):
<strong>        # Add authorization header to all requests
</strong>        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', self.api_key))
        
        new_details = client_call_details._replace(metadata=metadata)
        return await continuation(new_details, request)
    
    async def intercept_unary_stream(self, continuation, client_call_details, request):
<strong>        # Add authorization header to streaming requests
</strong>        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', self.api_key))
        
        new_details = client_call_details._replace(metadata=metadata)
        return await continuation(new_details, request)

<strong># Connection configuration
</strong>@dataclass
class NextBlockConfig:
    endpoint: str = "frankfurt.nextblock.io:443"
    api_key: str = ""
    use_tls: bool = True
    timeout: float = 30.0
    keepalive_time_ms: int = 60000  # 60 seconds
    keepalive_timeout_ms: int = 15000  # 15 seconds
    
    @classmethod
    def from_env(cls) -> 'NextBlockConfig':
        import os
        return cls(
            endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'frankfurt.nextblock.io:443'),
            api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
            use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
            timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30')),
        )

<strong># Create secure channel with authentication
</strong>async def create_nextblock_channel(config: NextBlockConfig) -> grpc.aio.Channel:
<strong>    # Configure channel options
</strong>    options = [
        ('grpc.keepalive_time_ms', config.keepalive_time_ms),
        ('grpc.keepalive_timeout_ms', config.keepalive_timeout_ms),
        ('grpc.keepalive_permit_without_stream', True),
        ('grpc.http2.max_pings_without_data', 0),
        ('grpc.http2.min_ping_interval_without_data_ms', 300000),  # 5 minutes
    ]
    
    if config.use_tls:
<strong>        # Create secure channel with TLS
</strong>        credentials = grpc.ssl_channel_credentials()
        channel = grpc.aio.secure_channel(
            config.endpoint,
            credentials,
            options=options
        )
    else:
<strong>        # Create insecure channel
</strong>        channel = grpc.aio.insecure_channel(
            config.endpoint,
            options=options
        )
    
    return channel

<strong># Create authenticated client
</strong>async def create_nextblock_client(config: NextBlockConfig):
<strong>    # Create the channel
</strong>    channel = await create_nextblock_channel(config)
    
<strong>    # Add authentication interceptor
</strong>    auth_interceptor = AuthInterceptor(config.api_key)
    intercept_channel = grpc.aio.intercept_channel(channel, auth_interceptor)
    
<strong>    # Create the client stub
</strong>    # Import your generated gRPC client here
    # from your_generated_proto import api_pb2_grpc
    # client = api_pb2_grpc.ApiStub(intercept_channel)
    
    return intercept_channel, None  # Replace None with your generated client stub

<strong># Connection manager with health checking
</strong>class NextBlockConnectionManager:
    def __init__(self, config: NextBlockConfig):
        self.config = config
        self.channel: Optional[grpc.aio.Channel] = None
        self.client = None
        self.is_connected = False
        
    async def connect(self) -> bool:
<strong>        """Establish connection to NextBlock"""
</strong>        try:
            self.channel, self.client = await create_nextblock_client(self.config)
            
<strong>            # Test the connection
</strong>            await self.health_check()
            self.is_connected = True
            print(f"Successfully connected to NextBlock at {self.config.endpoint}")
            return True
            
        except Exception as e:
            print(f"Failed to connect to NextBlock: {e}")
            self.is_connected = False
            return False
    
    async def health_check(self) -> bool:
<strong>        """Check if the connection is healthy"""
</strong>        if not self.channel:
            return False
            
        try:
<strong>            # Test connection with a simple call
</strong>            # await self.client.ping(Empty())  # Uncomment with real client
            print("Connection health check passed")
            return True
        except Exception as e:
            print(f"Health check failed: {e}")
            return False
    
    async def disconnect(self):
<strong>        """Close the connection"""
</strong>        if self.channel:
            await self.channel.close()
            self.is_connected = False
            print("Disconnected from NextBlock")
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()
</code></pre>

## Usage Example

<pre class="language-python"><code class="lang-python"><strong>async def main():
</strong><strong>    # Configure connection
</strong>    config = NextBlockConfig(
        endpoint="frankfurt.nextblock.io:443",
        api_key="&#x3C;your-api-key-here>",
        use_tls=True,
        timeout=30.0
    )
    
<strong>    # Method 1: Direct connection
</strong>    channel, client = await create_nextblock_client(config)
    
    try:
<strong>        # Use the client for API calls
</strong>        print("Connected to NextBlock!")
        
<strong>        # Your API calls would go here
</strong>        # response = await client.ping(Empty())
        
    finally:
        await channel.close()
    
<strong>    # Method 2: Using connection manager (recommended)
</strong>    async with NextBlockConnectionManager(config) as manager:
        if manager.is_connected:
            print("Connection manager established connection")
            
<strong>            # Use manager.client for API calls
</strong>            # response = await manager.client.ping(Empty())
            
        else:
            print("Failed to establish connection")

<strong># Run the example
</strong>if __name__ == "__main__":
    asyncio.run(main())
</code></pre>

## Advanced Connection Features

<pre class="language-python"><code class="lang-python"><strong>import logging
</strong>from typing import Callable, Any
import backoff

<strong># Enhanced connection manager with retry logic
</strong>class EnhancedConnectionManager(NextBlockConnectionManager):
    def __init__(self, config: NextBlockConfig, max_retries: int = 3):
        super().__init__(config)
        self.max_retries = max_retries
        self.retry_count = 0
        
    @backoff.on_exception(
        backoff.expo,
        (grpc.RpcError, ConnectionError),
        max_tries=3,
        base=2,
        max_value=60
    )
    async def connect_with_retry(self) -> bool:
<strong>        """Connect with exponential backoff retry"""
</strong>        self.retry_count += 1
        logging.info(f"Connection attempt {self.retry_count}")
        
        success = await self.connect()
        if not success:
            raise ConnectionError(f"Failed to connect on attempt {self.retry_count}")
        
        self.retry_count = 0  # Reset on success
        return True
    
    async def call_with_retry(self, func: Callable, *args, **kwargs) -> Any:
<strong>        """Execute gRPC call with automatic retry"""
</strong>        @backoff.on_exception(
            backoff.expo,
            grpc.RpcError,
            max_tries=3,
            giveup=lambda e: e.code() == grpc.StatusCode.UNAUTHENTICATED
        )
        async def _call():
            return await func(*args, **kwargs)
        
        return await _call()

<strong># Connection pool for high-throughput applications
</strong>class ConnectionPool:
    def __init__(self, config: NextBlockConfig, pool_size: int = 5):
        self.config = config
        self.pool_size = pool_size
        self.connections: List[NextBlockConnectionManager] = []
        self.current_index = 0
        
    async def initialize(self):
<strong>        """Initialize connection pool"""
</strong>        for i in range(self.pool_size):
            manager = NextBlockConnectionManager(self.config)
            if await manager.connect():
                self.connections.append(manager)
            else:
                logging.warning(f"Failed to create connection {i+1}/{self.pool_size}")
        
        logging.info(f"Connection pool initialized with {len(self.connections)} connections")
    
    def get_connection(self) -> NextBlockConnectionManager:
<strong>        """Get next available connection (round-robin)"""
</strong>        if not self.connections:
            raise RuntimeError("No available connections in pool")
        
        connection = self.connections[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.connections)
        return connection
    
    async def close_all(self):
<strong>        """Close all connections in the pool"""
</strong>        for connection in self.connections:
            await connection.disconnect()
        self.connections.clear()
</code></pre>

## Environment Configuration

<pre class="language-python"><code class="lang-python"><strong>import os
</strong>from typing import Dict, Any

<strong># Load configuration from environment variables
</strong>def load_config_from_env() -> NextBlockConfig:
    return NextBlockConfig(
        endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'frankfurt.nextblock.io:443'),
        api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
        use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
        timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30.0')),
        keepalive_time_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIME_MS', '60000')),
        keepalive_timeout_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIMEOUT_MS', '15000')),
    )

<strong># Configuration validation
</strong>def validate_config(config: NextBlockConfig) -> bool:
    if not config.api_key:
        raise ValueError("API key is required")
    
    if not config.endpoint:
        raise ValueError("Endpoint is required")
    
    if config.timeout &#x3C;= 0:
        raise ValueError("Timeout must be positive")
    
    return True

<strong># Example with configuration validation
</strong>async def main_with_validation():
    try:
        config = load_config_from_env()
        validate_config(config)
        
        async with EnhancedConnectionManager(config) as manager:
            await manager.connect_with_retry()
            print("Successfully connected with retry logic!")
            
    except ValueError as e:
        print(f"Configuration error: {e}")
    except Exception as e:
        print(f"Connection error: {e}")

if __name__ == "__main__":
    asyncio.run(main_with_validation())
</code></pre>

## Available Endpoints

* **Frankfurt**: `frankfurt.nextblock.io` (Europe)
* **Amsterdam**: `amsterdam.nextblock.io` (Europe)
* **London**: `london.nextblock.io` (Europe)
* **Singapore**: `singapore.nextblock.io` (Asia)
* **Tokyo**: `tokyo.nextblock.io` (Asia)
* **New York**: `ny.nextblock.io` (US East)
* **Salt Lake City**: `slc.nextblock.io` (US West)
* **Dublin**: `dublin.nextblock.io` (Europe)
* **Vilnius**: `vilnius.nextblock.io` (Europe)

## Best Practices

1. **Use async/await**: Leverage Python's asyncio for non-blocking operations
2. **Use TLS by default**: Prefer secure connections unless you explicitly operate in a trusted internal environment
3. **Implement keepalive**: Configure appropriate keepalive settings
4. **Handle errors gracefully**: Use retry logic with exponential backoff
5. **Validate configuration**: Check all required settings before connecting
6. **Use connection pooling**: For high-throughput applications
7. **Monitor connection health**: Implement regular health checks
8. **Close connections properly**: Always close connections when done
