Submit Batched Transactions
Submit 2-4 transactions as an atomic bundle to NextBlock using Python. Batched transactions are processed as Jito bundles - either all succeed or none do.
Example
import asyncio
import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized
# NextBlock tip wallets
NEXTBLOCK_TIP_WALLETS = [
"NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
"nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
"NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
"NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
"NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
"NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
"neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
"nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]
# Transaction bundle builder
class TransactionBundle:
def __init__(self):
self.transactions: List[Transaction] = []
self.max_size = 4
self.min_size = 2
def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
"""Add transaction to bundle"""
if len(self.transactions) >= self.max_size:
raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
self.transactions.append(transaction)
return self
def validate(self) -> bool:
"""Validate bundle before submission"""
if len(self.transactions) < self.min_size:
raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
if len(self.transactions) > self.max_size:
raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
# Check for duplicate signatures
signatures = set()
for tx in self.transactions:
sig = str(tx.signatures[0])
if sig in signatures:
raise ValueError("Duplicate transaction signatures in bundle")
signatures.add(sig)
return True
def to_base64_transactions(self) -> List[str]:
"""Convert all transactions to base64 strings"""
self.validate()
base64_transactions = []
for tx in self.transactions:
serialized_tx = bytes(tx)
base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
base64_transactions.append(base64_tx)
return base64_transactions
def get_signatures(self) -> List[str]:
"""Get all transaction signatures"""
return [str(tx.signatures[0]) for tx in self.transactions]
# Build single transaction with tip
async def build_transaction_with_tip(
payer: Keypair,
recent_blockhash: Hash,
tip_amount: int,
instructions: List,
) -> Transaction:
# Random tip wallet for load balancing
tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
# Create tip instruction (should be first)
tip_instruction = transfer(
TransferParams(
from_pubkey=payer.pubkey(),
to_pubkey=tip_wallet,
lamports=tip_amount
)
)
# Combine all instructions
all_instructions = [tip_instruction] + instructions
# Create and sign transaction
message = MessageV0.try_compile(
payer=payer.pubkey(),
instructions=all_instructions,
address_lookup_table_accounts=[],
recent_blockhash=recent_blockhash,
)
transaction = Transaction.new_unsigned(message)
transaction.sign([payer], recent_blockhash)
return transaction
# Submit batched transactions
async def submit_batched_transactions(
# nextblock_client, # Your generated gRPC client
rpc_client: AsyncClient,
signer: Keypair,
transaction_specs: List[Tuple[List, int]], # [(instructions, tip_amount), ...]
) -> str:
"""Submit multiple transactions as an atomic bundle"""
# Get recent blockhash (same for all transactions in bundle)
response = await rpc_client.get_latest_blockhash(commitment=Finalized)
recent_blockhash = response.value.blockhash
# Build transaction bundle
bundle = TransactionBundle()
for instructions, tip_amount in transaction_specs:
transaction = await build_transaction_with_tip(
signer,
recent_blockhash,
tip_amount,
instructions
)
bundle.add_transaction(transaction)
# Get base64 transactions for submission
base64_transactions = bundle.to_base64_transactions()
signatures = bundle.get_signatures()
# Log transaction signatures
for i, sig in enumerate(signatures):
print(f"Transaction {i+1} signature: {sig}")
# Submit bundle to NextBlock
""" Uncomment when you have the generated gRPC client
entries = [
PostSubmitRequestEntry(
transaction=TransactionMessage(content=base64_tx)
)
for base64_tx in base64_transactions
]
request = PostSubmitBatchRequest(entries=entries)
response = await nextblock_client.post_submit_batch_v2(request)
print(f"Batch submitted successfully!")
print(f"Bundle signature: {response.signature}")
return response.signature
"""
# Mock response for demonstration
bundle_signature = f"bundle-{len(bundle.transactions)}-transactions"
print(f"Batch of {len(bundle.transactions)} transactions built successfully!")
return bundle_signature
# Build common transaction patterns
async def build_arbitrage_bundle(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
dex_a_address: Pubkey,
dex_b_address: Pubkey,
trade_amount: int,
) -> str:
"""Build arbitrage bundle for cross-DEX trading"""
# Transaction 1: Buy on DEX A
buy_instructions = [
# Add your DEX-specific buy instructions here
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=dex_a_address,
lamports=trade_amount
))
]
# Transaction 2: Sell on DEX B
sell_instructions = [
# Add your DEX-specific sell instructions here
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=dex_b_address,
lamports=trade_amount
))
]
transaction_specs = [
(buy_instructions, 2_000_000), # Higher tip for arbitrage
(sell_instructions, 2_000_000), # Higher tip for arbitrage
]
return await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)
# Build complex DeFi operation bundle
async def build_defi_operation_bundle(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
) -> str:
"""Build complex DeFi operation bundle"""
# Transaction 1: Setup - Create token accounts
setup_instructions = [
# Add token account creation instructions
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<token-program-address>"),
lamports=1_000_000 # Rent exemption
))
]
# Transaction 2: Main operation - Execute swap
swap_instructions = [
# Add swap instructions (Jupiter, Raydium, etc.)
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<swap-program-address>"),
lamports=0 # No SOL transfer for swap
))
]
# Transaction 3: Cleanup - Stake or provide liquidity
stake_instructions = [
# Add staking/liquidity instructions
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<stake-pool-address>"),
lamports=0
))
]
transaction_specs = [
(setup_instructions, 500_000), # Setup tip
(swap_instructions, 1_500_000), # Main operation tip
(stake_instructions, 750_000), # Cleanup tip
]
return await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)Advanced Bundle Management
Usage Examples
Best Practices
Bundle size limits: Keep bundles between 2-4 transactions for optimal success rates
Transaction ordering: Setup → Main operations → Cleanup
Progressive tipping: Use higher tips for more critical transactions
Error handling: Validate bundles before submission
Performance monitoring: Track bundle success rates and timing
Conditional execution: Filter transactions based on current conditions
Tip optimization: Adjust tips based on transaction importance and network conditions
Last updated