# Submit Batched Transactions

Submit 2-4 transactions as an atomic bundle to NextBlock using Python. Batched transactions are processed as Jito bundles - either all succeed or none do.

This example shows bundle construction and the request shape you send to NextBlock. Replace the placeholder generated client types with the client generated from `nextblock-proto`.

## Example

<pre class="language-python"><code class="lang-python"><strong>import asyncio
</strong>import base64
import random
from typing import List, Optional, Tuple
from dataclasses import dataclass

from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized

<strong># NextBlock tip wallets
</strong>NEXTBLOCK_TIP_WALLETS = [
    "NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
    "nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
    "NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
    "NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
    "NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
    "NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
    "neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
    "nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]

<strong># Transaction bundle builder
</strong>class TransactionBundle:
    def __init__(self):
        self.transactions: List[Transaction] = []
        self.max_size = 4
        self.min_size = 2
    
    def add_transaction(self, transaction: Transaction) -> 'TransactionBundle':
<strong>        """Add transaction to bundle"""
</strong>        if len(self.transactions) >= self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
        self.transactions.append(transaction)
        return self
    
    def validate(self) -> bool:
<strong>        """Validate bundle before submission"""
</strong>        if len(self.transactions) &#x3C; self.min_size:
            raise ValueError(f"Bundle must contain at least {self.min_size} transactions")
        
        if len(self.transactions) > self.max_size:
            raise ValueError(f"Bundle cannot contain more than {self.max_size} transactions")
        
<strong>        # Check for duplicate signatures
</strong>        signatures = set()
        for tx in self.transactions:
            sig = str(tx.signatures[0])
            if sig in signatures:
                raise ValueError("Duplicate transaction signatures in bundle")
            signatures.add(sig)
        
        return True
    
    def to_base64_transactions(self) -> List[str]:
<strong>        """Convert all transactions to base64 strings"""
</strong>        self.validate()
        
        base64_transactions = []
        for tx in self.transactions:
            serialized_tx = bytes(tx)
            base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
            base64_transactions.append(base64_tx)
        
        return base64_transactions
    
    def get_signatures(self) -> List[str]:
<strong>        """Get all transaction signatures"""
</strong>        return [str(tx.signatures[0]) for tx in self.transactions]

<strong># Build single transaction with tip
</strong>async def build_transaction_with_tip(
    payer: Keypair,
    recent_blockhash: Hash,
    tip_amount: int,
    instructions: List,
) -> Transaction:
<strong>    # Random tip wallet for load balancing
</strong>    tip_wallet = Pubkey.from_string(random.choice(NEXTBLOCK_TIP_WALLETS))
    
<strong>    # Create tip instruction (should be first)
</strong>    tip_instruction = transfer(
        TransferParams(
            from_pubkey=payer.pubkey(),
            to_pubkey=tip_wallet,
            lamports=tip_amount
        )
    )
    
<strong>    # Combine all instructions
</strong>    all_instructions = [tip_instruction] + instructions
    
<strong>    # Create and sign transaction
</strong>    message = MessageV0.try_compile(
        payer=payer.pubkey(),
        instructions=all_instructions,
        address_lookup_table_accounts=[],
        recent_blockhash=recent_blockhash,
    )
    
    transaction = Transaction.new_unsigned(message)
    transaction.sign([payer], recent_blockhash)
    
    return transaction

<strong># Submit batched transactions
</strong>async def submit_batched_transactions(
    # nextblock_client,  # Your generated gRPC client
    rpc_client: AsyncClient,
    signer: Keypair,
    transaction_specs: List[Tuple[List, int]],  # [(instructions, tip_amount), ...]
) -> str:
<strong>    """Submit multiple transactions as an atomic bundle"""
</strong>    
<strong>    # Get recent blockhash (same for all transactions in bundle)
</strong>    response = await rpc_client.get_latest_blockhash(commitment=Finalized)
    recent_blockhash = response.value.blockhash
    
<strong>    # Build transaction bundle
</strong>    bundle = TransactionBundle()
    
    for instructions, tip_amount in transaction_specs:
        transaction = await build_transaction_with_tip(
            signer,
            recent_blockhash,
            tip_amount,
            instructions
        )
        bundle.add_transaction(transaction)
    
<strong>    # Get base64 transactions for submission
</strong>    base64_transactions = bundle.to_base64_transactions()
    signatures = bundle.get_signatures()
    
<strong>    # Log transaction signatures
</strong>    for i, sig in enumerate(signatures):
        print(f"Transaction {i+1} signature: {sig}")
    
<strong>    # Submit bundle to NextBlock
</strong>    """ Uncomment when you have the generated gRPC client
    entries = [
        PostSubmitRequestEntry(
            transaction=TransactionMessage(content=base64_tx)
        )
        for base64_tx in base64_transactions
    ]
    
    request = PostSubmitBatchRequest(entries=entries)
    response = await nextblock_client.post_submit_batch_v2(request)
    
    print(f"Batch submitted successfully!")
    print(f"Bundle signature: {response.signature}")
    
    return response.signature
    """
    
<strong>    # Until your generated client is wired in, return a placeholder bundle signature
</strong>    bundle_signature = ""
    print(f"Local bundle prepared with {len(bundle.transactions)} transactions.")
    return bundle_signature

<strong># Build common transaction patterns
</strong>async def build_arbitrage_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
    dex_a_address: Pubkey,
    dex_b_address: Pubkey,
    trade_amount: int,
) -> str:
<strong>    """Build arbitrage bundle for cross-DEX trading"""
</strong>    
<strong>    # Transaction 1: Buy on DEX A
</strong>    buy_instructions = [
<strong>        # Add your DEX-specific buy instructions here
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_a_address,
            lamports=trade_amount
        ))
    ]
    
<strong>    # Transaction 2: Sell on DEX B
</strong>    sell_instructions = [
<strong>        # Add your DEX-specific sell instructions here
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=dex_b_address,
            lamports=trade_amount
        ))
    ]
    
    transaction_specs = [
        (buy_instructions, 2_000_000),   # Higher tip for arbitrage
        (sell_instructions, 2_000_000),  # Higher tip for arbitrage
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )

<strong># Build complex DeFi operation bundle
</strong>async def build_defi_operation_bundle(
    # nextblock_client,
    rpc_client: AsyncClient,
    signer: Keypair,
) -> str:
<strong>    """Build complex DeFi operation bundle"""
</strong>    
<strong>    # Transaction 1: Setup - Create token accounts
</strong>    setup_instructions = [
<strong>        # Add token account creation instructions
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("&#x3C;token-program-address>"),
            lamports=1_000_000  # Rent exemption
        ))
    ]
    
<strong>    # Transaction 2: Main operation - Execute swap
</strong>    swap_instructions = [
<strong>        # Add swap instructions (Jupiter, Raydium, etc.)
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("&#x3C;swap-program-address>"),
            lamports=0  # No SOL transfer for swap
        ))
    ]
    
<strong>    # Transaction 3: Cleanup - Stake or provide liquidity
</strong>    stake_instructions = [
<strong>        # Add staking/liquidity instructions
</strong>        transfer(TransferParams(
            from_pubkey=signer.pubkey(),
            to_pubkey=Pubkey.from_string("&#x3C;stake-pool-address>"),
            lamports=0
        ))
    ]
    
    transaction_specs = [
        (setup_instructions, 500_000),    # Setup tip
        (swap_instructions, 1_500_000),   # Main operation tip
        (stake_instructions, 750_000),    # Cleanup tip
    ]
    
    return await submit_batched_transactions(
        # nextblock_client,
        rpc_client,
        signer,
        transaction_specs
    )
</code></pre>

## Advanced Bundle Management

<pre class="language-python"><code class="lang-python"><strong># Bundle with conditional execution
</strong>class ConditionalBundle(TransactionBundle):
    def __init__(self):
        super().__init__()
        self.conditions: List[callable] = []
    
    def add_conditional_transaction(
        self, 
        transaction: Transaction, 
        condition: callable
    ) -> 'ConditionalBundle':
<strong>        """Add transaction that only executes if condition is met"""
</strong>        self.add_transaction(transaction)
        self.conditions.append(condition)
        return self
    
    async def evaluate_conditions(self, rpc_client: AsyncClient) -> List[bool]:
<strong>        """Evaluate all conditions before submission"""
</strong>        results = []
        for condition in self.conditions:
            try:
                result = await condition(rpc_client) if asyncio.iscoroutinefunction(condition) else condition()
                results.append(bool(result))
            except Exception as e:
                print(f"Condition evaluation failed: {e}")
                results.append(False)
        return results
    
    def filter_by_conditions(self, condition_results: List[bool]) -> 'TransactionBundle':
<strong>        """Create new bundle with only transactions that meet conditions"""
</strong>        filtered_bundle = TransactionBundle()
        
        for i, (transaction, condition_met) in enumerate(zip(self.transactions, condition_results)):
            if condition_met:
                filtered_bundle.add_transaction(transaction)
            else:
                print(f"Transaction {i+1} filtered out due to condition")
        
        return filtered_bundle

<strong># Bundle performance optimizer
</strong>class BundleOptimizer:
    def __init__(self):
        self.tip_multipliers = {
            "setup": 0.5,      # Lower priority
            "main": 1.5,       # Higher priority
            "cleanup": 0.75,   # Medium priority
            "arbitrage": 2.0,  # Highest priority
        }
    
    def optimize_tips(
        self, 
        base_tip: int, 
        transaction_types: List[str]
    ) -> List[int]:
<strong>        """Optimize tip amounts based on transaction types"""
</strong>        optimized_tips = []
        
        for tx_type in transaction_types:
            multiplier = self.tip_multipliers.get(tx_type, 1.0)
            optimized_tip = int(base_tip * multiplier)
            optimized_tips.append(optimized_tip)
        
        return optimized_tips
    
    def reorder_transactions(
        self, 
        transactions: List[Transaction], 
        transaction_types: List[str]
    ) -> Tuple[List[Transaction], List[str]]:
<strong>        """Reorder transactions for optimal execution"""
</strong><strong>        # Priority order: setup -> main -> cleanup
</strong>        priority_order = {"setup": 1, "main": 2, "cleanup": 3, "arbitrage": 0}
        
<strong>        # Create pairs and sort by priority
</strong>        tx_pairs = list(zip(transactions, transaction_types))
        tx_pairs.sort(key=lambda x: priority_order.get(x[1], 2))
        
<strong>        # Unzip the sorted pairs
</strong>        sorted_transactions, sorted_types = zip(*tx_pairs)
        return list(sorted_transactions), list(sorted_types)

<strong># Bundle status monitoring
</strong>@dataclass
class BundleStatus:
    bundle_id: str
    transaction_count: int
    submitted_at: float
    signatures: List[str]
    status: str = "pending"  # pending, confirmed, failed
    
    def is_complete(self) -> bool:
        return self.status in ["confirmed", "failed"]

class BundleTracker:
    def __init__(self):
        self.bundles: Dict[str, BundleStatus] = {}
    
    def track_bundle(self, bundle_status: BundleStatus):
<strong>        """Start tracking a bundle"""
</strong>        self.bundles[bundle_status.bundle_id] = bundle_status
    
    async def check_bundle_status(
        self, 
        bundle_id: str, 
        rpc_client: AsyncClient
    ) -> Optional[BundleStatus]:
<strong>        """Check the status of a tracked bundle"""
</strong>        if bundle_id not in self.bundles:
            return None
        
        bundle_status = self.bundles[bundle_id]
        
<strong>        # Check if all transactions are confirmed
</strong>        confirmed_count = 0
        for signature in bundle_status.signatures:
            try:
<strong>                # Check transaction status
</strong>                # response = await rpc_client.get_signature_status(signature)
                # if response.value and response.value.confirmation_status:
                #     confirmed_count += 1
                confirmed_count += 1  # Mock confirmation
            except Exception as e:
                print(f"Failed to check signature {signature}: {e}")
        
<strong>        # Update bundle status
</strong>        if confirmed_count == bundle_status.transaction_count:
            bundle_status.status = "confirmed"
        elif time.time() - bundle_status.submitted_at > 60:  # Timeout after 60 seconds
            bundle_status.status = "failed"
        
        return bundle_status
</code></pre>

## Usage Examples

<pre class="language-python"><code class="lang-python"><strong>async def main():
</strong><strong>    # Initialize clients
</strong>    rpc_client = AsyncClient("https://api.mainnet-beta.solana.com")
    signer = Keypair()  # Use your actual keypair
    
<strong>    # Connect to NextBlock (see connection.md)
</strong>    # config = NextBlockConfig.from_env()
    # async with NextBlockConnectionManager(config) as manager:
    #     nextblock_client = manager.client
    
    try:
<strong>        # Example 1: Basic batch submission
</strong>        transaction_specs = [
<strong>            # Setup transaction
</strong>            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("&#x3C;recipient1>"),
                lamports=100_000
            ))], 500_000),  # 0.0005 SOL tip
            
<strong>            # Main transaction
</strong>            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("&#x3C;recipient2>"),
                lamports=200_000
            ))], 1_000_000),  # 0.001 SOL tip
            
<strong>            # Cleanup transaction
</strong>            ([transfer(TransferParams(
                from_pubkey=signer.pubkey(),
                to_pubkey=Pubkey.from_string("&#x3C;recipient3>"),
                lamports=50_000
            ))], 500_000),  # 0.0005 SOL tip
        ]
        
        bundle_signature = await submit_batched_transactions(
            # nextblock_client,
            rpc_client,
            signer,
            transaction_specs
        )
        print(f"Basic batch: {bundle_signature}")
        
<strong>        # Example 2: Arbitrage bundle
</strong>        arb_signature = await build_arbitrage_bundle(
            # nextblock_client,
            rpc_client,
            signer,
            Pubkey.from_string("&#x3C;dex-a-address>"),
            Pubkey.from_string("&#x3C;dex-b-address>"),
            1_000_000  # 0.001 SOL trade
        )
        print(f"Arbitrage bundle: {arb_signature}")
        
<strong>        # Example 3: Optimized DeFi bundle
</strong>        defi_signature = await build_defi_operation_bundle(
            # nextblock_client,
            rpc_client,
            signer
        )
        print(f"DeFi bundle: {defi_signature}")
        
<strong>        # Example 4: Bundle with optimization
</strong>        optimizer = BundleOptimizer()
        base_tip = 1_000_000
        transaction_types = ["setup", "main", "cleanup"]
        optimized_tips = optimizer.optimize_tips(base_tip, transaction_types)
        
        print(f"Optimized tips: {optimized_tips}")
        
    finally:
        await rpc_client.close()

if __name__ == "__main__":
    asyncio.run(main())
</code></pre>

## Best Practices

1. **Bundle size limits**: Keep bundles between 2-4 transactions for optimal success rates
2. **Transaction ordering**: Setup → Main operations → Cleanup
3. **Progressive tipping**: Use higher tips for more critical transactions
4. **Error handling**: Validate bundles before submission
5. **Performance monitoring**: Track bundle success rates and timing
6. **Conditional execution**: Filter transactions based on current conditions
7. **Tip optimization**: Adjust tips based on transaction importance and network conditions


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.nextblock.io/api/examples/python/submit-batched-transactions.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
