diff --git a/Cargo.lock b/Cargo.lock index c4fe169fa46..661f73a750d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -454,6 +454,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "backon" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd0b50b1b78dbadd44ab18b3c794e496f3a139abb9fbc27d9c94c4eebbb96496" +dependencies = [ + "fastrand", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -766,6 +775,20 @@ dependencies = [ "unreachable", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util 0.7.11", +] + [[package]] name = "console" version = "0.15.11" @@ -1654,9 +1677,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1669,9 +1692,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1679,15 +1702,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1696,15 +1719,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1713,15 +1736,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-timer" @@ -1731,9 +1754,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures 0.1.31", "futures-channel", @@ -1880,7 +1903,7 @@ dependencies = [ "envconfig", "ethabi", "futures 0.1.31", - "futures 0.3.30", + "futures 0.3.31", "graph_derive", "graphql-parser", "hex", @@ -1906,6 +1929,7 @@ dependencies = [ "prost", "prost-types", "rand 0.9.1", + "redis", "regex", "reqwest", "semver", @@ -2280,7 +2304,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" dependencies = [ - "combine", + "combine 3.8.1", "thiserror 1.0.61", ] @@ -3036,7 +3060,7 @@ version = "18.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb" dependencies = [ - "futures 0.3.30", + "futures 0.3.31", "futures-executor", "futures-util", "log", @@ -3472,7 +3496,7 @@ dependencies = [ "bytes", "chrono", "form_urlencoded", - "futures 0.3.30", + "futures 0.3.31", "http 1.3.1", "http-body-util", "humantime", @@ -4187,6 +4211,31 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bc1ea653e0b2e097db3ebb5b7f678be339620b8041f66b30a308c1d45d36a7f" +dependencies = [ + "arc-swap", + "backon", + "bytes", + "cfg-if 1.0.0", + "combine 4.6.7", + "futures-channel", + "futures-util", + "itoa", + "num-bigint 0.4.6", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-util 0.7.11", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -4704,6 +4753,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -4862,7 +4917,7 @@ checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" dependencies = [ "base64 0.13.1", "bytes", - "futures 0.3.30", + "futures 0.3.31", "httparse", "log", "rand 0.8.5", @@ -6472,7 +6527,7 @@ dependencies = [ "derive_more 0.99.19", "ethabi", "ethereum-types", - "futures 0.3.30", + "futures 0.3.31", "futures-timer", "headers", "hex", @@ -6846,7 +6901,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "deadpool", - "futures 0.3.30", + "futures 0.3.31", "http 1.3.1", "http-body-util", "hyper 1.6.0", diff --git a/Cargo.toml b/Cargo.toml index 6a0a8a955fd..9953b433fe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ itertools = "0.13.0" lazy_static = "1.5.0" prost = "0.13" prost-types = "0.13" +redis = { version = "0.31.0", features = ["aio", "connection-manager", "tokio-comp"] } regex = "1.5.4" reqwest = "0.12.15" serde = { version = "1.0.126", features = ["rc"] } diff --git a/docs/environment-variables.md b/docs/environment-variables.md index e7ce5b028c8..747364dd0c4 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -91,6 +91,12 @@ those. file not found or logical issue working as a safety mechanism to prevent infinite spamming of IPFS servers and network congestion (default: 100 000). +- `GRAPH_IPFS_CACHE_LOCATION`: When set, files retrieved from IPFS will be + cached in that location; future accesses to the same file will be served + from cache rather than IPFS. This can either be a URL starting with + `redis://`, in which case there must be a Redis instance running at that + URL, or an absolute file system path which must be a directory writable + by the `graph-node` process (experimental) ## GraphQL diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 452ede55ddc..496c36dd62e 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -43,6 +43,7 @@ num-bigint = { version = "=0.2.6", features = ["serde"] } num-integer = { version = "=0.1.46" } num-traits = "=0.2.19" rand.workspace = true +redis = { workspace = true } regex = "1.5.4" semver = { version = "1.0.23", features = ["serde"] } serde = { workspace = true } @@ -85,7 +86,7 @@ tonic = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } -futures03 = { version = "0.3.1", package = "futures", features = ["compat"] } +futures03 = { version = "0.3.31", package = "futures", features = ["compat"] } wasmparser = "0.118.1" thiserror = "2.0.12" parking_lot = "0.12.3" diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 9ecf4ff02e3..9f7ded84a67 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::sync::Mutex; use std::time::Duration; use anyhow::anyhow; @@ -9,7 +8,6 @@ use derivative::Derivative; use futures03::compat::Stream01CompatExt; use futures03::stream::StreamExt; use futures03::stream::TryStreamExt; -use lru_time_cache::LruCache; use serde_json::Value; use crate::derive::CheapClone; @@ -30,13 +28,9 @@ pub struct IpfsResolver { #[derivative(Debug = "ignore")] client: Arc, - #[derivative(Debug = "ignore")] - cache: Arc>>>, - timeout: Duration, max_file_size: usize, max_map_file_size: usize, - max_cache_file_size: usize, /// When set to `true`, it means infinite retries, ignoring the timeout setting. retry: bool, @@ -48,13 +42,9 @@ impl IpfsResolver { Self { client, - cache: Arc::new(Mutex::new(LruCache::with_capacity( - env.max_ipfs_cache_size as usize, - ))), timeout: env.ipfs_timeout, max_file_size: env.max_ipfs_file_bytes, max_map_file_size: env.max_ipfs_map_file_size, - max_cache_file_size: env.max_ipfs_cache_file_size, retry: false, } } @@ -74,18 +64,10 @@ impl LinkResolverTrait for IpfsResolver { Box::new(s) } - async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { + async fn cat(&self, _logger: &Logger, link: &Link) -> Result, Error> { let path = ContentPath::new(&link.link)?; let timeout = self.timeout; let max_file_size = self.max_file_size; - let max_cache_file_size = self.max_cache_file_size; - - if let Some(data) = self.cache.lock().unwrap().get(&path) { - trace!(logger, "IPFS cat cache hit"; "hash" => path.to_string()); - return Ok(data.to_owned()); - } - - trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string()); let (timeout, retry_policy) = if self.retry { (None, RetryPolicy::NonDeterministic) @@ -100,21 +82,6 @@ impl LinkResolverTrait for IpfsResolver { .await? .to_vec(); - if data.len() <= max_cache_file_size { - let mut cache = self.cache.lock().unwrap(); - - if !cache.contains_key(&path) { - cache.insert(path.clone(), data.clone()); - } - } else { - debug!( - logger, - "IPFS file too large for cache"; - "path" => path.to_string(), - "size" => data.len(), - ); - } - Ok(data) } diff --git a/graph/src/env/mappings.rs b/graph/src/env/mappings.rs index 99f97d0c8f2..da177644adb 100644 --- a/graph/src/env/mappings.rs +++ b/graph/src/env/mappings.rs @@ -1,7 +1,9 @@ use std::fmt; +use std::path::PathBuf; -use super::*; +use anyhow::anyhow; +use super::*; #[derive(Clone)] pub struct EnvVarsMapping { /// Forces the cache eviction policy to take its own memory overhead into account. @@ -58,6 +60,9 @@ pub struct EnvVarsMapping { /// Set by the environment variable `GRAPH_IPFS_MAX_ATTEMPTS`. Defaults to 100000. pub ipfs_max_attempts: usize, + /// Set by the flag `GRAPH_IPFS_CACHE_LOCATION`. + pub ipfs_cache_location: Option, + /// Set by the flag `GRAPH_ALLOW_NON_DETERMINISTIC_IPFS`. Off by /// default. pub allow_non_deterministic_ipfs: bool, @@ -82,9 +87,17 @@ impl fmt::Debug for EnvVarsMapping { } } -impl From for EnvVarsMapping { - fn from(x: InnerMappingHandlers) -> Self { - Self { +impl TryFrom for EnvVarsMapping { + type Error = anyhow::Error; + + fn try_from(x: InnerMappingHandlers) -> Result { + let ipfs_cache_location = x + .ipfs_cache_location + .map(PathBuf::from) + .map(validate_ipfs_cache_location) + .transpose()?; + + let vars = Self { entity_cache_dead_weight: x.entity_cache_dead_weight.0, entity_cache_size: x.entity_cache_size_in_kb * 1000, @@ -99,10 +112,12 @@ impl From for EnvVarsMapping { max_ipfs_file_bytes: x.max_ipfs_file_bytes.0, ipfs_request_limit: x.ipfs_request_limit, ipfs_max_attempts: x.ipfs_max_attempts, + ipfs_cache_location: ipfs_cache_location, allow_non_deterministic_ipfs: x.allow_non_deterministic_ipfs.0, disable_declared_calls: x.disable_declared_calls.0, store_errors_are_nondeterministic: x.store_errors_are_nondeterministic.0, - } + }; + Ok(vars) } } @@ -134,6 +149,8 @@ pub struct InnerMappingHandlers { ipfs_request_limit: u16, #[envconfig(from = "GRAPH_IPFS_MAX_ATTEMPTS", default = "100000")] ipfs_max_attempts: usize, + #[envconfig(from = "GRAPH_IPFS_CACHE_LOCATION")] + ipfs_cache_location: Option, #[envconfig(from = "GRAPH_ALLOW_NON_DETERMINISTIC_IPFS", default = "false")] allow_non_deterministic_ipfs: EnvVarBoolean, #[envconfig(from = "GRAPH_DISABLE_DECLARED_CALLS", default = "false")] @@ -141,3 +158,36 @@ pub struct InnerMappingHandlers { #[envconfig(from = "GRAPH_STORE_ERRORS_ARE_NON_DETERMINISTIC", default = "false")] store_errors_are_nondeterministic: EnvVarBoolean, } + +fn validate_ipfs_cache_location(path: PathBuf) -> Result { + if path.starts_with("redis://") { + // We validate this later when we set up the Redis client + return Ok(path); + } + let path = path.canonicalize().map_err(|e| { + anyhow!( + "GRAPH_IPFS_CACHE_LOCATION {} is invalid: {e}", + path.display() + ) + })?; + if !path.is_absolute() { + return Err(anyhow::anyhow!( + "GRAPH_IPFS_CACHE_LOCATION must be an absolute path: {}", + path.display() + )); + } + if !path.is_dir() { + return Err(anyhow::anyhow!( + "GRAPH_IPFS_CACHE_LOCATION must be a directory: {}", + path.display() + )); + } + let metadata = path.metadata()?; + if metadata.permissions().readonly() { + return Err(anyhow::anyhow!( + "GRAPH_IPFS_CACHE_LOCATION must be a writable directory: {}", + path.display() + )); + } + Ok(path) +} diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index b734302e2cf..8ff3335cfd5 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -268,7 +268,7 @@ impl EnvVars { pub fn from_env() -> Result { let inner = Inner::init_from_env()?; let graphql = InnerGraphQl::init_from_env()?.into(); - let mapping_handlers = InnerMappingHandlers::init_from_env()?.into(); + let mapping_handlers = InnerMappingHandlers::init_from_env()?.try_into()?; let store = InnerStore::init_from_env()?.try_into()?; let ipfs_request_timeout = match inner.ipfs_request_timeout { Some(timeout) => Duration::from_secs(timeout), diff --git a/graph/src/ipfs/cache.rs b/graph/src/ipfs/cache.rs new file mode 100644 index 00000000000..4c15e2cbc3d --- /dev/null +++ b/graph/src/ipfs/cache.rs @@ -0,0 +1,291 @@ +use std::{ + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use graph_derive::CheapClone; +use lru_time_cache::LruCache; +use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; +use redis::{ + aio::{ConnectionManager, ConnectionManagerConfig}, + AsyncCommands as _, RedisResult, Value, +}; +use slog::{debug, info, warn, Logger}; +use tokio::sync::Mutex as AsyncMutex; + +use crate::{env::ENV_VARS, prelude::CheapClone}; + +use super::{ + ContentPath, IpfsClient, IpfsError, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy, +}; + +struct RedisClient { + mgr: AsyncMutex, +} + +impl RedisClient { + async fn new(logger: &Logger, path: &str) -> RedisResult { + let env = &ENV_VARS.mappings; + let client = redis::Client::open(path)?; + let cfg = ConnectionManagerConfig::default() + .set_connection_timeout(env.ipfs_timeout) + .set_response_timeout(env.ipfs_timeout); + info!(logger, "Connecting to Redis for IPFS caching"; "url" => path); + // Try to connect once synchronously to check if the server is reachable. + let _ = client.get_connection()?; + let mgr = AsyncMutex::new(client.get_connection_manager_with_config(cfg).await?); + info!(logger, "Connected to Redis for IPFS caching"; "url" => path); + Ok(RedisClient { mgr }) + } + + async fn get(&self, path: &ContentPath) -> IpfsResult { + let mut mgr = self.mgr.lock().await; + + let key = Self::key(path); + let data: Vec = mgr + .get(&key) + .await + .map_err(|e| IpfsError::InvalidCacheConfig { + source: anyhow!("Failed to get IPFS object {key} from Redis cache: {e}"), + })?; + Ok(data.into()) + } + + async fn put(&self, path: &ContentPath, data: &Bytes) -> IpfsResult<()> { + let mut mgr = self.mgr.lock().await; + + let key = Self::key(path); + mgr.set(&key, data.as_ref()) + .await + .map(|_: Value| ()) + .map_err(|e| IpfsError::InvalidCacheConfig { + source: anyhow!("Failed to put IPFS object {key} in Redis cache: {e}"), + })?; + Ok(()) + } + + fn key(path: &ContentPath) -> String { + format!("ipfs:{path}") + } +} + +#[derive(Clone, CheapClone)] +enum Cache { + Memory { + cache: Arc>>, + max_entry_size: usize, + }, + Disk { + store: Arc, + }, + Redis { + client: Arc, + }, +} + +fn log_object_store_err(logger: &Logger, e: &object_store::Error, log_not_found: bool) { + if log_not_found || !matches!(e, object_store::Error::NotFound { .. }) { + warn!( + logger, + "Failed to get IPFS object from disk cache; fetching from IPFS"; + "error" => e.to_string(), + ); + } +} + +fn log_redis_err(logger: &Logger, e: &IpfsError) { + warn!( + logger, + "Failed to get IPFS object from Redis cache; fetching from IPFS"; + "error" => e.to_string(), + ); +} + +impl Cache { + async fn new( + logger: &Logger, + capacity: usize, + max_entry_size: usize, + path: Option, + ) -> IpfsResult { + match path { + Some(path) if path.starts_with("redis://") => { + let path = path.to_string_lossy(); + let client = RedisClient::new(logger, path.as_ref()) + .await + .map(Arc::new) + .map_err(|e| IpfsError::InvalidCacheConfig { + source: anyhow!("Failed to create IPFS Redis cache at {path}: {e}"), + })?; + Ok(Cache::Redis { client }) + } + Some(path) => { + let fs = LocalFileSystem::new_with_prefix(&path).map_err(|e| { + IpfsError::InvalidCacheConfig { + source: anyhow!( + "Failed to create IPFS file based cache at {}: {}", + path.display(), + e + ), + } + })?; + debug!(logger, "Using IPFS file based cache"; "path" => path.display()); + Ok(Cache::Disk { + store: Arc::new(fs), + }) + } + None => { + debug!(logger, "Using IPFS in-memory cache"; "capacity" => capacity, "max_entry_size" => max_entry_size); + Ok(Self::Memory { + cache: Arc::new(Mutex::new(LruCache::with_capacity(capacity))), + max_entry_size, + }) + } + } + } + + async fn find(&self, logger: &Logger, path: &ContentPath) -> Option { + match self { + Cache::Memory { + cache, + max_entry_size: _, + } => cache.lock().unwrap().get(path).cloned(), + Cache::Disk { store } => { + let log_err = |e: &object_store::Error| log_object_store_err(logger, e, false); + + let path = Self::disk_path(path); + let object = store.get(&path).await.inspect_err(log_err).ok()?; + let data = object.bytes().await.inspect_err(log_err).ok()?; + Some(data) + } + Cache::Redis { client } => client + .get(path) + .await + .inspect_err(|e| log_redis_err(logger, e)) + .ok() + .and_then(|data| if data.is_empty() { None } else { Some(data) }), + } + } + + async fn insert(&self, logger: &Logger, path: ContentPath, data: Bytes) { + match self { + Cache::Memory { max_entry_size, .. } if data.len() > *max_entry_size => { + return; + } + Cache::Memory { cache, .. } => { + let mut cache = cache.lock().unwrap(); + + if !cache.contains_key(&path) { + cache.insert(path.clone(), data.clone()); + } + } + Cache::Disk { store } => { + let log_err = |e: &object_store::Error| log_object_store_err(logger, e, true); + let path = Self::disk_path(&path); + store + .put(&path, data.into()) + .await + .inspect_err(log_err) + .ok(); + } + Cache::Redis { client } => { + if let Err(e) = client.put(&path, &data).await { + log_redis_err(logger, &e); + } + } + } + } + + /// The path where we cache content on disk + fn disk_path(path: &ContentPath) -> Path { + Path::from(path.to_string()) + } +} + +/// An IPFS client that caches the results of `cat` and `get_block` calls in +/// memory or on disk, depending on settings in the environment. +/// +/// The cache is used to avoid repeated calls to the IPFS API for the same +/// content. +pub struct CachingClient { + client: Arc, + cache: Cache, +} + +impl CachingClient { + pub async fn new(client: Arc) -> IpfsResult { + let env = &ENV_VARS.mappings; + + let cache = Cache::new( + client.logger(), + env.max_ipfs_cache_size as usize, + env.max_ipfs_cache_file_size, + env.ipfs_cache_location.clone(), + ) + .await?; + Ok(CachingClient { client, cache }) + } + + async fn with_cache(&self, path: &ContentPath, f: F) -> IpfsResult + where + F: AsyncFnOnce() -> IpfsResult, + { + if let Some(data) = self.cache.find(self.logger(), path).await { + return Ok(data); + } + + let data = f().await?; + self.cache + .insert(self.logger(), path.clone(), data.clone()) + .await; + Ok(data) + } +} + +#[async_trait] +impl IpfsClient for CachingClient { + fn logger(&self) -> &Logger { + self.client.logger() + } + + async fn call(self: Arc, req: IpfsRequest) -> IpfsResult { + self.client.cheap_clone().call(req).await + } + + async fn cat( + self: Arc, + path: &ContentPath, + max_size: usize, + timeout: Option, + retry_policy: RetryPolicy, + ) -> IpfsResult { + self.with_cache(path, async || { + { + self.client + .cheap_clone() + .cat(path, max_size, timeout, retry_policy) + .await + } + }) + .await + } + + async fn get_block( + self: Arc, + path: &ContentPath, + timeout: Option, + retry_policy: RetryPolicy, + ) -> IpfsResult { + self.with_cache(path, async || { + self.client + .cheap_clone() + .get_block(path, timeout, retry_policy) + .await + }) + .await + } +} diff --git a/graph/src/ipfs/error.rs b/graph/src/ipfs/error.rs index 9cb956bbccb..1722b02f467 100644 --- a/graph/src/ipfs/error.rs +++ b/graph/src/ipfs/error.rs @@ -49,6 +49,9 @@ pub enum IpfsError { #[error(transparent)] RequestFailed(RequestError), + + #[error("Invalid cache configuration: {source}")] + InvalidCacheConfig { source: anyhow::Error }, } #[derive(Debug, Error)] @@ -91,6 +94,7 @@ impl IpfsError { Self::RequestTimeout { .. } => false, Self::DeterministicFailure { .. } => true, Self::RequestFailed(_) => false, + Self::InvalidCacheConfig { .. } => true, } } } diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index ec100b425a3..d2ac9f0c8b1 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -34,7 +34,7 @@ pub struct IpfsGatewayClient { impl IpfsGatewayClient { /// Creates a new [IpfsGatewayClient] with the specified server address. /// Verifies that the server is responding to IPFS gateway requests. - pub async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { + pub(crate) async fn new(server_address: impl AsRef, logger: &Logger) -> IpfsResult { let client = Self::new_unchecked(server_address, logger)?; client diff --git a/graph/src/ipfs/mod.rs b/graph/src/ipfs/mod.rs index f2131916e6d..3a5fe211d26 100644 --- a/graph/src/ipfs/mod.rs +++ b/graph/src/ipfs/mod.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::anyhow; +use cache::CachingClient; use futures03::future::BoxFuture; use futures03::stream::FuturesUnordered; use futures03::stream::StreamExt; @@ -9,6 +10,7 @@ use slog::Logger; use crate::util::security::SafeDisplay; +mod cache; mod client; mod content_path; mod error; @@ -39,6 +41,8 @@ pub type IpfsResult = Result; /// If multiple IPFS server addresses are specified, an IPFS client pool is created internally /// and for each IPFS request, the fastest client that can provide the content is /// automatically selected and the response is streamed from that client. +/// +/// All clients are set up to cache results pub async fn new_ipfs_client( server_addresses: I, logger: &Logger, @@ -58,7 +62,9 @@ where SafeDisplay(server_address) ); - clients.push(use_first_valid_api(server_address, logger).await?); + let client = use_first_valid_api(server_address, logger).await?; + let client = Arc::new(CachingClient::new(client).await?); + clients.push(client); } match clients.len() {