Submit Single Transaction
Submit individual transactions to NextBlock using Python with proper tipping and error handling.
Example
import asyncio
import base64
import random
from typing import List, Optional
from dataclasses import dataclass
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized
# NextBlock tip wallets for load balancing
NEXTBLOCK_TIP_WALLETS = [
"NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
"nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
"NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
"NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
"NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
"NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
"neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
"nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]
# Get random tip wallet for load balancing
def get_random_nextblock_tip_wallet() -> Pubkey:
wallet_str = random.choice(NEXTBLOCK_TIP_WALLETS)
return Pubkey.from_string(wallet_str)
# Transaction submission parameters
@dataclass
class SubmissionOptions:
skip_preflight: bool = True
front_running_protection: bool = False
revert_on_fail: bool = False
disable_retries: bool = False
snipe_transaction: bool = False
# Build transaction with tip
async def build_transaction_with_tip(
payer: Keypair,
recent_blockhash: Hash,
tip_amount: int,
instructions: List,
) -> Transaction:
# Create tip instruction (should be first)
tip_wallet = get_random_nextblock_tip_wallet()
tip_instruction = transfer(
TransferParams(
from_pubkey=payer.pubkey(),
to_pubkey=tip_wallet,
lamports=tip_amount
)
)
# Combine all instructions
all_instructions = [tip_instruction] + instructions
# Create and sign transaction
message = MessageV0.try_compile(
payer=payer.pubkey(),
instructions=all_instructions,
address_lookup_table_accounts=[],
recent_blockhash=recent_blockhash,
)
transaction = Transaction.new_unsigned(message)
transaction.sign([payer], recent_blockhash)
return transaction
# Submit single transaction to NextBlock
async def submit_single_transaction(
# nextblock_client, # Your generated gRPC client
rpc_client: AsyncClient,
signer: Keypair,
recipient: Pubkey,
transfer_amount: int,
tip_amount: int,
options: SubmissionOptions = None,
) -> str:
if options is None:
options = SubmissionOptions()
# Get recent blockhash
response = await rpc_client.get_latest_blockhash(commitment=Finalized)
recent_blockhash = response.value.blockhash
# Create transfer instruction
transfer_instruction = transfer(
TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=recipient,
lamports=transfer_amount
)
)
# Build transaction with tip
transaction = await build_transaction_with_tip(
signer,
recent_blockhash,
tip_amount,
[transfer_instruction]
)
# Convert to base64 for submission
serialized_tx = bytes(transaction)
base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
# Submit to NextBlock
""" Uncomment when you have the generated gRPC client
request = PostSubmitRequest(
transaction=TransactionMessage(content=base64_tx),
skip_pre_flight=options.skip_preflight,
snipe_transaction=options.snipe_transaction,
front_running_protection=options.front_running_protection,
disable_retries=options.disable_retries,
revert_on_fail=options.revert_on_fail,
)
response = await nextblock_client.post_submit_v2(request)
print(f"Transaction submitted successfully!")
print(f"Signature: {response.signature}")
print(f"UUID: {response.uuid}")
return response.signature
"""
# Mock response for demonstration
signature = str(transaction.signatures[0])
print(f"Transaction built successfully: {signature}")
return signature
# Submit with optimal tip calculation
async def submit_with_optimal_tip(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
recipient: Pubkey,
transfer_amount: int,
priority_level: str = "normal",
) -> str:
# Calculate optimal tip based on priority level
tip_amounts = {
"conservative": 500_000, # 0.0005 SOL
"normal": 1_000_000, # 0.001 SOL
"aggressive": 2_000_000, # 0.002 SOL
"priority": 5_000_000, # 0.005 SOL
}
tip_amount = tip_amounts.get(priority_level, tip_amounts["normal"])
return await submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
transfer_amount,
tip_amount
)
# Batch multiple single transactions
async def submit_multiple_single_transactions(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
transactions_data: List[tuple], # [(recipient, amount, tip), ...]
) -> List[str]:
"""Submit multiple single transactions concurrently"""
tasks = []
for recipient, transfer_amount, tip_amount in transactions_data:
task = submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
transfer_amount,
tip_amount
)
tasks.append(task)
# Execute all transactions concurrently
signatures = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results
successful_signatures = []
for i, result in enumerate(signatures):
if isinstance(result, Exception):
print(f"Transaction {i+1} failed: {result}")
else:
successful_signatures.append(result)
print(f"Transaction {i+1} successful: {result}")
return successful_signatures
Advanced Features
import time
from typing import Dict, Any
import backoff
# Transaction with retry logic
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=3,
base=2,
max_value=30
)
async def submit_transaction_with_retry(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
recipient: Pubkey,
transfer_amount: int,
tip_amount: int,
) -> str:
return await submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
transfer_amount,
tip_amount
)
# Transaction builder with custom instructions
class TransactionBuilder:
def __init__(self, payer: Keypair):
self.payer = payer
self.instructions = []
self.tip_amount = 1_000_000 # Default tip
def add_instruction(self, instruction):
"""Add custom instruction to transaction"""
self.instructions.append(instruction)
return self
def set_tip_amount(self, amount: int):
"""Set tip amount in lamports"""
self.tip_amount = amount
return self
async def build_and_submit(
self,
# nextblock_client,
rpc_client: AsyncClient,
options: SubmissionOptions = None
) -> str:
"""Build and submit the transaction"""
if not self.instructions:
raise ValueError("No instructions added to transaction")
# Get recent blockhash
response = await rpc_client.get_latest_blockhash(commitment=Finalized)
recent_blockhash = response.value.blockhash
# Build transaction with tip
transaction = await build_transaction_with_tip(
self.payer,
recent_blockhash,
self.tip_amount,
self.instructions
)
# Convert and submit
serialized_tx = bytes(transaction)
base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
# Submit logic here (similar to above)
signature = str(transaction.signatures[0])
print(f"Custom transaction submitted: {signature}")
return signature
# Performance monitoring
class TransactionMetrics:
def __init__(self):
self.submissions = 0
self.successes = 0
self.failures = 0
self.total_time = 0.0
self.start_time = None
def start_submission(self):
self.submissions += 1
self.start_time = time.time()
def record_success(self):
if self.start_time:
self.total_time += time.time() - self.start_time
self.successes += 1
self.start_time = None
def record_failure(self):
if self.start_time:
self.total_time += time.time() - self.start_time
self.failures += 1
self.start_time = None
def get_stats(self) -> Dict[str, Any]:
return {
"total_submissions": self.submissions,
"successes": self.successes,
"failures": self.failures,
"success_rate": self.successes / max(self.submissions, 1) * 100,
"average_time": self.total_time / max(self.successes, 1),
}
# Monitored transaction submission
async def submit_with_monitoring(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
recipient: Pubkey,
transfer_amount: int,
tip_amount: int,
metrics: TransactionMetrics,
) -> Optional[str]:
metrics.start_submission()
try:
signature = await submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
transfer_amount,
tip_amount
)
metrics.record_success()
return signature
except Exception as e:
metrics.record_failure()
print(f"Transaction failed: {e}")
return None
Usage Examples
async def main():
# Initialize clients
rpc_client = AsyncClient("https://api.mainnet-beta.solana.com")
signer = Keypair() # Use your actual keypair
recipient = Pubkey.from_string("<recipient-public-key>")
# Connect to NextBlock (see connection.md)
# config = NextBlockConfig.from_env()
# async with NextBlockConnectionManager(config) as manager:
# nextblock_client = manager.client
try:
# Example 1: Basic transaction submission
signature1 = await submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
10_000, # Transfer 10,000 lamports
1_000_000, # Tip 1,000,000 lamports (0.001 SOL)
)
print(f"Basic transaction: {signature1}")
# Example 2: Transaction with optimal tip
signature2 = await submit_with_optimal_tip(
# nextblock_client,
rpc_client,
signer,
recipient,
20_000, # Transfer 20,000 lamports
"aggressive" # Use aggressive tip strategy
)
print(f"Optimally tipped transaction: {signature2}")
# Example 3: Custom transaction builder
builder = TransactionBuilder(signer)
signature3 = await (builder
.add_instruction(transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=recipient,
lamports=30_000
)))
.set_tip_amount(1_500_000)
.build_and_submit(
# nextblock_client,
rpc_client
))
print(f"Custom built transaction: {signature3}")
# Example 4: Multiple transactions
transactions_data = [
(recipient, 5_000, 500_000),
(recipient, 7_500, 750_000),
(recipient, 12_500, 1_250_000),
]
signatures = await submit_multiple_single_transactions(
# nextblock_client,
rpc_client,
signer,
transactions_data
)
print(f"Multiple transactions: {signatures}")
# Example 5: Transaction with monitoring
metrics = TransactionMetrics()
for i in range(5):
await submit_with_monitoring(
# nextblock_client,
rpc_client,
signer,
recipient,
1_000 * (i + 1), # Varying amounts
500_000, # Fixed tip
metrics
)
print(f"Performance metrics: {metrics.get_stats()}")
finally:
await rpc_client.close()
# Run the examples
if __name__ == "__main__":
asyncio.run(main())
Best Practices
Always include tips: NextBlock prioritizes transactions with appropriate tips
Use random tip wallets: Distribute load across multiple tip addresses
Monitor tip floors: Adjust tips based on current network conditions
Handle errors gracefully: Implement retry logic with exponential backoff
Validate inputs: Always validate public keys and amounts before submission
Use async/await: Leverage Python's asyncio for better performance
Monitor performance: Track success rates and response times
Choose appropriate RPC endpoints: Use reliable RPC providers for blockhash retrieval
Last updated