Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions prqlc/prqlc/src/semantic/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,277 @@ pub(super) mod test {
)
.unwrap());
}

// Helper function to verify basic lineage structure after append
fn verify_append_lineage_basics(
final_lineage: &crate::ir::pl::Lineage,
expected_inputs: &[&str],
) {
let input_names: Vec<&str> = final_lineage
.inputs
.iter()
.map(|i| i.name.as_str())
.collect();

for expected_input in expected_inputs {
assert!(input_names.contains(expected_input));
assert!(final_lineage.find_input_by_name(expected_input).is_some());
}

assert!(!final_lineage.columns.is_empty());
for col in &final_lineage.columns {
match col {
crate::ir::pl::LineageColumn::Single {
name, target_id, ..
} => {
assert!(target_id > &0);
assert!(name.is_some());
}
crate::ir::pl::LineageColumn::All { .. } => {}
}
}
}

// Helper function to find source frames by input name
fn find_source_frames<'a>(
fc: &'a crate::semantic::reporting::FrameCollector,
top_input_name: &str,
bottom_input_name: &str,
) -> (
Option<&'a crate::ir::pl::Lineage>,
Option<&'a crate::ir::pl::Lineage>,
) {
let mut top_frame = None;
let mut bottom_frame = None;

for (_span, frame) in &fc.frames {
if frame.inputs.len() == 1 {
let input_name = &frame.inputs[0].name;
if input_name == top_input_name && top_frame.is_none() {
top_frame = Some(frame);
} else if input_name == bottom_input_name && bottom_frame.is_none() {
bottom_frame = Some(frame);
}
}
}

(top_frame, bottom_frame)
}

// Helper function to verify column-level lineage for Single columns
fn verify_single_column_lineage(
final_lineage: &crate::ir::pl::Lineage,
fc: &crate::semantic::reporting::FrameCollector,
top_frame: &crate::ir::pl::Lineage,
bottom_frame: &crate::ir::pl::Lineage,
) {
assert_eq!(final_lineage.columns.len(), top_frame.columns.len());
assert_eq!(final_lineage.columns.len(), bottom_frame.columns.len());

for ((union_col, top_col), bottom_col) in final_lineage
.columns
.iter()
.zip(top_frame.columns.iter())
.zip(bottom_frame.columns.iter())
{
if let (
crate::ir::pl::LineageColumn::Single { .. },
crate::ir::pl::LineageColumn::Single {
name: top_name,
target_id: top_target_id,
..
},
crate::ir::pl::LineageColumn::Single {
name: bottom_name,
target_id: bottom_target_id,
..
},
) = (union_col, top_col, bottom_col)
{
if let (Some(top_name), Some(bottom_name)) = (top_name, bottom_name) {
assert_eq!(top_name.name, bottom_name.name);
}

assert!(fc.nodes.iter().any(|n| n.id == *top_target_id));
assert!(fc.nodes.iter().any(|n| n.id == *bottom_target_id));
}
}

for col in &final_lineage.columns {
if let crate::ir::pl::LineageColumn::Single { target_id, .. } = col {
assert!(fc.nodes.iter().any(|n| n.id == *target_id));
}
}
}

// Helper function to verify expression graph contains all expected nodes
fn verify_expression_graph_nodes(
fc: &crate::semantic::reporting::FrameCollector,
final_lineage: &crate::ir::pl::Lineage,
top_frame: &crate::ir::pl::Lineage,
bottom_frame: &crate::ir::pl::Lineage,
) {
for input in &final_lineage.inputs {
assert!(fc.nodes.iter().any(|n| n.id == input.id));
}

let top_col_target_ids: Vec<usize> = top_frame
.columns
.iter()
.filter_map(|c| match c {
crate::ir::pl::LineageColumn::Single { target_id, .. } => Some(*target_id),
_ => None,
})
.collect();

let bottom_col_target_ids: Vec<usize> = bottom_frame
.columns
.iter()
.filter_map(|c| match c {
crate::ir::pl::LineageColumn::Single { target_id, .. } => Some(*target_id),
_ => None,
})
.collect();

for target_id in &top_col_target_ids {
assert!(fc.nodes.iter().any(|n| n.id == *target_id));
}

for target_id in &bottom_col_target_ids {
assert!(fc.nodes.iter().any(|n| n.id == *target_id));
}
}

#[test]
fn test_append_union_different_tables() {
// This test verifies that lineage tracking for append/union operations
// correctly tracks inputs from both tables and shows column-level lineage.
use crate::internal::pl_to_lineage;

let query = r#"
from employees
select { name, salary }
append (
from managers
select { name, salary }
)
"#;

let pl = crate::prql_to_pl(query).unwrap();
let fc = pl_to_lineage(pl).unwrap();
let final_lineage = &fc.frames.last().unwrap().1;

assert_yaml_snapshot!(final_lineage);

verify_append_lineage_basics(final_lineage, &["employees", "managers"]);

let (top_frame, bottom_frame) = find_source_frames(&fc, "employees", "managers");
let top_frame = top_frame.unwrap();
let bottom_frame = bottom_frame.unwrap();

verify_single_column_lineage(final_lineage, &fc, top_frame, bottom_frame);

let employees_input = final_lineage.find_input_by_name("employees").unwrap();
let managers_input = final_lineage.find_input_by_name("managers").unwrap();

assert!(final_lineage
.inputs
.iter()
.any(|inp| inp.id == employees_input.id));
assert!(final_lineage
.inputs
.iter()
.any(|inp| inp.id == managers_input.id));

verify_expression_graph_nodes(&fc, final_lineage, top_frame, bottom_frame);
}

#[test]
fn test_append_union_same_table_with_exclude() {
// This test attempts to exercise the All columns path by unioning
// the same table with itself using select with exclude.
use crate::internal::pl_to_lineage;

let query = r#"
from employees
select !{name}
append (
from employees
select !{salary}
)
"#;

let pl = crate::prql_to_pl(query).unwrap();
let fc = pl_to_lineage(pl).unwrap();
let final_lineage = &fc.frames.last().unwrap().1;

verify_append_lineage_basics(final_lineage, &["employees"]);
}

#[test]
fn test_append_union_all_columns_same_input() {
// This test exercises the All columns path with same input_id (lines 765-766)
// to ensure code coverage for merging except sets when both All columns
// come from the same input.
use crate::ir::pl::{
Expr, ExprKind, Lineage, LineageColumn, LineageInput, TransformCall, TransformKind,
};
use std::collections::HashSet;

let input = LineageInput {
id: 100,
name: "employees".to_string(),
table: crate::ir::pl::Ident {
path: vec!["default_db".to_string()],
name: "employees".to_string(),
},
};

let mut top_lineage = Lineage::default();
top_lineage.inputs.push(input.clone());
top_lineage.columns.push(LineageColumn::All {
input_id: 100,
except: {
let mut set = HashSet::new();
set.insert("name".to_string());
set
},
});

let mut bottom_lineage = Lineage::default();
bottom_lineage.inputs.push(input.clone());
bottom_lineage.columns.push(LineageColumn::All {
input_id: 100,
except: {
let mut set = HashSet::new();
set.insert("salary".to_string());
set
},
});

let mut top_expr = Expr::new(ExprKind::Ident(crate::ir::pl::Ident::from_name("top")));
top_expr.lineage = Some(top_lineage);

let mut bottom_expr = Expr::new(ExprKind::Ident(crate::ir::pl::Ident::from_name("bottom")));
bottom_expr.lineage = Some(bottom_lineage);

let transform_call = TransformCall {
kind: Box::new(TransformKind::Append(Box::new(bottom_expr))),
input: Box::new(top_expr),
partition: None,
frame: crate::ir::pl::WindowFrame::default(),
sort: Vec::new(),
};

let result = transform_call.infer_lineage().unwrap();

match &result.columns[0] {
LineageColumn::All { input_id, except } => {
assert_eq!(*input_id, 100);
assert!(except.contains("name"));
assert!(except.contains("salary"));
}
_ => panic!("Expected All column"),
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
source: prqlc/prqlc/src/semantic/resolver/mod.rs
assertion_line: 234
expression: final_lineage
---
columns:
- Single:
name:
- employees
- name
target_id: 131
target_name: ~
- Single:
name:
- employees
- salary
target_id: 132
target_name: ~
inputs:
- id: 129
name: employees
table:
- default_db
- employees
- id: 118
name: managers
table:
- default_db
- managers
56 changes: 40 additions & 16 deletions prqlc/prqlc/src/semantic/resolver/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,37 @@ fn append(mut top: Lineage, bottom: Lineage) -> Result<Lineage, Error> {
));
}

// TODO: I'm not sure what to use as input_name and expr_id...
// Merge inputs from both relations so lineage can track both sources
// This is similar to how `join` handles inputs
top.inputs.extend(bottom.inputs);

// Merge columns positionally
// In a union, columns are aligned by position, so column 0 from top
// and column 0 from bottom become one output column
let mut columns = Vec::with_capacity(top.columns.len());
for (t, b) in zip(top.columns, bottom.columns) {
columns.push(match (t, b) {
(LineageColumn::All { input_id, except }, LineageColumn::All { .. }) => {
LineageColumn::All { input_id, except }
// For All columns, keep the top's input_id but merge except sets
(
LineageColumn::All {
input_id: input_id_t,
except: except_t,
},
LineageColumn::All {
input_id: _input_id_b,
except: except_b,
},
) => {
// Merge except sets from both tables
// This preserves exclusion information even when input_ids differ
// (e.g., "from employees select !{name}" append "from managers select !{salary}")
let mut except = except_t;
except.extend(except_b);

LineageColumn::All {
input_id: input_id_t,
except,
}
}
(
LineageColumn::Single {
Expand All @@ -750,24 +775,23 @@ fn append(mut top: Lineage, bottom: Lineage) -> Result<Lineage, Error> {
target_name,
},
LineageColumn::Single { name: name_b, .. },
) => match (name_t, name_b) {
(None, None) => {
let name = None;
LineageColumn::Single {
name,
) => {
// For Single columns in a union, we keep the top's target_id
// Both inputs are now tracked in top.inputs, so lineage can
// reference either source even though the column only points to one
match (name_t, name_b) {
(None, None) => LineageColumn::Single {
name: None,
target_id,
target_name,
}
}
(None, Some(name)) | (Some(name), _) => {
let name = Some(name);
LineageColumn::Single {
name,
},
(None, Some(name)) | (Some(name), _) => LineageColumn::Single {
name: Some(name),
target_id,
target_name,
}
},
}
},
}
(t, b) => return Err(Error::new_simple(format!(
"cannot match columns `{t:?}` and `{b:?}`"
))
Expand Down
Loading