The Problem
The trading platform used DuckDB for time-series queries. Queries were fast enough—1-2ms typical—but not optimal. Each query read full datasets from Parquet files, then filtered in memory.
Query: "SPY bars where close >= $450"
DuckDB execution:
1. Read all 100,000 bars from Parquet (1MB compressed)
2. Decompress to memory (7.2MB)
3. Filter in memory
4. Return 30,000 matching bars
Latency: 1.2ms
I/O: 1MB read for 300KB result (70% wasted)
The inefficiency was clear: reading 100% of data to return 30%. For selective queries, most I/O was wasted.
DuckDB is a general-purpose SQL engine. Excellent for ad-hoc queries, complex joins, window functions. Overkill for simple time-series lookups.
The decision: build a custom query engine optimized for the specific workload. Single-table queries, time-based partitions, simple filters. No SQL parser overhead, no FFI boundary, direct Parquet access.
Initial Investigation
Profiling started with Criterion.rs benchmarks. The baseline measured query performance before optimization.
Test setup: 100,000 bars, price range 400-500, various filter selectivities.
Baseline Results
Query Type | Latency (ns) | I/O Read | Bars Returned |
---|
High selectivity (90% filtered) | 2,781 | 100% | 10% |
Medium selectivity (50% filtered) | 3,011 | 100% | 50% |
Low selectivity (10% filtered) | 3,367 | 100% | 90% |
No filter (baseline) | 1,341 | 100% | 100% |
Filtered queries were slower than unfiltered. Paradox: reading 100K bars and keeping 10K took longer than reading 100K bars and keeping all.
The overhead came from the filter operation itself. Each bar required field access and comparison. For 90K discarded bars, that was wasted CPU.
The Anti-Pattern
// Query executor - baseline implementation
let mut all_bars = Vec::new();
// Read ALL data from all partitions
for partition in &partitions {
if let Ok(bars) = self.storage.read(symbol, partition.as_str()) {
all_bars.extend(bars); // No filtering
}
}
// Filter in memory AFTER reading everything
for filter in &query.filters {
all_bars.retain(|bar| filter.matches_ohlcv(bar));
}
Every query, regardless of filters, read 100% of data. Filters applied after I/O, decompression, and deserialization.
The problem was obvious. The solution was predicate pushdown: filter at the storage layer.
Solution Architecture
Predicate pushdown moves filtering from memory to storage. Two levels: row group pruning (skip chunks based on statistics) and Arrow compute filtering (filter before deserialization).
Level 1: Row Group Pruning
Parquet files contain metadata with min/max statistics per row group. For “close >= 450”, any row group with max_close < 450 can be skipped entirely.
Parquet File Structure:
├── Metadata (10KB)
│ ├── Row Group 0: {min: 440.0, max: 445.0} → SKIP (max < 450)
│ ├── Row Group 1: {min: 445.0, max: 452.0} → READ (overlaps)
│ ├── Row Group 2: {min: 450.0, max: 455.0} → READ (all match)
│ └── ...
├── Row Group 0 Data (100KB compressed) → Never read from disk
├── Row Group 1 Data (100KB compressed) → Read and filter
└── Row Group 2 Data (100KB compressed) → Read
Row Group 0 is never touched. I/O saved: 33% in this example. For highly selective queries, savings reach 90%.
Level 2: Arrow Compute Filtering
For row groups that might contain matches, apply filters before deserializing to OHLCV structs.
// Read row group as Arrow RecordBatch (columnar)
let batch = reader.next()?;
// Apply filter using Arrow compute kernels
let filtered_batch = filter::apply_filters(&batch, filters)?;
// Deserialize only filtered rows to OHLCV
let bars = Self::from_record_batch(&filtered_batch)?;
Arrow compute kernels use SIMD automatically. Comparisons process 4-8 values per instruction. Filtering happens on columnar data before conversion to structs.
Implementation
pub fn read_with_filters(
&self,
symbol: &str,
partition: &str,
filters: &[QueryFilter],
) -> Result<Vec<OHLCV>> {
let file = File::open(&file_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
// LEVEL 1: Row group pruning
let metadata = builder.metadata();
let row_groups_to_read: Vec<usize> = metadata
.row_groups()
.iter()
.enumerate()
.filter_map(|(idx, rg)| {
if filter::should_read_row_group(rg, filters) {
Some(idx)
} else {
None // Skip this row group
}
})
.collect();
// Early exit if no row groups match
if row_groups_to_read.is_empty() {
return Ok(Vec::new());
}
// Build reader with selected row groups only
let reader = builder
.with_row_groups(row_groups_to_read)
.build()?;
let mut all_data = Vec::new();
// LEVEL 2: Arrow compute filtering
for batch_result in reader {
let batch = batch_result?;
let filtered_batch = filter::apply_filters(&batch, filters)?;
let data = Self::from_record_batch(&filtered_batch)?;
all_data.extend(data);
}
Ok(all_data)
}
Row group selection happens at the ParquetRecordBatchReaderBuilder level. Only selected row groups are read from disk. Arrow filtering happens on each batch after decompression.
Filter Translation
Query filters translate to Arrow predicates:
pub fn filter_to_arrow_predicate(
batch: &RecordBatch,
filter: &QueryFilter,
) -> Result<BooleanArray> {
match filter {
QueryFilter::MinPrice(threshold_i64) => {
let threshold = *threshold_i64 as f64 / 10000.0;
let close_array = batch.column(4)
.as_any()
.downcast_ref::<Float64Array>()?;
let threshold_array = Float64Array::from(
vec![threshold; close_array.len()]
);
// SIMD-accelerated comparison
arrow::compute::gt_eq(close_array, &threshold_array)
},
QueryFilter::MinVolume(threshold) => {
let volume_array = batch.column(5)
.as_any()
.downcast_ref::<UInt64Array>()?;
let threshold_array = UInt64Array::from(
vec![*threshold; volume_array.len()]
);
arrow::compute::gt_eq(volume_array, &threshold_array)
}
}
}
Arrow’s gt_eq
kernel handles SIMD automatically. No manual vectorization required.
Zero Regression Strategy
Unfiltered queries must not regress. Explicit branching ensures they use the original fast path:
// In query executor
if query.filters.is_empty() {
// No filters: use original read path (zero overhead)
for partition in &partitions {
all_bars.extend(self.storage.read(symbol, partition)?);
}
} else {
// With filters: use predicate pushdown
for partition in &partitions {
all_bars.extend(
self.storage.read_with_filters(symbol, partition, &query.filters)?
);
}
}
Unfiltered queries bypass the new code entirely. No overhead from statistics parsing or predicate building.
Results
Benchmark ran after implementation. Same queries, same data, new code path.
Scenario | Before (ns) | After (ns) | Speedup | I/O Saved |
---|
High selectivity (90% filtered) | 2,781 | 267 | 10.4x | 90% |
Medium selectivity (50% filtered) | 3,011 | 799 | 3.8x | 50% |
Low selectivity (10% filtered) | 3,367 | 1,271 | 2.6x | 10% |
No filter (regression test) | 1,341 | 1,348 | 1.0x | 0% |
Multiple filters (AND) | 2,911 | 376 | 7.7x | 75% |
All targets exceeded. High selectivity case hit 10.4x speedup with 90% I/O reduction.
Regression test confirmed zero overhead: 1,341 ns → 1,348 ns (+0.5% noise).
Why Speedup Exceeds I/O Reduction
For high selectivity (90% filtered), I/O reduced 90% but speedup was 10.4x. More than proportional.
The gap came from:
- Avoided decompression: LZ4 decompression is CPU-intensive. Skipping 90% of compressed data saved CPU.
- Smaller allocations: Vec growth for 10K items is cheaper than 100K items (non-linear allocator).
- Better cache utilization: 10K items fit in L2 cache. 100K items thrash L3.
- SIMD filtering: Arrow compute uses AVX-2 instructions automatically.
I/O reduction was the first-order effect. CPU and memory savings compounded it.
I/O Breakdown Example
For high selectivity query (close >= 480):
Before:
Read: 100KB compressed → 1MB decompressed
Deserialize: 10,000 bars → Vec<OHLCV> (7.2MB)
Filter: 10,000 bars → 1,000 bars (9,000 discarded)
Time: 2,781 ns
After:
Read metadata: 10KB
Check 10 row group statistics
Skip 9 row groups (90KB compressed saved)
Read 1 row group: 10KB compressed → 100KB decompressed
Arrow filter: 1,200 bars → 1,000 bars
Deserialize: 1,000 bars → Vec<OHLCV> (720KB)
Time: 267 ns (10.4x faster)
I/O reduced from 100KB to 20KB (10KB metadata + 10KB data). But total time reduced by 10.4x due to CPU and memory savings.
Production Impact
The system deployed to production and replaced DuckDB entirely. Migration took 2 weeks with gradual rollout.
System Metrics
Metric | DuckDB | Custom Engine | Improvement |
---|
P50 latency | 1.5ms | 0.3ms | 5x |
P99 latency | 5ms | 0.5ms | 10x |
Throughput | 667 qps | 2,500 qps | 3.8x |
Memory usage | 120MB | 80MB | 33% reduction |
Latency improvements enabled new features:
- Real-time dashboard with 100ms refresh rate
- 10-symbol correlation analysis (parallel queries)
- Historical backtesting (combined with caching: 50x faster)
Monitoring
Production monitoring tracked filter effectiveness:
// Log selectivity to understand query patterns
let selectivity = result.bars.len() as f64 / total_bars_scanned as f64;
metrics::histogram!("query.selectivity", selectivity);
// Track row group pruning efficiency
let pruning_rate = 1.0 - (row_groups_read as f64 / total_row_groups as f64);
metrics::histogram!("query.row_groups_pruned", pruning_rate);
Observed selectivity distribution:
- 60% of queries: high selectivity (>80% filtered) → 8-10x speedup
- 30% of queries: medium selectivity (30-80% filtered) → 2-4x speedup
- 10% of queries: low selectivity (<30% filtered) → 1.5-2x speedup
No queries regressed. Unfiltered queries remained at baseline performance.
Evolution Timeline
The final system emerged through 5 iterations. Not all optimizations succeeded.
Iteration 1: Baseline
Established performance baseline with comprehensive benchmarks.
Results:
- Multi-partition query: 2.45ms
- Storage read: 9.30M bars/sec
- Storage write: 142K bars/sec
- Compression: 2.6:1 ratio
Iteration 2: Async I/O
Parallelized partition reads using Tokio.
Implementation:
let futures: Vec<_> = partitions
.iter()
.map(|p| storage.read_async(symbol, p))
.collect();
let results = futures::future::join_all(futures).await;
Results:
- Before: 2.19ms (sequential reads)
- After: 0.81ms (parallel reads)
- Speedup: 2.7x
Async I/O scaled well up to 5 partitions. Beyond 5, synchronization overhead limited gains.
Iteration 3: LRU Caching
Added result cache for repeated queries.
Implementation:
// Check cache first
if let Some(cached) = self.cache.get(&query) {
return Ok(cached.clone());
}
// Execute and cache
let result = self.execute_uncached(query)?;
self.cache.put(query.clone(), result.clone());
Results:
- Cache hit: 1,054x speedup (sub-microsecond latency)
- Realistic workload (80% hit rate): 588x speedup
- Cache size: 1,000 queries
Caching provided the largest single gain. Dashboard queries went from 1.5ms to <5ÎĽs.
Iteration 4: SIMD Aggregations (Failed)
Attempted manual SIMD for aggregations (VWAP, average, sum).
Problem: Data stored as Vec<OHLCV>
(Array-of-Structs layout). SIMD requires Struct-of-Arrays.
Results:
Operation | Scalar (ns) | SIMD (ns) | Result |
---|
VWAP | 1,340 | 2,486 | 0.54x (86% regression) |
Average | 412 | 687 | 0.60x (67% regression) |
Sum | 339 | 566 | 0.60x (67% regression) |
Field extraction overhead exceeded SIMD benefit. Reverted the changes.
Lesson: Data layout matters more than instruction optimization. Arrow’s columnar format enables SIMD at the library level. Manual SIMD on AoS layout fails.
Iteration 5: Predicate Pushdown (Success)
Implemented two-level filtering at storage layer.
Results: Covered in previous section (10.4x speedup).
Combined Impact
Iteration | Optimization | Speedup | Cumulative |
---|
1 | Baseline | 1.0x | 1.0x |
2 | Async I/O | 2.7x | 2.7x |
3 | LRU Cache | 588x (80% hit) | ~1,590x |
4 | SIMD | 0.5x (reverted) | N/A |
5 | Predicate Pushdown | 10.4x | ~16,500x |
For realistic workload (80% cache hit, selective filter):
- Cache hit: ~5ÎĽs
- Cache miss with filter: ~300ÎĽs (vs 2.5ms baseline)
- Effective speedup: ~500x
Key Lessons
Lesson 1: Optimize I/O Before CPU
SIMD attempt (iteration 4) targeted CPU. Predicate pushdown (iteration 5) targeted I/O.
SIMD regressed 86%. Predicate pushdown improved 10.4x.
Time-series databases are I/O-bound, not CPU-bound. Decompression and deserialization dominate runtime. Filtering in memory is fast—reading unnecessary data is slow.
Rule: Profile first. Fix the actual bottleneck.
Lesson 2: Parquet Statistics Are Free
Row group statistics exist in metadata. No extra work required. Parquet writes min/max stats by default.
Cost to read statistics: ~10KB. Cost to read data: ~1MB per row group.
ROI: 100:1 (read 10KB metadata to skip 1MB data).
Lesson 3: Test-Driven Development Prevents Bugs
Equivalence test caught 3 bugs before production:
#[test]
fn test_filter_equivalence_inmemory() {
// Filter at storage layer
let storage_result = storage.read_with_filters("SPY", "2024-01-02", &filters)?;
// Filter in memory (reference implementation)
let memory_result: Vec<_> = all_bars.iter()
.filter(|bar| filters.iter().all(|f| f.matches_ohlcv(bar)))
.cloned()
.collect();
// Must be identical
assert_eq!(storage_result, memory_result);
}
Bugs caught:
- Boundary value handling (450.0 vs 449.9999)
- Multiple filter AND logic (incorrectly used OR)
- Statistics byte order (assumed big-endian, Parquet uses little-endian)
All found in tests, not production.
Lesson 4: Zero Regression Is Critical
Unfiltered queries must not regress. Explicit branching ensured this:
if query.filters.is_empty() {
// Old path: zero overhead
} else {
// New path: predicate pushdown
}
Benchmark confirmed: 1,341 ns → 1,348 ns (+0.5% noise).
Existing workloads didn’t pay for optimization they don’t use.
Lesson 5: Rust Beats FFI Overhead
DuckDB via Python adds FFI overhead. Custom Rust engine integrates natively.
Latency breakdown:
- DuckDB (Python): ~1.2ms query + ~0.3ms FFI = 1.5ms total
- Custom (Rust): ~0.3ms query + 0ms FFI = 0.3ms total
FFI elimination contributed 20% of total speedup.
When to Build Custom vs Use DuckDB
DuckDB is excellent for general-purpose analytics. Custom engines make sense for specific workloads.
Use DuckDB When
- SQL flexibility is required
- Queries are complex (joins, window functions, CTEs)
- Development speed matters more than query speed
- Latency <10ms is acceptable
- Workload is varied (no single hot path to optimize)
Build Custom When
- Workload is specific and well-defined
- Latency requirements are strict (<1ms)
- Control over optimization is needed
- Integration overhead matters (FFI, serialization)
- Query patterns are predictable
For the trading system, workload was specific: single-table time-series queries with simple filters. DuckDB’s SQL parser and query planner were unnecessary overhead.
Custom engine achieved 5x lower latency with full control over optimization strategy.
Conclusion
Replacing DuckDB with a custom Rust query engine delivered 10.4x speedup for selective queries. Predicate pushdown reduced I/O by 90% through row group pruning and Arrow compute filtering.
The optimization targeted I/O, not CPU. Attempted SIMD optimization regressed performance by 86%. Predicate pushdown succeeded by eliminating wasted reads.
Production deployment replaced DuckDB entirely. P99 latency dropped from 5ms to 0.5ms. Throughput increased 3.8x. Zero regressions on unfiltered queries.
The key architectural decision: columnar storage (Parquet) with statistics-based pruning. Statistics were free. Filtering was automatic via Arrow compute kernels. No manual SIMD required.
Final Metrics
âś… Performance: 10.4x speedup (selective queries)
âś… Zero regression: 1.0x for unfiltered queries
âś… Production-ready: 118 tests passing, zero warnings
âś… DuckDB replaced: 100% migration complete
Test Coverage
- 93 tests (baseline) → 118 tests (final)
- 25 new tests for predicate pushdown
- Equivalence tests for correctness
- Regression tests for performance
- Zero clippy warnings
The system runs in production handling 2,500 queries per second with P99 latency under 1ms.