# Tip Floor Stream

Stream real-time tip floor data from NextBlock to optimize transaction tips dynamically using Python.

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
import logging

<strong># Tip floor data structure
</strong>@dataclass
class TipFloorData:
    time: str
    landed_tips_25th_percentile: float
    landed_tips_50th_percentile: float
    landed_tips_75th_percentile: float
    landed_tips_95th_percentile: float
    landed_tips_99th_percentile: float
    ema_landed_tips_50th_percentile: float
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'TipFloorData':
        return cls(**data)
    
    def to_lamports(self, percentile: str) -> int:
<strong>        """Convert SOL amounts to lamports"""
</strong>        sol_amount = getattr(self, f"landed_tips_{percentile}_percentile")
        return int(sol_amount * 1_000_000_000)

<strong># Tip strategy management
</strong>@dataclass
class TipStrategy:
    conservative_tip: int = 500_000     # 25th percentile
    normal_tip: int = 1_000_000        # 50th percentile
    aggressive_tip: int = 2_000_000    # 75th percentile
    priority_tip: int = 5_000_000      # 95th percentile
    last_updated: datetime = None
    
    def update_from_tip_floor(self, tip_floor: TipFloorData):
<strong>        """Update strategy based on tip floor data"""
</strong>        self.conservative_tip = tip_floor.to_lamports("25th")
        self.normal_tip = tip_floor.to_lamports("50th")
        self.aggressive_tip = tip_floor.to_lamports("75th")
        self.priority_tip = tip_floor.to_lamports("95th")
        self.last_updated = datetime.now(timezone.utc)
    
    def get_tip_for_priority(self, priority: str) -> int:
<strong>        """Get tip amount for given priority level"""
</strong>        return {
            "conservative": self.conservative_tip,
            "normal": self.normal_tip,
            "aggressive": self.aggressive_tip,
            "priority": self.priority_tip,
        }.get(priority, self.normal_tip)

<strong># Global tip strategy instance
</strong>global_tip_strategy = TipStrategy()

<strong># Stream tip floor data
</strong>async def stream_tip_floor(
    # nextblock_client,  # Your generated gRPC client
    update_frequency: str = "1m",
    callback: Optional[callable] = None,
) -> None:
<strong>    """Stream tip floor updates from NextBlock"""
</strong>    print(f"Starting tip floor stream with frequency: {update_frequency}")
    
    """ Uncomment when you have the generated gRPC client
    try:
        request = TipFloorStreamRequest(update_frequency=update_frequency)
        stream = nextblock_client.stream_tip_floor(request)
        
        print("Streaming tip floor data:")
        
        async for tip_floor_response in stream:
            try:
                # Convert protobuf response to TipFloorData
                tip_floor = TipFloorData(
                    time=tip_floor_response.time,
                    landed_tips_25th_percentile=tip_floor_response.landed_tips_25th_percentile,
                    landed_tips_50th_percentile=tip_floor_response.landed_tips_50th_percentile,
                    landed_tips_75th_percentile=tip_floor_response.landed_tips_75th_percentile,
                    landed_tips_95th_percentile=tip_floor_response.landed_tips_95th_percentile,
                    landed_tips_99th_percentile=tip_floor_response.landed_tips_99th_percentile,
                    ema_landed_tips_50th_percentile=tip_floor_response.ema_landed_tips_50th_percentile,
                )
                
                print(f"Received tip floor update:")
                print(f"  Time: {tip_floor.time}")
                print(f"  25th percentile: {tip_floor.landed_tips_25th_percentile:.6f} SOL")
                print(f"  50th percentile: {tip_floor.landed_tips_50th_percentile:.6f} SOL")
                print(f"  75th percentile: {tip_floor.landed_tips_75th_percentile:.6f} SOL")
                print(f"  95th percentile: {tip_floor.landed_tips_95th_percentile:.6f} SOL")
                print(f"  EMA 50th percentile: {tip_floor.ema_landed_tips_50th_percentile:.6f} SOL")
                print("  ---")
                
                # Update global tip strategy
                await process_tip_floor_update(tip_floor)
                
                # Call custom callback if provided
                if callback:
                    await callback(tip_floor)
                    
            except Exception as e:
                logging.error(f"Error processing tip floor update: {e}")
                
    except Exception as e:
        logging.error(f"Tip floor stream error: {e}")
        # Implement reconnection logic here
    """
    
<strong>    # Mock streaming for demonstration
</strong>    print("Mock tip floor streaming started...")
    
    while True:
        await asyncio.sleep(60)  # Update every minute
        
<strong>        # Generate mock tip floor data
</strong>        mock_tip_floor = TipFloorData(
            time=datetime.now(timezone.utc).isoformat(),
            landed_tips_25th_percentile=0.0011,
            landed_tips_50th_percentile=0.005000001,
            landed_tips_75th_percentile=0.01555,
            landed_tips_95th_percentile=0.09339195639999975,
            landed_tips_99th_percentile=0.4846427910400001,
            ema_landed_tips_50th_percentile=0.005989477267191758,
        )
        
        print(f"Mock tip floor update: {asdict(mock_tip_floor)}")
        await process_tip_floor_update(mock_tip_floor)
        
        if callback:
            await callback(mock_tip_floor)

<strong># Process tip floor updates
</strong>async def process_tip_floor_update(tip_floor: TipFloorData) -> None:
<strong>    """Process incoming tip floor data"""
</strong><strong>    # Update global strategy
</strong>    global_tip_strategy.update_from_tip_floor(tip_floor)
    
<strong>    # Log the update
</strong>    logging.info(f"Updated tip strategy at {tip_floor.time}")
    logging.info(f"  Conservative: {global_tip_strategy.conservative_tip} lamports")
    logging.info(f"  Normal: {global_tip_strategy.normal_tip} lamports")
    logging.info(f"  Aggressive: {global_tip_strategy.aggressive_tip} lamports")
    logging.info(f"  Priority: {global_tip_strategy.priority_tip} lamports")
    
<strong>    # Store historical data
</strong>    await store_tip_floor_data(tip_floor)
    
<strong>    # Trigger any pending transactions
</strong>    await trigger_pending_transactions()

<strong># Historical data management
</strong>class TipFloorHistory:
    def __init__(self, max_size: int = 1000):
        self.data: List[TipFloorData] = []
        self.max_size = max_size
    
    def add(self, tip_floor: TipFloorData):
<strong>        """Add tip floor data to history"""
</strong>        if len(self.data) >= self.max_size:
            self.data.pop(0)  # Remove oldest
        self.data.append(tip_floor)
    
    def get_trend(self, percentile: str = "50th", window: int = 10) -> float:
<strong>        """Calculate tip trend over time window"""
</strong>        if len(self.data) &#x3C; 2:
            return 0.0
        
        recent_data = self.data[-window:] if len(self.data) >= window else self.data
        
        if len(recent_data) &#x3C; 2:
            return 0.0
        
        start_value = getattr(recent_data[0], f"landed_tips_{percentile}_percentile")
        end_value = getattr(recent_data[-1], f"landed_tips_{percentile}_percentile")
        
        return end_value - start_value
    
    def get_average(self, percentile: str = "50th", window: int = 10) -> float:
<strong>        """Get average tip over time window"""
</strong>        if not self.data:
            return 0.0
        
        recent_data = self.data[-window:] if len(self.data) >= window else self.data
        values = [getattr(d, f"landed_tips_{percentile}_percentile") for d in recent_data]
        
        return sum(values) / len(values)

<strong># Global history tracker
</strong>tip_floor_history = TipFloorHistory()

<strong># Smart tip calculation with trend analysis
</strong>async def get_smart_tip(
    base_priority: str = "normal",
    consider_trend: bool = True,
) -> int:
<strong>    """Calculate smart tip amount based on current data and trends"""
</strong>    base_tip = global_tip_strategy.get_tip_for_priority(base_priority)
    
    if not consider_trend or len(tip_floor_history.data) &#x3C; 2:
        return base_tip
    
<strong>    # Analyze trend
</strong>    trend = tip_floor_history.get_trend("50th", window=5)
    
<strong>    # Adjust tip based on trend
</strong>    if trend > 0.001:  # Tips increasing
        adjustment_factor = 1.2
        print(f"Tips trending up (+{trend:.6f}), increasing tip by 20%")
    elif trend &#x3C; -0.001:  # Tips decreasing
        adjustment_factor = 0.9
        print(f"Tips trending down ({trend:.6f}), decreasing tip by 10%")
    else:
        adjustment_factor = 1.0
        print(f"Tips stable ({trend:.6f}), no adjustment")
    
    smart_tip = int(base_tip * adjustment_factor)
    return max(smart_tip, 100_000)  # Minimum tip of 0.0001 SOL

<strong># Store tip floor data
</strong>async def store_tip_floor_data(tip_floor: TipFloorData) -> None:
<strong>    """Store tip floor data for analysis"""
</strong><strong>    # Add to history
</strong>    tip_floor_history.add(tip_floor)
    
<strong>    # Optionally save to file
</strong>    filename = f"tip_data_{datetime.now().strftime('%Y%m%d')}.jsonl"
    
    with open(filename, "a") as f:
        json.dump(asdict(tip_floor), f)
        f.write("\n")

<strong># Trigger pending transactions
</strong>async def trigger_pending_transactions() -> None:
<strong>    """Check and trigger any pending transactions with updated tips"""
</strong>    print("Checking for pending transactions to trigger...")
<strong>    # Implementation would check your pending transaction queue
</strong>    # and submit them with updated tip amounts
</code></pre>

## Usage Example

<pre class="language-python"><code class="lang-python"><strong>async def tip_floor_example():
</strong><strong>    # Connect to NextBlock (see connection.md)
</strong>    # config = NextBlockConfig.from_env()
    # async with NextBlockConnectionManager(config) as manager:
    #     nextblock_client = manager.client
    
<strong>    # Custom callback for tip floor updates
</strong>    async def on_tip_floor_update(tip_floor: TipFloorData):
        print(f"Custom handler: Received update at {tip_floor.time}")
        
<strong>        # Example: Trigger high-priority transactions when tips are low
</strong>        if tip_floor.landed_tips_50th_percentile &#x3C; 0.002:  # Less than 0.002 SOL
            print("Tips are low - good time for high-priority transactions!")
            # await submit_priority_transactions()
    
<strong>    # Start streaming in background
</strong>    stream_task = asyncio.create_task(
        stream_tip_floor(
            # nextblock_client,
            update_frequency="1m",
            callback=on_tip_floor_update
        )
    )
    
<strong>    # Example usage of dynamic tips
</strong>    await asyncio.sleep(5)  # Wait for initial data
    
<strong>    # Get current optimal tips
</strong>    conservative_tip = global_tip_strategy.get_tip_for_priority("conservative")
    normal_tip = global_tip_strategy.get_tip_for_priority("normal")
    aggressive_tip = global_tip_strategy.get_tip_for_priority("aggressive")
    
    print(f"Current optimal tips:")
    print(f"  Conservative: {conservative_tip} lamports")
    print(f"  Normal: {normal_tip} lamports")
    print(f"  Aggressive: {aggressive_tip} lamports")
    
<strong>    # Get smart tip with trend analysis
</strong>    smart_tip = await get_smart_tip("normal", consider_trend=True)
    print(f"  Smart tip: {smart_tip} lamports")
    
<strong>    # Keep streaming
</strong>    try:
        await stream_task
    except KeyboardInterrupt:
        stream_task.cancel()
        print("Tip floor streaming stopped")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(tip_floor_example())
</code></pre>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.nextblock.io/api/examples/python/tip-floor-stream.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
