Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async-trait = "0.1.36"
futures = "0.3"
log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.8"
tokio = "1.0"
tonic = "0.4"
Expand Down
3 changes: 2 additions & 1 deletion rust/ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::array::{
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use serde::Serialize;
use uuid::Uuid;

use super::protobuf;
Expand Down Expand Up @@ -67,7 +68,7 @@ pub struct PartitionLocation {
}

/// Meta-data for an executor, used when fetching shuffle partitions from other executors
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ExecutorMeta {
pub id: String,
pub host: String,
Expand Down
7 changes: 6 additions & 1 deletion rust/ballista/rust/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ configure_me = "0.4.0"
env_logger = "0.8"
etcd-client = { version = "0.6", optional = true }
futures = "0.3"
http = "0.2"
http-body = "0.4"
hyper = "0.14.4"
log = "0.4"
parse_arg = "0.1.3"
prost = "0.7"
rand = "0.8"
serde = {version = "1", features = ["derive"]}
sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["full"] }
tonic = "0.4"
tower = { version = "0.4" }
warp = "0.3"

arrow = { git = "https://github.com/apache/arrow", rev="46161d2" }
datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" }
Expand Down
11 changes: 11 additions & 0 deletions rust/ballista/rust/scheduler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ $ RUST_LOG=info cargo run --release
```

By default, the scheduler will bind to `localhost` and listen on port `50051`.

## Connecting to Scheduler
Scheduler supports REST model also using content negotiation.
For e.x if you want to get list of executors connected to the scheduler,
you can do (assuming you use default config)

```bash
curl --request GET \
--url http://localhost:50050/executors \
--header 'Accept: application/json'
```
40 changes: 40 additions & 0 deletions rust/ballista/rust/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::SchedulerServer;
use ballista_core::serde::protobuf::{
scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata, GetExecutorMetadataParams,
GetExecutorMetadataResult,
};
use ballista_core::serde::scheduler::ExecutorMeta;
use tonic::{Request, Response};
use warp::Rejection;

pub(crate) async fn list_executors_data(
data_server: SchedulerServer,
) -> Result<impl warp::Reply, Rejection> {
let data: Result<Response<GetExecutorMetadataResult>, tonic::Status> = data_server
.get_executors_metadata(Request::new(GetExecutorMetadataParams {}))
.await;
let result = data.unwrap();
let res: &GetExecutorMetadataResult = result.get_ref();
let vec: &Vec<ExecutorMetadata> = &res.metadata;
let metadata: Vec<ExecutorMeta> = vec
.iter()
.map(|v: &ExecutorMetadata| ExecutorMeta {
host: v.host.clone(),
port: v.port as u16,
id: v.id.clone(),
})
.collect();
Ok(warp::reply::json(&metadata))
}
85 changes: 85 additions & 0 deletions rust/ballista/rust/scheduler/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod handlers;

use crate::SchedulerServer;
use anyhow::Result;
use std::{
pin::Pin,
task::{Context as TaskContext, Poll},
};
use warp::filters::BoxedFilter;
use warp::{Buf, Filter, Reply};

pub enum EitherBody<A, B> {
Left(A),
Right(B),
}

pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type HttpBody = dyn http_body::Body<Data = dyn Buf, Error = Error> + 'static;

impl<A, B> http_body::Body for EitherBody<A, B>
where
A: http_body::Body + Send + Unpin,
B: http_body::Body<Data = A::Data> + Send + Unpin,
A::Error: Into<Error>,
B::Error: Into<Error>,
{
type Data = A::Data;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}

fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}
}

fn map_option_err<T, U: Into<Error>>(err: Option<Result<T, U>>) -> Option<Result<T, Error>> {
err.map(|e| e.map_err(Into::into))
}

fn with_data_server(
db: SchedulerServer,
) -> impl Filter<Extract = (SchedulerServer,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || db.clone())
}

pub fn get_routes(scheduler_server: SchedulerServer) -> BoxedFilter<(impl Reply,)> {
let routes = warp::path("executors")
.and(with_data_server(scheduler_server))
.and_then(handlers::list_executors_data);
routes.boxed()
}
2 changes: 2 additions & 0 deletions rust/ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Support for distributed schedulers, such as Kubernetes

pub mod api;
pub mod planner;
pub mod state;

Expand Down Expand Up @@ -68,6 +69,7 @@ use self::state::{ConfigBackendClient, SchedulerState};
use datafusion::physical_plan::parquet::ParquetExec;
use std::time::Instant;

#[derive(Clone)]
pub struct SchedulerServer {
state: SchedulerState,
namespace: String,
Expand Down
43 changes: 36 additions & 7 deletions rust/ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

//! Ballista Rust scheduler binary.

use anyhow::{Context, Result};
use futures::future::{self, Either, TryFutureExt};
use hyper::{service::make_service_fn, Server};
use std::convert::Infallible;
use std::{net::SocketAddr, sync::Arc};
use tonic::transport::Server as TonicServer;
use tower::Service;

use anyhow::{Context, Result};
use ballista_core::BALLISTA_VERSION;
use ballista_core::{
print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
Expand All @@ -29,9 +34,9 @@ use ballista_scheduler::state::EtcdClient;
#[cfg(feature = "sled")]
use ballista_scheduler::state::StandaloneClient;
use ballista_scheduler::{state::ConfigBackendClient, ConfigBackend, SchedulerServer};
use ballista_scheduler::api::{get_routes, EitherBody, Error};

use log::info;
use tonic::transport::Server;

#[macro_use]
extern crate configure_me;
Expand All @@ -56,11 +61,35 @@ async fn start_server(
"Ballista v{} Scheduler listening on {:?}",
BALLISTA_VERSION, addr
);
let server =
SchedulerGrpcServer::new(SchedulerServer::new(config_backend, namespace));
Ok(Server::builder()
.add_service(server)
.serve(addr)
Ok(Server::bind(&addr)
.serve(make_service_fn(move |_| {
let scheduler_server = SchedulerServer::new(config_backend.clone(), namespace.clone());
let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone());

let mut tonic = TonicServer::builder()
.add_service(scheduler_grpc_server)
.into_service();
let mut warp = warp::service(get_routes(scheduler_server));

future::ok::<_, Infallible>(tower::service_fn(
move |req: hyper::Request<hyper::Body>| {
let header = req.headers().get(hyper::header::ACCEPT);
if header.is_some() && header.unwrap().eq("application/json") {
return Either::Left(
warp.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
);
}
Either::Right(
tonic
.call(req)
.map_ok(|res| res.map(EitherBody::Right))
.map_err(Error::from),
)
},
))
Comment on lines +65 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm super late to this PR. Does this change mean that a SchedulerGrpcServer is being built for every request?

}))
.await
.context("Could not start grpc server")?)
}
Expand Down