Skip to content

Commit 2aa1685

Browse files
committed
fix: keep prefix naming, remove empty path handling, sort test results, remove TTL from tests
1 parent c9c0607 commit 2aa1685

2 files changed

Lines changed: 44 additions & 50 deletions

File tree

datafusion/datasource/src/url.rs

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -239,23 +239,23 @@ impl ListingTableUrl {
239239
&'a self,
240240
ctx: &'a dyn Session,
241241
store: &'a dyn ObjectStore,
242-
partition_prefix: Option<Path>,
242+
prefix: Option<Path>,
243243
file_extension: &'a str,
244244
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
245245
let exec_options = &ctx.config_options().execution;
246246
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
247247

248248
// Build full_prefix for non-cached path and head() calls
249-
let full_prefix = if let Some(ref prefix) = partition_prefix {
250-
let mut p = self.prefix.parts().collect::<Vec<_>>();
251-
p.extend(prefix.parts());
252-
Path::from_iter(p.into_iter())
249+
let full_prefix = if let Some(ref p) = prefix {
250+
let mut parts = self.prefix.parts().collect::<Vec<_>>();
251+
parts.extend(p.parts());
252+
Path::from_iter(parts.into_iter())
253253
} else {
254254
self.prefix.clone()
255255
};
256256

257257
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
258-
list_with_cache(ctx, store, &self.prefix, partition_prefix.as_ref()).await?
258+
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
259259
} else {
260260
match store.head(&full_prefix).await {
261261
Ok(meta) => futures::stream::once(async { Ok(meta) })
@@ -264,8 +264,7 @@ impl ListingTableUrl {
264264
// If the head command fails, it is likely that object doesn't exist.
265265
// Retry as though it were a prefix (aka a collection)
266266
Err(object_store::Error::NotFound { .. }) => {
267-
list_with_cache(ctx, store, &self.prefix, partition_prefix.as_ref())
268-
.await?
267+
list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
269268
}
270269
Err(e) => return Err(e.into()),
271270
}
@@ -332,31 +331,31 @@ impl ListingTableUrl {
332331
/// * `ctx` - The session context
333332
/// * `store` - The object store to list from
334333
/// * `table_base_path` - The table's base path (the stable cache key)
335-
/// * `partition_prefix` - Optional partition prefix relative to table base
334+
/// * `prefix` - Optional prefix relative to table base for filtering results
336335
///
337336
/// # Cache Behavior:
338-
/// The cache key is always `table_base_path`. When a partition-specific listing
339-
/// is requested via `partition_prefix`, the cache:
337+
/// The cache key is always `table_base_path`. When a prefix-filtered listing
338+
/// is requested via `prefix`, the cache:
340339
/// - Looks up `table_base_path` in the cache
341-
/// - Filters results to match `table_base_path/partition_prefix`
340+
/// - Filters results to match `table_base_path/prefix`
342341
/// - Returns filtered results without a storage call
343342
///
344343
/// On cache miss, the full table is always listed and cached, ensuring
345-
/// subsequent partition queries can be served from cache.
344+
/// subsequent prefix queries can be served from cache.
346345
async fn list_with_cache<'b>(
347346
ctx: &'b dyn Session,
348347
store: &'b dyn ObjectStore,
349348
table_base_path: &Path,
350-
partition_prefix: Option<&Path>,
349+
prefix: Option<&Path>,
351350
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
352-
// Build the full listing path (table_base + partition_prefix)
353-
let full_prefix = match partition_prefix {
354-
Some(prefix) if !prefix.as_ref().is_empty() => {
351+
// Build the full listing path (table_base + prefix)
352+
let full_prefix = match prefix {
353+
Some(p) => {
355354
let mut parts: Vec<_> = table_base_path.parts().collect();
356-
parts.extend(prefix.parts());
355+
parts.extend(p.parts());
357356
Path::from_iter(parts)
358357
}
359-
_ => table_base_path.clone(),
358+
None => table_base_path.clone(),
360359
};
361360

362361
match ctx.runtime_env().cache_manager.get_list_files_cache() {
@@ -365,8 +364,8 @@ async fn list_with_cache<'b>(
365364
.map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e))))
366365
.boxed()),
367366
Some(cache) => {
368-
// Convert partition_prefix to Option<Path> for cache lookup
369-
let prefix_filter = partition_prefix.cloned();
367+
// Convert prefix to Option<Path> for cache lookup
368+
let prefix_filter = prefix.cloned();
370369

371370
// Try cache lookup with optional prefix filter
372371
let vec = if let Some(res) =
@@ -376,15 +375,15 @@ async fn list_with_cache<'b>(
376375
res.as_ref().clone()
377376
} else {
378377
// Cache miss - always list and cache the full table
379-
// This ensures we have complete data for future partition queries
378+
// This ensures we have complete data for future prefix queries
380379
let vec = store
381380
.list(Some(table_base_path))
382381
.try_collect::<Vec<ObjectMeta>>()
383382
.await?;
384383
cache.put(table_base_path, Arc::new(vec.clone()));
385384

386385
// If a prefix filter was requested, apply it to the results
387-
if partition_prefix.is_some() {
386+
if prefix.is_some() {
388387
let full_prefix_str = full_prefix.as_ref();
389388
vec.into_iter()
390389
.filter(|meta| {
@@ -806,11 +805,11 @@ mod tests {
806805
/// Tests that the cached code path produces identical results to the non-cached path.
807806
///
808807
/// This is critical: the cache is a transparent optimization, so both paths
809-
/// MUST return the same files in the same order.
808+
/// MUST return the same files. Note: order is not guaranteed by ObjectStore::list,
809+
/// so we sort results before comparison.
810810
#[tokio::test]
811811
async fn test_cache_path_equivalence() -> Result<()> {
812812
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
813-
use std::time::Duration;
814813

815814
let store = MockObjectStore {
816815
in_mem: object_store::memory::InMemory::new(),
@@ -827,10 +826,9 @@ mod tests {
827826
// Session WITHOUT cache
828827
let session_no_cache = MockSession::new();
829828

830-
// Session WITH cache - use RuntimeEnvBuilder with cache limits
829+
// Session WITH cache - use RuntimeEnvBuilder with cache limit (no TTL needed for this test)
831830
let runtime_with_cache = RuntimeEnvBuilder::new()
832831
.with_object_list_cache_limit(1024 * 1024) // 1MB limit
833-
.with_object_list_cache_ttl(Duration::from_secs(300))
834832
.build_arc()?;
835833
let session_with_cache = MockSessionWithRuntime::new(runtime_with_cache);
836834

@@ -862,18 +860,19 @@ mod tests {
862860
for (url_str, prefix, description) in test_cases {
863861
let url = ListingTableUrl::parse(url_str)?;
864862

865-
// Get results WITHOUT cache
866-
let results_no_cache: Vec<String> = url
863+
// Get results WITHOUT cache (sorted for comparison)
864+
let mut results_no_cache: Vec<String> = url
867865
.list_prefixed_files(&session_no_cache, &store, prefix.clone(), "parquet")
868866
.await?
869867
.try_collect::<Vec<_>>()
870868
.await?
871869
.into_iter()
872870
.map(|m| m.location.to_string())
873871
.collect();
872+
results_no_cache.sort();
874873

875-
// Get results WITH cache (first call - cache miss)
876-
let results_with_cache_miss: Vec<String> = url
874+
// Get results WITH cache (first call - cache miss, sorted for comparison)
875+
let mut results_with_cache_miss: Vec<String> = url
877876
.list_prefixed_files(
878877
&session_with_cache,
879878
&store,
@@ -886,18 +885,20 @@ mod tests {
886885
.into_iter()
887886
.map(|m| m.location.to_string())
888887
.collect();
888+
results_with_cache_miss.sort();
889889

890-
// Get results WITH cache (second call - cache hit)
891-
let results_with_cache_hit: Vec<String> = url
890+
// Get results WITH cache (second call - cache hit, sorted for comparison)
891+
let mut results_with_cache_hit: Vec<String> = url
892892
.list_prefixed_files(&session_with_cache, &store, prefix, "parquet")
893893
.await?
894894
.try_collect::<Vec<_>>()
895895
.await?
896896
.into_iter()
897897
.map(|m| m.location.to_string())
898898
.collect();
899+
results_with_cache_hit.sort();
899900

900-
// All three should be identical
901+
// All three should contain the same files
901902
assert_eq!(
902903
results_no_cache, results_with_cache_miss,
903904
"Cache miss path should match non-cached path for: {description}"
@@ -911,11 +912,10 @@ mod tests {
911912
Ok(())
912913
}
913914

914-
/// Tests that partition queries can be served from a cached full-table listing
915+
/// Tests that prefix queries can be served from a cached full-table listing
915916
#[tokio::test]
916917
async fn test_cache_serves_partition_from_full_listing() -> Result<()> {
917918
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
918-
use std::time::Duration;
919919

920920
let store = MockObjectStore {
921921
in_mem: object_store::memory::InMemory::new(),
@@ -927,10 +927,9 @@ mod tests {
927927
create_file(&store, "/sales/region=US/q2.parquet").await;
928928
create_file(&store, "/sales/region=EU/q1.parquet").await;
929929

930-
// Create session with cache - use RuntimeEnvBuilder with cache limits
930+
// Create session with cache (no TTL needed for this test)
931931
let runtime = RuntimeEnvBuilder::new()
932932
.with_object_list_cache_limit(1024 * 1024) // 1MB limit
933-
.with_object_list_cache_ttl(Duration::from_secs(300))
934933
.build_arc()?;
935934
let session = MockSessionWithRuntime::new(runtime);
936935

@@ -947,8 +946,8 @@ mod tests {
947946
.collect();
948947
assert_eq!(full_results.len(), 3);
949948

950-
// Second: query partition (should be served from cache)
951-
let us_results: Vec<String> = url
949+
// Second: query with prefix (should be served from cache)
950+
let mut us_results: Vec<String> = url
952951
.list_prefixed_files(
953952
&session,
954953
&store,
@@ -961,13 +960,14 @@ mod tests {
961960
.into_iter()
962961
.map(|m| m.location.to_string())
963962
.collect();
963+
us_results.sort();
964964

965965
assert_eq!(
966966
us_results,
967967
vec!["sales/region=US/q1.parquet", "sales/region=US/q2.parquet"]
968968
);
969969

970-
// Third: different partition (also from cache)
970+
// Third: different prefix (also from cache)
971971
let eu_results: Vec<String> = url
972972
.list_prefixed_files(
973973
&session,

datafusion/execution/src/cache/list_files_cache.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,15 +216,9 @@ impl DefaultListFilesCacheState {
216216
};
217217

218218
// Build the full prefix path: table_base/prefix
219-
let full_prefix = if table_base.as_ref().is_empty() {
220-
prefix.clone()
221-
} else if prefix.as_ref().is_empty() {
222-
table_base.clone()
223-
} else {
224-
let mut parts: Vec<_> = table_base.parts().collect();
225-
parts.extend(prefix.parts());
226-
Path::from_iter(parts)
227-
};
219+
let mut parts: Vec<_> = table_base.parts().collect();
220+
parts.extend(prefix.parts());
221+
let full_prefix = Path::from_iter(parts);
228222
let full_prefix_str = full_prefix.as_ref();
229223

230224
// Filter files to only those matching the prefix

0 commit comments

Comments
 (0)