Rust Example
Complete Rust implementation for streaming Solana transactions from NextBlock's TX Stream API using Tonic.
The example is hosted at github.com/nextblock-ag/nextblock-stream-examples
Prerequisites
Add these dependencies to your Cargo.toml
:
[package]
name = "rust-example"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.99"
bincode = "1.3.3"
bs58 = "0.5"
prost = "0.13"
prost-types = "0.13"
protobuf = "3.7.2"
rand = "0.8"
solana-sdk = "3.0.0"
time = "0.3"
tokio = { version = "1.39", features = ["rt-multi-thread", "macros"] }
tonic = { version = "0.12", features = ["gzip"] }
[build-dependencies]
tonic-build = "0.12"
Proto build script build.rs
use std::{env, path::PathBuf};
fn main() {
let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());
let repo_root = manifest_dir
.parent()
.expect("rust-example should have a parent directory")
.to_path_buf();
let proto = repo_root.join("stream.proto");
if !proto.exists() {
panic!("Proto not found at {}", proto.display());
}
let out_dir = manifest_dir.join("src/protos");
println!("cargo:rerun-if-changed={}", proto.display());
println!("cargo:rerun-if-changed={}", repo_root.display());
std::fs::create_dir_all(&out_dir).expect("create src/protos");
tonic_build::configure()
.build_client(true)
.build_server(false)
.out_dir(out_dir)
.compile_protos(
&[proto.to_string_lossy().to_string()],
&[repo_root.to_string_lossy().to_string()],
)
.expect("Failed to compile protos");
}
Example
use std::{
str::FromStr,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use rand::Rng;
use solana_sdk::{
pubkey::Pubkey, signature::Keypair, signer::Signer, transaction::VersionedTransaction,
};
use anyhow::{Context, Result, bail};
use tonic::{
Request,
transport::{Channel, Endpoint},
};
use crate::stream::{NextStreamSubscription, next_stream_service_client::NextStreamServiceClient};
#[path = "protos/stream.rs"]
pub mod stream;
/*
the auth message needs to be built clients-side.
it is a pipe-separated string made of:
1. domain that's being connected to (e.g. fra.stream.nextblock.io)
2. the publickey that sent the fee to strmuYvHKeA1qvHqooUpwUk2BFwaAmMbK9WXY9mh2GJ
3. a random nonce
4. the current unix timestamp
it is then signed by the supplied publickey.
*/
fn build_auth_message(domain: &str, pubkey: &Pubkey) -> String {
let nonce: u64 = rand::thread_rng().r#gen();
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
format!("{}|{}|{}|{}", domain, pubkey.to_string(), nonce, ts)
}
async fn make_insecure_channel(domain: &str) -> Result<Channel> {
let ep = Endpoint::from_shared(format!("http://{}", domain))?
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_while_idle(false);
Ok(ep.connect().await?)
}
#[tokio::main]
async fn main() -> Result<()> {
let domain = "fra.stream.nextblock.io:22221";
let private_key_b58 = "YOUR_PRIVATE_KEY";
if private_key_b58.is_empty() {
bail!("Set `private_key_b58` to your base58-encoded Solana private key.");
}
let authentication_keypair = Keypair::from_base58_string(private_key_b58);
let authentication_pubkey = authentication_keypair.pubkey();
let authentication_message = build_auth_message(domain, &authentication_pubkey);
let authentication_signature =
authentication_keypair.sign_message(authentication_message.as_bytes());
let channel = make_insecure_channel(domain).await?;
let mut client = NextStreamServiceClient::new(channel);
let req = NextStreamSubscription {
authentication_publickey: authentication_pubkey.to_string(),
authentication_message: authentication_message,
authentication_signature: authentication_signature.to_string(),
accounts: vec![
Pubkey::from_str("675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8")
.unwrap()
.to_string(),
],
};
let mut stream = client
.subscribe_next_stream(Request::new(req))
.await?
.into_inner();
while let Some(msg) = stream.message().await.context("recv")? {
if let Some(packet) = msg.packet {
let tx: VersionedTransaction = bincode::deserialize(&packet.transaction)?;
let first_sig = tx
.signatures
.get(0)
.map(|s| s.to_string())
.unwrap_or_else(|| "<no signatures>".to_string());
println!("got new sig {} on slot {}", first_sig, packet.slot);
}
}
Ok(())
}
Usage Examples
# Log signature + slot for all filtered transactions
cargo run
Available Endpoints
Choose the endpoint closest to your location:
Frankfurt:
fra.stream.nextblock.io:22221
Amsterdam:
amsterdam.stream.nextblock.io:22221
London:
london.stream.nextblock.io:22221
Singapore:
singapore.stream.nextblock.io:22221
Tokyo:
tokyo.stream.nextblock.io:22221
New York:
ny.stream.nextblock.io:22221
Salt Lake City:
slc.stream.nextblock.io:22221
Best Practices
Use insecure connections: Better performance with NextBlock's optimized network
Filter efficiently: Only subscribe to programs you need to reduce bandwidth
Handle reconnections: Implement automatic reconnection logic
Process quickly: Avoid blocking the stream receiver
Monitor performance: Track processing latency and throughput
Implement graceful shutdown: Handle interrupt signals properly
Use multiple endpoints: Implement redundancy for critical applications
Last updated