Skip to content
3 changes: 2 additions & 1 deletion src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl ParseableSinkProcessor {
vec![log_source_entry],
TelemetryType::default(),
tenant_id,
None,
vec![],
vec![],
)
.await?;

Expand Down
204 changes: 204 additions & 0 deletions src/handlers/http/datasets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::collections::HashSet;

use actix_web::http::StatusCode;
use actix_web::{HttpRequest, HttpResponse, web};
use serde::{Deserialize, Serialize};

use crate::utils::get_tenant_id_from_request;
use crate::{
handlers::DatasetTag,
parseable::PARSEABLE,
storage::{ObjectStorageError, StreamType},
};

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CorrelatedDataset {
name: String,
shared_tags: Vec<DatasetTag>,
shared_labels: Vec<String>,
}

/// GET /api/v1/datasets/{name}/correlated
/// Returns all datasets sharing at least one tag or label with the named dataset.
pub async fn get_correlated_datasets(
req: HttpRequest,
path: web::Path<String>,
) -> Result<HttpResponse, DatasetsError> {
let dataset_name = path.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
let stream = PARSEABLE
.get_stream(&dataset_name, &tenant_id)
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;

let target_tags: HashSet<DatasetTag> = stream.get_dataset_tags().into_iter().collect();
let target_labels: HashSet<String> = stream.get_dataset_labels().into_iter().collect();

if target_tags.is_empty() && target_labels.is_empty() {
return Ok(HttpResponse::Ok().json(Vec::<CorrelatedDataset>::new()));
}

let all_streams = PARSEABLE.streams.list(&tenant_id);
let mut correlated = Vec::new();

for name in all_streams {
if name == dataset_name {
continue;
}
if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
// Skip internal streams
if s.get_stream_type() == StreamType::Internal {
continue;
}

let s_tags: HashSet<DatasetTag> = s.get_dataset_tags().into_iter().collect();
let s_labels: HashSet<String> = s.get_dataset_labels().into_iter().collect();

let shared_tags: Vec<DatasetTag> = target_tags.intersection(&s_tags).copied().collect();
let shared_labels: Vec<String> =
target_labels.intersection(&s_labels).cloned().collect();

if !shared_tags.is_empty() || !shared_labels.is_empty() {
correlated.push(CorrelatedDataset {
name,
shared_tags,
shared_labels,
});
}
}
}

Ok(HttpResponse::Ok().json(correlated))
}

/// GET /api/v1/datasets/tags/{tag}
/// Returns all datasets that have the specified tag.
pub async fn get_datasets_by_tag(
req: HttpRequest,
path: web::Path<String>,
) -> Result<HttpResponse, DatasetsError> {
let tenant_id = get_tenant_id_from_request(&req);
let tag_str = path.into_inner();
let tag =
DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?;

let all_streams = PARSEABLE.streams.list(&tenant_id);
let mut matching = Vec::new();

for name in all_streams {
if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
if s.get_stream_type() == StreamType::Internal {
continue;
}
if s.get_dataset_tags().contains(&tag) {
matching.push(name);
}
}
}

Ok(HttpResponse::Ok().json(matching))
}

#[derive(Debug, Deserialize)]
pub struct PutDatasetMetadataBody {
pub tags: Option<Vec<DatasetTag>>,
pub labels: Option<Vec<String>>,
}

/// PUT /api/v1/datasets/{name}
/// Replaces the dataset's tags and/or labels.
/// Only fields present in the body are updated; absent fields are left unchanged.
pub async fn put_dataset_metadata(
req: HttpRequest,
path: web::Path<String>,
body: web::Json<PutDatasetMetadataBody>,
) -> Result<HttpResponse, DatasetsError> {
let dataset_name = path.into_inner();
let body = body.into_inner();
let tenant_id = get_tenant_id_from_request(&req);

let stream = PARSEABLE
.get_stream(&dataset_name, &tenant_id)
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;

let final_tags = match body.tags {
Some(tags) => tags
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect(),
None => stream.get_dataset_tags(),
};
let final_labels = match body.labels {
Some(labels) => labels
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect(),
None => stream.get_dataset_labels(),
};

// Update storage first, then in-memory
let storage = PARSEABLE.storage.get_object_store();
storage
.update_dataset_tags_and_labels_in_stream(
&dataset_name,
&final_tags,
&final_labels,
&tenant_id,
)
.await
.map_err(DatasetsError::Storage)?;

stream.set_dataset_tags(final_tags.clone());
stream.set_dataset_labels(final_labels.clone());

Ok(HttpResponse::Ok().json(serde_json::json!({
"tags": final_tags,
"labels": final_labels,
})))
}

#[derive(Debug, thiserror::Error)]
pub enum DatasetsError {
#[error("Dataset not found: {0}")]
DatasetNotFound(String),
#[error("Invalid tag: {0}")]
InvalidTag(String),
#[error("Storage error: {0}")]
Storage(ObjectStorageError),
}

impl actix_web::ResponseError for DatasetsError {
fn status_code(&self) -> StatusCode {
match self {
DatasetsError::DatasetNotFound(_) => StatusCode::NOT_FOUND,
DatasetsError::InvalidTag(_) => StatusCode::BAD_REQUEST,
DatasetsError::Storage(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code()).json(serde_json::json!({
"error": self.to_string()
}))
}
}
6 changes: 4 additions & 2 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ pub async fn ingest(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
vec![],
vec![],
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -239,7 +240,8 @@ pub async fn setup_otel_stream(
vec![log_source_entry.clone()],
telemetry_type,
&tenant_id,
None,
vec![],
vec![],
)
.await?;
let mut time_partition = None;
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod about;
pub mod alerts;
pub mod cluster;
pub mod correlation;
pub mod datasets;
pub mod demo_data;
pub mod health_check;
pub mod ingest;
Expand Down
35 changes: 27 additions & 8 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,33 @@ impl Server {
}

pub fn get_prism_datasets() -> Scope {
web::scope("/datasets").route(
"",
web::post()
.to(http::prism_logstream::post_datasets)
.authorize_for_resource(Action::GetStreamInfo)
.authorize_for_resource(Action::GetStats)
.authorize_for_resource(Action::GetRetention),
)
web::scope("/datasets")
.route(
"",
web::post()
.to(http::prism_logstream::post_datasets)
.authorize_for_resource(Action::GetStreamInfo)
.authorize_for_resource(Action::GetStats)
.authorize_for_resource(Action::GetRetention),
)
.route(
"/tags/{tag}",
web::get()
.to(http::datasets::get_datasets_by_tag)
.authorize_for_resource(Action::GetStreamInfo),
)
.route(
"/{name}/correlated",
web::get()
.to(http::datasets::get_correlated_datasets)
.authorize_for_resource(Action::GetStreamInfo),
Comment on lines +211 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Route shadowing: GET /tags/correlated will never reach get_correlated_datasets for a dataset named "tags".

actix-web evaluates .route() registrations in order. /tags/{tag} (line 211) is matched before /{name}/correlated (line 217), so a request for GET /tags/correlated binds to get_datasets_by_tag with tag = "correlated" rather than get_correlated_datasets with name = "tags". In practice this only bites if a dataset is named "tags", but it is an invisible constraint on valid dataset names.

Reordering the routes so the more-specific two-segment pattern comes first does not help here because both are two-segment paths; the real fix is to use a path that avoids the structural collision, e.g. /correlated/{name} for the correlated endpoint:

🔀 Proposed fix: reorder / rename to eliminate shadowing
-            .route(
-                "/tags/{tag}",
-                web::get()
-                    .to(http::datasets::get_datasets_by_tag)
-                    .authorize_for_resource(Action::GetStreamInfo),
-            )
-            .route(
-                "/{name}/correlated",
-                web::get()
-                    .to(http::datasets::get_correlated_datasets)
-                    .authorize_for_resource(Action::GetStreamInfo),
-            )
+            .route(
+                "/correlated/{name}",
+                web::get()
+                    .to(http::datasets::get_correlated_datasets)
+                    .authorize_for_resource(Action::GetStreamInfo),
+            )
+            .route(
+                "/tags/{tag}",
+                web::get()
+                    .to(http::datasets::get_datasets_by_tag)
+                    .authorize_for_resource(Action::GetStreamInfo),
+            )

(Remember to update the corresponding path extractor in get_correlated_datasets and any client/docs that reference the old path.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/server.rs` around lines 210 - 220, The two route
registrations conflict because "/tags/{tag}" will match "/tags/correlated"
before "/{name}/correlated", causing get_correlated_datasets to be unreachable
for a dataset named "tags"; change the correlated endpoint path to avoid
structural collision (for example replace "/{name}/correlated" with
"/correlated/{name}" in the route registration where
web::get().to(http::datasets::get_correlated_datasets).authorize_for_resource(Action::GetStreamInfo)
is set) and update the path extractor in the get_correlated_datasets handler and
any clients/docs referencing the old path so they use the new
"/correlated/{name}" form.

)
.route(
"/{name}",
web::put()
.to(http::datasets::put_dataset_metadata)
.authorize_for_resource(Action::CreateStream),
)
Comment on lines 201 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for the four handler implementations and how they extract tenant_id

echo "=== Searching for handler implementations ==="

echo -e "\n--- 1. prism_logstream::post_datasets ---"
rg -n "fn post_datasets" --type rust -A 10 src/handlers/http/ | head -40

echo -e "\n--- 2. datasets::get_datasets_by_tag ---"
rg -n "fn get_datasets_by_tag" --type rust -A 10 src/handlers/http/ | head -40

echo -e "\n--- 3. datasets::get_correlated_datasets ---"
rg -n "fn get_correlated_datasets" --type rust -A 10 src/handlers/http/ | head -40

echo -e "\n--- 4. datasets::put_dataset_metadata ---"
rg -n "fn put_dataset_metadata" --type rust -A 10 src/handlers/http/ | head -40

echo -e "\n--- Check for get_tenant_id_from_request usage in datasets.rs ---"
rg -n "get_tenant_id_from_request" --type rust src/handlers/http/datasets.rs

Repository: parseablehq/parseable

Length of output: 1362


🏁 Script executed:

#!/bin/bash
# Read datasets.rs around the get_tenant_id_from_request usages
echo "=== datasets.rs: lines 20-60 (first usage) ==="
sed -n '20,60p' src/handlers/http/datasets.rs

echo -e "\n=== datasets.rs: lines 95-110 (second usage) ==="
sed -n '95,110p' src/handlers/http/datasets.rs

echo -e "\n=== datasets.rs: lines 133-150 (third usage) ==="
sed -n '133,150p' src/handlers/http/datasets.rs

echo -e "\n=== prism_logstream.rs: full post_datasets function ==="
sed -n '41,80p' src/handlers/http/prism_logstream.rs

echo -e "\n=== Check if handlers exist in datasets.rs ==="
rg -n "pub async fn (get_datasets_by_tag|get_correlated_datasets|put_dataset_metadata)" --type rust src/handlers/http/datasets.rs

Repository: parseablehq/parseable

Length of output: 3385


🏁 Script executed:

#!/bin/bash
# Check extract_session_key_from_req and understand if it provides tenant scoping

echo "=== Search for extract_session_key_from_req definition ==="
rg -n "fn extract_session_key_from_req" --type rust -A 5 src/

echo -e "\n=== Check PrismDatasetRequest and get_datasets ==="
rg -n "impl.*PrismDatasetRequest\|fn get_datasets" --type rust -A 10 src/handlers/http/prism_logstream.rs | head -50

echo -e "\n=== Check PrismDatasetRequest struct ==="
rg -n "struct PrismDatasetRequest" --type rust -A 15 src/handlers/http/prism_logstream.rs

echo -e "\n=== Verify tenant scoping context for prism endpoints ==="
rg -n "get_prism_datasets|prism_logstream" --type rust src/handlers/http/modal/server.rs | head -20

Repository: parseablehq/parseable

Length of output: 1138


🏁 Script executed:

#!/bin/bash
# Check the exact route registration and actions for post_datasets

echo "=== server.rs post_datasets route registration (lines 200-227) ==="
sed -n '200,227p' src/handlers/http/modal/server.rs

echo -e "\n=== Search for PrismDatasetRequest and get_datasets method ==="
rg -n "struct PrismDatasetRequest\|fn get_datasets" --type rust -A 20 src/handlers/http/prism_logstream.rs | head -80

echo -e "\n=== Check what get_datasets does (full method) ==="
rg -n "async fn get_datasets\|async fn get_info" --type rust -A 30 src/handlers/http/prism_logstream.rs

Repository: parseablehq/parseable

Length of output: 1332


🏁 Script executed:

#!/bin/bash
# Broader search for PrismDatasetRequest and related types

echo "=== Count lines in prism_logstream.rs ==="
wc -l src/handlers/http/prism_logstream.rs

echo -e "\n=== Search for 'Request' in prism_logstream.rs ==="
rg -n "Request\|request" --type rust src/handlers/http/prism_logstream.rs | head -20

echo -e "\n=== Full prism_logstream.rs file (if small) ==="
wc -l src/handlers/http/prism_logstream.rs && cat -n src/handlers/http/prism_logstream.rs

echo -e "\n=== Check if PrismDatasetRequest is defined elsewhere ==="
rg -n "PrismDatasetRequest" --type rust src/

Repository: parseablehq/parseable

Length of output: 2896


🏁 Script executed:

#!/bin/bash
# Check the PrismDatasetRequest.get_datasets implementation

echo "=== PrismDatasetRequest struct and impl ==="
sed -n '211,250p' src/prism/logstream/mod.rs

echo -e "\n=== Full get_datasets method ==="
rg -n "fn get_datasets" --type rust -A 50 src/prism/logstream/mod.rs | head -80

Repository: parseablehq/parseable

Length of output: 3457


Use get_tenant_id_from_request in post_datasets to enforce middleware-normalized tenant scoping.

The handler at src/handlers/http/prism_logstream.rs:41 derives tenant from BasicAuth credentials via extract_session_key_from_req, but non-ingest handlers must extract tenant from the middleware-normalized request header using get_tenant_id_from_request. The get_info handler in the same file (line 30) correctly uses this pattern; post_datasets should do the same to prevent tenant header spoofing and remain consistent with the three other dataset handlers (get_datasets_by_tag, get_correlated_datasets, put_dataset_metadata).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/server.rs` around lines 200 - 227, The post_datasets
handler currently derives tenant from BasicAuth via
extract_session_key_from_req; change it to read the tenant using
get_tenant_id_from_request (the same middleware-normalized header used by
get_info and the other dataset handlers) to enforce tenant scoping and prevent
header spoofing: locate the post_datasets function in http::prism_logstream,
remove or stop using extract_session_key_from_req for tenant resolution, call
get_tenant_id_from_request(req) (or equivalent helper used elsewhere) and pass
that tenant downstream to any dataset creation/validation logic so it matches
the behavior of get_datasets_by_tag, get_correlated_datasets, and
put_dataset_metadata.

}

pub fn get_demo_data_webscope() -> Scope {
Expand Down
33 changes: 18 additions & 15 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
*
*/

use actix_web::http::header::HeaderMap;

use crate::{
event::format::LogSource,
handlers::{
CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
TelemetryType, UPDATE_STREAM_KEY,
CUSTOM_PARTITION_KEY, DATASET_LABELS_KEY, DATASET_TAG_KEY, DATASET_TAGS_KEY, DatasetTag,
LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType, UPDATE_STREAM_KEY,
parse_dataset_labels, parse_dataset_tags,
},
storage::StreamType,
};
use actix_web::http::header::HeaderMap;
use tracing::warn;

#[derive(Debug, Default)]
pub struct PutStreamHeaders {
Expand All @@ -38,7 +39,8 @@ pub struct PutStreamHeaders {
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
pub dataset_tag: Option<DatasetTag>,
pub dataset_tags: Vec<DatasetTag>,
pub dataset_labels: Vec<String>,
}

impl From<&HeaderMap> for PutStreamHeaders {
Expand Down Expand Up @@ -72,16 +74,17 @@ impl From<&HeaderMap> for PutStreamHeaders {
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
dataset_tag: headers
.get(DATASET_TAG_KEY)
dataset_tags: headers
.get(DATASET_TAGS_KEY)
.or_else(|| headers.get(DATASET_TAG_KEY))
.and_then(|v| v.to_str().ok())
.and_then(|v| match DatasetTag::try_from(v) {
Ok(tag) => Some(tag),
Err(err) => {
warn!("Invalid dataset tag '{v}': {err}");
None
}
}),
.map(parse_dataset_tags)
.unwrap_or_default(),
dataset_labels: headers
.get(DATASET_LABELS_KEY)
.and_then(|v| v.to_str().ok())
.map(parse_dataset_labels)
.unwrap_or_default(),
}
}
}
Loading
Loading