# Submit Batched Transactions

Submit 2-4 transactions as an atomic bundle to NextBlock using Python. Batched transactions are processed as Jito bundles - either all succeed or none do.

This example shows bundle construction and the request shape you send to NextBlock. Replace the placeholder generated client types with the client generated from `nextblock-proto`.

## Example

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass

from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized

<strong># NextBlock tip wallets
</strong>NEXTBLOCK_TIP_WALLETS = [
    "NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
    "nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
    "NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
    "NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
    "NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
    "NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
    "neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
    "nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]

<strong># Transaction bundle builder
</strong>class TransactionBundle:
    def __init__(self):
        self.transactions: List[Transaction] = []
        self.max_size = 4
        self.min_size = 2
    
    def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
<strong>        """Add transaction to bundle"""
</strong>        if len(self.transactions) >= self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
        self.transactions.append(transaction)
        return self
    
    def validate(self) -> bool:
<strong>        """Validate bundle before submission"""
</strong>        if len(self.transactions) &#x3C; self.min_size:
            raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
        
        if len(self.transactions) > self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
<strong>        # Check for duplicate signatures
</strong>        signatures = set()
        for tx in self.transactions:
            sig = str(tx.signatures[0])
            if sig in signatures:
                raise ValueError("Duplicate transaction signatures in bundle")
            signatures.add(sig)
        
        return True
    
    def to_base64_transactions(self) -> List[str]:
<strong>        """Convert all transactions to base64 strings"""
</strong>        self.validate()
        
        base64_transactions = []
        for tx in self.transactions:
            serialized_tx = bytes(tx)
            base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
            base64_transactions.append(base64_tx)
        
        return base64_transactions
    
    def get_signatures(self) -> List[str]:
<strong>        """Get all transaction signatures"""
</strong>        return [str(tx.signatures[0]) for tx in self.transactions]

<strong># Build single transaction with tip
</strong>async def build_transaction_with_tip(
    payer: Keypair,
    recent_blockhash: Hash,
    tip_amount: int,
    instructions: List,
) -> Transaction:
<strong>    # Random tip wallet for load balancing
</strong>    tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
    
<strong>    # Create tip instruction (should be first)
</strong>    tip_instruction = transfer(
        TransferParams(
            from_pubkey=payer.pubkey(),
            to_pubkey=tip_wallet,
            lamports=tip_amount
        )
    )
    
<strong>    # Combine all instructions
</strong>    all_instructions = [tip_instruction] + instructions
    
<strong>    # Create and sign transaction
</strong>    message = MessageV0.try_compile(
        payer=payer.pubkey(),
        instructions=all_instructions,
        address_lookup_table_accounts=[],
        recent_blockhash=recent_blockhash,
    )
    
    transaction = Transaction.new_unsigned(message)
    transaction.sign([payer], recent_blockhash)
    
    return transaction

<strong># Submit batched transactions
</strong>async def submit_batched_transactions(
    # nextblock_client,  # Your generated gRPC client
    rpc_client: AsyncClient,
    signer: Keypair,
    transaction_specs: List[Tuple[List, int]],  # [(instructions, tip_amount), ...]
) -> str:
<strong>    """Submit multiple transactions as an atomic bundle"""
</strong>    
<strong>    # Get recent blockhash (same for all transactions in bundle)
</strong>    response = await rpc_client.get_latest_blockhash(commitment=Finalized)
    recent_blockhash = response.value.blockhash
    
<strong>    # Build transaction bundle
</strong>    bundle = TransactionBundle()
    
    for instructions, tip_amount in transaction_specs:
        transaction = await build_transaction_with_tip(
            signer,
            recent_blockhash,
            tip_amount,
            instructions
        )
        bundle.add_transaction(transaction)
    
<strong>    # Get base64 transactions for submission
</strong>    base64_transactions = bundle.to_base64_transactions()
    signatures = bundle.get_signatures()
    
<strong>    # Log transaction signatures
</strong>    for i, sig in enumerate(signatures):
        print(f"Transaction {i+1} signature: {sig}")
    
<strong>    # Submit bundle to NextBlock
</strong>    """ Uncomment when you have the generated gRPC client
    entries = [
        PostSubmitRequestEntry(
            transaction=TransactionMessage(content=base64_tx)
        )
        for base64_tx in base64_transactions
    ]
    
    request = PostSubmitBatchRequest(entries=entries)
    response = await nextblock_client.post_submit_batch_v2(request)
    
    print(f"Batch submitted successfully!")
    print(f"Bundle signature: {response.signature}")
    
    return response.signature
    """
    
<strong>    # Until your generated client is wired in, return a placeholder bundle signature
</strong>    bundle_signature = ""
    print(f"Local bundle prepared with {len(bundle.transactions)} transactions.")
    return bundle_signature

<strong># Build common transaction patterns
</strong>async def build_arbitrage_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
    dex_a_address: Pubkey,
    dex_b_address: Pubkey,
    trade_amount: int,
) -> str:
<strong>    """Build arbitrage bundle for cross-DEX trading"""
</strong>    
<strong>    # Transaction 1: Buy on DEX A
</strong>    buy_instructions = [
<strong>        # Add your DEX-specific buy instructions here
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_a_address,
            lamports=trade_amount
        ))
    ]
    
<strong>    # Transaction 2: Sell on DEX B
</strong>    sell_instructions = [
<strong>        # Add your DEX-specific sell instructions here
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_b_address,
            lamports=trade_amount
        ))
    ]
    
    transaction_specs = [
        (buy_instructions, 2_000_000),   # Higher tip for arbitrage
        (sell_instructions, 2_000_000),  # Higher tip for arbitrage
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )

<strong># Build complex DeFi operation bundle
</strong>async def build_defi_operation_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
) -> str:
<strong>    """Build complex DeFi operation bundle"""
</strong>    
<strong>    # Transaction 1: Setup - Create token accounts
</strong>    setup_instructions = [
<strong>        # Add token account creation instructions
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("&#x3C;token-program-address>"),
            lamports=1_000_000  # Rent exemption
        ))
    ]
    
<strong>    # Transaction 2: Main operation - Execute swap
</strong>    swap_instructions = [
<strong>        # Add swap instructions (Jupiter, Raydium, etc.)
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("&#x3C;swap-program-address>"),
            lamports=0  # No SOL transfer for swap
        ))
    ]
    
<strong>    # Transaction 3: Cleanup - Stake or provide liquidity
</strong>    stake_instructions = [
<strong>        # Add staking/liquidity instructions
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("&#x3C;stake-pool-address>"),
            lamports=0
        ))
    ]
    
    transaction_specs = [
        (setup_instructions, 500_000),    # Setup tip
        (swap_instructions, 1_500_000),   # Main operation tip
        (stake_instructions, 750_000),    # Cleanup tip
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )
</code></pre>

## Advanced Bundle Management

<pre class="language-python"><code class="lang-python"><strong># Bundle with conditional execution
</strong>class ConditionalBundle(TransactionBundle):
    def __init__(self):
        super().__init__()
        self.conditions: List[callable] = []
    
    def add_conditional_transaction(
        self, 
        transaction: Transaction, 
        condition: callable
    ) -> 'ConditionalBundle':
<strong>        """Add transaction that only executes if condition is met"""
</strong>        self.add_transaction(transaction)
        self.conditions.append(condition)
        return self
    
    async def evaluate_conditions(self, rpc_client: AsyncClient) -> List[bool]:
<strong>        """Evaluate all conditions before submission"""
</strong>        results = []
        for condition in self.conditions:
            try:
                result = await condition(rpc_client) if asyncio.iscoroutinefunction(condition) else condition()
                results.append(bool(result))
            except Exception as e:
                print(f"Condition evaluation failed: {e}")
                results.append(False)
        return results
    
    def filter_by_conditions(self, condition_results: List[bool]) -> 'TransactionBundle':
<strong>        """Create new bundle with only transactions that meet conditions"""
</strong>        filtered_bundle = TransactionBundle()
        
        for i, (transaction, condition_met) in enumerate(zip(self.transactions, condition_results)):
            if condition_met:
                filtered_bundle.add_transaction(transaction)
            else:
                print(f"Transaction {i+1} filtered out due to condition")
        
        return filtered_bundle

<strong># Bundle performance optimizer
</strong>class BundleOptimizer:
    def __init__(self):
        self.tip_multipliers = {
            "setup": 0.5,      # Lower priority
            "main": 1.5,       # Higher priority
            "cleanup": 0.75,   # Medium priority
            "arbitrage": 2.0,  # Highest priority
        }
    
    def optimize_tips(
        self, 
        base_tip: int, 
        transaction_types: List[str]
    ) -> List[int]:
<strong>        """Optimize tip amounts based on transaction types"""
</strong>        optimized_tips = []
        
        for tx_type in transaction_types:
            multiplier = self.tip_multipliers.get(tx_type, 1.0)
            optimized_tip = int(base_tip * multiplier)
            optimized_tips.append(optimized_tip)
        
        return optimized_tips
    
    def reorder_transactions(
        self, 
        transactions: List[Transaction], 
        transaction_types: List[str]
    ) -> Tuple[List[Transaction], List[str]]:
<strong>        """Reorder transactions for optimal execution"""
</strong><strong>        # Priority order: setup -> main -> cleanup
</strong>        priority_order = {"setup": 1, "main": 2, "cleanup": 3, "arbitrage": 0}
        
<strong>        # Create pairs and sort by priority
</strong>        tx_pairs = list(zip(transactions, transaction_types))
        tx_pairs.sort(key=lambda x: priority_order.get(x[1], 2))
        
<strong>        # Unzip the sorted pairs
</strong>        sorted_transactions, sorted_types = zip(*tx_pairs)
        return list(sorted_transactions), list(sorted_types)

<strong># Bundle status monitoring
</strong>@dataclass
class BundleStatus:
    bundle_id: str
    transaction_count: int
    submitted_at: float
    signatures: List[str]
    status: str = "pending"  # pending, confirmed, failed
    
    def is_complete(self) -> bool:
        return self.status in ["confirmed", "failed"]

class BundleTracker:
    def __init__(self):
        self.bundles: Dict[str, BundleStatus] = {}
    
    def track_bundle(self, bundle_status: BundleStatus):
<strong>        """Start tracking a bundle"""
</strong>        self.bundles[bundle_status.bundle_id] = bundle_status
    
    async def check_bundle_status(
        self, 
        bundle_id: str, 
        rpc_client: AsyncClient
    ) -> Optional[BundleStatus]:
<strong>        """Check the status of a tracked bundle"""
</strong>        if bundle_id not in self.bundles:
            return None
        
        bundle_status = self.bundles[bundle_id]
        
<strong>        # Check if all transactions are confirmed
</strong>        confirmed_count = 0
        for signature in bundle_status.signatures:
            try:
<strong>                # Check transaction status
</strong>                # response = await rpc_client.get_signature_status(signature)
                # if response.value and response.value.confirmation_status:
                #     confirmed_count += 1
                confirmed_count += 1  # Mock confirmation
            except Exception as e:
                print(f"Failed to check signature {signature}: {e}")
        
<strong>        # Update bundle status
</strong>        if confirmed_count == bundle_status.transaction_count:
            bundle_status.status = "confirmed"
        elif time.time() - bundle_status.submitted_at > 60:  # Timeout after 60 seconds
            bundle_status.status = "failed"
        
        return bundle_status
</code></pre>

## Usage Examples

<pre class="language-python"><code class="lang-python"><strong>async def main():
</strong><strong>    # Initialize clients
</strong>    rpc_client = AsyncClient("https://api.mainnet-beta.solana.com")
    signer = Keypair()  # Use your actual keypair
    
<strong>    # Connect to NextBlock (see connection.md)
</strong>    # config = NextBlockConfig.from_env()
    # async with NextBlockConnectionManager(config) as manager:
    #     nextblock_client = manager.client
    
    try:
<strong>        # Example 1: Basic batch submission
</strong>        transaction_specs = [
<strong>            # Setup transaction
</strong>            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("&#x3C;recipient1>"),
                lamports=100_000
            ))], 500_000),  # 0.0005 SOL tip
            
<strong>            # Main transaction
</strong>            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("&#x3C;recipient2>"),
                lamports=200_000
            ))], 1_000_000),  # 0.001 SOL tip
            
<strong>            # Cleanup transaction
</strong>            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("&#x3C;recipient3>"),
                lamports=50_000
            ))], 500_000),  # 0.0005 SOL tip
        ]
        
        bundle_signature = await submit_batched_transactions(
            # nextblock_client,
            rpc_client,
            signer,
            transaction_specs
        )
        print(f"Basic batch: {bundle_signature}")
        
<strong>        # Example 2: Arbitrage bundle
</strong>        arb_signature = await build_arbitrage_bundle(
            # nextblock_client,
            rpc_client,
            signer,
            Pubkey.from_string("&#x3C;dex-a-address>"),
            Pubkey.from_string("&#x3C;dex-b-address>"),
            1_000_000  # 0.001 SOL trade
        )
        print(f"Arbitrage bundle: {arb_signature}")
        
<strong>        # Example 3: Optimized DeFi bundle
</strong>        defi_signature = await build_defi_operation_bundle(
            # nextblock_client,
            rpc_client,
            signer
        )
        print(f"DeFi bundle: {defi_signature}")
        
<strong>        # Example 4: Bundle with optimization
</strong>        optimizer = BundleOptimizer()
        base_tip = 1_000_000
        transaction_types = ["setup", "main", "cleanup"]
        optimized_tips = optimizer.optimize_tips(base_tip, transaction_types)
        
        print(f"Optimized tips: {optimized_tips}")
        
    finally:
        await rpc_client.close()

if __name__ == "__main__":
    asyncio.run(main())
</code></pre>

## Best Practices

1. **Bundle size limits**: Keep bundles between 2-4 transactions for optimal success rates
2. **Transaction ordering**: Setup → Main operations → Cleanup
3. **Progressive tipping**: Use higher tips for more critical transactions
4. **Error handling**: Validate bundles before submission
5. **Performance monitoring**: Track bundle success rates and timing
6. **Conditional execution**: Filter transactions based on current conditions
7. **Tip optimization**: Adjust tips based on transaction importance and network conditions
