# Connection

Establish a gRPC connection to NextBlock's API using Python and grpcio.

This page shows the connection pattern and authentication flow. Replace the placeholder client creation with the client generated from [`nextblock-proto`](https://github.com/nextblock-ag/nextblock-proto).

## Prerequisites

Install the required dependencies:

```bash
pip install grpcio grpcio-tools
pip install solders solana
pip install asyncio
```

Generate the Python gRPC client from the proto specs:

```bash
# Clone the proto repository
git clone https://github.com/nextblock-ag/nextblock-proto
cd nextblock-proto

# Generate Python gRPC client
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. *.proto
```

## Connection Setup

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import grpc
from typing import Optional, Dict, Any
from dataclasses import dataclass
import ssl

<strong># Authentication metadata interceptor
</strong>class AuthInterceptor(grpc.aio.ClientInterceptor):
    def __init__(self, api_key: str):
        self.api_key = api_key
    
    async def intercept_unary_unary(self, continuation, client_call_details, request):
<strong>        # Add authorization header to all requests
</strong>        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', self.api_key))
        
        new_details = client_call_details._replace(metadata=metadata)
        return await continuation(new_details, request)
    
    async def intercept_unary_stream(self, continuation, client_call_details, request):
<strong>        # Add authorization header to streaming requests
</strong>        metadata = list(client_call_details.metadata or [])
        metadata.append(('authorization', self.api_key))
        
        new_details = client_call_details._replace(metadata=metadata)
        return await continuation(new_details, request)

<strong># Connection configuration
</strong>@dataclass
class NextBlockConfig:
    endpoint: str = "frankfurt.nextblock.io:443"
    api_key: str = ""
    use_tls: bool = True
    timeout: float = 30.0
    keepalive_time_ms: int = 60000  # 60 seconds
    keepalive_timeout_ms: int = 15000  # 15 seconds
    
    @classmethod
    def from_env(cls) -> 'NextBlockConfig':
        import os
        return cls(
            endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'frankfurt.nextblock.io:443'),
            api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
            use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
            timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30')),
        )

<strong># Create secure channel with authentication
</strong>async def create_nextblock_channel(config: NextBlockConfig) -> grpc.aio.Channel:
<strong>    # Configure channel options
</strong>    options = [
        ('grpc.keepalive_time_ms', config.keepalive_time_ms),
        ('grpc.keepalive_timeout_ms', config.keepalive_timeout_ms),
        ('grpc.keepalive_permit_without_stream', True),
        ('grpc.http2.max_pings_without_data', 0),
        ('grpc.http2.min_ping_interval_without_data_ms', 300000),  # 5 minutes
    ]
    
    if config.use_tls:
<strong>        # Create secure channel with TLS
</strong>        credentials = grpc.ssl_channel_credentials()
        channel = grpc.aio.secure_channel(
            config.endpoint,
            credentials,
            options=options
        )
    else:
<strong>        # Create insecure channel
</strong>        channel = grpc.aio.insecure_channel(
            config.endpoint,
            options=options
        )
    
    return channel

<strong># Create authenticated client
</strong>async def create_nextblock_client(config: NextBlockConfig):
<strong>    # Create the channel
</strong>    channel = await create_nextblock_channel(config)
    
<strong>    # Add authentication interceptor
</strong>    auth_interceptor = AuthInterceptor(config.api_key)
    intercept_channel = grpc.aio.intercept_channel(channel, auth_interceptor)
    
<strong>    # Create the client stub
</strong>    # Import your generated gRPC client here
    # from your_generated_proto import api_pb2_grpc
    # client = api_pb2_grpc.ApiStub(intercept_channel)
    
    return intercept_channel, None  # Replace None with your generated client stub

<strong># Connection manager with health checking
</strong>class NextBlockConnectionManager:
    def __init__(self, config: NextBlockConfig):
        self.config = config
        self.channel: Optional[grpc.aio.Channel] = None
        self.client = None
        self.is_connected = False
        
    async def connect(self) -> bool:
<strong>        """Establish connection to NextBlock"""
</strong>        try:
            self.channel, self.client = await create_nextblock_client(self.config)
            
<strong>            # Test the connection
</strong>            await self.health_check()
            self.is_connected = True
            print(f"Successfully connected to NextBlock at {self.config.endpoint}")
            return True
            
        except Exception as e:
            print(f"Failed to connect to NextBlock: {e}")
            self.is_connected = False
            return False
    
    async def health_check(self) -> bool:
<strong>        """Check if the connection is healthy"""
</strong>        if not self.channel:
            return False
            
        try:
<strong>            # Test connection with a simple call
</strong>            # await self.client.ping(Empty())  # Uncomment with real client
            print("Connection health check passed")
            return True
        except Exception as e:
            print(f"Health check failed: {e}")
            return False
    
    async def disconnect(self):
<strong>        """Close the connection"""
</strong>        if self.channel:
            await self.channel.close()
            self.is_connected = False
            print("Disconnected from NextBlock")
    
    async def __aenter__(self):
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()
</code></pre>

## Usage Example

<pre class="language-python"><code class="lang-python"><strong>async def main():
</strong><strong>    # Configure connection
</strong>    config = NextBlockConfig(
        endpoint="frankfurt.nextblock.io:443",
        api_key="&#x3C;your-api-key-here>",
        use_tls=True,
        timeout=30.0
    )
    
<strong>    # Method 1: Direct connection
</strong>    channel, client = await create_nextblock_client(config)
    
    try:
<strong>        # Use the client for API calls
</strong>        print("Connected to NextBlock!")
        
<strong>        # Your API calls would go here
</strong>        # response = await client.ping(Empty())
        
    finally:
        await channel.close()
    
<strong>    # Method 2: Using connection manager (recommended)
</strong>    async with NextBlockConnectionManager(config) as manager:
        if manager.is_connected:
            print("Connection manager established connection")
            
<strong>            # Use manager.client for API calls
</strong>            # response = await manager.client.ping(Empty())
            
        else:
            print("Failed to establish connection")

<strong># Run the example
</strong>if __name__ == "__main__":
    asyncio.run(main())
</code></pre>

## Advanced Connection Features

<pre class="language-python"><code class="lang-python"><strong>import logging
</strong>from typing import Callable, Any
import backoff

<strong># Enhanced connection manager with retry logic
</strong>class EnhancedConnectionManager(NextBlockConnectionManager):
    def __init__(self, config: NextBlockConfig, max_retries: int = 3):
        super().__init__(config)
        self.max_retries = max_retries
        self.retry_count = 0
        
    @backoff.on_exception(
        backoff.expo,
        (grpc.RpcError, ConnectionError),
        max_tries=3,
        base=2,
        max_value=60
    )
    async def connect_with_retry(self) -> bool:
<strong>        """Connect with exponential backoff retry"""
</strong>        self.retry_count += 1
        logging.info(f"Connection attempt {self.retry_count}")
        
        success = await self.connect()
        if not success:
            raise ConnectionError(f"Failed to connect on attempt {self.retry_count}")
        
        self.retry_count = 0  # Reset on success
        return True
    
    async def call_with_retry(self, func: Callable, *args, **kwargs) -> Any:
<strong>        """Execute gRPC call with automatic retry"""
</strong>        @backoff.on_exception(
            backoff.expo,
            grpc.RpcError,
            max_tries=3,
            giveup=lambda e: e.code() == grpc.StatusCode.UNAUTHENTICATED
        )
        async def _call():
            return await func(*args, **kwargs)
        
        return await _call()

<strong># Connection pool for high-throughput applications
</strong>class ConnectionPool:
    def __init__(self, config: NextBlockConfig, pool_size: int = 5):
        self.config = config
        self.pool_size = pool_size
        self.connections: List[NextBlockConnectionManager] = []
        self.current_index = 0
        
    async def initialize(self):
<strong>        """Initialize connection pool"""
</strong>        for i in range(self.pool_size):
            manager = NextBlockConnectionManager(self.config)
            if await manager.connect():
                self.connections.append(manager)
            else:
                logging.warning(f"Failed to create connection {i+1}/{self.pool_size}")
        
        logging.info(f"Connection pool initialized with {len(self.connections)} connections")
    
    def get_connection(self) -> NextBlockConnectionManager:
<strong>        """Get next available connection (round-robin)"""
</strong>        if not self.connections:
            raise RuntimeError("No available connections in pool")
        
        connection = self.connections[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.connections)
        return connection
    
    async def close_all(self):
<strong>        """Close all connections in the pool"""
</strong>        for connection in self.connections:
            await connection.disconnect()
        self.connections.clear()
</code></pre>

## Environment Configuration

<pre class="language-python"><code class="lang-python"><strong>import os
</strong>from typing import Dict, Any

<strong># Load configuration from environment variables
</strong>def load_config_from_env() -> NextBlockConfig:
    return NextBlockConfig(
        endpoint=os.getenv('NEXTBLOCK_ENDPOINT', 'frankfurt.nextblock.io:443'),
        api_key=os.getenv('NEXTBLOCK_API_KEY', ''),
        use_tls=os.getenv('NEXTBLOCK_USE_TLS', 'true').lower() == 'true',
        timeout=float(os.getenv('NEXTBLOCK_TIMEOUT', '30.0')),
        keepalive_time_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIME_MS', '60000')),
        keepalive_timeout_ms=int(os.getenv('NEXTBLOCK_KEEPALIVE_TIMEOUT_MS', '15000')),
    )

<strong># Configuration validation
</strong>def validate_config(config: NextBlockConfig) -> bool:
    if not config.api_key:
        raise ValueError("API key is required")
    
    if not config.endpoint:
        raise ValueError("Endpoint is required")
    
    if config.timeout &#x3C;= 0:
        raise ValueError("Timeout must be positive")
    
    return True

<strong># Example with configuration validation
</strong>async def main_with_validation():
    try:
        config = load_config_from_env()
        validate_config(config)
        
        async with EnhancedConnectionManager(config) as manager:
            await manager.connect_with_retry()
            print("Successfully connected with retry logic!")
            
    except ValueError as e:
        print(f"Configuration error: {e}")
    except Exception as e:
        print(f"Connection error: {e}")

if __name__ == "__main__":
    asyncio.run(main_with_validation())
</code></pre>

## Available Endpoints

* **Frankfurt**: `frankfurt.nextblock.io` (Europe)
* **Amsterdam**: `amsterdam.nextblock.io` (Europe)
* **London**: `london.nextblock.io` (Europe)
* **Singapore**: `singapore.nextblock.io` (Asia)
* **Tokyo**: `tokyo.nextblock.io` (Asia)
* **New York**: `ny.nextblock.io` (US East)
* **Salt Lake City**: `slc.nextblock.io` (US West)
* **Dublin**: `dublin.nextblock.io` (Europe)
* **Vilnius**: `vilnius.nextblock.io` (Europe)

## Best Practices

1. **Use async/await**: Leverage Python's asyncio for non-blocking operations
2. **Use TLS by default**: Prefer secure connections unless you explicitly operate in a trusted internal environment
3. **Implement keepalive**: Configure appropriate keepalive settings
4. **Handle errors gracefully**: Use retry logic with exponential backoff
5. **Validate configuration**: Check all required settings before connecting
6. **Use connection pooling**: For high-throughput applications
7. **Monitor connection health**: Implement regular health checks
8. **Close connections properly**: Always close connections when done


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.nextblock.io/api/examples/python/connection.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
