From 1a319943da885cc9804957e8f5c0db1fdd784791 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 14:14:49 -0700 Subject: [PATCH 1/5] store: Streamline how we get DeploymentDetail a little --- store/postgres/src/detail.rs | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index 168af5b5d51..e31a836b141 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -5,7 +5,7 @@ use diesel::dsl::sql; use diesel::prelude::{ ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, PgConnection, - QueryDsl, RunQueryDsl, + QueryDsl, RunQueryDsl, SelectableHelper as _, }; use diesel_derives::Associations; use git_testament::{git_testament, git_testament_macros}; @@ -39,11 +39,10 @@ const CARGO_PKG_VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); type Bytes = Vec; -#[derive(Queryable, QueryableByName)] +#[derive(Queryable, Selectable)] #[diesel(table_name = subgraph_deployment)] // We map all fields to make loading `Detail` with diesel easier, but we // don't need all the fields -#[allow(dead_code)] pub struct DeploymentDetail { pub id: DeploymentId, pub deployment: String, @@ -51,14 +50,10 @@ pub struct DeploymentDetail { health: HealthType, pub synced_at: Option>, pub synced_at_block_number: Option, - fatal_error: Option, - non_fatal_errors: Vec, /// The earliest block for which we have history pub earliest_block_number: i32, pub latest_ethereum_block_hash: Option, pub latest_ethereum_block_number: Option, - last_healthy_ethereum_block_hash: Option, - last_healthy_ethereum_block_number: Option, pub entity_count: BigDecimal, graft_base: Option, graft_block_hash: Option, @@ -67,10 +62,9 @@ pub struct DeploymentDetail { reorg_count: i32, current_reorg_depth: i32, max_reorg_depth: i32, - firehose_cursor: Option, } -#[derive(Queryable, QueryableByName)] +#[derive(Queryable, Selectable)] #[diesel(table_name = subgraph_error)] // We map all fields to make loading `Detail` with diesel easier, but we // don't need all the fields @@ -193,8 +187,6 @@ pub(crate) fn info_from_details( failed: _, health, synced_at, - fatal_error: _, - non_fatal_errors: _, earliest_block_number, latest_ethereum_block_hash, latest_ethereum_block_number, @@ -202,7 +194,11 @@ pub(crate) fn info_from_details( graft_base: _, graft_block_hash: _, graft_block_number: _, - .. + synced_at_block_number: _, + debug_fork: _, + reorg_count: _, + current_reorg_depth: _, + max_reorg_depth: _, } = detail; let site = sites @@ -261,12 +257,15 @@ pub(crate) fn deployment_details( ) -> Result, StoreError> { use subgraph_deployment as d; + let cols = DeploymentDetail::as_select(); + // Empty deployments means 'all of them' let details = if deployments.is_empty() { - d::table.load::(conn)? + d::table.select(cols).load::(conn)? } else { d::table .filter(d::deployment.eq_any(&deployments)) + .select(cols) .load::(conn)? }; Ok(details) @@ -278,8 +277,12 @@ pub(crate) fn deployment_details_for_id( deployment: &DeploymentId, ) -> Result { use subgraph_deployment as d; + + let cols = DeploymentDetail::as_select(); + d::table .filter(d::id.eq(&deployment)) + .select(cols) .first::(conn) .map_err(StoreError::from) } @@ -299,15 +302,19 @@ pub(crate) fn deployment_statuses( let details_with_fatal_error = { let join = e::table.on(e::id.nullable().eq(d::fatal_error)); + let cols = <(DeploymentDetail, Option)>::as_select(); + // Empty deployments means 'all of them' if sites.is_empty() { d::table .left_outer_join(join) + .select(cols) .load::<(DeploymentDetail, Option)>(conn)? } else { d::table .left_outer_join(join) .filter(d::id.eq_any(sites.iter().map(|site| site.id))) + .select(cols) .load::<(DeploymentDetail, Option)>(conn)? } }; @@ -480,7 +487,8 @@ pub fn deployment_entity( let detail = d::table .find(site.id) - .first::(conn)?; + .select(DeploymentDetail::as_select()) + .first::(conn)?; StoredDeploymentEntity(detail, manifest).as_subgraph_deployment(schema) } From 8aa1bd7774a54968943fb24883ae7aea88229b37 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 14:24:05 -0700 Subject: [PATCH 2/5] store: Streamline how we get ErrorDetail a little --- store/postgres/src/detail.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index e31a836b141..8a76e68e1ac 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -68,9 +68,7 @@ pub struct DeploymentDetail { #[diesel(table_name = subgraph_error)] // We map all fields to make loading `Detail` with diesel easier, but we // don't need all the fields -#[allow(dead_code)] pub(crate) struct ErrorDetail { - vid: i64, pub id: String, subgraph_id: String, message: String, @@ -93,7 +91,7 @@ impl ErrorDetail { d::table .filter(d::deployment.eq(deployment_id.as_str())) .inner_join(e::table.on(e::id.nullable().eq(d::fatal_error))) - .select(e::all_columns) + .select(ErrorDetail::as_select()) .get_result(conn) .optional() .map_err(StoreError::from) @@ -105,7 +103,6 @@ impl TryFrom for SubgraphError { fn try_from(value: ErrorDetail) -> Result { let ErrorDetail { - vid: _, id: _, subgraph_id, message, @@ -327,13 +324,13 @@ pub(crate) fn deployment_statuses( if sites.is_empty() { d::table .inner_join(join) - .select((d::id, e::all_columns)) + .select((d::id, ErrorDetail::as_select())) .load::<(DeploymentId, ErrorDetail)>(conn)? } else { d::table .inner_join(join) .filter(d::id.eq_any(sites.iter().map(|site| site.id))) - .select((d::id, e::all_columns)) + .select((d::id, ErrorDetail::as_select())) .load::<(DeploymentId, ErrorDetail)>(conn)? } .into_iter() From e449758d58911f3c33bfc1f5ae6ec1116043e6fd Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 14:25:48 -0700 Subject: [PATCH 3/5] store: Streamline how we get SubgraphManifest a little --- store/postgres/src/detail.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index 8a76e68e1ac..1f127e1e2ab 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -362,12 +362,11 @@ pub(crate) fn deployment_statuses( .collect() } -#[derive(Queryable, QueryableByName, Identifiable, Associations)] +#[derive(Queryable, Selectable, Identifiable, Associations)] #[diesel(table_name = subgraph_manifest)] #[diesel(belongs_to(GraphNodeVersion))] // We never read the id field but map it to make the interaction with Diesel // simpler -#[allow(dead_code)] struct StoredSubgraphManifest { id: i32, spec_version: String, @@ -376,12 +375,10 @@ struct StoredSubgraphManifest { features: Vec, schema: String, graph_node_version_id: Option, - use_bytea_prefix: bool, start_block_number: Option, start_block_hash: Option, raw_yaml: Option, entities_with_causality_region: Vec, - on_sync: Option, history_blocks: i32, } @@ -480,6 +477,7 @@ pub fn deployment_entity( let manifest = m::table .find(site.id) + .select(StoredSubgraphManifest::as_select()) .first::(conn)?; let detail = d::table From e1d98766724adf9089f52006df0152090b27395d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 15:56:44 -0700 Subject: [PATCH 4/5] graph: Speed up some tests by using shorter timeouts in debug builds This also makes the timeouts used for IPFS requests configurable; the default of 1s in debug builds is too short for the runner tests in CI and we therefore set it to the 60s for release builds for those tests. --- .github/workflows/ci.yml | 3 ++- graph/src/env/mod.rs | 18 ++++++++++++++++++ graph/src/ipfs/gateway_client.rs | 10 ++++------ graph/src/ipfs/retry_policy.rs | 7 +------ graph/src/ipfs/rpc_client.rs | 9 +++------ 5 files changed, 28 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 24993639945..d6ec31c129e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,7 +79,8 @@ jobs: ports: - 5432:5432 env: - RUSTFLAGS: "-C link-arg=-fuse-ld=lld -D warnings" + GRAPH_IPFS_REQUEST_TIMEOUT: "60" + RUSTFLAGS: "-C link-arg=-fuse-ld=lld -D warnings --cfg test_with_ipfs" RUNNER_TESTS_WAIT_FOR_SYNC_SECS: "600" steps: - name: Tune GitHub hosted runner to reduce flakiness diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index ab3ed09df5e..6d611cf6102 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -248,6 +248,11 @@ pub struct EnvVars { /// Set by the environment variable `GRAPH_FIREHOSE_BLOCK_BATCH_SIZE`. /// The default value is 10. pub firehose_block_batch_size: usize, + /// Timeouts to use for various IPFS requests set by + /// `GRAPH_IPFS_REQUEST_TIMEOUT`. Defaults to 60 seconds for release + /// builds and one second for debug builds to speed up tests. The value + /// is in seconds. + pub ipfs_request_timeout: Duration, } impl EnvVars { @@ -256,6 +261,16 @@ impl EnvVars { let graphql = InnerGraphQl::init_from_env()?.into(); let mapping_handlers = InnerMappingHandlers::init_from_env()?.into(); let store = InnerStore::init_from_env()?.try_into()?; + let ipfs_request_timeout = match inner.ipfs_request_timeout { + Some(timeout) => Duration::from_secs(timeout), + None => { + if cfg!(debug_assertions) { + Duration::from_secs(1) + } else { + Duration::from_secs(60) + } + } + }; Ok(Self { graphql, @@ -330,6 +345,7 @@ impl EnvVars { firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit, firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout, firehose_block_batch_size: inner.firehose_block_fetch_batch_size, + ipfs_request_timeout, }) } @@ -510,6 +526,8 @@ struct Inner { firehose_block_fetch_timeout: u64, #[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_BATCH_SIZE", default = "10")] firehose_block_fetch_batch_size: usize, + #[envconfig(from = "GRAPH_IPFS_REQUEST_TIMEOUT")] + ipfs_request_timeout: Option, } #[derive(Clone, Debug)] diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index 1ee36bb6609..ec100b425a3 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -1,5 +1,4 @@ use std::sync::Arc; -use std::time::Duration; use anyhow::anyhow; use async_trait::async_trait; @@ -9,6 +8,7 @@ use http::header::CACHE_CONTROL; use reqwest::StatusCode; use slog::Logger; +use crate::env::ENV_VARS; use crate::ipfs::IpfsClient; use crate::ipfs::IpfsError; use crate::ipfs::IpfsRequest; @@ -17,10 +17,6 @@ use crate::ipfs::IpfsResult; use crate::ipfs::RetryPolicy; use crate::ipfs::ServerAddress; -/// The request that verifies that the IPFS gateway is accessible is generally fast because -/// it does not involve querying the distributed network. -const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); - /// A client that connects to an IPFS gateway. /// /// Reference: @@ -99,7 +95,7 @@ impl IpfsGatewayClient { } }); - let ok = tokio::time::timeout(TEST_REQUEST_TIMEOUT, fut) + let ok = tokio::time::timeout(ENV_VARS.ipfs_request_timeout, fut) .await .map_err(|_| anyhow!("request timed out"))??; @@ -151,6 +147,8 @@ impl IpfsClient for IpfsGatewayClient { #[cfg(test)] mod tests { + use std::time::Duration; + use bytes::BytesMut; use futures03::TryStreamExt; use wiremock::matchers as m; diff --git a/graph/src/ipfs/retry_policy.rs b/graph/src/ipfs/retry_policy.rs index df6a7b35ef0..2e80c5e9c5d 100644 --- a/graph/src/ipfs/retry_policy.rs +++ b/graph/src/ipfs/retry_policy.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use slog::Logger; use crate::ipfs::error::IpfsError; @@ -7,9 +5,6 @@ use crate::prelude::*; use crate::util::futures::retry; use crate::util::futures::RetryConfig; -/// The default maximum delay between retries. -const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(60); - /// Describes retry behavior when IPFS requests fail. #[derive(Clone, Copy, Debug)] pub enum RetryPolicy { @@ -33,7 +28,7 @@ impl RetryPolicy { ) -> RetryConfig { retry(operation_name, logger) .limit(ENV_VARS.mappings.ipfs_max_attempts) - .max_delay(DEFAULT_MAX_DELAY) + .max_delay(ENV_VARS.ipfs_request_timeout) .when(move |result: &Result| match result { Ok(_) => false, Err(err) => match self { diff --git a/graph/src/ipfs/rpc_client.rs b/graph/src/ipfs/rpc_client.rs index cd35d55b0ed..16976537044 100644 --- a/graph/src/ipfs/rpc_client.rs +++ b/graph/src/ipfs/rpc_client.rs @@ -9,6 +9,7 @@ use reqwest::Response; use reqwest::StatusCode; use slog::Logger; +use crate::env::ENV_VARS; use crate::ipfs::IpfsClient; use crate::ipfs::IpfsError; use crate::ipfs::IpfsRequest; @@ -17,10 +18,6 @@ use crate::ipfs::IpfsResult; use crate::ipfs::RetryPolicy; use crate::ipfs::ServerAddress; -/// The request that verifies that the IPFS RPC API is accessible is generally fast because -/// it does not involve querying the distributed network. -const TEST_REQUEST_TIMEOUT: Duration = Duration::from_secs(60); - /// A client that connects to an IPFS RPC API. /// /// Reference: @@ -60,7 +57,7 @@ impl IpfsRpcClient { server_address: ServerAddress::new(server_address)?, http_client: reqwest::Client::new(), logger: logger.to_owned(), - test_request_timeout: TEST_REQUEST_TIMEOUT, + test_request_timeout: ENV_VARS.ipfs_request_timeout, }) } @@ -88,7 +85,7 @@ impl IpfsRpcClient { } }); - let ok = tokio::time::timeout(TEST_REQUEST_TIMEOUT, fut) + let ok = tokio::time::timeout(ENV_VARS.ipfs_request_timeout, fut) .await .map_err(|_| anyhow!("request timed out"))??; From 230198841b05de6cf0a672f8404f24c1c842942e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 13 May 2025 11:14:14 -0700 Subject: [PATCH 5/5] store: Split subgraph_deployment table We split the table `subgraphs.subgraph_deployment` into two tables, `subgraphs.head` and `subgraphs.deployment` where the `head` table only contains the metadata that changes on every block. This should help with situations where the `subgraph_deployment` table gets very bloated since the `head` table that gets bloated through frequent changes has much smaller rows than the current `subgraph_deployment` table. Rows in `subgraph_deployment` can grow as big as 500k, whereas rows in the `head` table will only take about 350 bytes at most. Updates will also be marginally better on the `heads` table since it only has one index rather than the two that `subgraph_deployment` has. --- docs/implementation/metadata.md | 85 +++-- .../manager/commands/unused_deployments.rs | 2 +- .../down.sql | 110 ++++++ .../up.sql | 116 ++++++ store/postgres/src/chain_store.rs | 8 +- store/postgres/src/deployment.rs | 348 ++++++++++-------- store/postgres/src/deployment_store.rs | 23 +- store/postgres/src/detail.rs | 204 ++++++---- store/postgres/src/jobs.rs | 16 +- store/postgres/src/pool/mod.rs | 3 +- store/postgres/src/primary.rs | 6 +- store/postgres/src/query_store.rs | 5 +- store/postgres/src/subgraph_store.rs | 13 +- 13 files changed, 648 insertions(+), 291 deletions(-) create mode 100644 store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/down.sql create mode 100644 store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/up.sql diff --git a/docs/implementation/metadata.md b/docs/implementation/metadata.md index 37676b3137b..1cf3c189c6c 100644 --- a/docs/implementation/metadata.md +++ b/docs/implementation/metadata.md @@ -7,7 +7,7 @@ List of all known subgraph names. Maintained in the primary, but there is a background job that periodically copies the table from the primary to all other shards. Those copies are used for queries when the primary is down. | Column | Type | Use | -|-------------------|--------------|-------------------------------------------| +| ----------------- | ------------ | ----------------------------------------- | | `id` | `text!` | primary key, UUID | | `name` | `text!` | user-chosen name | | `current_version` | `text` | `subgraph_version.id` for current version | @@ -18,13 +18,12 @@ List of all known subgraph names. Maintained in the primary, but there is a back The `id` is used by the hosted explorer to reference the subgraph. - ### `subgraphs.subgraph_version` Mapping of subgraph names from `subgraph` to IPFS hashes. Maintained in the primary, but there is a background job that periodically copies the table from the primary to all other shards. Those copies are used for queries when the primary is down. | Column | Type | Use | -|---------------|--------------|-------------------------| +| ------------- | ------------ | ----------------------- | | `id` | `text!` | primary key, UUID | | `subgraph` | `text!` | `subgraph.id` | | `deployment` | `text!` | IPFS hash of deployment | @@ -32,7 +31,6 @@ Mapping of subgraph names from `subgraph` to IPFS hashes. Maintained in the prim | `vid` | `int8!` | unused | | `block_range` | `int4range!` | unused | - ## Managing a deployment Directory of all deployments. Maintained in the primary, but there is a background job that periodically copies the table from the primary to all other shards. Those copies are used for queries when the primary is down. @@ -40,7 +38,7 @@ Directory of all deployments. Maintained in the primary, but there is a backgrou ### `public.deployment_schemas` | Column | Type | Use | -|--------------|----------------|----------------------------------------------| +| ------------ | -------------- | -------------------------------------------- | | `id` | `int4!` | primary key | | `subgraph` | `text!` | IPFS hash of deployment | | `name` | `text!` | name of `sgdNNN` schema | @@ -52,36 +50,53 @@ Directory of all deployments. Maintained in the primary, but there is a backgrou There can be multiple copies of the same deployment, but at most one per shard. The `active` flag indicates which of these copies will be used for queries; `graph-node` makes sure that there is always exactly one for each IPFS hash. -### `subgraphs.subgraph_deployment` +### `subgraphs.head` + +Details about a deployment that change on every block. Maintained in the +shard alongside the deployment's data in `sgdNNN`. + +| Column | Type | Use | +| ----------------- | ---------- | -------------------------------------------- | +| `id` | `integer!` | primary key, same as `deployment_schemas.id` | +| `block_hash` | `bytea` | current subgraph head | +| `block_number` | `numeric` | | +| `entity_count` | `numeric!` | total number of entities | +| `firehose_cursor` | `text` | | + +The head block pointer in `block_number` and `block_hash` is the latest +block that has been fully processed by the deployment. It will be `null` +until the deployment is fully initialized, and only set when the deployment +processes the first block. For deployments that are grafted or being copied, +the head block pointer will be `null` until the graft/copy has finished +which can take considerable time. + +### `subgraphs.deployment` Details about a deployment to track sync progress etc. Maintained in the shard alongside the deployment's data in `sgdNNN`. The table should only -contain frequently changing data, but for historical reasons contains also -static data. - -| Column | Type | Use | -|--------------------------------------|------------|----------------------------------------------| -| `id` | `integer!` | primary key, same as `deployment_schemas.id` | -| `deployment` | `text!` | IPFS hash | -| `failed` | `boolean!` | | -| `synced` | `boolean!` | | -| `earliest_block_number` | `integer!` | earliest block for which we have data | -| `latest_ethereum_block_hash` | `bytea` | current subgraph head | -| `latest_ethereum_block_number` | `numeric` | | -| `entity_count` | `numeric!` | total number of entities | -| `graft_base` | `text` | IPFS hash of graft base | -| `graft_block_hash` | `bytea` | graft block | -| `graft_block_number` | `numeric` | | -| `reorg_count` | `integer!` | | -| `current_reorg_depth` | `integer!` | | -| `max_reorg_depth` | `integer!` | | -| `fatal_error` | `text` | | -| `non_fatal_errors` | `text[]` | | -| `health` | `health!` | | -| `last_healthy_ethereum_block_hash` | `bytea` | | -| `last_healthy_ethereum_block_number` | `numeric` | | -| `firehose_cursor` | `text` | | -| `debug_fork` | `text` | | +contain data that changes fairly infrequently, but for historical reasons +contains also static data. + +| Column | Type | Use | +| ------------------------------------ | ------------- | ---------------------------------------------------- | +| `id` | `integer!` | primary key, same as `deployment_schemas.id` | +| `subgraph` | `text!` | IPFS hash | +| `earliest_block_number` | `integer!` | earliest block for which we have data | +| `health` | `health!` | | +| `failed` | `boolean!` | | +| `fatal_error` | `text` | | +| `non_fatal_errors` | `text[]` | | +| `graft_base` | `text` | IPFS hash of graft base | +| `graft_block_hash` | `bytea` | graft block | +| `graft_block_number` | `numeric` | | +| `reorg_count` | `integer!` | | +| `current_reorg_depth` | `integer!` | | +| `max_reorg_depth` | `integer!` | | +| `last_healthy_ethereum_block_hash` | `bytea` | | +| `last_healthy_ethereum_block_number` | `numeric` | | +| `debug_fork` | `text` | | +| `synced_at` | `timestamptz` | time when deployment first reach chain head | +| `synced_at_block_number` | `integer` | block number where deployment first reach chain head | The columns `reorg_count`, `current_reorg_depth`, and `max_reorg_depth` are set during indexing. They are used to determine whether a reorg happened @@ -94,7 +109,7 @@ Details about a deployment that rarely change. Maintained in the shard alongside the deployment's data in `sgdNNN`. | Column | Type | Use | -|-------------------------|------------|------------------------------------------------------| +| ----------------------- | ---------- | ---------------------------------------------------- | | `id` | `integer!` | primary key, same as `deployment_schemas.id` | | `spec_version` | `text!` | | | `description` | `text` | | @@ -115,7 +130,7 @@ but there is a background job that periodically copies the table from the primary to all other shards. | Column | Type | Use | -|---------|-------|---------------------------------------------| +| ------- | ----- | ------------------------------------------- | | id | int4! | primary key, ref to `deployment_schemas.id` | | node_id | text! | name of index node | @@ -147,7 +162,7 @@ should have the 'account-like' optimization turned on. Details about features that a deployment uses, Maintained in the primary. | Column | Type | Use | -|----------------|-----------|-------------| +| -------------- | --------- | ----------- | | `id` | `text!` | primary key | | `spec_version` | `text!` | | | `api_version` | `text` | | diff --git a/node/src/manager/commands/unused_deployments.rs b/node/src/manager/commands/unused_deployments.rs index cc11ec2884a..e8a6e14a1da 100644 --- a/node/src/manager/commands/unused_deployments.rs +++ b/node/src/manager/commands/unused_deployments.rs @@ -64,7 +64,7 @@ pub fn record(store: Arc) -> Result<(), Error> { let recorded = store.record_unused_deployments()?; for unused in store.list_unused_deployments(unused::Filter::New)? { - if recorded.iter().any(|r| r.deployment == unused.deployment) { + if recorded.iter().any(|r| r.subgraph == unused.deployment) { add_row(&mut list, unused); } } diff --git a/store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/down.sql b/store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/down.sql new file mode 100644 index 00000000000..6fc391449bb --- /dev/null +++ b/store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/down.sql @@ -0,0 +1,110 @@ +create table subgraphs.subgraph_deployment ( + id int4 primary key, + + deployment text unique not null, + + latest_ethereum_block_hash bytea, + latest_ethereum_block_number numeric, + entity_count numeric NOT NULL, + firehose_cursor text, + + earliest_block_number integer DEFAULT 0 NOT NULL, + + graft_base text, + graft_block_hash bytea, + graft_block_number numeric, + + health text NOT NULL, + failed boolean NOT NULL, + fatal_error text, + non_fatal_errors text[] DEFAULT '{}'::text[], + + reorg_count integer DEFAULT 0 NOT NULL, + current_reorg_depth integer DEFAULT 0 NOT NULL, + max_reorg_depth integer DEFAULT 0 NOT NULL, + + last_healthy_ethereum_block_hash bytea, + last_healthy_ethereum_block_number numeric, + + debug_fork text, + + synced_at timestamp with time zone, + synced_at_block_number integer, + + constraint subgraph_deployment_health_new_check + check ((health = any (array['failed', 'healthy', 'unhealthy']))) +); + +insert into subgraphs.subgraph_deployment + (id, deployment, + latest_ethereum_block_hash, latest_ethereum_block_number, + entity_count, firehose_cursor, + earliest_block_number, + graft_base, graft_block_hash, graft_block_number, + health, failed, fatal_error, non_fatal_errors, + reorg_count, current_reorg_depth, max_reorg_depth, + last_healthy_ethereum_block_hash, last_healthy_ethereum_block_number, + debug_fork, + synced_at, synced_at_block_number) +select h.id, d.subgraph, + h.block_hash, h.block_number, + h.entity_count, h.firehose_cursor, + earliest_block_number, + graft_base, graft_block_hash, graft_block_number, + health, failed, fatal_error, non_fatal_errors, + reorg_count, current_reorg_depth, max_reorg_depth, + last_healthy_block_hash, last_healthy_block_number, + debug_fork, + synced_at, synced_at_block_number + from subgraphs.head h, subgraphs.deployment d + where h.id = d.id; + +alter table subgraphs.copy_state + drop constraint copy_state_dst_fkey, + add constraint copy_state_dst_fkey + foreign key (dst) references + subgraphs.subgraph_deployment(id) on delete cascade; + +alter table subgraphs.subgraph_error + drop constraint subgraph_error_subgraph_id_fkey, + add constraint subgraph_error_subgraph_id_fkey + foreign key (subgraph_id) references + subgraphs.subgraph_deployment(deployment) on delete cascade; + +alter table subgraphs.subgraph_manifest + drop constraint subgraph_manifest_id_fkey, + add constraint subgraph_manifest_new_id_fkey + foreign key (id) references + subgraphs.subgraph_deployment(id) on delete cascade; + +alter table subgraphs.table_stats + drop constraint table_stats_deployment_fkey, + add constraint table_stats_deployment_fkey + foreign key (deployment) references + subgraphs.subgraph_deployment(id) on delete cascade; + +drop view info.subgraph_info; + +create view info.subgraph_info as +select ds.id AS schema_id, + ds.name AS schema_name, + ds.subgraph, + ds.version, + s.name, + CASE + WHEN s.pending_version = v.id THEN 'pending'::text + WHEN s.current_version = v.id THEN 'current'::text + ELSE 'unused'::text + END AS status, + d.failed, + d.synced_at + from deployment_schemas ds, + subgraph_deployment d, + subgraph_version v, + subgraph s + where d.id = ds.id + and v.deployment = d.deployment + and v.subgraph = s.id; + +drop table subgraphs.deployment; +drop table subgraphs.head; diff --git a/store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/up.sql b/store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/up.sql new file mode 100644 index 00000000000..554d481d158 --- /dev/null +++ b/store/postgres/migrations/2025-05-13-173523_split_subgraph_deployment/up.sql @@ -0,0 +1,116 @@ +create table subgraphs.head ( + id int4 primary key, + entity_count int8 not null, + block_number int4, + block_hash bytea, + firehose_cursor text +); + +create table subgraphs.deployment ( + id int4 primary key, + + subgraph text unique not null, + + earliest_block_number int4 default 0 not null, + + health text not null, + failed boolean not null, + fatal_error text, + non_fatal_errors text[] default '{}'::text[], + + graft_base text, + graft_block_hash bytea, + graft_block_number int4, + + reorg_count int4 default 0 not null, + current_reorg_depth int4 default 0 not null, + max_reorg_depth int4 default 0 not null, + + last_healthy_block_hash bytea, + last_healthy_block_number int4, + + debug_fork text, + + synced_at timestamptz, + synced_at_block_number int4, + + constraint deployment_health_new_check + check ((health = any (array['failed', 'healthy', 'unhealthy']))), + constraint deployment_id + foreign key (id) references subgraphs.head(id) on delete cascade +); + +insert into subgraphs.head + (id, block_hash, block_number, entity_count, firehose_cursor) +select id, latest_ethereum_block_hash, + latest_ethereum_block_number, entity_count, firehose_cursor + from subgraphs.subgraph_deployment; + +insert into subgraphs.deployment + (id, subgraph, failed, graft_base, graft_block_hash, graft_block_number, + fatal_error, non_fatal_errors, reorg_count, current_reorg_depth, + max_reorg_depth, + last_healthy_block_hash, last_healthy_block_number, + debug_fork, earliest_block_number, + health, + synced_at, synced_at_block_number) +select + id, deployment, failed, graft_base, graft_block_hash, graft_block_number, + fatal_error, non_fatal_errors, reorg_count, current_reorg_depth, + max_reorg_depth, + last_healthy_ethereum_block_hash, last_healthy_ethereum_block_number, + debug_fork, earliest_block_number, + health, + synced_at, synced_at_block_number +from subgraphs.subgraph_deployment; + +-- Support joining with subgraph_error +create index deployment_fatal_error + on subgraphs.deployment(fatal_error); + +alter table subgraphs.copy_state + drop constraint copy_state_dst_fkey, + add constraint copy_state_dst_fkey + foreign key (dst) references subgraphs.deployment(id) on delete cascade; + +alter table subgraphs.subgraph_error + drop constraint subgraph_error_subgraph_id_fkey, + add constraint subgraph_error_subgraph_id_fkey + foreign key (subgraph_id) references + subgraphs.deployment(subgraph) on delete cascade; + +alter table subgraphs.subgraph_manifest + drop constraint subgraph_manifest_new_id_fkey, + add constraint subgraph_manifest_id_fkey + foreign key (id) references subgraphs.deployment(id) on delete cascade; + +alter table subgraphs.table_stats + drop constraint table_stats_deployment_fkey, + add constraint table_stats_deployment_fkey + foreign key (deployment) references subgraphs.deployment(id) + on delete cascade; + +drop view info.subgraph_info; + +drop table subgraphs.subgraph_deployment; + +create view info.subgraph_info as +select ds.id as schema_id, + ds.name as schema_name, + ds.subgraph, + ds.version, + s.name, + CASE + WHEN s.pending_version = v.id THEN 'pending' + WHEN s.current_version = v.id THEN 'current' + ELSE 'unused' + END AS status, + d.failed, + d.synced_at + from deployment_schemas ds, + subgraphs.deployment d, + subgraphs.subgraph_version v, + subgraphs.subgraph s + where d.id = ds.id + and v.deployment = d.subgraph + and v.subgraph = s.id; diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index a94c44a8870..2e2b4fdea99 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -2393,11 +2393,13 @@ impl ChainStoreTrait for ChainStore { from ethereum_networks where name = $2)), -1)::int as block from ( - select min(d.latest_ethereum_block_number) as block - from subgraphs.subgraph_deployment d, + select min(h.block_number) as block + from subgraphs.deployment d, + subgraphs.head h, subgraphs.subgraph_deployment_assignment a, deployment_schemas ds - where ds.subgraph = d.deployment + where ds.id = d.id + and h.id = d.id and a.id = d.id and not d.failed and ds.network = $2) a;"; diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index eaa3a185797..13b54b2a6b5 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -2,18 +2,17 @@ //! into these methods must be for the shard that holds the actual //! deployment data and metadata use crate::{advisory_lock, detail::GraphNodeVersion, primary::DeploymentId}; +use diesel::pg::PgConnection; use diesel::{ connection::SimpleConnection, dsl::{count, delete, insert_into, now, select, sql, update}, sql_types::{Bool, Integer}, }; -use diesel::{expression::SqlLiteral, pg::PgConnection, sql_types::Numeric}; use diesel::{ prelude::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl}, sql_query, sql_types::{Nullable, Text}, }; -use graph::semver::Version; use graph::{ blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError, @@ -21,11 +20,12 @@ use graph::{ schema::EntityType, slog::{debug, Logger}, }; +use graph::{components::store::StoreResult, semver::Version}; use graph::{ data::store::scalar::ToPrimitive, prelude::{ - anyhow, hex, web3::types::H256, BigDecimal, BlockNumber, BlockPtr, DeploymentHash, - DeploymentState, StoreError, + anyhow, hex, web3::types::H256, BlockNumber, BlockPtr, DeploymentHash, DeploymentState, + StoreError, }, schema::InputSchema, }; @@ -131,29 +131,48 @@ impl OnSync { } table! { - subgraphs.subgraph_deployment (id) { + /// Deployment metadata that changes on every block + subgraphs.head (id) { id -> Integer, - deployment -> Text, - failed -> Bool, + block_hash -> Nullable, + block_number -> Nullable, + entity_count -> Int8, + firehose_cursor -> Nullable, + } +} + +table! { + /// Deployment metadata that changes less frequently + subgraphs.deployment (id) { + id -> Integer, + + /// The IPFS hash of the deployment. We would like to call this + /// 'deployment', but Diesel doesn't let us have a column with the + /// same name as the table + subgraph -> Text, + + earliest_block_number -> Integer, + health -> crate::deployment::SubgraphHealthMapping, - synced_at -> Nullable, - synced_at_block_number -> Nullable, + failed -> Bool, fatal_error -> Nullable, non_fatal_errors -> Array, - earliest_block_number -> Integer, - latest_ethereum_block_hash -> Nullable, - latest_ethereum_block_number -> Nullable, - last_healthy_ethereum_block_hash -> Nullable, - last_healthy_ethereum_block_number -> Nullable, - entity_count -> Numeric, + graft_base -> Nullable, graft_block_hash -> Nullable, - graft_block_number -> Nullable, - debug_fork -> Nullable, + graft_block_number -> Nullable, + reorg_count -> Integer, current_reorg_depth -> Integer, max_reorg_depth -> Integer, - firehose_cursor -> Nullable, + + last_healthy_ethereum_block_hash -> Nullable, + last_healthy_ethereum_block_number -> Nullable, + + debug_fork -> Nullable, + + synced_at -> Nullable, + synced_at_block_number -> Nullable, } } @@ -207,7 +226,9 @@ table! { } } -allow_tables_to_appear_in_same_query!(subgraph_deployment, subgraph_error, subgraph_manifest); +allow_tables_to_appear_in_same_query!(subgraph_error, subgraph_manifest, head, deployment); + +joinable!(head -> deployment(id)); /// Look up the graft point for the given subgraph in the database and /// return it. If `pending_only` is `true`, only return `Some(_)` if the @@ -218,15 +239,17 @@ fn graft( id: &DeploymentHash, pending_only: bool, ) -> Result, StoreError> { - use subgraph_deployment as sd; + use deployment as sd; + use head as h; let graft_query = sd::table .select((sd::graft_base, sd::graft_block_hash, sd::graft_block_number)) - .filter(sd::deployment.eq(id.as_str())); + .filter(sd::subgraph.eq(id.as_str())); // The name of the base subgraph, the hash, and block number - let graft: (Option, Option>, Option) = if pending_only { + let graft: (Option, Option>, Option) = if pending_only { graft_query - .filter(sd::latest_ethereum_block_number.is_null()) + .inner_join(h::table) + .filter(h::block_number.is_null()) .first(conn) .optional()? .unwrap_or((None, None, None)) @@ -286,11 +309,11 @@ pub fn debug_fork( conn: &mut PgConnection, id: &DeploymentHash, ) -> Result, StoreError> { - use subgraph_deployment as sd; + use deployment as sd; let debug_fork: Option = sd::table .select(sd::debug_fork) - .filter(sd::deployment.eq(id.as_str())) + .filter(sd::subgraph.eq(id.as_str())) .first(conn)?; match debug_fork { @@ -400,6 +423,18 @@ pub fn set_manifest_raw_yaml( .map_err(|e| e.into()) } +/// Most of the time, this will be a noop; the only time we actually modify +/// the deployment table is the first forward block after a reorg +fn reset_reorg_count(conn: &mut PgConnection, site: &Site) -> StoreResult<()> { + use deployment as d; + + update(d::table.filter(d::id.eq(site.id))) + .filter(d::current_reorg_depth.gt(0)) + .set(d::current_reorg_depth.eq(0)) + .execute(conn)?; + Ok(()) +} + pub fn transact_block( conn: &mut PgConnection, site: &Site, @@ -407,10 +442,8 @@ pub fn transact_block( firehose_cursor: &FirehoseCursor, count: i32, ) -> Result { - use subgraph_deployment as d; - - // Work around a Diesel issue with serializing BigDecimals to numeric - let number = format!("{}::numeric", ptr.number); + use deployment as d; + use head as h; let count_sql = entity_count_sql(count); @@ -419,7 +452,7 @@ pub fn transact_block( // Performance note: This costs us an extra DB query on every update. We used to put this in the // `where` clause of the `update` statement, but that caused Postgres to use bitmap scans instead // of a simple primary key lookup. So a separate query it is. - let block_ptr = block_ptr(conn, &site.deployment)?; + let block_ptr = block_ptr(conn, &site)?; if let Some(block_ptr_from) = block_ptr { if block_ptr_from.number >= ptr.number { return Err(StoreError::DuplicateBlockProcessing( @@ -429,21 +462,32 @@ pub fn transact_block( } } - let rows = update(d::table.filter(d::id.eq(site.id))) + reset_reorg_count(conn, site)?; + + let rows = update(h::table.filter(h::id.eq(site.id))) .set(( - d::latest_ethereum_block_number.eq(sql(&number)), - d::latest_ethereum_block_hash.eq(ptr.hash_slice()), - d::firehose_cursor.eq(firehose_cursor.as_ref()), - d::entity_count.eq(sql(&count_sql)), - d::current_reorg_depth.eq(0), + h::block_number.eq(ptr.number), + h::block_hash.eq(ptr.hash_slice()), + h::firehose_cursor.eq(firehose_cursor.as_ref()), + h::entity_count.eq(sql(&count_sql)), )) - .returning(d::earliest_block_number) - .get_results::(conn) + .execute(conn) .map_err(StoreError::from)?; - match rows.len() { + match rows { // Common case: A single row was updated. - 1 => Ok(rows[0]), + 1 => { + // It's not strictly necessary to load the earliest block every + // time this method is called; if these queries slow things down + // too much, we should cache the earliest block number since it + // is only needed to determine whether a pruning run should be + // kicked off + d::table + .filter(d::id.eq(site.id)) + .select(d::earliest_block_number) + .get_result::(conn) + .map_err(StoreError::from) + } // No matching rows were found. This is logically impossible, as the `block_ptr` would have // caught a non-existing deployment. @@ -460,27 +504,21 @@ pub fn transact_block( pub fn forward_block_ptr( conn: &mut PgConnection, - id: &DeploymentHash, + site: &Site, ptr: &BlockPtr, ) -> Result<(), StoreError> { use crate::diesel::BoolExpressionMethods; - use subgraph_deployment as d; + use head as h; - // Work around a Diesel issue with serializing BigDecimals to numeric - let number = format!("{}::numeric", ptr.number); + reset_reorg_count(conn, site)?; - let row_count = update( - d::table.filter(d::deployment.eq(id.as_str())).filter( - // Asserts that the processing direction is forward. - d::latest_ethereum_block_number - .lt(sql(&number)) - .or(d::latest_ethereum_block_number.is_null()), - ), - ) + let row_count = update(h::table.filter(h::id.eq(site.id)).filter( + // Asserts that the processing direction is forward. + h::block_number.lt(ptr.number).or(h::block_number.is_null()), + )) .set(( - d::latest_ethereum_block_number.eq(sql(&number)), - d::latest_ethereum_block_hash.eq(ptr.hash_slice()), - d::current_reorg_depth.eq(0), + h::block_number.eq(ptr.number), + h::block_hash.eq(ptr.hash_slice()), )) .execute(conn) .map_err(StoreError::from)?; @@ -491,10 +529,10 @@ pub fn forward_block_ptr( // No matching rows were found. This is an error. By the filter conditions, this can only be // due to a missing deployment (which `block_ptr` catches) or duplicate block processing. - 0 => match block_ptr(conn, id)? { - Some(block_ptr_from) if block_ptr_from.number >= ptr.number => { - Err(StoreError::DuplicateBlockProcessing(id.clone(), ptr.number)) - } + 0 => match block_ptr(conn, &site)? { + Some(block_ptr_from) if block_ptr_from.number >= ptr.number => Err( + StoreError::DuplicateBlockProcessing(site.deployment.clone(), ptr.number), + ), None | Some(_) => Err(StoreError::Unknown(anyhow!( "unknown error forwarding block ptr" ))), @@ -511,11 +549,11 @@ pub fn get_subgraph_firehose_cursor( conn: &mut PgConnection, site: Arc, ) -> Result, StoreError> { - use subgraph_deployment as d; + use head as h; - let res = d::table - .filter(d::deployment.eq(site.deployment.as_str())) - .select(d::firehose_cursor) + let res = h::table + .filter(h::id.eq(site.id)) + .select(h::firehose_cursor) .first::>(conn) .map_err(StoreError::from); res @@ -523,14 +561,12 @@ pub fn get_subgraph_firehose_cursor( pub fn revert_block_ptr( conn: &mut PgConnection, - id: &DeploymentHash, + site: &Site, ptr: BlockPtr, firehose_cursor: &FirehoseCursor, ) -> Result<(), StoreError> { - use subgraph_deployment as d; - - // Work around a Diesel issue with serializing BigDecimals to numeric - let number = format!("{}::numeric", ptr.number); + use deployment as d; + use head as h; // Intention is to revert to a block lower than the reorg threshold, on the other // hand the earliest we can possibly go is genesys block, so go to genesys even @@ -538,19 +574,24 @@ pub fn revert_block_ptr( let earliest_block = i32::max(ptr.number - ENV_VARS.reorg_threshold(), 0); let affected_rows = update( d::table - .filter(d::deployment.eq(id.as_str())) + .filter(d::id.eq(site.id)) .filter(d::earliest_block_number.le(earliest_block)), ) .set(( - d::latest_ethereum_block_number.eq(sql(&number)), - d::latest_ethereum_block_hash.eq(ptr.hash_slice()), - d::firehose_cursor.eq(firehose_cursor.as_ref()), d::reorg_count.eq(d::reorg_count + 1), d::current_reorg_depth.eq(d::current_reorg_depth + 1), d::max_reorg_depth.eq(sql("greatest(current_reorg_depth + 1, max_reorg_depth)")), )) .execute(conn)?; + update(h::table.filter(h::id.eq(site.id))) + .set(( + h::block_number.eq(ptr.number), + h::block_hash.eq(ptr.hash_slice()), + h::firehose_cursor.eq(firehose_cursor.as_ref()), + )) + .execute(conn)?; + match affected_rows { 1 => Ok(()), 0 => Err(StoreError::Unknown(anyhow!( @@ -563,26 +604,27 @@ pub fn revert_block_ptr( } } -pub fn block_ptr( - conn: &mut PgConnection, - id: &DeploymentHash, -) -> Result, StoreError> { - use subgraph_deployment as d; +pub fn block_ptr(conn: &mut PgConnection, site: &Site) -> Result, StoreError> { + use head as h; - let (number, hash) = d::table - .filter(d::deployment.eq(id.as_str())) - .select(( - d::latest_ethereum_block_number, - d::latest_ethereum_block_hash, - )) - .first::<(Option, Option>)>(conn) + let (number, hash) = h::table + .filter(h::id.eq(site.id)) + .select((h::block_number, h::block_hash)) + .first::<(Option, Option>)>(conn) .map_err(|e| match e { - diesel::result::Error::NotFound => StoreError::DeploymentNotFound(id.to_string()), + diesel::result::Error::NotFound => { + StoreError::DeploymentNotFound(site.deployment.to_string()) + } e => e.into(), })?; - let ptr = crate::detail::block(id.as_str(), "latest_ethereum_block", hash, number)? - .map(|block| block.to_ptr()); + let ptr = crate::detail::block( + site.deployment.as_str(), + "latest_ethereum_block", + hash, + number, + )? + .map(|block| block.to_ptr()); Ok(ptr) } @@ -590,12 +632,12 @@ pub fn block_ptr( /// `latest_ethereum_block` is set already, do nothing. If it is still /// `null`, set it to `start_ethereum_block` from `subgraph_manifest` pub fn initialize_block_ptr(conn: &mut PgConnection, site: &Site) -> Result<(), StoreError> { - use subgraph_deployment as d; + use head as h; use subgraph_manifest as m; - let needs_init = d::table - .filter(d::id.eq(site.id)) - .select(d::latest_ethereum_block_hash) + let needs_init = h::table + .filter(h::id.eq(site.id)) + .select(h::block_hash) .first::>>(conn) .map_err(|e| { internal_error!( @@ -611,13 +653,8 @@ pub fn initialize_block_ptr(conn: &mut PgConnection, site: &Site) -> Result<(), .select((m::start_block_hash, m::start_block_number)) .first::<(Option>, Option)>(conn)? { - let number = format!("{}::numeric", number); - - update(d::table.filter(d::id.eq(site.id))) - .set(( - d::latest_ethereum_block_hash.eq(&hash), - d::latest_ethereum_block_number.eq(sql(&number)), - )) + update(h::table.filter(h::id.eq(site.id))) + .set((h::block_hash.eq(&hash), h::block_number.eq(number))) .execute(conn) .map(|_| ()) .map_err(|e| e.into()) @@ -644,18 +681,20 @@ fn convert_to_u32(number: Option, field: &str, subgraph: &str) -> Result Result { - use subgraph_deployment as d; +pub fn state(conn: &mut PgConnection, site: &Site) -> Result { + use deployment as d; + use head as h; use subgraph_error as e; match d::table - .filter(d::deployment.eq(id.as_str())) + .inner_join(h::table) + .filter(d::id.eq(site.id)) .select(( - d::deployment, + d::subgraph, d::reorg_count, d::max_reorg_depth, - d::latest_ethereum_block_number, - d::latest_ethereum_block_hash, + h::block_number, + h::block_hash, d::earliest_block_number, d::failed, d::health, @@ -664,7 +703,7 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result, + Option, Option>, BlockNumber, bool, @@ -674,7 +713,7 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result Err(StoreError::QueryExecutionError(format!( "No data found for subgraph {}", - id + site.deployment ))), Some(( _, @@ -686,11 +725,11 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result { - let reorg_count = convert_to_u32(Some(reorg_count), "reorg_count", id.as_str())?; + let reorg_count = convert_to_u32(Some(reorg_count), "reorg_count", &site.deployment)?; let max_reorg_depth = - convert_to_u32(Some(max_reorg_depth), "max_reorg_depth", id.as_str())?; + convert_to_u32(Some(max_reorg_depth), "max_reorg_depth", &site.deployment)?; let latest_block = crate::detail::block( - id.as_str(), + &site.deployment, "latest_block", latest_block_hash, latest_block_number, @@ -699,7 +738,7 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result Result>("min(lower(block_range))")) .first::>(conn)? @@ -716,7 +755,7 @@ pub fn state(conn: &mut PgConnection, id: DeploymentHash) -> Result Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; update( d::table - .filter(d::deployment.eq(id.as_str())) + .filter(d::subgraph.eq(id.as_str())) .filter(d::synced_at.is_null()), ) .set(( @@ -750,7 +789,7 @@ pub fn set_synced( /// Returns `true` if the deployment (as identified by `site.id`) pub fn exists(conn: &mut PgConnection, site: &Site) -> Result { - use subgraph_deployment as d; + use deployment as d; let exists = d::table .filter(d::id.eq(site.id)) @@ -762,10 +801,10 @@ pub fn exists(conn: &mut PgConnection, site: &Site) -> Result /// Returns `true` if the deployment `id` exists and is synced pub fn exists_and_synced(conn: &mut PgConnection, id: &str) -> Result { - use subgraph_deployment as d; + use deployment as d; let synced = d::table - .filter(d::deployment.eq(id)) + .filter(d::subgraph.eq(id)) .select(d::synced_at.is_not_null()) .first(conn) .optional()? @@ -867,9 +906,9 @@ pub fn update_deployment_status( fatal_error: Option, non_fatal_errors: Option>, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; - update(d::table.filter(d::deployment.eq(deployment_id.as_str()))) + update(d::table.filter(d::subgraph.eq(deployment_id.as_str()))) .set(( d::failed.eq(health.is_failed()), d::health.eq(health), @@ -927,7 +966,7 @@ fn check_health( id: &DeploymentHash, block: BlockNumber, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; let has_errors = has_deterministic_errors(conn, id, block)?; @@ -946,7 +985,7 @@ fn check_health( update( d::table - .filter(d::deployment.eq(id.as_str())) + .filter(d::subgraph.eq(id.as_str())) .filter(d::health.eq(old)), ) .set(d::health.eq(new)) @@ -959,7 +998,7 @@ pub(crate) fn health( conn: &mut PgConnection, id: DeploymentId, ) -> Result { - use subgraph_deployment as d; + use deployment as d; d::table .filter(d::id.eq(id)) @@ -997,7 +1036,7 @@ pub(crate) fn revert_subgraph_errors( id: &DeploymentHash, reverted_block: BlockNumber, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; use subgraph_error as e; let lower_geq = format!("lower({}) >= ", BLOCK_RANGE_COLUMN); @@ -1018,7 +1057,7 @@ pub(crate) fn revert_subgraph_errors( // unfail the statuses. update( d::table - .filter(d::deployment.eq(id.as_str())) + .filter(d::subgraph.eq(id.as_str())) .filter(d::failed.eq(true)) .filter(d::health.eq(SubgraphHealth::Failed)), ) @@ -1106,33 +1145,31 @@ pub fn drop_schema( } pub fn drop_metadata(conn: &mut PgConnection, site: &Site) -> Result<(), StoreError> { - use subgraph_deployment as d; + use head as h; - // We don't need to delete from subgraph_manifest or subgraph_error - // since that cascades from deleting the subgraph_deployment - delete(d::table.filter(d::id.eq(site.id))).execute(conn)?; + // We don't need to delete from `deployment`, `subgraph_manifest`, or + // `subgraph_error` since that cascades from deleting `head` + delete(h::table.filter(h::id.eq(site.id))).execute(conn)?; Ok(()) } pub fn create_deployment( conn: &mut PgConnection, site: &Site, - deployment: DeploymentCreate, + create: DeploymentCreate, exists: bool, replace: bool, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; + use head as h; use subgraph_manifest as m; fn b(ptr: &Option) -> Option<&[u8]> { ptr.as_ref().map(|ptr| ptr.hash_slice()) } - fn n(ptr: &Option) -> SqlLiteral> { - match ptr { - None => sql("null"), - Some(ptr) => sql(&format!("{}::numeric", ptr.number)), - } + fn n(ptr: &Option) -> Option { + ptr.as_ref().map(|ptr| ptr.number) } let DeploymentCreate { @@ -1152,7 +1189,7 @@ pub fn create_deployment( graft_block, debug_fork, history_blocks_override, - } = deployment; + } = create; let earliest_block_number = start_block.as_ref().map(|ptr| ptr.number).unwrap_or(0); let entities_with_causality_region = Vec::from_iter( entities_with_causality_region @@ -1160,17 +1197,22 @@ pub fn create_deployment( .map(|et| et.typename().to_owned()), ); + let head_values = ( + h::id.eq(site.id), + h::block_number.eq(sql("null")), + h::block_hash.eq(sql("null")), + h::firehose_cursor.eq(sql("null")), + h::entity_count.eq(sql("0")), + ); + let deployment_values = ( d::id.eq(site.id), - d::deployment.eq(site.deployment.as_str()), + d::subgraph.eq(site.deployment.as_str()), d::failed.eq(false), d::health.eq(SubgraphHealth::Healthy), d::fatal_error.eq::>(None), d::non_fatal_errors.eq::>(vec![]), d::earliest_block_number.eq(earliest_block_number), - d::latest_ethereum_block_hash.eq(sql("null")), - d::latest_ethereum_block_number.eq(sql("null")), - d::entity_count.eq(sql("0")), d::graft_base.eq(graft_base.as_ref().map(|s| s.as_str())), d::graft_block_hash.eq(b(&graft_block)), d::graft_block_number.eq(n(&graft_block)), @@ -1198,7 +1240,11 @@ pub fn create_deployment( ); if exists && replace { - update(d::table.filter(d::deployment.eq(site.deployment.as_str()))) + update(h::table.filter(h::id.eq(site.id))) + .set(head_values) + .execute(conn)?; + + update(d::table.filter(d::subgraph.eq(site.deployment.as_str()))) .set(deployment_values) .execute(conn)?; @@ -1206,6 +1252,8 @@ pub fn create_deployment( .set(manifest_values) .execute(conn)?; } else { + insert_into(h::table).values(head_values).execute(conn)?; + insert_into(d::table) .values(deployment_values) .execute(conn)?; @@ -1226,25 +1274,25 @@ pub fn update_entity_count( site: &Site, count: i32, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use head as h; if count == 0 { return Ok(()); } let count_sql = entity_count_sql(count); - update(d::table.filter(d::id.eq(site.id))) - .set(d::entity_count.eq(sql(&count_sql))) + update(h::table.filter(h::id.eq(site.id))) + .set(h::entity_count.eq(sql(&count_sql))) .execute(conn)?; Ok(()) } /// Set the deployment's entity count back to `0` pub fn clear_entity_count(conn: &mut PgConnection, site: &Site) -> Result<(), StoreError> { - use subgraph_deployment as d; + use head as h; - update(d::table.filter(d::id.eq(site.id))) - .set(d::entity_count.eq(BigDecimal::from(0))) + update(h::table.filter(h::id.eq(site.id))) + .set(h::entity_count.eq(0)) .execute(conn)?; Ok(()) } @@ -1259,7 +1307,7 @@ pub fn set_earliest_block( site: &Site, earliest_block: BlockNumber, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; update(d::table.filter(d::id.eq(site.id))) .set(d::earliest_block_number.eq(earliest_block)) @@ -1276,12 +1324,12 @@ pub fn copy_earliest_block( src: &Site, dst: &Site, ) -> Result<(), StoreError> { - use subgraph_deployment as d; + use deployment as d; let src_nsp = ForeignServer::metadata_schema_in(&src.shard, &dst.shard); let query = format!( - "(select earliest_block_number from {src_nsp}.subgraph_deployment where id = {})", + "(select earliest_block_number from {src_nsp}.deployment where id = {})", src.id ); diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d4618aef3ad..ea78d1d2e09 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -523,7 +523,7 @@ impl DeploymentStore { conn: &mut PgConnection, site: Arc, ) -> Result, StoreError> { - deployment::block_ptr(conn, &site.deployment) + deployment::block_ptr(conn, &site) } pub(crate) fn deployment_details( @@ -604,7 +604,7 @@ impl DeploymentStore { const QUERY: &str = " delete from subgraphs.dynamic_ethereum_contract_data_source; delete from subgraphs.subgraph; - delete from subgraphs.subgraph_deployment; + delete from subgraphs.head; delete from subgraphs.subgraph_deployment_assignment; delete from subgraphs.subgraph_version; delete from subgraphs.subgraph_manifest; @@ -621,7 +621,7 @@ impl DeploymentStore { pub(crate) async fn vacuum(&self) -> Result<(), StoreError> { self.with_conn(|conn, _| { - conn.batch_execute("vacuum (analyze) subgraphs.subgraph_deployment")?; + conn.batch_execute("vacuum (analyze) subgraphs.head, subgraphs.deployment")?; Ok(()) }) .await @@ -834,7 +834,7 @@ impl DeploymentStore { ) -> Result, CancelableError> { let layout = store.layout(&mut conn, site.clone())?; cancel.check_cancel()?; - let state = deployment::state(&mut conn, site.deployment.clone())?; + let state = deployment::state(&mut conn, &site)?; if state.latest_block.number <= req.history_blocks { // We haven't accumulated enough history yet, nothing to prune @@ -1297,12 +1297,7 @@ impl DeploymentStore { // The revert functions want the number of the first block that we need to get rid of let block = block_ptr_to.number + 1; - deployment::revert_block_ptr( - conn, - &site.deployment, - block_ptr_to, - firehose_cursor, - )?; + deployment::revert_block_ptr(conn, &site, block_ptr_to, firehose_cursor)?; // Revert the data let layout = self.layout(conn, site.clone())?; @@ -1416,11 +1411,11 @@ impl DeploymentStore { self.rewind_or_truncate_with_conn(&mut conn, site, block_ptr_to, firehose_cursor, false) } - pub(crate) async fn deployment_state_from_id( + pub(crate) async fn deployment_state( &self, - id: DeploymentHash, + site: Arc, ) -> Result { - self.with_conn(|conn, _| deployment::state(conn, id).map_err(|e| e.into())) + self.with_conn(move |conn, _| deployment::state(conn, &site).map_err(|e| e.into())) .await } @@ -1604,7 +1599,7 @@ impl DeploymentStore { // Set the block ptr to the graft point to signal that we successfully // performed the graft - crate::deployment::forward_block_ptr(conn, &dst.site.deployment, &block)?; + crate::deployment::forward_block_ptr(conn, &dst.site, &block)?; info!(logger, "Subgraph successfully initialized"; "time_ms" => start.elapsed().as_millis()); Ok(()) diff --git a/store/postgres/src/detail.rs b/store/postgres/src/detail.rs index 1f127e1e2ab..0be3909a2c9 100644 --- a/store/postgres/src/detail.rs +++ b/store/postgres/src/detail.rs @@ -12,9 +12,10 @@ use git_testament::{git_testament, git_testament_macros}; use graph::blockchain::BlockHash; use graph::data::store::scalar::ToPrimitive; use graph::data::subgraph::schema::{SubgraphError, SubgraphManifestEntity}; +use graph::prelude::BlockNumber; use graph::prelude::{ chrono::{DateTime, Utc}, - BigDecimal, BlockPtr, DeploymentHash, StoreError, SubgraphDeploymentEntity, + BlockPtr, DeploymentHash, StoreError, SubgraphDeploymentEntity, }; use graph::schema::InputSchema; use graph::{data::subgraph::status, internal_error, prelude::web3::types::H256}; @@ -24,8 +25,8 @@ use std::convert::TryFrom; use std::{ops::Bound, sync::Arc}; use crate::deployment::{ - graph_node_versions, subgraph_deployment, subgraph_error, subgraph_manifest, - SubgraphHealth as HealthType, + deployment as subgraph_deployment, graph_node_versions, head as subgraph_head, subgraph_error, + subgraph_manifest, SubgraphHealth as HealthType, }; use crate::primary::{DeploymentId, Site}; @@ -39,29 +40,100 @@ const CARGO_PKG_VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); type Bytes = Vec; -#[derive(Queryable, Selectable)] -#[diesel(table_name = subgraph_deployment)] -// We map all fields to make loading `Detail` with diesel easier, but we -// don't need all the fields pub struct DeploymentDetail { pub id: DeploymentId, - pub deployment: String, - pub failed: bool, + pub subgraph: String, + /// The earliest block for which we have history + pub earliest_block_number: i32, health: HealthType, + pub failed: bool, + graft_base: Option, + graft_block_hash: Option, + graft_block_number: Option, + reorg_count: i32, + current_reorg_depth: i32, + max_reorg_depth: i32, + debug_fork: Option, pub synced_at: Option>, pub synced_at_block_number: Option, + pub block_hash: Option, + pub block_number: Option, + pub entity_count: usize, +} + +#[derive(Queryable, Selectable)] +#[diesel(table_name = subgraph_deployment)] +struct Deployment { + id: DeploymentId, + subgraph: String, /// The earliest block for which we have history - pub earliest_block_number: i32, - pub latest_ethereum_block_hash: Option, - pub latest_ethereum_block_number: Option, - pub entity_count: BigDecimal, + earliest_block_number: i32, + health: HealthType, + failed: bool, graft_base: Option, graft_block_hash: Option, - graft_block_number: Option, - debug_fork: Option, + graft_block_number: Option, reorg_count: i32, current_reorg_depth: i32, max_reorg_depth: i32, + debug_fork: Option, + synced_at: Option>, + synced_at_block_number: Option, +} + +#[derive(Queryable, Selectable)] +#[diesel(table_name = subgraph_head)] +struct Head { + block_hash: Option, + block_number: Option, + entity_count: i64, +} + +impl From<(Deployment, Head)> for DeploymentDetail { + fn from((deployment, head): (Deployment, Head)) -> Self { + let Deployment { + id, + subgraph, + earliest_block_number, + health, + failed, + graft_base, + graft_block_hash, + graft_block_number, + reorg_count, + current_reorg_depth, + max_reorg_depth, + debug_fork, + synced_at, + synced_at_block_number, + } = deployment; + + let Head { + block_hash, + block_number, + entity_count, + } = head; + + Self { + id, + subgraph, + earliest_block_number, + health, + failed, + graft_base, + graft_block_hash, + graft_block_number, + reorg_count, + current_reorg_depth, + max_reorg_depth, + debug_fork, + synced_at, + synced_at_block_number, + block_hash: block_hash.clone(), + block_number: block_number.clone(), + entity_count: entity_count as usize, + } + } } #[derive(Queryable, Selectable)] @@ -89,7 +161,7 @@ impl ErrorDetail { use subgraph_error as e; d::table - .filter(d::deployment.eq(deployment_id.as_str())) + .filter(d::subgraph.eq(deployment_id.as_str())) .inner_join(e::table.on(e::id.nullable().eq(d::fatal_error))) .select(ErrorDetail::as_select()) .get_result(conn) @@ -141,23 +213,13 @@ pub(crate) fn block( id: &str, name: &str, hash: Option>, - number: Option, + number: Option, ) -> Result, StoreError> { match (hash, number) { - (Some(hash), Some(number)) => { - let number = number.to_i32().ok_or_else(|| { - internal_error!( - "the block number {} for {} in {} is not representable as an i32", - number, - name, - id - ) - })?; - Ok(Some(status::EthereumBlock::new( - BlockHash(hash.into_boxed_slice()), - number, - ))) - } + (Some(hash), Some(number)) => Ok(Some(status::EthereumBlock::new( + BlockHash(hash.into_boxed_slice()), + number, + ))), (None, None) => Ok(None), (hash, number) => Err(internal_error!( "the hash and number \ @@ -180,13 +242,13 @@ pub(crate) fn info_from_details( ) -> Result { let DeploymentDetail { id, - deployment, + subgraph, failed: _, health, synced_at, earliest_block_number, - latest_ethereum_block_hash, - latest_ethereum_block_number, + block_hash, + block_number, entity_count, graft_base: _, graft_block_hash: _, @@ -200,18 +262,13 @@ pub(crate) fn info_from_details( let site = sites .iter() - .find(|site| site.deployment.as_str() == deployment) - .ok_or_else(|| internal_error!("missing site for subgraph `{}`", deployment))?; + .find(|site| site.deployment.as_str() == subgraph) + .ok_or_else(|| internal_error!("missing site for subgraph `{}`", subgraph))?; // This needs to be filled in later since it lives in a // different shard let chain_head_block = None; - let latest_block = block( - &deployment, - "latest_ethereum_block", - latest_ethereum_block_hash, - latest_ethereum_block_number, - )?; + let latest_block = block(&subgraph, "latest_ethereum_block", block_hash, block_number)?; let health = health.into(); let chain = status::ChainInfo { network: site.network.clone(), @@ -222,7 +279,7 @@ pub(crate) fn info_from_details( let entity_count = entity_count.to_u64().ok_or_else(|| { internal_error!( "the entityCount for {} is not representable as a u64", - deployment + subgraph ) })?; let fatal_error = fatal.map(SubgraphError::try_from).transpose()?; @@ -234,7 +291,7 @@ pub(crate) fn info_from_details( // 'node' needs to be filled in later from a different shard Ok(status::Info { id: id.into(), - subgraph: deployment, + subgraph, synced: synced_at.is_some(), health, paused: None, @@ -253,18 +310,26 @@ pub(crate) fn deployment_details( deployments: Vec, ) -> Result, StoreError> { use subgraph_deployment as d; + use subgraph_head as h; - let cols = DeploymentDetail::as_select(); + let cols = <(Deployment, Head)>::as_select(); // Empty deployments means 'all of them' let details = if deployments.is_empty() { - d::table.select(cols).load::(conn)? + d::table + .inner_join(h::table) + .select(cols) + .load::<(Deployment, Head)>(conn)? } else { d::table - .filter(d::deployment.eq_any(&deployments)) + .inner_join(h::table) + .filter(d::subgraph.eq_any(&deployments)) .select(cols) - .load::(conn)? - }; + .load::<(Deployment, Head)>(conn)? + } + .into_iter() + .map(DeploymentDetail::from) + .collect(); Ok(details) } @@ -274,14 +339,17 @@ pub(crate) fn deployment_details_for_id( deployment: &DeploymentId, ) -> Result { use subgraph_deployment as d; + use subgraph_head as h; - let cols = DeploymentDetail::as_select(); + let cols = <(Deployment, Head)>::as_select(); d::table + .inner_join(h::table) .filter(d::id.eq(&deployment)) .select(cols) - .first::(conn) + .first::<(Deployment, Head)>(conn) .map_err(StoreError::from) + .map(DeploymentDetail::from) } pub(crate) fn deployment_statuses( @@ -290,6 +358,7 @@ pub(crate) fn deployment_statuses( ) -> Result, StoreError> { use subgraph_deployment as d; use subgraph_error as e; + use subgraph_head as h; use subgraph_manifest as sm; // First, we fetch all deployment information along with any fatal errors. @@ -299,27 +368,28 @@ pub(crate) fn deployment_statuses( let details_with_fatal_error = { let join = e::table.on(e::id.nullable().eq(d::fatal_error)); - let cols = <(DeploymentDetail, Option)>::as_select(); + let cols = <(Deployment, Head, Option)>::as_select(); // Empty deployments means 'all of them' if sites.is_empty() { d::table + .inner_join(h::table) .left_outer_join(join) .select(cols) - .load::<(DeploymentDetail, Option)>(conn)? + .load::<(Deployment, Head, Option)>(conn)? } else { d::table + .inner_join(h::table) .left_outer_join(join) .filter(d::id.eq_any(sites.iter().map(|site| site.id))) .select(cols) - .load::<(DeploymentDetail, Option)>(conn)? + .load::<(Deployment, Head, Option)>(conn)? } }; let mut non_fatal_errors = { #[allow(deprecated)] - let join = - e::table.on(e::id.eq(sql("any(subgraphs.subgraph_deployment.non_fatal_errors)"))); + let join = e::table.on(e::id.eq(sql("any(subgraphs.deployment.non_fatal_errors)"))); if sites.is_empty() { d::table @@ -354,7 +424,8 @@ pub(crate) fn deployment_statuses( details_with_fatal_error .into_iter() - .map(|(detail, fatal)| { + .map(|(deployment, head, fatal)| { + let detail = DeploymentDetail::from((deployment, head)); let non_fatal = non_fatal_errors.remove(&detail.id).unwrap_or_default(); let subgraph_history_blocks = history_blocks_map.remove(&detail.id).unwrap_or_default(); info_from_details(detail, fatal, non_fatal, sites, subgraph_history_blocks) @@ -412,7 +483,7 @@ impl StoredDeploymentEntity { let (detail, manifest) = (self.0, self.1); let start_block = block( - &detail.deployment, + &detail.subgraph, "start_block", manifest.start_block_hash.clone(), manifest.start_block_number.map(|n| n.into()), @@ -420,15 +491,15 @@ impl StoredDeploymentEntity { .map(|block| block.to_ptr()); let latest_block = block( - &detail.deployment, + &detail.subgraph, "latest_block", - detail.latest_ethereum_block_hash, - detail.latest_ethereum_block_number, + detail.block_hash, + detail.block_number, )? .map(|block| block.to_ptr()); let graft_block = block( - &detail.deployment, + &detail.subgraph, "graft_block", detail.graft_block_hash, detail.graft_block_number, @@ -473,6 +544,7 @@ pub fn deployment_entity( schema: &InputSchema, ) -> Result { use subgraph_deployment as d; + use subgraph_head as h; use subgraph_manifest as m; let manifest = m::table @@ -481,9 +553,11 @@ pub fn deployment_entity( .first::(conn)?; let detail = d::table - .find(site.id) - .select(DeploymentDetail::as_select()) - .first::(conn)?; + .inner_join(h::table) + .filter(d::id.eq(site.id)) + .select(<(Deployment, Head)>::as_select()) + .first::<(Deployment, Head)>(conn) + .map(DeploymentDetail::from)?; StoredDeploymentEntity(detail, manifest).as_subgraph_deployment(schema) } diff --git a/store/postgres/src/jobs.rs b/store/postgres/src/jobs.rs index a150598427e..d8177667183 100644 --- a/store/postgres/src/jobs.rs +++ b/store/postgres/src/jobs.rs @@ -49,10 +49,11 @@ pub fn register( ); } -/// A job that vacuums `subgraphs.subgraph_deployment`. With a large number -/// of subgraphs, the autovacuum daemon might not run often enough to keep -/// this table, which is _very_ write-heavy, from getting bloated. We -/// therefore set up a separate job that vacuums the table once a minute +/// A job that vacuums `subgraphs.deployment` and `subgraphs.head`. With a +/// large number of subgraphs, the autovacuum daemon might not run often +/// enough to keep this table, which is _very_ write-heavy, from getting +/// bloated. We therefore set up a separate job that vacuums the table once +/// a minute struct VacuumDeploymentsJob { store: Arc, } @@ -66,16 +67,13 @@ impl VacuumDeploymentsJob { #[async_trait] impl Job for VacuumDeploymentsJob { fn name(&self) -> &str { - "Vacuum subgraphs.subgraph_deployment" + "Vacuum subgraphs.deployment and subgraphs.head" } async fn run(&self, logger: &Logger) { for res in self.store.vacuum().await { if let Err(e) = res { - error!( - logger, - "Vacuum of subgraphs.subgraph_deployment failed: {}", e - ); + error!(logger, "Vacuum of subgraphs.deployment failed: {}", e); } } } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index 628a977ff9b..a94238fd62f 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -59,7 +59,8 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [ "copy_state", "copy_table_state", "dynamic_ethereum_contract_data_source", - "subgraph_deployment", + "head", + "deployment", "subgraph_error", "subgraph_manifest", "table_stats", diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index d28a9efbc07..50e358dc388 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1638,10 +1638,10 @@ impl<'a> Connection<'a> { for detail in details { let (latest_hash, latest_number) = block( - &detail.deployment, + &detail.subgraph, "latest_ethereum_block", - detail.latest_ethereum_block_hash.clone(), - detail.latest_ethereum_block_number.clone(), + detail.block_hash.clone(), + detail.block_number.clone(), )? .map(|b| b.to_ptr()) .map(|ptr| (Some(Vec::from(ptr.hash_slice())), Some(ptr.number))) diff --git a/store/postgres/src/query_store.rs b/store/postgres/src/query_store.rs index fe7d084030b..ab6c43e55fd 100644 --- a/store/postgres/src/query_store.rs +++ b/store/postgres/src/query_store.rs @@ -117,10 +117,7 @@ impl QueryStoreTrait for QueryStore { } async fn deployment_state(&self) -> Result { - Ok(self - .store - .deployment_state_from_id(self.site.deployment.clone()) - .await?) + Ok(self.store.deployment_state(self.site.cheap_clone()).await?) } fn api_schema(&self) -> Result, QueryExecutionError> { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 1a7a917efff..5cfdcbc2f1f 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -182,11 +182,12 @@ pub mod unused { /// metadata is stored in tables in the `subgraphs` namespace in the same /// shard as the deployment data. The most important of these tables are /// -/// - `subgraphs.subgraph_deployment`: the main table for deployment -/// metadata; most importantly, it stores the pointer to the current -/// subgraph head, i.e., the block up to which the subgraph has indexed -/// the chain, together with other things like whether the subgraph has -/// synced, whether it has failed and whether it encountered any errors +/// - `subgraphs.deployment` and `subgraphs.head`: the main table for +/// deployment metadata; most importantly, it stores the pointer to the +/// current subgraph head, i.e., the block up to which the subgraph has +/// indexed the chain, together with other things like whether the +/// subgraph has synced, whether it has failed and whether it encountered +/// any errors /// - `subgraphs.subgraph_manifest`: immutable information derived from the /// YAML manifest for the deployment /// - `subgraphs.dynamic_ethereum_contract_data_source`: the data sources @@ -1011,7 +1012,7 @@ impl SubgraphStoreInner { store.error_count(id) } - /// Vacuum the `subgraph_deployment` table in each shard + /// Vacuum the `head` and `deployment` table in each shard pub(crate) async fn vacuum(&self) -> Vec> { join_all(self.stores.values().map(|store| store.vacuum())).await }