Tip Floor Stream
Stream real-time tip floor data from NextBlock to optimize transaction tips dynamically using Python.
import asyncio
import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
import logging
# Tip floor data structure
@dataclass
class TipFloorData:
time: str
landed_tips_25th_percentile: float
landed_tips_50th_percentile: float
landed_tips_75th_percentile: float
landed_tips_95th_percentile: float
landed_tips_99th_percentile: float
ema_landed_tips_50th_percentile: float
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TipFloorData':
return cls(**data)
def to_lamports(self, percentile: str) -> int:
"""Convert SOL amounts to lamports"""
sol_amount = getattr(self, f"landed_tips_{percentile}_percentile")
return int(sol_amount * 1_000_000_000)
# Tip strategy management
@dataclass
class TipStrategy:
conservative_tip: int = 500_000 # 25th percentile
normal_tip: int = 1_000_000 # 50th percentile
aggressive_tip: int = 2_000_000 # 75th percentile
priority_tip: int = 5_000_000 # 95th percentile
last_updated: datetime = None
def update_from_tip_floor(self, tip_floor: TipFloorData):
"""Update strategy based on tip floor data"""
self.conservative_tip = tip_floor.to_lamports("25th")
self.normal_tip = tip_floor.to_lamports("50th")
self.aggressive_tip = tip_floor.to_lamports("75th")
self.priority_tip = tip_floor.to_lamports("95th")
self.last_updated = datetime.now(timezone.utc)
def get_tip_for_priority(self, priority: str) -> int:
"""Get tip amount for given priority level"""
return {
"conservative": self.conservative_tip,
"normal": self.normal_tip,
"aggressive": self.aggressive_tip,
"priority": self.priority_tip,
}.get(priority, self.normal_tip)
# Global tip strategy instance
global_tip_strategy = TipStrategy()
# Stream tip floor data
async def stream_tip_floor(
# nextblock_client, # Your generated gRPC client
update_frequency: str = "1m",
callback: Optional[callable] = None,
) -> None:
"""Stream tip floor updates from NextBlock"""
print(f"Starting tip floor stream with frequency: {update_frequency}")
""" Uncomment when you have the generated gRPC client
try:
request = TipFloorStreamRequest(update_frequency=update_frequency)
stream = nextblock_client.stream_tip_floor(request)
print("Streaming tip floor data:")
async for tip_floor_response in stream:
try:
# Convert protobuf response to TipFloorData
tip_floor = TipFloorData(
time=tip_floor_response.time,
landed_tips_25th_percentile=tip_floor_response.landed_tips_25th_percentile,
landed_tips_50th_percentile=tip_floor_response.landed_tips_50th_percentile,
landed_tips_75th_percentile=tip_floor_response.landed_tips_75th_percentile,
landed_tips_95th_percentile=tip_floor_response.landed_tips_95th_percentile,
landed_tips_99th_percentile=tip_floor_response.landed_tips_99th_percentile,
ema_landed_tips_50th_percentile=tip_floor_response.ema_landed_tips_50th_percentile,
)
print(f"Received tip floor update:")
print(f" Time: {tip_floor.time}")
print(f" 25th percentile: {tip_floor.landed_tips_25th_percentile:.6f} SOL")
print(f" 50th percentile: {tip_floor.landed_tips_50th_percentile:.6f} SOL")
print(f" 75th percentile: {tip_floor.landed_tips_75th_percentile:.6f} SOL")
print(f" 95th percentile: {tip_floor.landed_tips_95th_percentile:.6f} SOL")
print(f" EMA 50th percentile: {tip_floor.ema_landed_tips_50th_percentile:.6f} SOL")
print(" ---")
# Update global tip strategy
await process_tip_floor_update(tip_floor)
# Call custom callback if provided
if callback:
await callback(tip_floor)
except Exception as e:
logging.error(f"Error processing tip floor update: {e}")
except Exception as e:
logging.error(f"Tip floor stream error: {e}")
# Implement reconnection logic here
"""
# Mock streaming for demonstration
print("Mock tip floor streaming started...")
while True:
await asyncio.sleep(60) # Update every minute
# Generate mock tip floor data
mock_tip_floor = TipFloorData(
time=datetime.now(timezone.utc).isoformat(),
landed_tips_25th_percentile=0.0011,
landed_tips_50th_percentile=0.005000001,
landed_tips_75th_percentile=0.01555,
landed_tips_95th_percentile=0.09339195639999975,
landed_tips_99th_percentile=0.4846427910400001,
ema_landed_tips_50th_percentile=0.005989477267191758,
)
print(f"Mock tip floor update: {asdict(mock_tip_floor)}")
await process_tip_floor_update(mock_tip_floor)
if callback:
await callback(mock_tip_floor)
# Process tip floor updates
async def process_tip_floor_update(tip_floor: TipFloorData) -> None:
"""Process incoming tip floor data"""
# Update global strategy
global_tip_strategy.update_from_tip_floor(tip_floor)
# Log the update
logging.info(f"Updated tip strategy at {tip_floor.time}")
logging.info(f" Conservative: {global_tip_strategy.conservative_tip} lamports")
logging.info(f" Normal: {global_tip_strategy.normal_tip} lamports")
logging.info(f" Aggressive: {global_tip_strategy.aggressive_tip} lamports")
logging.info(f" Priority: {global_tip_strategy.priority_tip} lamports")
# Store historical data
await store_tip_floor_data(tip_floor)
# Trigger any pending transactions
await trigger_pending_transactions()
# Historical data management
class TipFloorHistory:
def __init__(self, max_size: int = 1000):
self.data: List[TipFloorData] = []
self.max_size = max_size
def add(self, tip_floor: TipFloorData):
"""Add tip floor data to history"""
if len(self.data) >= self.max_size:
self.data.pop(0) # Remove oldest
self.data.append(tip_floor)
def get_trend(self, percentile: str = "50th", window: int = 10) -> float:
"""Calculate tip trend over time window"""
if len(self.data) < 2:
return 0.0
recent_data = self.data[-window:] if len(self.data) >= window else self.data
if len(recent_data) < 2:
return 0.0
start_value = getattr(recent_data[0], f"landed_tips_{percentile}_percentile")
end_value = getattr(recent_data[-1], f"landed_tips_{percentile}_percentile")
return end_value - start_value
def get_average(self, percentile: str = "50th", window: int = 10) -> float:
"""Get average tip over time window"""
if not self.data:
return 0.0
recent_data = self.data[-window:] if len(self.data) >= window else self.data
values = [getattr(d, f"landed_tips_{percentile}_percentile") for d in recent_data]
return sum(values) / len(values)
# Global history tracker
tip_floor_history = TipFloorHistory()
# Smart tip calculation with trend analysis
async def get_smart_tip(
base_priority: str = "normal",
consider_trend: bool = True,
) -> int:
"""Calculate smart tip amount based on current data and trends"""
base_tip = global_tip_strategy.get_tip_for_priority(base_priority)
if not consider_trend or len(tip_floor_history.data) < 2:
return base_tip
# Analyze trend
trend = tip_floor_history.get_trend("50th", window=5)
# Adjust tip based on trend
if trend > 0.001: # Tips increasing
adjustment_factor = 1.2
print(f"Tips trending up (+{trend:.6f}), increasing tip by 20%")
elif trend < -0.001: # Tips decreasing
adjustment_factor = 0.9
print(f"Tips trending down ({trend:.6f}), decreasing tip by 10%")
else:
adjustment_factor = 1.0
print(f"Tips stable ({trend:.6f}), no adjustment")
smart_tip = int(base_tip * adjustment_factor)
return max(smart_tip, 100_000) # Minimum tip of 0.0001 SOL
# Store tip floor data
async def store_tip_floor_data(tip_floor: TipFloorData) -> None:
"""Store tip floor data for analysis"""
# Add to history
tip_floor_history.add(tip_floor)
# Optionally save to file
filename = f"tip_data_{datetime.now().strftime('%Y%m%d')}.jsonl"
with open(filename, "a") as f:
json.dump(asdict(tip_floor), f)
f.write("\n")
# Trigger pending transactions
async def trigger_pending_transactions() -> None:
"""Check and trigger any pending transactions with updated tips"""
print("Checking for pending transactions to trigger...")
# Implementation would check your pending transaction queue
# and submit them with updated tip amounts
Usage Example
async def tip_floor_example():
# Connect to NextBlock (see connection.md)
# config = NextBlockConfig.from_env()
# async with NextBlockConnectionManager(config) as manager:
# nextblock_client = manager.client
# Custom callback for tip floor updates
async def on_tip_floor_update(tip_floor: TipFloorData):
print(f"Custom handler: Received update at {tip_floor.time}")
# Example: Trigger high-priority transactions when tips are low
if tip_floor.landed_tips_50th_percentile < 0.002: # Less than 0.002 SOL
print("Tips are low - good time for high-priority transactions!")
# await submit_priority_transactions()
# Start streaming in background
stream_task = asyncio.create_task(
stream_tip_floor(
# nextblock_client,
update_frequency="1m",
callback=on_tip_floor_update
)
)
# Example usage of dynamic tips
await asyncio.sleep(5) # Wait for initial data
# Get current optimal tips
conservative_tip = global_tip_strategy.get_tip_for_priority("conservative")
normal_tip = global_tip_strategy.get_tip_for_priority("normal")
aggressive_tip = global_tip_strategy.get_tip_for_priority("aggressive")
print(f"Current optimal tips:")
print(f" Conservative: {conservative_tip} lamports")
print(f" Normal: {normal_tip} lamports")
print(f" Aggressive: {aggressive_tip} lamports")
# Get smart tip with trend analysis
smart_tip = await get_smart_tip("normal", consider_trend=True)
print(f" Smart tip: {smart_tip} lamports")
# Keep streaming
try:
await stream_task
except KeyboardInterrupt:
stream_task.cancel()
print("Tip floor streaming stopped")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(tip_floor_example())
Last updated