Submit Batched Transactions
Submit 2-4 transactions as an atomic bundle to NextBlock using Python. Batched transactions are processed as Jito bundles - either all succeed or none do.
Example
import asyncio
import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized
# NextBlock tip wallets
NEXTBLOCK_TIP_WALLETS = [
"NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
"nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
"NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
"NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
"NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
"NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
"neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
"nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]
# Transaction bundle builder
class TransactionBundle:
def __init__(self):
self.transactions: List[Transaction] = []
self.max_size = 4
self.min_size = 2
def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
"""Add transaction to bundle"""
if len(self.transactions) >= self.max_size:
raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
self.transactions.append(transaction)
return self
def validate(self) -> bool:
"""Validate bundle before submission"""
if len(self.transactions) < self.min_size:
raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
if len(self.transactions) > self.max_size:
raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
# Check for duplicate signatures
signatures = set()
for tx in self.transactions:
sig = str(tx.signatures[0])
if sig in signatures:
raise ValueError("Duplicate transaction signatures in bundle")
signatures.add(sig)
return True
def to_base64_transactions(self) -> List[str]:
"""Convert all transactions to base64 strings"""
self.validate()
base64_transactions = []
for tx in self.transactions:
serialized_tx = bytes(tx)
base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
base64_transactions.append(base64_tx)
return base64_transactions
def get_signatures(self) -> List[str]:
"""Get all transaction signatures"""
return [str(tx.signatures[0]) for tx in self.transactions]
# Build single transaction with tip
async def build_transaction_with_tip(
payer: Keypair,
recent_blockhash: Hash,
tip_amount: int,
instructions: List,
) -> Transaction:
# Random tip wallet for load balancing
tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
# Create tip instruction (should be first)
tip_instruction = transfer(
TransferParams(
from_pubkey=payer.pubkey(),
to_pubkey=tip_wallet,
lamports=tip_amount
)
)
# Combine all instructions
all_instructions = [tip_instruction] + instructions
# Create and sign transaction
message = MessageV0.try_compile(
payer=payer.pubkey(),
instructions=all_instructions,
address_lookup_table_accounts=[],
recent_blockhash=recent_blockhash,
)
transaction = Transaction.new_unsigned(message)
transaction.sign([payer], recent_blockhash)
return transaction
# Submit batched transactions
async def submit_batched_transactions(
# nextblock_client, # Your generated gRPC client
rpc_client: AsyncClient,
signer: Keypair,
transaction_specs: List[Tuple[List, int]], # [(instructions, tip_amount), ...]
) -> str:
"""Submit multiple transactions as an atomic bundle"""
# Get recent blockhash (same for all transactions in bundle)
response = await rpc_client.get_latest_blockhash(commitment=Finalized)
recent_blockhash = response.value.blockhash
# Build transaction bundle
bundle = TransactionBundle()
for instructions, tip_amount in transaction_specs:
transaction = await build_transaction_with_tip(
signer,
recent_blockhash,
tip_amount,
instructions
)
bundle.add_transaction(transaction)
# Get base64 transactions for submission
base64_transactions = bundle.to_base64_transactions()
signatures = bundle.get_signatures()
# Log transaction signatures
for i, sig in enumerate(signatures):
print(f"Transaction {i+1} signature: {sig}")
# Submit bundle to NextBlock
""" Uncomment when you have the generated gRPC client
entries = [
PostSubmitRequestEntry(
transaction=TransactionMessage(content=base64_tx)
)
for base64_tx in base64_transactions
]
request = PostSubmitBatchRequest(entries=entries)
response = await nextblock_client.post_submit_batch_v2(request)
print(f"Batch submitted successfully!")
print(f"Bundle signature: {response.signature}")
return response.signature
"""
# Mock response for demonstration
bundle_signature = f"bundle-{len(bundle.transactions)}-transactions"
print(f"Batch of {len(bundle.transactions)} transactions built successfully!")
return bundle_signature
# Build common transaction patterns
async def build_arbitrage_bundle(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
dex_a_address: Pubkey,
dex_b_address: Pubkey,
trade_amount: int,
) -> str:
"""Build arbitrage bundle for cross-DEX trading"""
# Transaction 1: Buy on DEX A
buy_instructions = [
# Add your DEX-specific buy instructions here
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=dex_a_address,
lamports=trade_amount
))
]
# Transaction 2: Sell on DEX B
sell_instructions = [
# Add your DEX-specific sell instructions here
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=dex_b_address,
lamports=trade_amount
))
]
transaction_specs = [
(buy_instructions, 2_000_000), # Higher tip for arbitrage
(sell_instructions, 2_000_000), # Higher tip for arbitrage
]
return await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)
# Build complex DeFi operation bundle
async def build_defi_operation_bundle(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
) -> str:
"""Build complex DeFi operation bundle"""
# Transaction 1: Setup - Create token accounts
setup_instructions = [
# Add token account creation instructions
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<token-program-address>"),
lamports=1_000_000 # Rent exemption
))
]
# Transaction 2: Main operation - Execute swap
swap_instructions = [
# Add swap instructions (Jupiter, Raydium, etc.)
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<swap-program-address>"),
lamports=0 # No SOL transfer for swap
))
]
# Transaction 3: Cleanup - Stake or provide liquidity
stake_instructions = [
# Add staking/liquidity instructions
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<stake-pool-address>"),
lamports=0
))
]
transaction_specs = [
(setup_instructions, 500_000), # Setup tip
(swap_instructions, 1_500_000), # Main operation tip
(stake_instructions, 750_000), # Cleanup tip
]
return await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)
Advanced Bundle Management
# Bundle with conditional execution
class ConditionalBundle(TransactionBundle):
def __init__(self):
super().__init__()
self.conditions: List[callable] = []
def add_conditional_transaction(
self,
transaction: Transaction,
condition: callable
) -> 'ConditionalBundle':
"""Add transaction that only executes if condition is met"""
self.add_transaction(transaction)
self.conditions.append(condition)
return self
async def evaluate_conditions(self, rpc_client: AsyncClient) -> List[bool]:
"""Evaluate all conditions before submission"""
results = []
for condition in self.conditions:
try:
result = await condition(rpc_client) if asyncio.iscoroutinefunction(condition) else condition()
results.append(bool(result))
except Exception as e:
print(f"Condition evaluation failed: {e}")
results.append(False)
return results
def filter_by_conditions(self, condition_results: List[bool]) -> 'TransactionBundle':
"""Create new bundle with only transactions that meet conditions"""
filtered_bundle = TransactionBundle()
for i, (transaction, condition_met) in enumerate(zip(self.transactions, condition_results)):
if condition_met:
filtered_bundle.add_transaction(transaction)
else:
print(f"Transaction {i+1} filtered out due to condition")
return filtered_bundle
# Bundle performance optimizer
class BundleOptimizer:
def __init__(self):
self.tip_multipliers = {
"setup": 0.5, # Lower priority
"main": 1.5, # Higher priority
"cleanup": 0.75, # Medium priority
"arbitrage": 2.0, # Highest priority
}
def optimize_tips(
self,
base_tip: int,
transaction_types: List[str]
) -> List[int]:
"""Optimize tip amounts based on transaction types"""
optimized_tips = []
for tx_type in transaction_types:
multiplier = self.tip_multipliers.get(tx_type, 1.0)
optimized_tip = int(base_tip * multiplier)
optimized_tips.append(optimized_tip)
return optimized_tips
def reorder_transactions(
self,
transactions: List[Transaction],
transaction_types: List[str]
) -> Tuple[List[Transaction], List[str]]:
"""Reorder transactions for optimal execution"""
# Priority order: setup -> main -> cleanup
priority_order = {"setup": 1, "main": 2, "cleanup": 3, "arbitrage": 0}
# Create pairs and sort by priority
tx_pairs = list(zip(transactions, transaction_types))
tx_pairs.sort(key=lambda x: priority_order.get(x[1], 2))
# Unzip the sorted pairs
sorted_transactions, sorted_types = zip(*tx_pairs)
return list(sorted_transactions), list(sorted_types)
# Bundle status monitoring
@dataclass
class BundleStatus:
bundle_id: str
transaction_count: int
submitted_at: float
signatures: List[str]
status: str = "pending" # pending, confirmed, failed
def is_complete(self) -> bool:
return self.status in ["confirmed", "failed"]
class BundleTracker:
def __init__(self):
self.bundles: Dict[str, BundleStatus] = {}
def track_bundle(self, bundle_status: BundleStatus):
"""Start tracking a bundle"""
self.bundles[bundle_status.bundle_id] = bundle_status
async def check_bundle_status(
self,
bundle_id: str,
rpc_client: AsyncClient
) -> Optional[BundleStatus]:
"""Check the status of a tracked bundle"""
if bundle_id not in self.bundles:
return None
bundle_status = self.bundles[bundle_id]
# Check if all transactions are confirmed
confirmed_count = 0
for signature in bundle_status.signatures:
try:
# Check transaction status
# response = await rpc_client.get_signature_status(signature)
# if response.value and response.value.confirmation_status:
# confirmed_count += 1
confirmed_count += 1 # Mock confirmation
except Exception as e:
print(f"Failed to check signature {signature}: {e}")
# Update bundle status
if confirmed_count == bundle_status.transaction_count:
bundle_status.status = "confirmed"
elif time.time() - bundle_status.submitted_at > 60: # Timeout after 60 seconds
bundle_status.status = "failed"
return bundle_status
Usage Examples
async def main():
# Initialize clients
rpc_client = AsyncClient("https://api.mainnet-beta.solana.com")
signer = Keypair() # Use your actual keypair
# Connect to NextBlock (see connection.md)
# config = NextBlockConfig.from_env()
# async with NextBlockConnectionManager(config) as manager:
# nextblock_client = manager.client
try:
# Example 1: Basic batch submission
transaction_specs = [
# Setup transaction
([transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<recipient1>"),
lamports=100_000
))], 500_000), # 0.0005 SOL tip
# Main transaction
([transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<recipient2>"),
lamports=200_000
))], 1_000_000), # 0.001 SOL tip
# Cleanup transaction
([transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<recipient3>"),
lamports=50_000
))], 500_000), # 0.0005 SOL tip
]
bundle_signature = await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)
print(f"Basic batch: {bundle_signature}")
# Example 2: Arbitrage bundle
arb_signature = await build_arbitrage_bundle(
# nextblock_client,
rpc_client,
signer,
Pubkey.from_string("<dex-a-address>"),
Pubkey.from_string("<dex-b-address>"),
1_000_000 # 0.001 SOL trade
)
print(f"Arbitrage bundle: {arb_signature}")
# Example 3: Optimized DeFi bundle
defi_signature = await build_defi_operation_bundle(
# nextblock_client,
rpc_client,
signer
)
print(f"DeFi bundle: {defi_signature}")
# Example 4: Bundle with optimization
optimizer = BundleOptimizer()
base_tip = 1_000_000
transaction_types = ["setup", "main", "cleanup"]
optimized_tips = optimizer.optimize_tips(base_tip, transaction_types)
print(f"Optimized tips: {optimized_tips}")
finally:
await rpc_client.close()
if __name__ == "__main__":
asyncio.run(main())
Best Practices
Bundle size limits: Keep bundles between 2-4 transactions for optimal success rates
Transaction ordering: Setup → Main operations → Cleanup
Progressive tipping: Use higher tips for more critical transactions
Error handling: Validate bundles before submission
Performance monitoring: Track bundle success rates and timing
Conditional execution: Filter transactions based on current conditions
Tip optimization: Adjust tips based on transaction importance and network conditions
Last updated