Submit Batched Transactions

Submit 2-4 transactions as an atomic bundle to NextBlock using Python. Batched transactions are processed as Jito bundles - either all succeed or none do.

Example

import asyncio
import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass

from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized

# NextBlock tip wallets
NEXTBLOCK_TIP_WALLETS = [
    "NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
    "nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
    "NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
    "NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
    "NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
    "NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
    "neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
    "nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]

# Transaction bundle builder
class TransactionBundle:
    def __init__(self):
        self.transactions: List[Transaction] = []
        self.max_size = 4
        self.min_size = 2
    
    def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
        """Add transaction to bundle"""
        if len(self.transactions) >= self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
        self.transactions.append(transaction)
        return self
    
    def validate(self) -> bool:
        """Validate bundle before submission"""
        if len(self.transactions) < self.min_size:
            raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
        
        if len(self.transactions) > self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
        # Check for duplicate signatures
        signatures = set()
        for tx in self.transactions:
            sig = str(tx.signatures[0])
            if sig in signatures:
                raise ValueError("Duplicate transaction signatures in bundle")
            signatures.add(sig)
        
        return True
    
    def to_base64_transactions(self) -> List[str]:
        """Convert all transactions to base64 strings"""
        self.validate()
        
        base64_transactions = []
        for tx in self.transactions:
            serialized_tx = bytes(tx)
            base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
            base64_transactions.append(base64_tx)
        
        return base64_transactions
    
    def get_signatures(self) -> List[str]:
        """Get all transaction signatures"""
        return [str(tx.signatures[0]) for tx in self.transactions]

# Build single transaction with tip
async def build_transaction_with_tip(
    payer: Keypair,
    recent_blockhash: Hash,
    tip_amount: int,
    instructions: List,
) -> Transaction:
    # Random tip wallet for load balancing
    tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
    
    # Create tip instruction (should be first)
    tip_instruction = transfer(
        TransferParams(
            from_pubkey=payer.pubkey(),
            to_pubkey=tip_wallet,
            lamports=tip_amount
        )
    )
    
    # Combine all instructions
    all_instructions = [tip_instruction] + instructions
    
    # Create and sign transaction
    message = MessageV0.try_compile(
        payer=payer.pubkey(),
        instructions=all_instructions,
        address_lookup_table_accounts=[],
        recent_blockhash=recent_blockhash,
    )
    
    transaction = Transaction.new_unsigned(message)
    transaction.sign([payer], recent_blockhash)
    
    return transaction

# Submit batched transactions
async def submit_batched_transactions(
    # nextblock_client,  # Your generated gRPC client
    rpc_client: AsyncClient,
    signer: Keypair,
    transaction_specs: List[Tuple[List, int]],  # [(instructions, tip_amount), ...]
) -> str:
    """Submit multiple transactions as an atomic bundle"""
    
    # Get recent blockhash (same for all transactions in bundle)
    response = await rpc_client.get_latest_blockhash(commitment=Finalized)
    recent_blockhash = response.value.blockhash
    
    # Build transaction bundle
    bundle = TransactionBundle()
    
    for instructions, tip_amount in transaction_specs:
        transaction = await build_transaction_with_tip(
            signer,
            recent_blockhash,
            tip_amount,
            instructions
        )
        bundle.add_transaction(transaction)
    
    # Get base64 transactions for submission
    base64_transactions = bundle.to_base64_transactions()
    signatures = bundle.get_signatures()
    
    # Log transaction signatures
    for i, sig in enumerate(signatures):
        print(f"Transaction {i+1} signature: {sig}")
    
    # Submit bundle to NextBlock
    """ Uncomment when you have the generated gRPC client
    entries = [
        PostSubmitRequestEntry(
            transaction=TransactionMessage(content=base64_tx)
        )
        for base64_tx in base64_transactions
    ]
    
    request = PostSubmitBatchRequest(entries=entries)
    response = await nextblock_client.post_submit_batch_v2(request)
    
    print(f"Batch submitted successfully!")
    print(f"Bundle signature: {response.signature}")
    
    return response.signature
    """
    
    # Mock response for demonstration
    bundle_signature = f"bundle-{len(bundle.transactions)}-transactions"
    print(f"Batch of {len(bundle.transactions)} transactions built successfully!")
    return bundle_signature

# Build common transaction patterns
async def build_arbitrage_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
    dex_a_address: Pubkey,
    dex_b_address: Pubkey,
    trade_amount: int,
) -> str:
    """Build arbitrage bundle for cross-DEX trading"""
    
    # Transaction 1: Buy on DEX A
    buy_instructions = [
        # Add your DEX-specific buy instructions here
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_a_address,
            lamports=trade_amount
        ))
    ]
    
    # Transaction 2: Sell on DEX B
    sell_instructions = [
        # Add your DEX-specific sell instructions here
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_b_address,
            lamports=trade_amount
        ))
    ]
    
    transaction_specs = [
        (buy_instructions, 2_000_000),   # Higher tip for arbitrage
        (sell_instructions, 2_000_000),  # Higher tip for arbitrage
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )

# Build complex DeFi operation bundle
async def build_defi_operation_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
) -> str:
    """Build complex DeFi operation bundle"""
    
    # Transaction 1: Setup - Create token accounts
    setup_instructions = [
        # Add token account creation instructions
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("<token-program-address>"),
            lamports=1_000_000  # Rent exemption
        ))
    ]
    
    # Transaction 2: Main operation - Execute swap
    swap_instructions = [
        # Add swap instructions (Jupiter, Raydium, etc.)
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("<swap-program-address>"),
            lamports=0  # No SOL transfer for swap
        ))
    ]
    
    # Transaction 3: Cleanup - Stake or provide liquidity
    stake_instructions = [
        # Add staking/liquidity instructions
        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("<stake-pool-address>"),
            lamports=0
        ))
    ]
    
    transaction_specs = [
        (setup_instructions, 500_000),    # Setup tip
        (swap_instructions, 1_500_000),   # Main operation tip
        (stake_instructions, 750_000),    # Cleanup tip
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )

Advanced Bundle Management

# Bundle with conditional execution
class ConditionalBundle(TransactionBundle):
    def __init__(self):
        super().__init__()
        self.conditions: List[callable] = []
    
    def add_conditional_transaction(
        self, 
        transaction: Transaction, 
        condition: callable
    ) -> 'ConditionalBundle':
        """Add transaction that only executes if condition is met"""
        self.add_transaction(transaction)
        self.conditions.append(condition)
        return self
    
    async def evaluate_conditions(self, rpc_client: AsyncClient) -> List[bool]:
        """Evaluate all conditions before submission"""
        results = []
        for condition in self.conditions:
            try:
                result = await condition(rpc_client) if asyncio.iscoroutinefunction(condition) else condition()
                results.append(bool(result))
            except Exception as e:
                print(f"Condition evaluation failed: {e}")
                results.append(False)
        return results
    
    def filter_by_conditions(self, condition_results: List[bool]) -> 'TransactionBundle':
        """Create new bundle with only transactions that meet conditions"""
        filtered_bundle = TransactionBundle()
        
        for i, (transaction, condition_met) in enumerate(zip(self.transactions, condition_results)):
            if condition_met:
                filtered_bundle.add_transaction(transaction)
            else:
                print(f"Transaction {i+1} filtered out due to condition")
        
        return filtered_bundle

# Bundle performance optimizer
class BundleOptimizer:
    def __init__(self):
        self.tip_multipliers = {
            "setup": 0.5,      # Lower priority
            "main": 1.5,       # Higher priority
            "cleanup": 0.75,   # Medium priority
            "arbitrage": 2.0,  # Highest priority
        }
    
    def optimize_tips(
        self, 
        base_tip: int, 
        transaction_types: List[str]
    ) -> List[int]:
        """Optimize tip amounts based on transaction types"""
        optimized_tips = []
        
        for tx_type in transaction_types:
            multiplier = self.tip_multipliers.get(tx_type, 1.0)
            optimized_tip = int(base_tip * multiplier)
            optimized_tips.append(optimized_tip)
        
        return optimized_tips
    
    def reorder_transactions(
        self, 
        transactions: List[Transaction], 
        transaction_types: List[str]
    ) -> Tuple[List[Transaction], List[str]]:
        """Reorder transactions for optimal execution"""
        # Priority order: setup -> main -> cleanup
        priority_order = {"setup": 1, "main": 2, "cleanup": 3, "arbitrage": 0}
        
        # Create pairs and sort by priority
        tx_pairs = list(zip(transactions, transaction_types))
        tx_pairs.sort(key=lambda x: priority_order.get(x[1], 2))
        
        # Unzip the sorted pairs
        sorted_transactions, sorted_types = zip(*tx_pairs)
        return list(sorted_transactions), list(sorted_types)

# Bundle status monitoring
@dataclass
class BundleStatus:
    bundle_id: str
    transaction_count: int
    submitted_at: float
    signatures: List[str]
    status: str = "pending"  # pending, confirmed, failed
    
    def is_complete(self) -> bool:
        return self.status in ["confirmed", "failed"]

class BundleTracker:
    def __init__(self):
        self.bundles: Dict[str, BundleStatus] = {}
    
    def track_bundle(self, bundle_status: BundleStatus):
        """Start tracking a bundle"""
        self.bundles[bundle_status.bundle_id] = bundle_status
    
    async def check_bundle_status(
        self, 
        bundle_id: str, 
        rpc_client: AsyncClient
    ) -> Optional[BundleStatus]:
        """Check the status of a tracked bundle"""
        if bundle_id not in self.bundles:
            return None
        
        bundle_status = self.bundles[bundle_id]
        
        # Check if all transactions are confirmed
        confirmed_count = 0
        for signature in bundle_status.signatures:
            try:
                # Check transaction status
                # response = await rpc_client.get_signature_status(signature)
                # if response.value and response.value.confirmation_status:
                #     confirmed_count += 1
                confirmed_count += 1  # Mock confirmation
            except Exception as e:
                print(f"Failed to check signature {signature}: {e}")
        
        # Update bundle status
        if confirmed_count == bundle_status.transaction_count:
            bundle_status.status = "confirmed"
        elif time.time() - bundle_status.submitted_at > 60:  # Timeout after 60 seconds
            bundle_status.status = "failed"
        
        return bundle_status

Usage Examples

async def main():
    # Initialize clients
    rpc_client = AsyncClient("https://api.mainnet-beta.solana.com")
    signer = Keypair()  # Use your actual keypair
    
    # Connect to NextBlock (see connection.md)
    # config = NextBlockConfig.from_env()
    # async with NextBlockConnectionManager(config) as manager:
    #     nextblock_client = manager.client
    
    try:
        # Example 1: Basic batch submission
        transaction_specs = [
            # Setup transaction
            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("<recipient1>"),
                lamports=100_000
            ))], 500_000),  # 0.0005 SOL tip
            
            # Main transaction
            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("<recipient2>"),
                lamports=200_000
            ))], 1_000_000),  # 0.001 SOL tip
            
            # Cleanup transaction
            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("<recipient3>"),
                lamports=50_000
            ))], 500_000),  # 0.0005 SOL tip
        ]
        
        bundle_signature = await submit_batched_transactions(
            # nextblock_client,
            rpc_client,
            signer,
            transaction_specs
        )
        print(f"Basic batch: {bundle_signature}")
        
        # Example 2: Arbitrage bundle
        arb_signature = await build_arbitrage_bundle(
            # nextblock_client,
            rpc_client,
            signer,
            Pubkey.from_string("<dex-a-address>"),
            Pubkey.from_string("<dex-b-address>"),
            1_000_000  # 0.001 SOL trade
        )
        print(f"Arbitrage bundle: {arb_signature}")
        
        # Example 3: Optimized DeFi bundle
        defi_signature = await build_defi_operation_bundle(
            # nextblock_client,
            rpc_client,
            signer
        )
        print(f"DeFi bundle: {defi_signature}")
        
        # Example 4: Bundle with optimization
        optimizer = BundleOptimizer()
        base_tip = 1_000_000
        transaction_types = ["setup", "main", "cleanup"]
        optimized_tips = optimizer.optimize_tips(base_tip, transaction_types)
        
        print(f"Optimized tips: {optimized_tips}")
        
    finally:
        await rpc_client.close()

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

  1. Bundle size limits: Keep bundles between 2-4 transactions for optimal success rates

  2. Transaction ordering: Setup → Main operations → Cleanup

  3. Progressive tipping: Use higher tips for more critical transactions

  4. Error handling: Validate bundles before submission

  5. Performance monitoring: Track bundle success rates and timing

  6. Conditional execution: Filter transactions based on current conditions

  7. Tip optimization: Adjust tips based on transaction importance and network conditions

Last updated