Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,13 @@ pub struct VersionStats {
pub ratio: f64,
/// The last block to which this table was pruned
pub last_pruned_block: Option<BlockNumber>,
/// Histograms for the upper bounds of the block ranges in
/// this table. Each histogram bucket contains roughly the same number
/// of rows; values might be repeated to achieve that. The vectors are
/// empty if the table hasn't been analyzed, the subgraph is stored in
/// Postgres version 16 or lower, or if the table doesn't have a
/// block_range column.
pub block_range_upper: Vec<BlockNumber>,
}

/// What phase of pruning we are working on
Expand Down Expand Up @@ -1091,11 +1098,22 @@ impl PruneRequest {
return None;
}

// Estimate how much data we will throw away; we assume that
// entity versions are distributed evenly across all blocks so
// that `history_pct` will tell us how much of that data pruning
// will remove.
let removal_ratio = self.history_pct(stats) * (1.0 - stats.ratio);
let removal_ratio = if stats.block_range_upper.is_empty()
|| ENV_VARS.store.prune_disable_range_bound_estimation
{
// Estimate how much data we will throw away; we assume that
// entity versions are distributed evenly across all blocks so
// that `history_pct` will tell us how much of that data pruning
// will remove.
self.history_pct(stats) * (1.0 - stats.ratio)
} else {
// This estimate is more accurate than the one above since it
// does not assume anything about the distribution of entities
// and versions but uses the estimates from Postgres statistics.
// Of course, we can only use it if we have statistics
self.remove_pct_from_bounds(stats)
};

if removal_ratio >= self.rebuild_threshold {
Some(PruningStrategy::Rebuild)
} else if removal_ratio >= self.delete_threshold {
Expand All @@ -1120,6 +1138,18 @@ impl PruneRequest {
1.0 - self.history_blocks as f64 / total_blocks as f64
}
}

/// Return the fraction of entities that we will remove according to the
/// histogram bounds in `stats`. That fraction can be estimated as the
/// fraction of histogram buckets that end before `self.earliest_block`
fn remove_pct_from_bounds(&self, stats: &VersionStats) -> f64 {
stats
.block_range_upper
.iter()
.filter(|b| **b <= self.earliest_block)
.count() as f64
/ stats.block_range_upper.len() as f64
}
}

/// Represents an item retrieved from an
Expand Down
10 changes: 10 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ pub struct EnvVarsStore {
/// For how many prune runs per deployment to keep status information.
/// Set by `GRAPH_STORE_HISTORY_KEEP_STATUS`. The default is 5
pub prune_keep_history: usize,
/// Temporary switch to disable range bound estimation for pruning.
/// Set by `GRAPH_STORE_PRUNE_DISABLE_RANGE_BOUND_ESTIMATION`.
/// Defaults to false. Remove after 2025-07-15
pub prune_disable_range_bound_estimation: bool,
/// How long to accumulate changes into a batch before a write has to
/// happen. Set by the environment variable
/// `GRAPH_STORE_WRITE_BATCH_DURATION` in seconds. The default is 300s.
Expand Down Expand Up @@ -188,6 +192,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
delete_threshold: x.delete_threshold.0,
history_slack_factor: x.history_slack_factor.0,
prune_keep_history: x.prune_keep_status,
prune_disable_range_bound_estimation: x.prune_disable_range_bound_estimation,
write_batch_duration: Duration::from_secs(x.write_batch_duration_in_secs),
write_batch_size: x.write_batch_size * 1_000,
create_gin_indexes: x.create_gin_indexes,
Expand Down Expand Up @@ -263,6 +268,11 @@ pub struct InnerStore {
history_slack_factor: HistorySlackF64,
#[envconfig(from = "GRAPH_STORE_HISTORY_KEEP_STATUS", default = "5")]
prune_keep_status: usize,
#[envconfig(
from = "GRAPH_STORE_PRUNE_DISABLE_RANGE_BOUND_ESTIMATION",
default = "false"
)]
prune_disable_range_bound_estimation: bool,
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_DURATION", default = "300")]
write_batch_duration_in_secs: u64,
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")]
Expand Down
7 changes: 5 additions & 2 deletions node/src/manager/commands/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use diesel::PgConnection;
use graph::components::store::DeploymentLocator;
use graph::components::store::VersionStats;
use graph::prelude::anyhow;
use graph::prelude::CheapClone as _;
use graph_store_postgres::command_support::catalog as store_catalog;
use graph_store_postgres::command_support::catalog::Site;
use graph_store_postgres::ConnectionPool;
Expand All @@ -20,7 +21,7 @@ use graph_store_postgres::PRIMARY_SHARD;
fn site_and_conn(
pools: HashMap<Shard, ConnectionPool>,
search: &DeploymentSearch,
) -> Result<(Site, PooledConnection<ConnectionManager<PgConnection>>), anyhow::Error> {
) -> Result<(Arc<Site>, PooledConnection<ConnectionManager<PgConnection>>), anyhow::Error> {
let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap();
let locator = search.locate_unique(primary_pool)?;

Expand All @@ -30,6 +31,7 @@ fn site_and_conn(
let site = conn
.locate_site(locator)?
.ok_or_else(|| anyhow!("deployment `{}` does not exist", search))?;
let site = Arc::new(site);

let conn = pools.get(&site.shard).unwrap().get()?;

Expand Down Expand Up @@ -96,7 +98,8 @@ pub fn show(
) -> Result<(), anyhow::Error> {
let (site, mut conn) = site_and_conn(pools, search)?;

let stats = store_catalog::stats(&mut conn, &site)?;
let catalog = store_catalog::Catalog::load(&mut conn, site.cheap_clone(), false, vec![])?;
let stats = catalog.stats(&mut conn)?;

let account_like = store_catalog::account_like(&mut conn, &site)?;

Expand Down
Loading
Loading