Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 2 additions & 44 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 13 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ members = [
"chain/*",
"graphql",
"node",
"runtime/*",
"server/*",
"store/*",
"substreams/*",
"runtime/derive",
"runtime/test",
"runtime/wasm",
"server/graphman",
"server/http",
"server/index-node",
"server/json-rpc",
"server/metrics",
"store/postgres",
"store/test-store",
"substreams/substreams-head-tracker",
"substreams/substreams-trigger-filter",
"substreams/trigger-filters",
"graph",
"tests",
"graph/derive",
Expand Down
4 changes: 2 additions & 2 deletions core/graphman/src/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph::prelude::EntityChange;
use graph::prelude::AssignmentChange;
use graph::prelude::NodeId;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub fn reassign_deployment(
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes: Vec<EntityChange> = match catalog_conn
let changes: Vec<AssignmentChange> = match catalog_conn
.assigned_node(&deployment.site)
.map_err(GraphmanError::from)?
{
Expand Down
24 changes: 5 additions & 19 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,11 @@ where
let logger = self.logger.clone();

self.subscription_manager
.subscribe(FromIterator::from_iter([SubscriptionFilter::Assignment]))
.subscribe()
.map_err(|()| anyhow!("Entity change stream failed"))
.map(|event| {
// We're only interested in the SubgraphDeploymentAssignment change; we
// know that there is at least one, as that is what we subscribed to
let filter = SubscriptionFilter::Assignment;
let assignments = event
.changes
.iter()
.filter(|change| filter.matches(change))
.map(|change| match change {
EntityChange::Data { .. } => unreachable!(),
EntityChange::Assignment {
deployment,
operation,
} => (deployment.clone(), operation.clone()),
})
.collect::<Vec<_>>();
stream::iter_ok(assignments)
let changes: Vec<_> = event.changes.iter().cloned().map(AssignmentChange::into_parts).collect();
stream::iter_ok(changes)
})
.flatten()
.and_then(
Expand All @@ -183,7 +169,7 @@ where
);

match operation {
EntityChangeOperation::Set => {
AssignmentOperation::Set => {
store
.assignment_status(&deployment)
.map_err(|e| {
Expand Down Expand Up @@ -220,7 +206,7 @@ where
}
})
}
EntityChangeOperation::Removed => {
AssignmentOperation::Removed => {
// Send remove event without checking node ID.
// If node ID does not match, then this is a no-op when handled in
// assignment provider.
Expand Down
17 changes: 4 additions & 13 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,10 @@ those.
result is checked while the response is being constructed, so that
execution does not take more memory than what is configured. The default
value for both is unlimited.
- `GRAPH_GRAPHQL_MAX_OPERATIONS_PER_CONNECTION`: maximum number of GraphQL
operations per WebSocket connection. Any operation created after the limit
will return an error to the client. Default: 1000.
- `GRAPH_GRAPHQL_HTTP_PORT` : Port for the GraphQL HTTP server
- `GRAPH_GRAPHQL_WS_PORT` : Port for the GraphQL WebSocket server
- `GRAPH_SQL_STATEMENT_TIMEOUT`: the maximum number of seconds an
individual SQL query is allowed to take during GraphQL
execution. Default: unlimited
- `GRAPH_DISABLE_SUBSCRIPTION_NOTIFICATIONS`: disables the internal
mechanism that is used to trigger updates on GraphQL subscriptions. When
this variable is set to any value, `graph-node` will still accept GraphQL
subscriptions, but they won't receive any updates.
- `ENABLE_GRAPHQL_VALIDATIONS`: enables GraphQL validations, based on the GraphQL specification.
This will validate and ensure every query executes follows the execution
rules. Default: `false`
Expand Down Expand Up @@ -185,11 +177,10 @@ those.
query, and the `query_id` of the GraphQL query that caused the SQL
query. These SQL queries are marked with `component: GraphQlRunner` There
are additional SQL queries that get logged when `sql` is given. These are
queries caused by mappings when processing blocks for a subgraph, and
queries caused by subscriptions. If `cache` is present in addition to
`gql`, also logs information for each toplevel GraphQL query field
whether that could be retrieved from cache or not. Defaults to no
logging.
queries caused by mappings when processing blocks for a subgraph. If
`cache` is present in addition to `gql`, also logs information for each
toplevel GraphQL query field whether that could be retrieved from cache
or not. Defaults to no logging.
- `GRAPH_LOG_TIME_FORMAT`: Custom log time format.Default value is `%b %d %H:%M:%S%.3f`. More information [here](https://docs.rs/chrono/latest/chrono/#formatting-and-parsing).
- `STORE_CONNECTION_POOL_SIZE`: How many simultaneous connections to allow to the store.
Due to implementation details, this value may not be strictly adhered to. Defaults to 10.
Expand Down
13 changes: 0 additions & 13 deletions graph/src/components/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use crate::data::query::QueryResults;
use crate::data::query::{Query, QueryTarget};
use crate::data::subscription::{Subscription, SubscriptionError, SubscriptionResult};
use crate::prelude::DeploymentHash;

use async_trait::async_trait;
use futures01::Future;
use std::sync::Arc;
use std::time::Duration;

/// Future for subscription results.
pub type SubscriptionResultFuture =
Box<dyn Future<Item = SubscriptionResult, Error = SubscriptionError> + Send>;

pub enum GraphQlTarget {
SubgraphName(String),
Deployment(DeploymentHash),
Expand All @@ -33,13 +27,6 @@ pub trait GraphQlRunner: Send + Sync + 'static {
max_skip: Option<u32>,
) -> QueryResults;

/// Runs a GraphQL subscription and returns a stream of results.
async fn run_subscription(
self: Arc<Self>,
subscription: Subscription,
target: QueryTarget,
) -> Result<SubscriptionResult, SubscriptionError>;

fn metrics(&self) -> Arc<dyn GraphQLMetrics>;
}

Expand Down
3 changes: 0 additions & 3 deletions graph/src/components/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
/// Component for running GraphQL queries over HTTP.
pub mod query;

/// Component for running GraphQL subscriptions over WebSockets.
pub mod subscription;

/// Component for the index node server.
pub mod index_node;

Expand Down
8 changes: 0 additions & 8 deletions graph/src/components/server/subscription.rs

This file was deleted.

Loading
Loading