Skip to content

Commit fce0516

Browse files
committed
server: Add initial MetaStore
This PR adds a new internal metadata store for namespace configuration. This internally uses a normal namespace so that the replica also get's synchronized. This is just initial work there are a few more issues to solve regarding replica's getting updated configs and some internal hardening that needs to be done. Tracking issue for follow up work #768. Closes #501
1 parent f57bf3c commit fce0516

12 files changed

Lines changed: 480 additions & 114 deletions

File tree

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,7 @@
1-
use parking_lot::Mutex;
1+
use crate::LIBSQL_PAGE_SIZE;
22
use serde::{Deserialize, Serialize};
3-
use std::path::{Path, PathBuf};
4-
use std::sync::Arc;
5-
use std::{fs, io};
63
use url::Url;
74

8-
use crate::error::Error;
9-
use crate::{Result, LIBSQL_PAGE_SIZE};
10-
11-
#[derive(Debug)]
12-
pub struct DatabaseConfigStore {
13-
config_path: PathBuf,
14-
tmp_config_path: PathBuf,
15-
config: Mutex<Arc<DatabaseConfig>>,
16-
}
17-
185
#[derive(Debug, Clone, Serialize, Deserialize)]
196
pub struct DatabaseConfig {
207
#[serde(default)]
@@ -49,43 +36,3 @@ impl Default for DatabaseConfig {
4936
}
5037
}
5138
}
52-
53-
impl DatabaseConfigStore {
54-
pub fn load(db_path: &Path) -> Result<Self> {
55-
let config_path = db_path.join("config.json");
56-
let tmp_config_path = db_path.join("config.json~");
57-
58-
let config = match fs::read(&config_path) {
59-
Ok(data) => serde_json::from_slice(&data)?,
60-
Err(err) if err.kind() == io::ErrorKind::NotFound => DatabaseConfig::default(),
61-
Err(err) => return Err(Error::IOError(err)),
62-
};
63-
64-
Ok(Self {
65-
config_path,
66-
tmp_config_path,
67-
config: Mutex::new(Arc::new(config)),
68-
})
69-
}
70-
71-
#[cfg(test)]
72-
pub fn new_test() -> Self {
73-
Self {
74-
config_path: "".into(),
75-
tmp_config_path: "".into(),
76-
config: Mutex::new(Arc::new(DatabaseConfig::default())),
77-
}
78-
}
79-
80-
pub fn get(&self) -> Arc<DatabaseConfig> {
81-
self.config.lock().clone()
82-
}
83-
84-
pub fn store(&self, config: DatabaseConfig) -> Result<()> {
85-
let data = serde_json::to_vec_pretty(&config)?;
86-
fs::write(&self.tmp_config_path, data)?;
87-
fs::rename(&self.tmp_config_path, &self.config_path)?;
88-
*self.config.lock() = Arc::new(config);
89-
Ok(())
90-
}
91-
}

libsql-server/src/connection/libsql.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ use crate::metrics::{
1818
DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, READ_QUERY_COUNT, VACUUM_COUNT, WAL_CHECKPOINT_COUNT,
1919
WRITE_QUERY_COUNT, WRITE_TXN_DURATION,
2020
};
21+
use crate::namespace::meta_store::MetaStoreHandle;
2122
use crate::query::Query;
2223
use crate::query_analysis::{StmtKind, TxnStatus};
2324
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
2425
use crate::replication::FrameNo;
2526
use crate::stats::Stats;
2627
use crate::Result;
2728

28-
use super::config::DatabaseConfigStore;
2929
use super::program::{Cond, DescribeCol, DescribeParam, DescribeResponse};
3030
use super::{MakeConnection, Program, Step, TXN_TIMEOUT};
3131

3232
pub struct MakeLibSqlConn<T: WalManager> {
3333
db_path: PathBuf,
3434
wal_manager: T,
3535
stats: Arc<Stats>,
36-
config_store: Arc<DatabaseConfigStore>,
36+
config_store: MetaStoreHandle,
3737
extensions: Arc<[PathBuf]>,
3838
max_response_size: u64,
3939
max_total_response_size: u64,
@@ -55,7 +55,7 @@ where
5555
db_path: PathBuf,
5656
wal_manager: T,
5757
stats: Arc<Stats>,
58-
config_store: Arc<DatabaseConfigStore>,
58+
config_store: MetaStoreHandle,
5959
extensions: Arc<[PathBuf]>,
6060
max_response_size: u64,
6161
max_total_response_size: u64,
@@ -241,7 +241,7 @@ where
241241
extensions: Arc<[PathBuf]>,
242242
wal_manager: T,
243243
stats: Arc<Stats>,
244-
config_store: Arc<DatabaseConfigStore>,
244+
config_store: MetaStoreHandle,
245245
builder_config: QueryBuilderConfig,
246246
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
247247
state: Arc<TxnState<W>>,
@@ -292,7 +292,7 @@ impl LibSqlConnection<libsql_sys::wal::Sqlite3Wal> {
292292
Arc::new([]),
293293
libsql_sys::wal::Sqlite3WalManager::new(),
294294
Default::default(),
295-
DatabaseConfigStore::new_test().into(),
295+
MetaStoreHandle::new_test().into(),
296296
QueryBuilderConfig::default(),
297297
rcv,
298298
Default::default(),
@@ -308,7 +308,7 @@ impl LibSqlConnection<libsql_sys::wal::Sqlite3Wal> {
308308
struct Connection<T> {
309309
conn: libsql_sys::Connection<T>,
310310
stats: Arc<Stats>,
311-
config_store: Arc<DatabaseConfigStore>,
311+
config_store: MetaStoreHandle,
312312
builder_config: QueryBuilderConfig,
313313
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
314314
// must be dropped after the connection because the connection refers to it
@@ -487,7 +487,7 @@ impl<W: Wal> Connection<W> {
487487
extensions: Arc<[PathBuf]>,
488488
wal_manager: T,
489489
stats: Arc<Stats>,
490-
config_store: Arc<DatabaseConfigStore>,
490+
config_store: MetaStoreHandle,
491491
builder_config: QueryBuilderConfig,
492492
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
493493
state: Arc<TxnState<W>>,
@@ -674,6 +674,7 @@ impl<W: Wal> Connection<W> {
674674

675675
let start = Instant::now();
676676
let config = self.config_store.get();
677+
677678
let blocked = match query.stmt.kind {
678679
StmtKind::Read | StmtKind::TxnBegin | StmtKind::Other => config.block_reads,
679680
StmtKind::Write => config.block_reads || config.block_writes,
@@ -987,7 +988,7 @@ mod test {
987988
let conn = Connection {
988989
conn: libsql_sys::Connection::test(),
989990
stats: Arc::new(Stats::default()),
990-
config_store: Arc::new(DatabaseConfigStore::new_test()),
991+
config_store: MetaStoreHandle::new_test(),
991992
builder_config: QueryBuilderConfig::default(),
992993
current_frame_no_receiver: watch::channel(None).1,
993994
state: Default::default(),
@@ -1019,7 +1020,7 @@ mod test {
10191020
tmp.path().into(),
10201021
Sqlite3WalManager::new(),
10211022
Default::default(),
1022-
Arc::new(DatabaseConfigStore::load(tmp.path()).unwrap()),
1023+
MetaStoreHandle::load(tmp.path()).unwrap(),
10231024
Arc::new([]),
10241025
100000000,
10251026
100000000,
@@ -1060,7 +1061,7 @@ mod test {
10601061
tmp.path().into(),
10611062
Sqlite3WalManager::new(),
10621063
Default::default(),
1063-
Arc::new(DatabaseConfigStore::load(tmp.path()).unwrap()),
1064+
MetaStoreHandle::load(tmp.path()).unwrap(),
10641065
Arc::new([]),
10651066
100000000,
10661067
100000000,
@@ -1102,7 +1103,7 @@ mod test {
11021103
tmp.path().into(),
11031104
Sqlite3WalManager::new(),
11041105
Default::default(),
1105-
Arc::new(DatabaseConfigStore::load(tmp.path()).unwrap()),
1106+
MetaStoreHandle::load(tmp.path()).unwrap(),
11061107
Arc::new([]),
11071108
100000000,
11081109
100000000,
@@ -1180,7 +1181,7 @@ mod test {
11801181
tmp.path().into(),
11811182
Sqlite3WalManager::new(),
11821183
Default::default(),
1183-
Arc::new(DatabaseConfigStore::load(tmp.path()).unwrap()),
1184+
MetaStoreHandle::load(tmp.path()).unwrap(),
11841185
Arc::new([]),
11851186
100000000,
11861187
100000000,
@@ -1245,7 +1246,7 @@ mod test {
12451246
}
12461247
};
12471248

1248-
tokio::time::timeout(Duration::from_secs(30), join_all)
1249+
tokio::time::timeout(Duration::from_secs(60), join_all)
12491250
.await
12501251
.expect("timed out running connections");
12511252
}

libsql-server/src/connection/write_proxy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ use crate::auth::Authenticated;
2020
use crate::connection::program::{DescribeCol, DescribeParam};
2121
use crate::error::Error;
2222
use crate::metrics::{REPLICA_LOCAL_EXEC_MISPREDICT, REPLICA_LOCAL_PROGRAM_EXEC};
23+
use crate::namespace::meta_store::MetaStoreHandle;
2324
use crate::namespace::NamespaceName;
2425
use crate::query_analysis::TxnStatus;
2526
use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder};
2627
use crate::replication::FrameNo;
2728
use crate::stats::Stats;
2829
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};
2930

30-
use super::config::DatabaseConfigStore;
3131
use super::libsql::{LibSqlConnection, MakeLibSqlConn};
3232
use super::program::DescribeResponse;
3333
use super::Connection;
@@ -54,7 +54,7 @@ impl MakeWriteProxyConn {
5454
channel: Channel,
5555
uri: tonic::transport::Uri,
5656
stats: Arc<Stats>,
57-
config_store: Arc<DatabaseConfigStore>,
57+
config_store: MetaStoreHandle,
5858
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
5959
max_response_size: u64,
6060
max_total_response_size: u64,

libsql-server/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ pub enum Error {
9292
UrlParseError(#[from] url::ParseError),
9393
#[error("Namespace store has shutdown")]
9494
NamespaceStoreShutdown,
95+
#[error("Unable to update metastore: {0}")]
96+
MetaStoreUpdateFailure(Box<dyn std::error::Error + Send + Sync>),
9597
}
9698

9799
trait ResponseError: std::error::Error {
@@ -150,6 +152,7 @@ impl IntoResponse for Error {
150152
PrimaryStreamInterupted => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
151153
UrlParseError(_) => self.format_err(StatusCode::BAD_REQUEST),
152154
NamespaceStoreShutdown => self.format_err(StatusCode::SERVICE_UNAVAILABLE),
155+
MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
153156
}
154157
}
155158
}

libsql-server/src/http/admin/mod.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use axum::routing::delete;
44
use axum::Json;
55
use chrono::NaiveDateTime;
66
use futures::TryStreamExt;
7-
use hyper::Body;
7+
use hyper::{Body, Request};
88
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
99
use parking_lot::Mutex;
1010
use serde::{Deserialize, Serialize};
@@ -15,6 +15,7 @@ use std::sync::Arc;
1515
use std::time::Duration;
1616
use tokio::sync::Notify;
1717
use tokio_util::io::ReaderStream;
18+
use tower_http::trace::DefaultOnResponse;
1819
use url::Url;
1920

2021
use crate::database::Database;
@@ -95,6 +96,12 @@ where
9596
None
9697
};
9798

99+
fn trace_request<B>(req: &Request<B>, span: &tracing::Span) {
100+
let _s = span.enter();
101+
102+
tracing::debug!("{} {} {:?}", req.method(), req.uri(), req.headers());
103+
}
104+
98105
use axum::routing::{get, post};
99106
let metrics = Metrics {
100107
handle: prom_handle,
@@ -126,7 +133,16 @@ where
126133
connector,
127134
user_http_server,
128135
metrics,
129-
}));
136+
}))
137+
.layer(
138+
tower_http::trace::TraceLayer::new_for_http()
139+
.on_request(trace_request)
140+
.on_response(
141+
DefaultOnResponse::new()
142+
.level(tracing::Level::DEBUG)
143+
.latency_unit(tower_http::LatencyUnit::Micros),
144+
),
145+
);
130146

131147
hyper::server::Server::builder(acceptor)
132148
.serve(router.into_make_service())
@@ -225,7 +241,7 @@ async fn handle_post_config<M: MakeNamespace, C>(
225241
config.heartbeat_url = Some(Url::parse(&url)?);
226242
}
227243

228-
store.store(config)?;
244+
store.store(config).await?;
229245

230246
Ok(())
231247
}
@@ -268,7 +284,7 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
268284
if let Some(url) = req.heartbeat_url {
269285
config.heartbeat_url = Some(Url::parse(&url)?)
270286
}
271-
store.store(config)?;
287+
store.store(config).await?;
272288

273289
Ok(())
274290
}
@@ -296,8 +312,7 @@ async fn handle_fork_namespace<M: MakeNamespace, C>(
296312
let mut to_config = (*to_store.get()).clone();
297313
to_config.max_db_pages = from_config.max_db_pages;
298314
to_config.heartbeat_url = from_config.heartbeat_url.clone();
299-
to_config.bottomless_db_id = from_config.bottomless_db_id.clone();
300-
to_store.store(to_config)?;
315+
to_store.store(to_config).await?;
301316
Ok(())
302317
}
303318

libsql-server/src/lib.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,8 +516,18 @@ where
516516
checkpoint_interval: self.db_config.checkpoint_interval,
517517
disable_namespace: self.disable_namespaces,
518518
};
519+
520+
let meta_store_path = conf.base_path.join("metastore");
521+
519522
let factory = PrimaryNamespaceMaker::new(conf);
520-
let namespaces = NamespaceStore::new(factory, false, self.db_config.snapshot_at_shutdown);
523+
524+
let namespaces = NamespaceStore::new(
525+
factory,
526+
false,
527+
self.db_config.snapshot_at_shutdown,
528+
meta_store_path,
529+
)
530+
.await?;
521531

522532
// eagerly load the default namespace when namespaces are disabled
523533
if self.disable_namespaces {
@@ -604,8 +614,11 @@ impl<C: Connector> Replica<C> {
604614
max_response_size: self.db_config.max_response_size,
605615
max_total_response_size: self.db_config.max_total_response_size,
606616
};
617+
618+
let meta_store_path = conf.base_path.join("metastore");
619+
607620
let factory = ReplicaNamespaceMaker::new(conf);
608-
let namespaces = NamespaceStore::new(factory, true, false);
621+
let namespaces = NamespaceStore::new(factory, true, false, meta_store_path).await?;
609622
let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone());
610623
let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone());
611624

libsql-server/src/namespace/fork.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::replication::primary::frame_stream::FrameStream;
1616
use crate::replication::{LogReadError, ReplicationLogger};
1717
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};
1818

19+
use super::meta_store::MetaStore;
1920
use super::{MakeNamespace, NamespaceBottomlessDbId, NamespaceName, RestoreOption};
2021

2122
type Result<T> = crate::Result<T, ForkError>;
@@ -58,6 +59,7 @@ pub struct ForkTask<'a> {
5859
pub make_namespace: &'a dyn MakeNamespace<Database = PrimaryDatabase>,
5960
pub restore_to: Option<PointInTimeRestore>,
6061
pub bottomless_db_id: NamespaceBottomlessDbId,
62+
pub meta_store: &'a MetaStore,
6163
}
6264

6365
pub struct PointInTimeRestore {
@@ -112,6 +114,7 @@ impl ForkTask<'_> {
112114
// PrimaryNamespaceMaker::create ignores
113115
// reset_cb param
114116
Box::new(|_op| {}),
117+
&self.meta_store,
115118
)
116119
.await
117120
.map_err(|e| ForkError::CreateNamespace(Box::new(e)))

0 commit comments

Comments
 (0)