Submit Single Transaction
Example
import asyncio
import base64
import random
from typing import List, Optional
from dataclasses import dataclass
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.system_program import TransferParams, transfer
from solders.transaction import Transaction
from solders.message import MessageV0
from solders.hash import Hash
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Finalized
# NextBlock tip wallets for load balancing
NEXTBLOCK_TIP_WALLETS = [
"NEXTbLoCkB51HpLBLojQfpyVAMorm3zzKg7w9NFdqid",
"nextBLoCkPMgmG8ZgJtABeScP35qLa2AMCNKntAP7Xc",
"NextbLoCkVtMGcV47JzewQdvBpLqT9TxQFozQkN98pE",
"NexTbLoCkWykbLuB1NkjXgFWkX9oAtcoagQegygXXA2",
"NeXTBLoCKs9F1y5PJS9CKrFNNLU1keHW71rfh7KgA1X",
"NexTBLockJYZ7QD7p2byrUa6df8ndV2WSd8GkbWqfbb",
"neXtBLock1LeC67jYd1QdAa32kbVeubsfPNTJC1V5At",
"nEXTBLockYgngeRmRrjDV31mGSekVPqZoMGhQEZtPVG",
]
# Get random tip wallet for load balancing
def get_random_nextblock_tip_wallet() -> Pubkey:
wallet_str = random.choice(NEXTBLOCK_TIP_WALLETS)
return Pubkey.from_string(wallet_str)
# Transaction submission parameters
@dataclass
class SubmissionOptions:
skip_preflight: bool = True
front_running_protection: bool = False
revert_on_fail: bool = False
disable_retries: bool = False
snipe_transaction: bool = False
# Build transaction with tip
async def build_transaction_with_tip(
payer: Keypair,
recent_blockhash: Hash,
tip_amount: int,
instructions: List,
) -> Transaction:
# Create tip instruction (should be first)
tip_wallet = get_random_nextblock_tip_wallet()
tip_instruction = transfer(
TransferParams(
from_pubkey=payer.pubkey(),
to_pubkey=tip_wallet,
lamports=tip_amount
)
)
# Combine all instructions
all_instructions = [tip_instruction] + instructions
# Create and sign transaction
message = MessageV0.try_compile(
payer=payer.pubkey(),
instructions=all_instructions,
address_lookup_table_accounts=[],
recent_blockhash=recent_blockhash,
)
transaction = Transaction.new_unsigned(message)
transaction.sign([payer], recent_blockhash)
return transaction
# Submit single transaction to NextBlock
async def submit_single_transaction(
# nextblock_client, # Your generated gRPC client
rpc_client: AsyncClient,
signer: Keypair,
recipient: Pubkey,
transfer_amount: int,
tip_amount: int,
options: SubmissionOptions = None,
) -> str:
if options is None:
options = SubmissionOptions()
# Get recent blockhash
response = await rpc_client.get_latest_blockhash(commitment=Finalized)
recent_blockhash = response.value.blockhash
# Create transfer instruction
transfer_instruction = transfer(
TransferParams(
from_pubkey=signer.pubkey(),
to_pubkey=recipient,
lamports=transfer_amount
)
)
# Build transaction with tip
transaction = await build_transaction_with_tip(
signer,
recent_blockhash,
tip_amount,
[transfer_instruction]
)
# Convert to base64 for submission
serialized_tx = bytes(transaction)
base64_tx = base64.b64encode(serialized_tx).decode('utf-8')
# Submit to NextBlock
""" Uncomment when you have the generated gRPC client
request = PostSubmitRequest(
transaction=TransactionMessage(content=base64_tx),
skip_pre_flight=options.skip_preflight,
snipe_transaction=options.snipe_transaction,
front_running_protection=options.front_running_protection,
disable_retries=options.disable_retries,
revert_on_fail=options.revert_on_fail,
)
response = await nextblock_client.post_submit_v2(request)
print(f"Transaction submitted successfully!")
print(f"Signature: {response.signature}")
print(f"UUID: {response.uuid}")
return response.signature
"""
# Mock response for demonstration
signature = str(transaction.signatures[0])
print(f"Transaction built successfully: {signature}")
return signature
# Submit with optimal tip calculation
async def submit_with_optimal_tip(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
recipient: Pubkey,
transfer_amount: int,
priority_level: str = "normal",
) -> str:
# Calculate optimal tip based on priority level
tip_amounts = {
"conservative": 500_000, # 0.0005 SOL
"normal": 1_000_000, # 0.001 SOL
"aggressive": 2_000_000, # 0.002 SOL
"priority": 5_000_000, # 0.005 SOL
}
tip_amount = tip_amounts.get(priority_level, tip_amounts["normal"])
return await submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
transfer_amount,
tip_amount
)
# Batch multiple single transactions
async def submit_multiple_single_transactions(
# nextblock_client,
rpc_client: AsyncClient,
signer: Keypair,
transactions_data: List[tuple], # [(recipient, amount, tip), ...]
) -> List[str]:
"""Submit multiple single transactions concurrently"""
tasks = []
for recipient, transfer_amount, tip_amount in transactions_data:
task = submit_single_transaction(
# nextblock_client,
rpc_client,
signer,
recipient,
transfer_amount,
tip_amount
)
tasks.append(task)
# Execute all transactions concurrently
signatures = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results
successful_signatures = []
for i, result in enumerate(signatures):
if isinstance(result, Exception):
print(f"Transaction {i+1} failed: {result}")
else:
successful_signatures.append(result)
print(f"Transaction {i+1} successful: {result}")
return successful_signaturesAdvanced Features
Usage Examples
Best Practices
Last updated