Rust Example

Complete Rust implementation for streaming Solana transactions from NextBlock's TX Stream API using Tonic.

The example is hosted at github.com/nextblock-ag/nextblock-stream-examples

Prerequisites

Add these dependencies to your Cargo.toml:

[package]
name = "rust-example"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow = "1.0.99"
bincode = "1.3.3"
bs58 = "0.5"
prost = "0.13"
prost-types = "0.13"
protobuf = "3.7.2"
rand = "0.8"
solana-sdk = "3.0.0"
time = "0.3"
tokio = { version = "1.39", features = ["rt-multi-thread", "macros"] }
tonic = { version = "0.12", features = ["gzip"] }

[build-dependencies]
tonic-build = "0.12"

Proto build script build.rs

use std::{env, path::PathBuf};

fn main() {
    let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());

    let repo_root = manifest_dir
        .parent()
        .expect("rust-example should have a parent directory")
        .to_path_buf();

    let proto = repo_root.join("stream.proto");
    if !proto.exists() {
        panic!("Proto not found at {}", proto.display());
    }

    let out_dir = manifest_dir.join("src/protos");

    println!("cargo:rerun-if-changed={}", proto.display());
    println!("cargo:rerun-if-changed={}", repo_root.display());

    std::fs::create_dir_all(&out_dir).expect("create src/protos");

    tonic_build::configure()
        .build_client(true)
        .build_server(false)
        .out_dir(out_dir)
        .compile_protos(
            &[proto.to_string_lossy().to_string()],
            &[repo_root.to_string_lossy().to_string()],
        )
        .expect("Failed to compile protos");
}

Example

use std::{
    str::FromStr,
    time::{Duration, SystemTime, UNIX_EPOCH},
};

use rand::Rng;
use solana_sdk::{
    pubkey::Pubkey, signature::Keypair, signer::Signer, transaction::VersionedTransaction,
};

use anyhow::{Context, Result, bail};
use tonic::{
    Request,
    transport::{Channel, Endpoint},
};

use crate::stream::{NextStreamSubscription, next_stream_service_client::NextStreamServiceClient};

#[path = "protos/stream.rs"]
pub mod stream;

/*
the auth message needs to be built clients-side.
it is a pipe-separated string made of:
1. domain that's being connected to (e.g. fra.stream.nextblock.io)
2. the publickey that sent the fee to strmuYvHKeA1qvHqooUpwUk2BFwaAmMbK9WXY9mh2GJ
3. a random nonce
4. the current unix timestamp

it is then signed by the supplied publickey.
*/

fn build_auth_message(domain: &str, pubkey: &Pubkey) -> String {
    let nonce: u64 = rand::thread_rng().r#gen();
    let ts = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs();
    format!("{}|{}|{}|{}", domain, pubkey.to_string(), nonce, ts)
}

async fn make_insecure_channel(domain: &str) -> Result<Channel> {
    let ep = Endpoint::from_shared(format!("http://{}", domain))?
        .http2_keep_alive_interval(Duration::from_secs(5))
        .keep_alive_while_idle(false);
    Ok(ep.connect().await?)
}

#[tokio::main]
async fn main() -> Result<()> {
    let domain = "fra.stream.nextblock.io:22221";
    let private_key_b58 = "YOUR_PRIVATE_KEY";
    if private_key_b58.is_empty() {
        bail!("Set `private_key_b58` to your base58-encoded Solana private key.");
    }

    let authentication_keypair = Keypair::from_base58_string(private_key_b58);
    let authentication_pubkey = authentication_keypair.pubkey();

    let authentication_message = build_auth_message(domain, &authentication_pubkey);
    let authentication_signature =
        authentication_keypair.sign_message(authentication_message.as_bytes());

    let channel = make_insecure_channel(domain).await?;
    let mut client = NextStreamServiceClient::new(channel);

    let req = NextStreamSubscription {
        authentication_publickey: authentication_pubkey.to_string(),
        authentication_message: authentication_message,
        authentication_signature: authentication_signature.to_string(),
        accounts: vec![
            Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8")
                .unwrap()
                .to_string(),
        ],
    };

    let mut stream = client
        .subscribe_next_stream(Request::new(req))
        .await?
        .into_inner();

    while let Some(msg) = stream.message().await.context("recv")? {
        if let Some(packet) = msg.packet {
            let tx: VersionedTransaction = bincode::deserialize(&packet.transaction)?;
            let first_sig = tx
                .signatures
                .get(0)
                .map(|s| s.to_string())
                .unwrap_or_else(|| "<no signatures>".to_string());
            println!("got new sig {} on slot {}", first_sig, packet.slot);
        }
    }
    Ok(())
}

Usage Examples

# Log signature + slot for all filtered transactions
cargo run

Available Endpoints

Choose the endpoint closest to your location:

  • Frankfurt: fra.stream.nextblock.io:22221

  • Amsterdam: amsterdam.stream.nextblock.io:22221

  • London: london.stream.nextblock.io:22221

  • Singapore: singapore.stream.nextblock.io:22221

  • Tokyo: tokyo.stream.nextblock.io:22221

  • New York: ny.stream.nextblock.io:22221

  • Salt Lake City: slc.stream.nextblock.io:22221

Best Practices

  1. Use insecure connections: Better performance with NextBlock's optimized network

  2. Filter efficiently: Only subscribe to programs you need to reduce bandwidth

  3. Handle reconnections: Implement automatic reconnection logic

  4. Process quickly: Avoid blocking the stream receiver

  5. Monitor performance: Track processing latency and throughput

  6. Implement graceful shutdown: Handle interrupt signals properly

  7. Use multiple endpoints: Implement redundancy for critical applications

Last updated