Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions graph/src/ipfs/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait IpfsClient: Send + Sync + 'static {
retry_policy: RetryPolicy,
) -> IpfsResult<BoxStream<'static, IpfsResult<Bytes>>> {
let fut = retry_policy
.create("IPFS.cat_stream", self.logger())
.create(format!("IPFS.cat_stream[{}]", path), self.logger())
.no_timeout()
.run({
let path = path.to_owned();
Expand Down Expand Up @@ -67,7 +67,7 @@ pub trait IpfsClient: Send + Sync + 'static {
retry_policy: RetryPolicy,
) -> IpfsResult<Bytes> {
let fut = retry_policy
.create("IPFS.cat", self.logger())
.create(format!("IPFS.cat[{}]", path), self.logger())
.no_timeout()
.run({
let path = path.to_owned();
Expand Down Expand Up @@ -100,7 +100,7 @@ pub trait IpfsClient: Send + Sync + 'static {
retry_policy: RetryPolicy,
) -> IpfsResult<Bytes> {
let fut = retry_policy
.create("IPFS.get_block", self.logger())
.create(format!("IPFS.get_block[{}]", path), self.logger())
.no_timeout()
.run({
let path = path.to_owned();
Expand Down
82 changes: 82 additions & 0 deletions graph/src/ipfs/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,86 @@ mod tests {

assert_eq!(bytes.as_ref(), b"some data");
}

#[tokio::test]
async fn operation_names_include_cid_for_debugging() {
use slog::{o, Drain, Logger, Record};
use std::sync::{Arc, Mutex};

// Custom drain to capture log messages
struct LogCapture {
messages: Arc<Mutex<Vec<String>>>,
}

impl Drain for LogCapture {
type Ok = ();
type Err = std::io::Error;

fn log(
&self,
record: &Record,
_: &slog::OwnedKVList,
) -> std::result::Result<Self::Ok, Self::Err> {
let message = format!("{}", record.msg());
self.messages.lock().unwrap().push(message);
Ok(())
}
}

let captured_messages = Arc::new(Mutex::new(Vec::new()));
let drain = LogCapture {
messages: captured_messages.clone(),
};
let logger = Logger::root(drain.fuse(), o!());

let server = mock_server().await;
let client = Arc::new(IpfsGatewayClient::new_unchecked(server.uri(), &logger).unwrap());

// Set up mock to fail twice then succeed to trigger retry with warning logs
mock_get()
.respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR))
.up_to_n_times(2)
.expect(2)
.mount(&server)
.await;

mock_get()
.respond_with(ResponseTemplate::new(StatusCode::OK).set_body_bytes(b"data"))
.expect(1)
.mount(&server)
.await;

let path = make_path();

// This should trigger retry logs because we set up failures first
let _result = client
.cat(&path, usize::MAX, None, RetryPolicy::NonDeterministic)
.await
.unwrap();

// Check that the captured log messages include the CID
let messages = captured_messages.lock().unwrap();
let retry_messages: Vec<_> = messages
.iter()
.filter(|msg| msg.contains("Trying again after"))
.collect();

assert!(
!retry_messages.is_empty(),
"Expected retry messages but found none. All messages: {:?}",
*messages
);

// Verify that the operation name includes the CID
let expected_cid = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn";
let has_cid_in_operation = retry_messages
.iter()
.any(|msg| msg.contains(&format!("IPFS.cat[{}]", expected_cid)));

assert!(
has_cid_in_operation,
"Expected operation name to include CID [{}] in retry messages: {:?}",
expected_cid, retry_messages
);
}
}