diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index ab30caeda75..585df5945f1 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -928,6 +928,13 @@ pub struct VersionStats { pub ratio: f64, /// The last block to which this table was pruned pub last_pruned_block: Option, + /// Histograms for the upper bounds of the block ranges in + /// this table. Each histogram bucket contains roughly the same number + /// of rows; values might be repeated to achieve that. The vectors are + /// empty if the table hasn't been analyzed, the subgraph is stored in + /// Postgres version 16 or lower, or if the table doesn't have a + /// block_range column. + pub block_range_upper: Vec, } /// What phase of pruning we are working on @@ -1091,11 +1098,22 @@ impl PruneRequest { return None; } - // Estimate how much data we will throw away; we assume that - // entity versions are distributed evenly across all blocks so - // that `history_pct` will tell us how much of that data pruning - // will remove. - let removal_ratio = self.history_pct(stats) * (1.0 - stats.ratio); + let removal_ratio = if stats.block_range_upper.is_empty() + || ENV_VARS.store.prune_disable_range_bound_estimation + { + // Estimate how much data we will throw away; we assume that + // entity versions are distributed evenly across all blocks so + // that `history_pct` will tell us how much of that data pruning + // will remove. + self.history_pct(stats) * (1.0 - stats.ratio) + } else { + // This estimate is more accurate than the one above since it + // does not assume anything about the distribution of entities + // and versions but uses the estimates from Postgres statistics. + // Of course, we can only use it if we have statistics + self.remove_pct_from_bounds(stats) + }; + if removal_ratio >= self.rebuild_threshold { Some(PruningStrategy::Rebuild) } else if removal_ratio >= self.delete_threshold { @@ -1120,6 +1138,18 @@ impl PruneRequest { 1.0 - self.history_blocks as f64 / total_blocks as f64 } } + + /// Return the fraction of entities that we will remove according to the + /// histogram bounds in `stats`. That fraction can be estimated as the + /// fraction of histogram buckets that end before `self.earliest_block` + fn remove_pct_from_bounds(&self, stats: &VersionStats) -> f64 { + stats + .block_range_upper + .iter() + .filter(|b| **b <= self.earliest_block) + .count() as f64 + / stats.block_range_upper.len() as f64 + } } /// Represents an item retrieved from an diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 1c768f45bed..7d52d514f1b 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -114,6 +114,10 @@ pub struct EnvVarsStore { /// For how many prune runs per deployment to keep status information. /// Set by `GRAPH_STORE_HISTORY_KEEP_STATUS`. The default is 5 pub prune_keep_history: usize, + /// Temporary switch to disable range bound estimation for pruning. + /// Set by `GRAPH_STORE_PRUNE_DISABLE_RANGE_BOUND_ESTIMATION`. + /// Defaults to false. Remove after 2025-07-15 + pub prune_disable_range_bound_estimation: bool, /// How long to accumulate changes into a batch before a write has to /// happen. Set by the environment variable /// `GRAPH_STORE_WRITE_BATCH_DURATION` in seconds. The default is 300s. @@ -188,6 +192,7 @@ impl TryFrom for EnvVarsStore { delete_threshold: x.delete_threshold.0, history_slack_factor: x.history_slack_factor.0, prune_keep_history: x.prune_keep_status, + prune_disable_range_bound_estimation: x.prune_disable_range_bound_estimation, write_batch_duration: Duration::from_secs(x.write_batch_duration_in_secs), write_batch_size: x.write_batch_size * 1_000, create_gin_indexes: x.create_gin_indexes, @@ -263,6 +268,11 @@ pub struct InnerStore { history_slack_factor: HistorySlackF64, #[envconfig(from = "GRAPH_STORE_HISTORY_KEEP_STATUS", default = "5")] prune_keep_status: usize, + #[envconfig( + from = "GRAPH_STORE_PRUNE_DISABLE_RANGE_BOUND_ESTIMATION", + default = "false" + )] + prune_disable_range_bound_estimation: bool, #[envconfig(from = "GRAPH_STORE_WRITE_BATCH_DURATION", default = "300")] write_batch_duration_in_secs: u64, #[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")] diff --git a/node/src/manager/commands/stats.rs b/node/src/manager/commands/stats.rs index abb02fdb77c..8200703c180 100644 --- a/node/src/manager/commands/stats.rs +++ b/node/src/manager/commands/stats.rs @@ -10,6 +10,7 @@ use diesel::PgConnection; use graph::components::store::DeploymentLocator; use graph::components::store::VersionStats; use graph::prelude::anyhow; +use graph::prelude::CheapClone as _; use graph_store_postgres::command_support::catalog as store_catalog; use graph_store_postgres::command_support::catalog::Site; use graph_store_postgres::ConnectionPool; @@ -20,7 +21,7 @@ use graph_store_postgres::PRIMARY_SHARD; fn site_and_conn( pools: HashMap, search: &DeploymentSearch, -) -> Result<(Site, PooledConnection>), anyhow::Error> { +) -> Result<(Arc, PooledConnection>), anyhow::Error> { let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap(); let locator = search.locate_unique(primary_pool)?; @@ -30,6 +31,7 @@ fn site_and_conn( let site = conn .locate_site(locator)? .ok_or_else(|| anyhow!("deployment `{}` does not exist", search))?; + let site = Arc::new(site); let conn = pools.get(&site.shard).unwrap().get()?; @@ -96,7 +98,8 @@ pub fn show( ) -> Result<(), anyhow::Error> { let (site, mut conn) = site_and_conn(pools, search)?; - let stats = store_catalog::stats(&mut conn, &site)?; + let catalog = store_catalog::Catalog::load(&mut conn, site.cheap_clone(), false, vec![])?; + let stats = catalog.stats(&mut conn)?; let account_like = store_catalog::account_like(&mut conn, &site)?; diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index a4ed8fa55d3..6b7f184cab2 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -19,10 +19,11 @@ use std::time::Duration; use graph::prelude::anyhow::anyhow; use graph::{ data::subgraph::schema::POI_TABLE, - prelude::{lazy_static, StoreError}, + prelude::{lazy_static, StoreError, BLOCK_NUMBER_MAX}, }; use crate::{ + block_range::BLOCK_RANGE_COLUMN, pool::ForeignServer, primary::{Namespace, Site, NAMESPACE_PUBLIC}, relational::SqlName, @@ -186,6 +187,11 @@ pub struct Catalog { /// Whether the database supports `int4_minmax_multi_ops` etc. /// See the [Postgres docs](https://www.postgresql.org/docs/15/brin-builtin-opclasses.html) has_minmax_multi_ops: bool, + + /// Whether the column `pg_stats.range_bounds_histogram` introduced in + /// Postgres 17 exists. See the [Postgres + /// docs](https://www.postgresql.org/docs/17/view-pg-stats.html) + pg_stats_has_range_bounds_histogram: bool, } impl Catalog { @@ -199,6 +205,7 @@ impl Catalog { let text_columns = get_text_columns(conn, &site.namespace)?; let use_poi = supports_proof_of_indexing(conn, &site.namespace)?; let has_minmax_multi_ops = has_minmax_multi_ops(conn)?; + let pg_stats_has_range_bounds_histogram = pg_stats_has_range_bounds_histogram(conn)?; Ok(Catalog { site, @@ -207,6 +214,7 @@ impl Catalog { use_bytea_prefix, entities_with_causality_region: entities_with_causality_region.into_iter().collect(), has_minmax_multi_ops, + pg_stats_has_range_bounds_histogram, }) } @@ -217,6 +225,7 @@ impl Catalog { entities_with_causality_region: BTreeSet, ) -> Result { let has_minmax_multi_ops = has_minmax_multi_ops(conn)?; + let pg_stats_has_range_bounds_histogram = pg_stats_has_range_bounds_histogram(conn)?; Ok(Catalog { site, @@ -228,6 +237,7 @@ impl Catalog { use_bytea_prefix: true, entities_with_causality_region, has_minmax_multi_ops, + pg_stats_has_range_bounds_histogram, }) } @@ -245,6 +255,7 @@ impl Catalog { use_bytea_prefix: true, entities_with_causality_region, has_minmax_multi_ops: false, + pg_stats_has_range_bounds_histogram: false, }) } @@ -269,6 +280,123 @@ impl Catalog { MINMAX_OPS } } + + pub fn stats(&self, conn: &mut PgConnection) -> Result, StoreError> { + #[derive(Queryable, QueryableByName)] + pub struct DbStats { + #[diesel(sql_type = BigInt)] + pub entities: i64, + #[diesel(sql_type = BigInt)] + pub versions: i64, + #[diesel(sql_type = Text)] + pub tablename: String, + /// The ratio `entities / versions` + #[diesel(sql_type = Double)] + pub ratio: f64, + #[diesel(sql_type = Nullable)] + pub last_pruned_block: Option, + } + + impl From for VersionStats { + fn from(s: DbStats) -> Self { + VersionStats { + entities: s.entities, + versions: s.versions, + tablename: s.tablename, + ratio: s.ratio, + last_pruned_block: s.last_pruned_block, + block_range_upper: vec![], + } + } + } + + #[derive(Queryable, QueryableByName)] + struct RangeHistogram { + #[diesel(sql_type = Text)] + tablename: String, + #[diesel(sql_type = Array)] + upper: Vec, + } + + fn block_range_histogram( + conn: &mut PgConnection, + namespace: &Namespace, + ) -> Result, StoreError> { + let query = format!( + "select tablename, \ + array_agg(coalesce(upper(block_range), {BLOCK_NUMBER_MAX})) upper \ + from (select tablename, + unnest(range_bounds_histogram::text::int4range[]) block_range + from pg_stats where schemaname = $1 and attname = '{BLOCK_RANGE_COLUMN}') a + group by tablename + order by tablename" + ); + let result = sql_query(query) + .bind::(namespace.as_str()) + .get_results::(conn)?; + Ok(result) + } + + // Get an estimate of number of rows (pg_class.reltuples) and number of + // distinct entities (based on the planners idea of how many distinct + // values there are in the `id` column) See the [Postgres + // docs](https://www.postgresql.org/docs/current/view-pg-stats.html) for + // the precise meaning of n_distinct + let query = "select case when s.n_distinct < 0 then (- s.n_distinct * c.reltuples)::int8 + else s.n_distinct::int8 + end as entities, + c.reltuples::int8 as versions, + c.relname as tablename, + case when c.reltuples = 0 then 0::float8 + when s.n_distinct < 0 then (-s.n_distinct)::float8 + else greatest(s.n_distinct, 1)::float8 / c.reltuples::float8 + end as ratio, + ts.last_pruned_block + from pg_namespace n, pg_class c, pg_stats s + left outer join subgraphs.table_stats ts + on (ts.table_name = s.tablename + and ts.deployment = $1) + where n.nspname = $2 + and c.relnamespace = n.oid + and s.schemaname = n.nspname + and s.attname = 'id' + and c.relname = s.tablename + order by c.relname" + .to_string(); + + let stats = sql_query(query) + .bind::(self.site.id) + .bind::(self.site.namespace.as_str()) + .load::(conn) + .map_err(StoreError::from)?; + + let mut range_histogram = if self.pg_stats_has_range_bounds_histogram { + block_range_histogram(conn, &self.site.namespace)? + } else { + vec![] + }; + + let stats = stats + .into_iter() + .map(|s| { + let pos = range_histogram + .iter() + .position(|h| h.tablename == s.tablename); + let mut upper = pos + .map(|pos| range_histogram.swap_remove(pos)) + .map(|h| h.upper) + .unwrap_or(vec![]); + // Since lower and upper are supposed to be histograms, we + // sort them + upper.sort_unstable(); + let mut vs = VersionStats::from(s); + vs.block_range_upper = upper; + vs + }) + .collect::>(); + + Ok(stats) + } } fn get_text_columns( @@ -764,70 +892,6 @@ pub(crate) fn drop_index( Ok(()) } -pub fn stats(conn: &mut PgConnection, site: &Site) -> Result, StoreError> { - #[derive(Queryable, QueryableByName)] - pub struct DbStats { - #[diesel(sql_type = BigInt)] - pub entities: i64, - #[diesel(sql_type = BigInt)] - pub versions: i64, - #[diesel(sql_type = Text)] - pub tablename: String, - /// The ratio `entities / versions` - #[diesel(sql_type = Double)] - pub ratio: f64, - #[diesel(sql_type = Nullable)] - pub last_pruned_block: Option, - } - - impl From for VersionStats { - fn from(s: DbStats) -> Self { - VersionStats { - entities: s.entities, - versions: s.versions, - tablename: s.tablename, - ratio: s.ratio, - last_pruned_block: s.last_pruned_block, - } - } - } - - // Get an estimate of number of rows (pg_class.reltuples) and number of - // distinct entities (based on the planners idea of how many distinct - // values there are in the `id` column) See the [Postgres - // docs](https://www.postgresql.org/docs/current/view-pg-stats.html) for - // the precise meaning of n_distinct - let query = "select case when s.n_distinct < 0 then (- s.n_distinct * c.reltuples)::int8 - else s.n_distinct::int8 - end as entities, - c.reltuples::int8 as versions, - c.relname as tablename, - case when c.reltuples = 0 then 0::float8 - when s.n_distinct < 0 then (-s.n_distinct)::float8 - else greatest(s.n_distinct, 1)::float8 / c.reltuples::float8 - end as ratio, - ts.last_pruned_block - from pg_namespace n, pg_class c, pg_stats s - left outer join subgraphs.table_stats ts - on (ts.table_name = s.tablename - and ts.deployment = $1) - where n.nspname = $2 - and c.relnamespace = n.oid - and s.schemaname = n.nspname - and s.attname = 'id' - and c.relname = s.tablename - order by c.relname" - .to_string(); - - let stats = sql_query(query) - .bind::(site.id) - .bind::(site.namespace.as_str()) - .load::(conn) - .map_err(StoreError::from)?; - - Ok(stats.into_iter().map(|s| s.into()).collect()) -} - /// Return by how much the slowest replica connected to the database `conn` /// is lagging. The returned value has millisecond precision. If the /// database has no replicas, return `0` @@ -975,6 +1039,28 @@ fn has_minmax_multi_ops(conn: &mut PgConnection) -> Result { Ok(sql_query(QUERY).get_result::(conn)?.has_ops) } +/// Check whether the database for `conn` has the column +/// `pg_stats.range_bounds_histogram` introduced in Postgres 17 +fn pg_stats_has_range_bounds_histogram(conn: &mut PgConnection) -> Result { + #[derive(Queryable, QueryableByName)] + struct HasIt { + #[diesel(sql_type = Bool)] + has_it: bool, + } + + let query = " + select exists (\ + select 1 \ + from information_schema.columns \ + where table_name = 'pg_stats' \ + and table_schema = 'pg_catalog' \ + and column_name = 'range_bounds_histogram') as has_it"; + sql_query(query) + .get_result::(conn) + .map(|h| h.has_it) + .map_err(StoreError::from) +} + pub(crate) fn histogram_bounds( conn: &mut PgConnection, namespace: &Namespace, diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index e5f71068abe..a05fc40d36f 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -73,7 +73,7 @@ pub use self::subgraph_store::{unused, DeploymentPlacer, Shard, SubgraphStore, P pub mod command_support { pub mod catalog { pub use crate::block_store::primary as block_store; - pub use crate::catalog::{account_like, stats}; + pub use crate::catalog::{account_like, Catalog}; pub use crate::copy::{copy_state, copy_table_state}; pub use crate::primary::{ active_copies, deployment_schemas, ens_names, subgraph, subgraph_deployment_assignment, diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 6e3af20179d..ed3bf09913b 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -267,7 +267,7 @@ impl Layout { reporter.finish_analyze_table(table.name.as_str()); cancel.check_cancel()?; } - let stats = catalog::stats(conn, &self.site)?; + let stats = self.catalog.stats(conn)?; let analyzed: Vec<_> = tables.iter().map(|table| table.name.as_str()).collect(); reporter.finish_analyze(&stats, &analyzed); diff --git a/store/test-store/tests/postgres/graft.rs b/store/test-store/tests/postgres/graft.rs index d9da064ff66..6c7b4e28f55 100644 --- a/store/test-store/tests/postgres/graft.rs +++ b/store/test-store/tests/postgres/graft.rs @@ -648,6 +648,7 @@ fn prune() { tablename: USER.to_ascii_lowercase(), ratio: 3.0 / 5.0, last_pruned_block: None, + block_range_upper: vec![], }; assert_eq!( Some(strategy),