Tip Floor Stream

Stream real-time tip floor data from NextBlock to optimize transaction tips dynamically using Python.

import asyncio
import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
import logging

# Tip floor data structure
@dataclass
class TipFloorData:
    time: str
    landed_tips_25th_percentile: float
    landed_tips_50th_percentile: float
    landed_tips_75th_percentile: float
    landed_tips_95th_percentile: float
    landed_tips_99th_percentile: float
    ema_landed_tips_50th_percentile: float
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'TipFloorData':
        return cls(**data)
    
    def to_lamports(self, percentile: str) -> int:
        """Convert SOL amounts to lamports"""
        sol_amount = getattr(self, f"landed_tips_{percentile}_percentile")
        return int(sol_amount * 1_000_000_000)

# Tip strategy management
@dataclass
class TipStrategy:
    conservative_tip: int = 500_000     # 25th percentile
    normal_tip: int = 1_000_000        # 50th percentile
    aggressive_tip: int = 2_000_000    # 75th percentile
    priority_tip: int = 5_000_000      # 95th percentile
    last_updated: datetime = None
    
    def update_from_tip_floor(self, tip_floor: TipFloorData):
        """Update strategy based on tip floor data"""
        self.conservative_tip = tip_floor.to_lamports("25th")
        self.normal_tip = tip_floor.to_lamports("50th")
        self.aggressive_tip = tip_floor.to_lamports("75th")
        self.priority_tip = tip_floor.to_lamports("95th")
        self.last_updated = datetime.now(timezone.utc)
    
    def get_tip_for_priority(self, priority: str) -> int:
        """Get tip amount for given priority level"""
        return {
            "conservative": self.conservative_tip,
            "normal": self.normal_tip,
            "aggressive": self.aggressive_tip,
            "priority": self.priority_tip,
        }.get(priority, self.normal_tip)

# Global tip strategy instance
global_tip_strategy = TipStrategy()

# Stream tip floor data
async def stream_tip_floor(
    # nextblock_client,  # Your generated gRPC client
    update_frequency: str = "1m",
    callback: Optional[callable] = None,
) -> None:
    """Stream tip floor updates from NextBlock"""
    print(f"Starting tip floor stream with frequency: {update_frequency}")
    
    """ Uncomment when you have the generated gRPC client
    try:
        request = TipFloorStreamRequest(update_frequency=update_frequency)
        stream = nextblock_client.stream_tip_floor(request)
        
        print("Streaming tip floor data:")
        
        async for tip_floor_response in stream:
            try:
                # Convert protobuf response to TipFloorData
                tip_floor = TipFloorData(
                    time=tip_floor_response.time,
                    landed_tips_25th_percentile=tip_floor_response.landed_tips_25th_percentile,
                    landed_tips_50th_percentile=tip_floor_response.landed_tips_50th_percentile,
                    landed_tips_75th_percentile=tip_floor_response.landed_tips_75th_percentile,
                    landed_tips_95th_percentile=tip_floor_response.landed_tips_95th_percentile,
                    landed_tips_99th_percentile=tip_floor_response.landed_tips_99th_percentile,
                    ema_landed_tips_50th_percentile=tip_floor_response.ema_landed_tips_50th_percentile,
                )
                
                print(f"Received tip floor update:")
                print(f"  Time: {tip_floor.time}")
                print(f"  25th percentile: {tip_floor.landed_tips_25th_percentile:.6f} SOL")
                print(f"  50th percentile: {tip_floor.landed_tips_50th_percentile:.6f} SOL")
                print(f"  75th percentile: {tip_floor.landed_tips_75th_percentile:.6f} SOL")
                print(f"  95th percentile: {tip_floor.landed_tips_95th_percentile:.6f} SOL")
                print(f"  EMA 50th percentile: {tip_floor.ema_landed_tips_50th_percentile:.6f} SOL")
                print("  ---")
                
                # Update global tip strategy
                await process_tip_floor_update(tip_floor)
                
                # Call custom callback if provided
                if callback:
                    await callback(tip_floor)
                    
            except Exception as e:
                logging.error(f"Error processing tip floor update: {e}")
                
    except Exception as e:
        logging.error(f"Tip floor stream error: {e}")
        # Implement reconnection logic here
    """
    
    # Mock streaming for demonstration
    print("Mock tip floor streaming started...")
    
    while True:
        await asyncio.sleep(60)  # Update every minute
        
        # Generate mock tip floor data
        mock_tip_floor = TipFloorData(
            time=datetime.now(timezone.utc).isoformat(),
            landed_tips_25th_percentile=0.0011,
            landed_tips_50th_percentile=0.005000001,
            landed_tips_75th_percentile=0.01555,
            landed_tips_95th_percentile=0.09339195639999975,
            landed_tips_99th_percentile=0.4846427910400001,
            ema_landed_tips_50th_percentile=0.005989477267191758,
        )
        
        print(f"Mock tip floor update: {asdict(mock_tip_floor)}")
        await process_tip_floor_update(mock_tip_floor)
        
        if callback:
            await callback(mock_tip_floor)

# Process tip floor updates
async def process_tip_floor_update(tip_floor: TipFloorData) -> None:
    """Process incoming tip floor data"""
    # Update global strategy
    global_tip_strategy.update_from_tip_floor(tip_floor)
    
    # Log the update
    logging.info(f"Updated tip strategy at {tip_floor.time}")
    logging.info(f"  Conservative: {global_tip_strategy.conservative_tip} lamports")
    logging.info(f"  Normal: {global_tip_strategy.normal_tip} lamports")
    logging.info(f"  Aggressive: {global_tip_strategy.aggressive_tip} lamports")
    logging.info(f"  Priority: {global_tip_strategy.priority_tip} lamports")
    
    # Store historical data
    await store_tip_floor_data(tip_floor)
    
    # Trigger any pending transactions
    await trigger_pending_transactions()

# Historical data management
class TipFloorHistory:
    def __init__(self, max_size: int = 1000):
        self.data: List[TipFloorData] = []
        self.max_size = max_size
    
    def add(self, tip_floor: TipFloorData):
        """Add tip floor data to history"""
        if len(self.data) >= self.max_size:
            self.data.pop(0)  # Remove oldest
        self.data.append(tip_floor)
    
    def get_trend(self, percentile: str = "50th", window: int = 10) -> float:
        """Calculate tip trend over time window"""
        if len(self.data) < 2:
            return 0.0
        
        recent_data = self.data[-window:] if len(self.data) >= window else self.data
        
        if len(recent_data) < 2:
            return 0.0
        
        start_value = getattr(recent_data[0], f"landed_tips_{percentile}_percentile")
        end_value = getattr(recent_data[-1], f"landed_tips_{percentile}_percentile")
        
        return end_value - start_value
    
    def get_average(self, percentile: str = "50th", window: int = 10) -> float:
        """Get average tip over time window"""
        if not self.data:
            return 0.0
        
        recent_data = self.data[-window:] if len(self.data) >= window else self.data
        values = [getattr(d, f"landed_tips_{percentile}_percentile") for d in recent_data]
        
        return sum(values) / len(values)

# Global history tracker
tip_floor_history = TipFloorHistory()

# Smart tip calculation with trend analysis
async def get_smart_tip(
    base_priority: str = "normal",
    consider_trend: bool = True,
) -> int:
    """Calculate smart tip amount based on current data and trends"""
    base_tip = global_tip_strategy.get_tip_for_priority(base_priority)
    
    if not consider_trend or len(tip_floor_history.data) < 2:
        return base_tip
    
    # Analyze trend
    trend = tip_floor_history.get_trend("50th", window=5)
    
    # Adjust tip based on trend
    if trend > 0.001:  # Tips increasing
        adjustment_factor = 1.2
        print(f"Tips trending up (+{trend:.6f}), increasing tip by 20%")
    elif trend < -0.001:  # Tips decreasing
        adjustment_factor = 0.9
        print(f"Tips trending down ({trend:.6f}), decreasing tip by 10%")
    else:
        adjustment_factor = 1.0
        print(f"Tips stable ({trend:.6f}), no adjustment")
    
    smart_tip = int(base_tip * adjustment_factor)
    return max(smart_tip, 100_000)  # Minimum tip of 0.0001 SOL

# Store tip floor data
async def store_tip_floor_data(tip_floor: TipFloorData) -> None:
    """Store tip floor data for analysis"""
    # Add to history
    tip_floor_history.add(tip_floor)
    
    # Optionally save to file
    filename = f"tip_data_{datetime.now().strftime('%Y%m%d')}.jsonl"
    
    with open(filename, "a") as f:
        json.dump(asdict(tip_floor), f)
        f.write("\n")

# Trigger pending transactions
async def trigger_pending_transactions() -> None:
    """Check and trigger any pending transactions with updated tips"""
    print("Checking for pending transactions to trigger...")
    # Implementation would check your pending transaction queue
    # and submit them with updated tip amounts

Usage Example

async def tip_floor_example():
    # Connect to NextBlock (see connection.md)
    # config = NextBlockConfig.from_env()
    # async with NextBlockConnectionManager(config) as manager:
    #     nextblock_client = manager.client
    
    # Custom callback for tip floor updates
    async def on_tip_floor_update(tip_floor: TipFloorData):
        print(f"Custom handler: Received update at {tip_floor.time}")
        
        # Example: Trigger high-priority transactions when tips are low
        if tip_floor.landed_tips_50th_percentile < 0.002:  # Less than 0.002 SOL
            print("Tips are low - good time for high-priority transactions!")
            # await submit_priority_transactions()
    
    # Start streaming in background
    stream_task = asyncio.create_task(
        stream_tip_floor(
            # nextblock_client,
            update_frequency="1m",
            callback=on_tip_floor_update
        )
    )
    
    # Example usage of dynamic tips
    await asyncio.sleep(5)  # Wait for initial data
    
    # Get current optimal tips
    conservative_tip = global_tip_strategy.get_tip_for_priority("conservative")
    normal_tip = global_tip_strategy.get_tip_for_priority("normal")
    aggressive_tip = global_tip_strategy.get_tip_for_priority("aggressive")
    
    print(f"Current optimal tips:")
    print(f"  Conservative: {conservative_tip} lamports")
    print(f"  Normal: {normal_tip} lamports")
    print(f"  Aggressive: {aggressive_tip} lamports")
    
    # Get smart tip with trend analysis
    smart_tip = await get_smart_tip("normal", consider_trend=True)
    print(f"  Smart tip: {smart_tip} lamports")
    
    # Keep streaming
    try:
        await stream_task
    except KeyboardInterrupt:
        stream_task.cancel()
        print("Tip floor streaming stopped")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(tip_floor_example())

Last updated