Submit Batched Transactions
Example
import asyncio
import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized
# NextBlock tip wallets
NEXTBLOCK_TIP_WALLETS = [
"NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
"nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
"NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
"NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
"NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
"NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
"neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
"nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]
# Transaction bundle builder
class TransactionBundle:
def __init__(self):
self.transactions: List[Transaction] = []
self.max_size = 4
self.min_size = 2
def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
"""Add transaction to bundle"""
if len(self.transactions) >= self.max_size:
raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
self.transactions.append(transaction)
return self
def validate(self) -> bool:
"""Validate bundle before submission"""
if len(self.transactions) < self.min_size:
raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
if len(self.transactions) > self.max_size:
raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
# Check for duplicate signatures
signatures = set()
for tx in self.transactions:
sig = str(tx.signatures[0])
if sig in signatures:
raise ValueError("Duplicate transaction signatures in bundle")
signatures.add(sig)
return True
def to_base64_transactions(self) -> List[str]:
"""Convert all transactions to base64 strings"""
self.validate()
base64_transactions = []
for tx in self.transactions:
serialized_tx = bytes(tx)
base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
base64_transactions.append(base64_tx)
return base64_transactions
def get_signatures(self) -> List[str]:
"""Get all transaction signatures"""
return [str(tx.signatures[0]) for tx in self.transactions]
# Build single transaction with tip
async def build_transaction_with_tip(
payer: Keypair,
recent_blockhash: Hash,
tip_amount: int,
instructions: List,
) -> Transaction:
# Random tip wallet for load balancing
tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
# Create tip instruction (should be first)
tip_instruction = transfer(
TransferParams(
from_pubkey=payer.pubkey(),
to_pubkey=tip_wallet,
lamports=tip_amount
)
)
# Combine all instructions
all_instructions = [tip_instruction] + instructions
# Create and sign transaction
message = MessageV0.try_compile(
payer=payer.pubkey(),
instructions=all_instructions,
address_lookup_table_accounts=[],
recent_blockhash=recent_blockhash,
)
transaction = Transaction.new_unsigned(message)
transaction.sign([payer], recent_blockhash)
return transaction
# Submit batched transactions
async def submit_batched_transactions(
# nextblock_client, # Your generated gRPC client
rpc_client: AsyncClient,
signer: Keypair,
transaction_specs: List[Tuple[List, int]], # [(instructions, tip_amount), ...]
) -> str:
"""Submit multiple transactions as an atomic bundle"""
# Get recent blockhash (same for all transactions in bundle)
response = await rpc_client.get_latest_blockhash(commitment=Finalized)
recent_blockhash = response.value.blockhash
# Build transaction bundle
bundle = TransactionBundle()
for instructions, tip_amount in transaction_specs:
transaction = await build_transaction_with_tip(
signer,
recent_blockhash,
tip_amount,
instructions
)
bundle.add_transaction(transaction)
# Get base64 transactions for submission
base64_transactions = bundle.to_base64_transactions()
signatures = bundle.get_signatures()
# Log transaction signatures
for i, sig in enumerate(signatures):
print(f"Transaction {i+1} signature: {sig}")
# Submit bundle to NextBlock
""" Uncomment when you have the generated gRPC client
entries = [
PostSubmitRequestEntry(
transaction=TransactionMessage(content=base64_tx)
)
for base64_tx in base64_transactions
]
request = PostSubmitBatchRequest(entries=entries)
response = await nextblock_client.post_submit_batch_v2(request)
print(f"Batch submitted successfully!")
print(f"Bundle signature: {response.signature}")
return response.signature
"""
# Mock response for demonstration
bundle_signature = f"bundle-{len(bundle.transactions)}-transactions"
print(f"Batch of {len(bundle.transactions)} transactions built successfully!")
return bundle_signature
# Build common transaction patterns
async def build_arbitrage_bundle(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
dex_a_address: Pubkey,
dex_b_address: Pubkey,
trade_amount: int,
) -> str:
"""Build arbitrage bundle for cross-DEX trading"""
# Transaction 1: Buy on DEX A
buy_instructions = [
# Add your DEX-specific buy instructions here
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=dex_a_address,
lamports=trade_amount
))
]
# Transaction 2: Sell on DEX B
sell_instructions = [
# Add your DEX-specific sell instructions here
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=dex_b_address,
lamports=trade_amount
))
]
transaction_specs = [
(buy_instructions, 2_000_000), # Higher tip for arbitrage
(sell_instructions, 2_000_000), # Higher tip for arbitrage
]
return await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)
# Build complex DeFi operation bundle
async def build_defi_operation_bundle(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
) -> str:
"""Build complex DeFi operation bundle"""
# Transaction 1: Setup - Create token accounts
setup_instructions = [
# Add token account creation instructions
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<token-program-address>"),
lamports=1_000_000 # Rent exemption
))
]
# Transaction 2: Main operation - Execute swap
swap_instructions = [
# Add swap instructions (Jupiter, Raydium, etc.)
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<swap-program-address>"),
lamports=0 # No SOL transfer for swap
))
]
# Transaction 3: Cleanup - Stake or provide liquidity
stake_instructions = [
# Add staking/liquidity instructions
transfer(TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=Pubkey.from_string("<stake-pool-address>"),
lamports=0
))
]
transaction_specs = [
(setup_instructions, 500_000), # Setup tip
(swap_instructions, 1_500_000), # Main operation tip
(stake_instructions, 750_000), # Cleanup tip
]
return await submit_batched_transactions(
# nextblock_client,
rpc_client,
signer,
transaction_specs
)Advanced Bundle Management
Usage Examples
Best Practices
Last updated