Skip to content

Commit 3230bb6

Browse files
authored
Merge pull request #31 from picklerick2349/master
Adding support for verifying Tower BFT consensus
2 parents e4c72a6 + 3b63e56 commit 3230bb6

4 files changed

Lines changed: 305 additions & 19 deletions

File tree

tinydancer/src/consensus.rs

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster};
2+
use crate::sampler::{ArchiveConfig, SlotSubscribeResponse};
3+
use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred};
4+
use anyhow::anyhow;
5+
use async_trait::async_trait;
6+
use crossbeam::channel::{Receiver, Sender};
7+
use futures::Sink;
8+
use itertools::Itertools;
9+
use rand::distributions::Uniform;
10+
use rand::prelude::*;
11+
use rayon::prelude::*;
12+
use reqwest::Request;
13+
use rocksdb::{ColumnFamily, Options as RocksOptions, DB};
14+
use serde::de::DeserializeOwned;
15+
use solana_ledger::shred::{ShredId, ShredType};
16+
use solana_ledger::{
17+
ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash},
18+
blockstore::Blockstore,
19+
// blockstore_db::columns::ShredCode,
20+
shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE},
21+
};
22+
use solana_sdk::hash::hashv;
23+
use solana_sdk::{
24+
clock::Slot,
25+
genesis_config::ClusterType,
26+
hash::{Hash, HASH_BYTES},
27+
packet::PACKET_DATA_SIZE,
28+
pubkey::{Pubkey, PUBKEY_BYTES},
29+
signature::{Signable, Signature, Signer, SIGNATURE_BYTES},
30+
signer::keypair::Keypair,
31+
timing::{duration_as_ms, timestamp},
32+
};
33+
use std::str::FromStr;
34+
use std::sync::atomic::{AtomicU32, Ordering};
35+
use std::sync::{Arc, Mutex, MutexGuard};
36+
use std::{error::Error, ops::Add};
37+
use std::{
38+
net::{SocketAddr, UdpSocket},
39+
thread::Builder,
40+
};
41+
use tiny_logger::logs::{debug, error, info};
42+
use tokio::{
43+
sync::mpsc::UnboundedSender,
44+
task::{JoinError, JoinHandle},
45+
sync::Mutex as TokioMutex,
46+
};
47+
use tungstenite::{connect, Message};
48+
use url::Url;
49+
use serde_derive::Deserialize;
50+
use serde_derive::Serialize;
51+
52+
pub struct ConsensusService {
53+
consensus_indices: Vec<u64>,
54+
consensus_handler: JoinHandle<()>,
55+
}
56+
57+
pub struct ConsensusServiceConfig {
58+
pub cluster: Cluster,
59+
pub archive_config: ArchiveConfig,
60+
pub instance: Arc<rocksdb::DB>,
61+
pub status_consensus: Arc<TokioMutex<ClientStatus>>,
62+
pub sample_qty: usize,
63+
}
64+
65+
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
66+
#[serde(rename_all = "camelCase")]
67+
pub struct RpcBlockCommitment<T> {
68+
pub commitment: Option<T>,
69+
pub total_stake: u64,
70+
}
71+
72+
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
73+
#[serde(rename_all = "camelCase")]
74+
pub struct GetCommittmentResponse {
75+
pub jsonrpc: String,
76+
pub result: RpcBlockCommitment<BlockCommitmentArray>,
77+
pub id: i64,
78+
}
79+
80+
pub const MAX_LOCKOUT_HISTORY: usize = 31;
81+
pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1];
82+
83+
pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64;
84+
85+
#[async_trait]
86+
impl ClientService<ConsensusServiceConfig> for ConsensusService {
87+
type ServiceError = tokio::task::JoinError;
88+
89+
fn new(config: ConsensusServiceConfig) -> Self {
90+
let consensus_handler = tokio::spawn(async move {
91+
let rpc_url = endpoint(config.cluster);
92+
let pub_sub = convert_to_websocket!(rpc_url);
93+
94+
let mut threads = Vec::default();
95+
96+
let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::<u64>();
97+
98+
let status_arc = config.status_consensus.clone();
99+
100+
// waits on new slots => triggers slot_verify_loop
101+
threads.push(tokio::spawn(slot_update_loop(
102+
slot_update_tx,
103+
pub_sub,
104+
config.status_consensus,
105+
)));
106+
107+
// verify slot votes
108+
threads.push(tokio::spawn(slot_verify_loop(
109+
slot_update_rx,
110+
rpc_url,
111+
status_arc,
112+
)));
113+
114+
115+
for thread in threads {
116+
thread.await;
117+
}
118+
});
119+
120+
Self {
121+
consensus_handler,
122+
consensus_indices: Vec::default(),
123+
}
124+
}
125+
126+
async fn join(self) -> std::result::Result<(), Self::ServiceError> {
127+
self.consensus_handler.await
128+
}
129+
}
130+
131+
pub async fn slot_update_loop(
132+
slot_update_tx: Sender<u64>,
133+
pub_sub: String,
134+
status_sampler: Arc<TokioMutex<ClientStatus>>,
135+
) -> anyhow::Result<()> {
136+
let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) {
137+
Ok((socket, _response)) => Some((socket, _response)),
138+
Err(_) => {
139+
let mut status = status_sampler.lock().await;
140+
*status = ClientStatus::Crashed(String::from("Client can't connect to socket"));
141+
None
142+
}
143+
};
144+
145+
if result.is_none() {
146+
return Err(anyhow!(""));
147+
}
148+
149+
let (mut socket, _response) = result.unwrap();
150+
151+
socket.write_message(Message::Text(
152+
r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(),
153+
))?;
154+
155+
loop {
156+
match socket.read_message() {
157+
Ok(msg) => {
158+
let res = serde_json::from_str::<SlotSubscribeResponse>(msg.to_string().as_str());
159+
160+
// info!("res: {:?}", msg.to_string().as_str());
161+
if let Ok(res) = res {
162+
match slot_update_tx.send(res.params.result.root as u64) {
163+
Ok(_) => {
164+
info!("slot updated: {:?}", res.params.result.root);
165+
}
166+
Err(e) => {
167+
info!("error here: {:?} {:?}", e, res.params.result.root as u64);
168+
continue; // @TODO: we should add retries here incase send fails for some reason
169+
}
170+
}
171+
}
172+
}
173+
Err(e) => info!("err: {:?}", e),
174+
}
175+
}
176+
}
177+
178+
// verifies the total vote on the slot > 2/3
179+
fn verify_slot(slot_commitment: RpcBlockCommitment<BlockCommitmentArray>) -> bool {
180+
let commitment_array = &slot_commitment.commitment;
181+
let total_stake = &slot_commitment.total_stake;
182+
let sum: u64 = commitment_array.iter().flatten().sum();
183+
184+
if (sum as f64 / *total_stake as f64) > VOTE_THRESHOLD_SIZE {
185+
true
186+
} else {
187+
false
188+
}
189+
}
190+
191+
pub async fn slot_verify_loop(
192+
slot_update_rx: Receiver<u64>,
193+
endpoint: String,
194+
status_sampler: Arc<tokio::sync::Mutex<ClientStatus>>,
195+
) -> anyhow::Result<()> {
196+
loop {
197+
let mut status = status_sampler.lock().await;
198+
if let ClientStatus::Crashed(_) = &*status {
199+
return Err(anyhow!("Client crashed"));
200+
} else {
201+
*status = ClientStatus::Active(String::from(
202+
"Monitoring Tinydancer: Verifying consensus",
203+
));
204+
}
205+
if let Ok(slot) = slot_update_rx.recv() {
206+
let slot_commitment_result = request_slot_voting(slot, &endpoint).await;
207+
208+
if let Err(e) = slot_commitment_result {
209+
println!("Error {}", e);
210+
info!("{}", e);
211+
continue;
212+
}
213+
214+
let slot_commitment = slot_commitment_result.unwrap();
215+
216+
let verified = verify_slot(slot_commitment.result);
217+
218+
if verified {
219+
info!("slot {:?} verified ", slot);
220+
} else {
221+
info!("slot {:?} failed to verified ", slot);
222+
info!("sample INVALID for slot : {:?}", slot);
223+
}
224+
}
225+
}
226+
}
227+
228+
pub async fn request_slot_voting(
229+
slot: u64,
230+
endpoint: &String,
231+
) -> Result<GetCommittmentResponse, serde_json::Error> {
232+
233+
let request = serde_json::json!({
234+
"jsonrpc": "2.0",
235+
"id": 1,
236+
"method": "getBlockCommitment",
237+
"params": [
238+
slot
239+
]
240+
})
241+
.to_string();
242+
243+
let res = send_rpc_call!(endpoint, request);
244+
245+
serde_json::from_str::<GetCommittmentResponse>(&res)
246+
}

tinydancer/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ mod macros;
4848
use colored::Colorize;
4949
mod rpc_wrapper;
5050
mod sampler;
51+
mod consensus;
5152
mod ui;
5253

5354
use anyhow::{anyhow, Result};
@@ -85,6 +86,10 @@ pub enum Commands {
8586
/// Duration after which shreds will be purged
8687
#[clap(required = false, default_value_t = 10000000)]
8788
shred_archive_duration: u64,
89+
90+
/// Run the node in consensus mode
91+
#[clap(long, short)]
92+
consensus_mode: bool,
8893
},
8994
/// Verify the samples for a single slot
9095
Verify {
@@ -144,6 +149,7 @@ async fn main() -> Result<()> {
144149
archive_path,
145150
shred_archive_duration,
146151
tui_monitor,
152+
consensus_mode
147153
} => {
148154
let config_file =
149155
get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?;
@@ -152,6 +158,7 @@ async fn main() -> Result<()> {
152158
rpc_endpoint: get_cluster(config_file.cluster),
153159
sample_qty,
154160
tui_monitor,
161+
consensus_mode,
155162
log_path: config_file.log_path,
156163
archive_config: {
157164
archive_path

tinydancer/src/sampler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ pub async fn request_shreds(
155155
serde_json::from_str::<GetShredResponse>(&res)
156156
}
157157

158-
async fn slot_update_loop(
158+
pub async fn slot_update_loop(
159159
slot_update_tx: Sender<u64>,
160160
pub_sub: String,
161161
status_sampler: Arc<Mutex<ClientStatus>>,
@@ -535,6 +535,7 @@ pub struct GetShredResponse {
535535
pub result: GetShredResult,
536536
pub id: i64,
537537
}
538+
538539
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
539540
#[serde(rename_all = "camelCase")]
540541
pub struct GetShredResult {

0 commit comments

Comments
 (0)