Tip Floor Stream
import asyncio
import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
import logging
# Tip floor data structure
@dataclass
class TipFloorData:
time: str
landed_tips_25th_percentile: float
landed_tips_50th_percentile: float
landed_tips_75th_percentile: float
landed_tips_95th_percentile: float
landed_tips_99th_percentile: float
ema_landed_tips_50th_percentile: float
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TipFloorData':
return cls(**data)
def to_lamports(self, percentile: str) -> int:
"""Convert SOL amounts to lamports"""
sol_amount = getattr(self, f"landed_tips_{percentile}_percentile")
return int(sol_amount * 1_000_000_000)
# Tip strategy management
@dataclass
class TipStrategy:
conservative_tip: int = 500_000 # 25th percentile
normal_tip: int = 1_000_000 # 50th percentile
aggressive_tip: int = 2_000_000 # 75th percentile
priority_tip: int = 5_000_000 # 95th percentile
last_updated: datetime = None
def update_from_tip_floor(self, tip_floor: TipFloorData):
"""Update strategy based on tip floor data"""
self.conservative_tip = tip_floor.to_lamports("25th")
self.normal_tip = tip_floor.to_lamports("50th")
self.aggressive_tip = tip_floor.to_lamports("75th")
self.priority_tip = tip_floor.to_lamports("95th")
self.last_updated = datetime.now(timezone.utc)
def get_tip_for_priority(self, priority: str) -> int:
"""Get tip amount for given priority level"""
return {
"conservative": self.conservative_tip,
"normal": self.normal_tip,
"aggressive": self.aggressive_tip,
"priority": self.priority_tip,
}.get(priority, self.normal_tip)
# Global tip strategy instance
global_tip_strategy = TipStrategy()
# Stream tip floor data
async def stream_tip_floor(
# nextblock_client, # Your generated gRPC client
update_frequency: str = "1m",
callback: Optional[callable] = None,
) -> None:
"""Stream tip floor updates from NextBlock"""
print(f"Starting tip floor stream with frequency: {update_frequency}")
""" Uncomment when you have the generated gRPC client
try:
request = TipFloorStreamRequest(update_frequency=update_frequency)
stream = nextblock_client.stream_tip_floor(request)
print("Streaming tip floor data:")
async for tip_floor_response in stream:
try:
# Convert protobuf response to TipFloorData
tip_floor = TipFloorData(
time=tip_floor_response.time,
landed_tips_25th_percentile=tip_floor_response.landed_tips_25th_percentile,
landed_tips_50th_percentile=tip_floor_response.landed_tips_50th_percentile,
landed_tips_75th_percentile=tip_floor_response.landed_tips_75th_percentile,
landed_tips_95th_percentile=tip_floor_response.landed_tips_95th_percentile,
landed_tips_99th_percentile=tip_floor_response.landed_tips_99th_percentile,
ema_landed_tips_50th_percentile=tip_floor_response.ema_landed_tips_50th_percentile,
)
print(f"Received tip floor update:")
print(f" Time: {tip_floor.time}")
print(f" 25th percentile: {tip_floor.landed_tips_25th_percentile:.6f} SOL")
print(f" 50th percentile: {tip_floor.landed_tips_50th_percentile:.6f} SOL")
print(f" 75th percentile: {tip_floor.landed_tips_75th_percentile:.6f} SOL")
print(f" 95th percentile: {tip_floor.landed_tips_95th_percentile:.6f} SOL")
print(f" EMA 50th percentile: {tip_floor.ema_landed_tips_50th_percentile:.6f} SOL")
print(" ---")
# Update global tip strategy
await process_tip_floor_update(tip_floor)
# Call custom callback if provided
if callback:
await callback(tip_floor)
except Exception as e:
logging.error(f"Error processing tip floor update: {e}")
except Exception as e:
logging.error(f"Tip floor stream error: {e}")
# Implement reconnection logic here
"""
# Mock streaming for demonstration
print("Mock tip floor streaming started...")
while True:
await asyncio.sleep(60) # Update every minute
# Generate mock tip floor data
mock_tip_floor = TipFloorData(
time=datetime.now(timezone.utc).isoformat(),
landed_tips_25th_percentile=0.0011,
landed_tips_50th_percentile=0.005000001,
landed_tips_75th_percentile=0.01555,
landed_tips_95th_percentile=0.09339195639999975,
landed_tips_99th_percentile=0.4846427910400001,
ema_landed_tips_50th_percentile=0.005989477267191758,
)
print(f"Mock tip floor update: {asdict(mock_tip_floor)}")
await process_tip_floor_update(mock_tip_floor)
if callback:
await callback(mock_tip_floor)
# Process tip floor updates
async def process_tip_floor_update(tip_floor: TipFloorData) -> None:
"""Process incoming tip floor data"""
# Update global strategy
global_tip_strategy.update_from_tip_floor(tip_floor)
# Log the update
logging.info(f"Updated tip strategy at {tip_floor.time}")
logging.info(f" Conservative: {global_tip_strategy.conservative_tip} lamports")
logging.info(f" Normal: {global_tip_strategy.normal_tip} lamports")
logging.info(f" Aggressive: {global_tip_strategy.aggressive_tip} lamports")
logging.info(f" Priority: {global_tip_strategy.priority_tip} lamports")
# Store historical data
await store_tip_floor_data(tip_floor)
# Trigger any pending transactions
await trigger_pending_transactions()
# Historical data management
class TipFloorHistory:
def __init__(self, max_size: int = 1000):
self.data: List[TipFloorData] = []
self.max_size = max_size
def add(self, tip_floor: TipFloorData):
"""Add tip floor data to history"""
if len(self.data) >= self.max_size:
self.data.pop(0) # Remove oldest
self.data.append(tip_floor)
def get_trend(self, percentile: str = "50th", window: int = 10) -> float:
"""Calculate tip trend over time window"""
if len(self.data) < 2:
return 0.0
recent_data = self.data[-window:] if len(self.data) >= window else self.data
if len(recent_data) < 2:
return 0.0
start_value = getattr(recent_data[0], f"landed_tips_{percentile}_percentile")
end_value = getattr(recent_data[-1], f"landed_tips_{percentile}_percentile")
return end_value - start_value
def get_average(self, percentile: str = "50th", window: int = 10) -> float:
"""Get average tip over time window"""
if not self.data:
return 0.0
recent_data = self.data[-window:] if len(self.data) >= window else self.data
values = [getattr(d, f"landed_tips_{percentile}_percentile") for d in recent_data]
return sum(values) / len(values)
# Global history tracker
tip_floor_history = TipFloorHistory()
# Smart tip calculation with trend analysis
async def get_smart_tip(
base_priority: str = "normal",
consider_trend: bool = True,
) -> int:
"""Calculate smart tip amount based on current data and trends"""
base_tip = global_tip_strategy.get_tip_for_priority(base_priority)
if not consider_trend or len(tip_floor_history.data) < 2:
return base_tip
# Analyze trend
trend = tip_floor_history.get_trend("50th", window=5)
# Adjust tip based on trend
if trend > 0.001: # Tips increasing
adjustment_factor = 1.2
print(f"Tips trending up (+{trend:.6f}), increasing tip by 20%")
elif trend < -0.001: # Tips decreasing
adjustment_factor = 0.9
print(f"Tips trending down ({trend:.6f}), decreasing tip by 10%")
else:
adjustment_factor = 1.0
print(f"Tips stable ({trend:.6f}), no adjustment")
smart_tip = int(base_tip * adjustment_factor)
return max(smart_tip, 100_000) # Minimum tip of 0.0001 SOL
# Store tip floor data
async def store_tip_floor_data(tip_floor: TipFloorData) -> None:
"""Store tip floor data for analysis"""
# Add to history
tip_floor_history.add(tip_floor)
# Optionally save to file
filename = f"tip_data_{datetime.now().strftime('%Y%m%d')}.jsonl"
with open(filename, "a") as f:
json.dump(asdict(tip_floor), f)
f.write("\n")
# Trigger pending transactions
async def trigger_pending_transactions() -> None:
"""Check and trigger any pending transactions with updated tips"""
print("Checking for pending transactions to trigger...")
# Implementation would check your pending transaction queue
# and submit them with updated tip amountsUsage Example
Last updated