Submit Batched Transactions

Submit 2-4 transactions as an atomic bundle to NextBlock using Python. Batched transactions are processed as Jito bundles - either all succeed or none do.

Example

import asyncio
import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass

from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized

# NextBlock tip wallets
NEXTBLOCK_TIP_WALLETS = [
    "NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
    "nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
    "NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
    "NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
    "NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
    "NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
    "neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
    "nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]

# Transaction bundle builder
class TransactionBundle:
    def __init__(self):
        self.transactions: List[Transaction] = []
        self.max_size = 4
        self.min_size = 2
    
    def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
        """Add transaction to bundle"""
        if len(self.transactions) >= self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
        self.transactions.append(transaction)
        return self
    
    def validate(self) -> bool:
        """Validate bundle before submission"""
        if len(self.transactions) < self.min_size:
            raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
        
        if len(self.transactions) > self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
        # Check for duplicate signatures
        signatures = set()
        for tx in self.transactions:
            sig = str(tx.signatures[0])
            if sig in signatures:
                raise ValueError("Duplicate transaction signatures in bundle")
            signatures.add(sig)
        
        return True
    
    def to_base64_transactions(self) -> List[str]:
        """Convert all transactions to base64 strings"""
        self.validate()
        
        base64_transactions = []
        for tx in self.transactions:
            serialized_tx = bytes(tx)
            base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
            base64_transactions.append(base64_tx)
        
        return base64_transactions
    
    def get_signatures(self) -> List[str]:
        """Get all transaction signatures"""
        return [str(tx.signatures[0]) for tx in self.transactions]

# Build single transaction with tip
async def build_transaction_with_tip(
    payer: Keypair,
    recent_blockhash: Hash,
    tip_amount: int,
    instructions: List,
) -> Transaction:
    # Random tip wallet for load balancing
    tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
    
    # Create tip instruction (should be first)
    tip_instruction = transfer(
        TransferParams(
            from_pubkey=payer.pubkey(),
            to_pubkey=tip_wallet,
            lamports=tip_amount
        )
    )
    
    # Combine all instructions
    all_instructions = [tip_instruction] + instructions
    
    # Create and sign transaction
    message = MessageV0.try_compile(
        payer=payer.pubkey(),
        instructions=all_instructions,
        address_lookup_table_accounts=[],
        recent_blockhash=recent_blockhash,
    )
    
    transaction = Transaction.new_unsigned(message)
    transaction.sign([payer], recent_blockhash)
    
    return transaction

# Submit batched transactions
async def submit_batched_transactions(
    # nextblock_client,  # Your generated gRPC client
    rpc_client: AsyncClient,
    signer: Keypair,
    transaction_specs: List[Tuple[List, int]],  # [(instructions, tip_amount), ...]
) -> str:
    """Submit multiple transactions as an atomic bundle"""
    
    # Get recent blockhash (same for all transactions in bundle)
    response = await rpc_client.get_latest_blockhash(commitment=Finalized)
    recent_blockhash = response.value.blockhash
    
    # Build transaction bundle
    bundle = TransactionBundle()
    
    for instructions, tip_amount in transaction_specs:
        transaction = await build_transaction_with_tip(
            signer,
            recent_blockhash,
            tip_amount,
            instructions
        )
        bundle.add_transaction(transaction)
    
    # Get base64 transactions for submission
    base64_transactions = bundle.to_base64_transactions()
    signatures = bundle.get_signatures()
    
    # Log transaction signatures
    for i, sig in enumerate(signatures):
        print(f"Transaction {i+1} signature: {sig}")
    
    # Submit bundle to NextBlock
    """ Uncomment when you have the generated gRPC client
    entries = [
        PostSubmitRequestEntry(
            transaction=TransactionMessage(content=base64_tx)
        )
        for base64_tx in base64_transactions
    ]
    
    request = PostSubmitBatchRequest(entries=entries)
    response = await nextblock_client.post_submit_batch_v2(request)
    
    print(f"Batch submitted successfully!")
    print(f"Bundle signature: {response.signature}")
    
    return response.signature
    """
    
    # Mock response for demonstration
    bundle_signature = f"bundle-{len(bundle.transactions)}-transactions"
    print(f"Batch of {len(bundle.transactions)} transactions built successfully!")
    return bundle_signature

# Build common transaction patterns
async def build_arbitrage_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
    dex_a_address: Pubkey,
    dex_b_address: Pubkey,
    trade_amount: int,
) -> str:
    """Build arbitrage bundle for cross-DEX trading"""
    
    # Transaction 1: Buy on DEX A
    buy_instructions = [
        # Add your DEX-specific buy instructions here
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_a_address,
            lamports=trade_amount
        ))
    ]
    
    # Transaction 2: Sell on DEX B
    sell_instructions = [
        # Add your DEX-specific sell instructions here
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_b_address,
            lamports=trade_amount
        ))
    ]
    
    transaction_specs = [
        (buy_instructions, 2_000_000),   # Higher tip for arbitrage
        (sell_instructions, 2_000_000),  # Higher tip for arbitrage
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )

# Build complex DeFi operation bundle
async def build_defi_operation_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
) -> str:
    """Build complex DeFi operation bundle"""
    
    # Transaction 1: Setup - Create token accounts
    setup_instructions = [
        # Add token account creation instructions
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("<token-program-address>"),
            lamports=1_000_000  # Rent exemption
        ))
    ]
    
    # Transaction 2: Main operation - Execute swap
    swap_instructions = [
        # Add swap instructions (Jupiter, Raydium, etc.)
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("<swap-program-address>"),
            lamports=0  # No SOL transfer for swap
        ))
    ]
    
    # Transaction 3: Cleanup - Stake or provide liquidity
    stake_instructions = [
        # Add staking/liquidity instructions
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("<stake-pool-address>"),
            lamports=0
        ))
    ]
    
    transaction_specs = [
        (setup_instructions, 500_000),    # Setup tip
        (swap_instructions, 1_500_000),   # Main operation tip
        (stake_instructions, 750_000),    # Cleanup tip
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )

Advanced Bundle Management

Usage Examples

Best Practices

  1. Bundle size limits: Keep bundles between 2-4 transactions for optimal success rates

  2. Transaction ordering: Setup → Main operations → Cleanup

  3. Progressive tipping: Use higher tips for more critical transactions

  4. Error handling: Validate bundles before submission

  5. Performance monitoring: Track bundle success rates and timing

  6. Conditional execution: Filter transactions based on current conditions

  7. Tip optimization: Adjust tips based on transaction importance and network conditions

Last updated