Skip to main content

readstat/
rs_data.rs

1//! Data reading and Arrow [`RecordBatch`](arrow_array::RecordBatch) conversion.
2//!
3//! [`ReadStatData`] coordinates the FFI parsing of row values from a `.sas7bdat` file,
4//! accumulating them directly into typed Arrow builders via the `handle_value`
5//! callback, then finishing them into an Arrow `RecordBatch` for downstream writing.
6//! Supports streaming chunks with configurable row offsets and progress tracking.
7
8use arrow::datatypes::Schema;
9use arrow_array::{
10    ArrayRef, RecordBatch,
11    builder::{
12        Date32Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, StringBuilder,
13        Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
14        Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
15        TimestampNanosecondBuilder, TimestampSecondBuilder,
16    },
17};
18use log::debug;
19use std::{
20    collections::BTreeMap,
21    ffi::CString,
22    os::raw::c_void,
23    sync::{Arc, atomic::AtomicUsize},
24};
25
26use crate::{
27    cb,
28    err::{ReadStatError, check_c_error},
29    progress::ProgressCallback,
30    rs_buffer_io::ReadStatBufferCtx,
31    rs_metadata::{ReadStatMetadata, ReadStatVarMetadata},
32    rs_parser::ReadStatParser,
33    rs_path::ReadStatPath,
34    rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass},
35};
36
37/// Upper bound on the row capacity pre-allocated for Arrow builders.
38///
39/// The claimed row count comes from an untrusted file header, so the up-front
40/// allocation is capped here; builders grow on demand past this for honest
41/// files. 1,000,000 rows is far beyond the default 10k streaming chunk while
42/// keeping the worst-case empty-builder reservation bounded.
43const MAX_PREALLOC_ROWS: usize = 1_000_000;
44
45/// A typed Arrow array builder for a single column.
46///
47/// Each variant wraps the corresponding Arrow builder, pre-sized with capacity
48/// hints from the metadata (row count, string `storage_width`). Values are
49/// appended directly during FFI callbacks, eliminating intermediate allocations.
50pub(crate) enum ColumnBuilder {
51    /// UTF-8 string column.
52    Str(StringBuilder),
53    /// 16-bit signed integer column (covers both SAS Int8 and Int16).
54    Int16(Int16Builder),
55    /// 32-bit signed integer column.
56    Int32(Int32Builder),
57    /// 32-bit floating point column.
58    Float32(Float32Builder),
59    /// 64-bit floating point column.
60    Float64(Float64Builder),
61    /// Date column (days since Unix epoch).
62    Date32(Date32Builder),
63    /// Timestamp with second precision.
64    TimestampSecond(TimestampSecondBuilder),
65    /// Timestamp with millisecond precision.
66    TimestampMillisecond(TimestampMillisecondBuilder),
67    /// Timestamp with microsecond precision.
68    TimestampMicrosecond(TimestampMicrosecondBuilder),
69    /// Timestamp with nanosecond precision.
70    TimestampNanosecond(TimestampNanosecondBuilder),
71    /// Time of day with second precision.
72    Time32Second(Time32SecondBuilder),
73    /// Time of day with millisecond precision.
74    Time32Millisecond(Time32MillisecondBuilder),
75    /// Time of day with microsecond precision.
76    Time64Microsecond(Time64MicrosecondBuilder),
77    /// Time of day with nanosecond precision.
78    Time64Nanosecond(Time64NanosecondBuilder),
79}
80
81impl ColumnBuilder {
82    /// Appends a null value, regardless of the underlying builder type.
83    pub(crate) fn append_null(&mut self) {
84        match self {
85            Self::Str(b) => b.append_null(),
86            Self::Int16(b) => b.append_null(),
87            Self::Int32(b) => b.append_null(),
88            Self::Float32(b) => b.append_null(),
89            Self::Float64(b) => b.append_null(),
90            Self::Date32(b) => b.append_null(),
91            Self::TimestampSecond(b) => b.append_null(),
92            Self::TimestampMillisecond(b) => b.append_null(),
93            Self::TimestampMicrosecond(b) => b.append_null(),
94            Self::TimestampNanosecond(b) => b.append_null(),
95            Self::Time32Second(b) => b.append_null(),
96            Self::Time32Millisecond(b) => b.append_null(),
97            Self::Time64Microsecond(b) => b.append_null(),
98            Self::Time64Nanosecond(b) => b.append_null(),
99        }
100    }
101
102    /// Finishes the builder and returns the completed Arrow array.
103    pub(crate) fn finish(&mut self) -> ArrayRef {
104        match self {
105            Self::Str(b) => Arc::new(b.finish()),
106            Self::Int16(b) => Arc::new(b.finish()),
107            Self::Int32(b) => Arc::new(b.finish()),
108            Self::Float32(b) => Arc::new(b.finish()),
109            Self::Float64(b) => Arc::new(b.finish()),
110            Self::Date32(b) => Arc::new(b.finish()),
111            Self::TimestampSecond(b) => Arc::new(b.finish()),
112            Self::TimestampMillisecond(b) => Arc::new(b.finish()),
113            Self::TimestampMicrosecond(b) => Arc::new(b.finish()),
114            Self::TimestampNanosecond(b) => Arc::new(b.finish()),
115            Self::Time32Second(b) => Arc::new(b.finish()),
116            Self::Time32Millisecond(b) => Arc::new(b.finish()),
117            Self::Time64Microsecond(b) => Arc::new(b.finish()),
118            Self::Time64Nanosecond(b) => Arc::new(b.finish()),
119        }
120    }
121
122    /// Creates a typed builder matching the variable's metadata.
123    ///
124    /// Uses `var_type`, `var_type_class`, and `var_format_class` to select the
125    /// correct builder variant, and pre-sizes it with `capacity` rows.
126    /// For string columns, `storage_width` provides a byte-level capacity hint.
127    fn from_metadata(vm: &ReadStatVarMetadata, capacity: usize) -> Self {
128        match vm.var_type_class {
129            ReadStatVarTypeClass::String => Self::Str(StringBuilder::with_capacity(
130                capacity,
131                // saturating_mul: storage_width is an untrusted file-header
132                // field, so guard the byte hint against usize overflow.
133                capacity.saturating_mul(vm.storage_width),
134            )),
135            ReadStatVarTypeClass::Numeric => {
136                match vm.var_format_class {
137                    Some(ReadStatVarFormatClass::Date) => {
138                        Self::Date32(Date32Builder::with_capacity(capacity))
139                    }
140                    Some(ReadStatVarFormatClass::DateTime) => {
141                        Self::TimestampSecond(TimestampSecondBuilder::with_capacity(capacity))
142                    }
143                    Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
144                        Self::TimestampMillisecond(TimestampMillisecondBuilder::with_capacity(
145                            capacity,
146                        ))
147                    }
148                    Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
149                        Self::TimestampMicrosecond(TimestampMicrosecondBuilder::with_capacity(
150                            capacity,
151                        ))
152                    }
153                    Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
154                        Self::TimestampNanosecond(TimestampNanosecondBuilder::with_capacity(
155                            capacity,
156                        ))
157                    }
158                    Some(ReadStatVarFormatClass::Time) => {
159                        Self::Time32Second(Time32SecondBuilder::with_capacity(capacity))
160                    }
161                    Some(ReadStatVarFormatClass::TimeWithMilliseconds) => {
162                        Self::Time32Millisecond(Time32MillisecondBuilder::with_capacity(capacity))
163                    }
164                    Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
165                        Self::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(capacity))
166                    }
167                    Some(ReadStatVarFormatClass::TimeWithNanoseconds) => {
168                        Self::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity))
169                    }
170                    None => {
171                        // Plain numeric — dispatch by storage type
172                        match vm.var_type {
173                            ReadStatVarType::Int8 | ReadStatVarType::Int16 => {
174                                Self::Int16(Int16Builder::with_capacity(capacity))
175                            }
176                            ReadStatVarType::Int32 => {
177                                Self::Int32(Int32Builder::with_capacity(capacity))
178                            }
179                            ReadStatVarType::Float => {
180                                Self::Float32(Float32Builder::with_capacity(capacity))
181                            }
182                            _ => Self::Float64(Float64Builder::with_capacity(capacity)),
183                        }
184                    }
185                }
186            }
187        }
188    }
189}
190
191/// Holds parsed row data from a `.sas7bdat` file and converts it to Arrow format.
192///
193/// Each instance processes one streaming chunk of rows. Values are appended
194/// directly into typed Arrow `ColumnBuilder`s during the `handle_value`
195/// callback, then finished into an Arrow [`RecordBatch`] via `cols_to_batch`.
196pub struct ReadStatData {
197    /// Number of variables (columns) in the dataset.
198    pub var_count: i32,
199    /// Per-variable metadata, keyed by variable index.
200    /// Wrapped in `Arc` so parallel chunks share the same metadata without deep cloning.
201    pub vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
202    /// Typed Arrow builders — one per variable, pre-sized with capacity hints.
203    pub(crate) builders: Vec<ColumnBuilder>,
204    /// Arrow schema for the dataset.
205    /// Wrapped in `Arc` for cheap sharing across parallel chunks.
206    pub schema: Arc<Schema>,
207    /// The Arrow `RecordBatch` produced after parsing, if available.
208    pub batch: Option<RecordBatch>,
209    /// Number of rows to process in this chunk.
210    pub chunk_rows_to_process: usize,
211    /// Starting row offset for this chunk.
212    pub(crate) chunk_row_start: usize,
213    /// Ending row offset (exclusive) for this chunk.
214    pub(crate) chunk_row_end: usize,
215    /// Number of rows actually processed so far in this chunk.
216    pub(crate) chunk_rows_processed: usize,
217    /// Shared atomic counter of total rows processed across all chunks.
218    pub(crate) total_rows_processed: Option<Arc<AtomicUsize>>,
219    /// Optional progress callback for visual feedback during parsing.
220    pub(crate) progress: Option<Arc<dyn ProgressCallback>>,
221    /// A typed error raised by a value callback that aborted parsing.
222    ///
223    /// Set by `handle_value` (e.g. on date/time overflow or a builder/value
224    /// type mismatch) and surfaced by the parse routines in preference to the
225    /// generic `USER_ABORT` the C library reports for any callback abort.
226    pub(crate) abort_error: Option<ReadStatError>,
227    /// Optional mapping: original var index -> filtered column index.
228    /// Wrapped in `Arc` so parallel chunks share the same filter without deep cloning.
229    pub(crate) column_filter: Option<Arc<BTreeMap<i32, i32>>>,
230    /// Total variable count in the unfiltered dataset.
231    /// Used for row-boundary detection in `handle_value` when filtering is active.
232    /// Defaults to `var_count` when no filter is set.
233    pub(crate) total_var_count: i32,
234}
235
236impl Default for ReadStatData {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242impl ReadStatData {
243    /// Creates a new `ReadStatData` with default (empty) values.
244    pub fn new() -> Self {
245        Self {
246            // metadata
247            var_count: 0,
248            vars: Arc::new(BTreeMap::new()),
249            // data
250            builders: Vec::new(),
251            schema: Arc::new(Schema::empty()),
252            // record batch
253            batch: None,
254            chunk_rows_to_process: 0,
255            chunk_rows_processed: 0,
256            chunk_row_start: 0,
257            chunk_row_end: 0,
258            // total rows
259            total_rows_processed: None,
260            // progress
261            progress: None,
262            // errors
263            abort_error: None,
264            // column filtering
265            column_filter: None,
266            total_var_count: 0,
267        }
268    }
269
270    /// Allocates typed Arrow builders with capacity for `chunk_rows_to_process`.
271    ///
272    /// Each builder's type is determined by the variable metadata. String builders
273    /// are additionally pre-sized with `storage_width * chunk_rows` bytes.
274    ///
275    /// The capacity hint is clamped to [`MAX_PREALLOC_ROWS`] because both the row
276    /// count and per-string `storage_width` originate from untrusted file headers;
277    /// a crafted file claiming billions of rows would otherwise trigger a multi-GB
278    /// up-front allocation (or a multiply overflow) before a single row is parsed.
279    /// Builders grow on demand, so clamping costs honest files nothing.
280    #[must_use]
281    pub fn allocate_builders(self) -> Self {
282        let capacity = self.chunk_rows_to_process.min(MAX_PREALLOC_ROWS);
283        let builders: Vec<ColumnBuilder> = self
284            .vars
285            .values()
286            .map(|vm| ColumnBuilder::from_metadata(vm, capacity))
287            .collect();
288        Self { builders, ..self }
289    }
290
291    /// Finishes all builders and assembles the Arrow [`RecordBatch`].
292    ///
293    /// Each builder produces its final array via `finish()`, which is an O(1)
294    /// operation (no data copying). The heavy work was already done during
295    /// `handle_value` when values were appended directly into the builders.
296    pub(crate) fn cols_to_batch(&mut self) -> Result<(), ReadStatError> {
297        let arrays: Vec<ArrayRef> = self
298            .builders
299            .iter_mut()
300            .map(ColumnBuilder::finish)
301            .collect();
302
303        self.batch = Some(RecordBatch::try_new(self.schema.clone(), arrays)?);
304
305        Ok(())
306    }
307
308    /// Records that a value was observed for `var_index` during parsing.
309    ///
310    /// When `var_index` is the dataset's final variable, the cell marks the end
311    /// of a row, so the per-chunk and shared row counters are advanced. Boundary
312    /// detection uses `total_var_count` (the *unfiltered* variable count) so it
313    /// stays correct even when a column filter skips trailing columns.
314    ///
315    /// Called from the value callback for both stored and filter-skipped cells,
316    /// keeping row-boundary accounting in a single place.
317    pub(crate) fn note_value(&mut self, var_index: i32) {
318        if var_index == self.total_var_count - 1 {
319            self.chunk_rows_processed += 1;
320            if let Some(trp) = &self.total_rows_processed {
321                trp.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
322            }
323        }
324    }
325
326    /// Parses row data from the file and converts it to an Arrow [`RecordBatch`].
327    ///
328    /// # Errors
329    ///
330    /// Returns [`ReadStatError`] if FFI parsing or Arrow conversion fails.
331    pub fn read_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
332        // parse data and if successful then convert cols into a record batch
333        self.parse_data(rsp)?;
334        self.cols_to_batch()?;
335        Ok(())
336    }
337
338    /// Parses row data from an in-memory byte slice and converts it to an Arrow [`RecordBatch`].
339    ///
340    /// Equivalent to [`read_data`](ReadStatData::read_data) but reads from a `&[u8]`
341    /// buffer instead of a file path.
342    ///
343    /// # Errors
344    ///
345    /// Returns [`ReadStatError`] if FFI parsing or Arrow conversion fails.
346    pub fn read_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
347        self.parse_data_from_bytes(bytes)?;
348        self.cols_to_batch()?;
349        Ok(())
350    }
351
352    /// Parses row data from a memory-mapped `.sas7bdat` file and converts it to an Arrow [`RecordBatch`].
353    ///
354    /// Opens the file at `path` and memory-maps it, avoiding explicit read syscalls.
355    /// Especially beneficial for large files and repeated chunk reads against the
356    /// same file, as the OS manages page caching automatically.
357    ///
358    /// # Safety
359    ///
360    /// Memory mapping is safe as long as the file is not modified or truncated by
361    /// another process while the map is active.
362    ///
363    /// # Errors
364    ///
365    /// Returns [`ReadStatError`] if the file cannot be opened, mapped, or parsed.
366    #[cfg(not(target_arch = "wasm32"))]
367    pub fn read_data_from_mmap(&mut self, path: &std::path::Path) -> Result<(), ReadStatError> {
368        let file = std::fs::File::open(path)?;
369        let mmap = unsafe { memmap2::Mmap::map(&file)? };
370        self.read_data_from_bytes(&mmap)
371    }
372
373    /// Parses row data from the file via FFI callbacks (without Arrow conversion).
374    #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
375    pub(crate) fn parse_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
376        // path as pointer
377        debug!("Path as C string is {:?}", rsp.cstring_path);
378        let ppath = rsp.cstring_path.as_ptr();
379
380        // initialize context
381        let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
382
383        // initialize error
384        let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
385        debug!("Initially, error ==> {error:#?}");
386
387        // setup parser
388        // once call parse_sas7bdat, iteration begins
389        let error = ReadStatParser::new()?
390            // do not set metadata handler nor variable handler as already processed
391            .set_value_handler(Some(cb::handle_value))?
392            .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
393            .set_row_offset(Some(self.chunk_row_start.try_into()?))?
394            .parse_sas7bdat(ppath, ctx);
395
396        // A value callback may have aborted with a specific, typed error; prefer
397        // it over the generic `USER_ABORT` the C library reports for any abort.
398        if let Some(e) = self.abort_error.take() {
399            return Err(e);
400        }
401        check_c_error(error as i32)?;
402
403        // Advance the progress bar by the rows just parsed. Doing this *after*
404        // the chunk completes (rather than before) keeps the displayed position
405        // in step with work actually done — under `--parallel` a pre-parse
406        // increment made the bar jump straight to 100%.
407        if let Some(progress) = &self.progress {
408            progress.inc(self.chunk_rows_to_process as u64);
409        }
410
411        Ok(())
412    }
413
414    #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
415    fn parse_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
416        let mut buffer_ctx = ReadStatBufferCtx::new(bytes);
417
418        // initialize context
419        let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
420
421        // initialize error
422        let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
423        debug!("Initially, error ==> {error:#?}");
424
425        // Dummy path — custom I/O handlers ignore it
426        let dummy_path = CString::new("").expect("empty string is valid C string");
427
428        // setup parser with buffer I/O
429        let error = buffer_ctx
430            .configure_parser(
431                ReadStatParser::new()?
432                    .set_value_handler(Some(cb::handle_value))?
433                    .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
434                    .set_row_offset(Some(self.chunk_row_start.try_into()?))?,
435            )?
436            .parse_sas7bdat(dummy_path.as_ptr(), ctx);
437
438        // A value callback may have aborted with a specific, typed error; prefer
439        // it over the generic `USER_ABORT` the C library reports for any abort.
440        if let Some(e) = self.abort_error.take() {
441            return Err(e);
442        }
443        check_c_error(error as i32)?;
444        Ok(())
445    }
446
447    /// Initializes this instance with metadata and chunk boundaries, allocating builders.
448    ///
449    /// Wraps `vars` and `schema` in `Arc` internally. For the parallel read path,
450    /// prefer [`init_shared`](ReadStatData::init_shared) which accepts pre-wrapped
451    /// `Arc`s to avoid repeated deep clones.
452    #[must_use]
453    pub fn init(self, md: ReadStatMetadata, row_start: u32, row_end: u32) -> Self {
454        self.set_metadata(md)
455            .set_chunk_counts(row_start, row_end)
456            .allocate_builders()
457    }
458
459    /// Initializes this instance with a column filter applied, in one step.
460    ///
461    /// Combines [`set_column_filter`](ReadStatData::set_column_filter) and
462    /// [`init`](ReadStatData::init) in the correct order so callers cannot
463    /// accidentally invoke them the wrong way around (which would clobber the
464    /// original variable count needed for row-boundary detection).
465    ///
466    /// `md` must be the **original, unfiltered** metadata and `mapping` the
467    /// result of [`ReadStatMetadata::resolve_selected_columns`]. The filtered
468    /// metadata and the original variable count are derived internally.
469    ///
470    /// ```no_run
471    /// use readstat::{ReadStatPath, ReadStatMetadata, ReadStatData};
472    ///
473    /// # fn main() -> Result<(), readstat::ReadStatError> {
474    /// let rsp = ReadStatPath::new("data.sas7bdat")?;
475    /// let mut md = ReadStatMetadata::new();
476    /// md.read_metadata(&rsp, false)?;
477    ///
478    /// if let Some(mapping) = md.resolve_selected_columns(Some(vec!["name".into(), "age".into()]))? {
479    ///     let row_count = u32::try_from(md.row_count)?;
480    ///     let mut d = ReadStatData::new().init_filtered(md, &mapping, 0, row_count);
481    ///     d.read_data(&rsp)?;
482    /// }
483    /// # Ok(())
484    /// # }
485    /// ```
486    #[must_use]
487    pub fn init_filtered(
488        self,
489        md: ReadStatMetadata,
490        mapping: &BTreeMap<i32, i32>,
491        row_start: u32,
492        row_end: u32,
493    ) -> Self {
494        let original_var_count = md.var_count;
495        let filtered = md.filter_to_selected_columns(mapping);
496        self.set_column_filter(Some(Arc::new(mapping.clone())), original_var_count)
497            .init(filtered, row_start, row_end)
498    }
499
500    /// Initializes this instance with pre-shared metadata and chunk boundaries.
501    ///
502    /// Accepts `Arc`-wrapped `vars` and `schema` for cheap cloning in parallel loops.
503    /// Each call only increments reference counts (atomic +1) instead of deep-cloning
504    /// the entire metadata tree.
505    #[must_use]
506    pub fn init_shared(
507        self,
508        var_count: i32,
509        vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
510        schema: Arc<Schema>,
511        row_start: u32,
512        row_end: u32,
513    ) -> Self {
514        let total_var_count = if self.total_var_count != 0 {
515            self.total_var_count
516        } else {
517            var_count
518        };
519        Self {
520            var_count,
521            vars,
522            schema,
523            total_var_count,
524            ..self
525        }
526        .set_chunk_counts(row_start, row_end)
527        .allocate_builders()
528    }
529
530    #[allow(clippy::cast_possible_truncation)]
531    fn set_chunk_counts(self, row_start: u32, row_end: u32) -> Self {
532        // saturating_sub: guard against a caller passing row_end < row_start,
533        // which would underflow-panic in debug and wrap to ~4 billion in
534        // release (then feed an enormous builder pre-allocation).
535        let chunk_rows_to_process = row_end.saturating_sub(row_start) as usize;
536        let chunk_row_start = row_start as usize;
537        let chunk_row_end = row_end as usize;
538        let chunk_rows_processed = 0_usize;
539
540        Self {
541            chunk_rows_to_process,
542            chunk_row_start,
543            chunk_row_end,
544            chunk_rows_processed,
545            ..self
546        }
547    }
548
549    fn set_metadata(self, md: ReadStatMetadata) -> Self {
550        let var_count = md.var_count;
551        let vars = Arc::new(md.vars);
552        let schema = Arc::new(md.schema);
553        // Only set total_var_count from metadata if not already set by set_column_filter
554        let total_var_count = if self.total_var_count != 0 {
555            self.total_var_count
556        } else {
557            var_count
558        };
559        Self {
560            var_count,
561            vars,
562            schema,
563            total_var_count,
564            ..self
565        }
566    }
567
568    /// Sets the shared atomic counter for tracking rows processed across chunks.
569    #[must_use]
570    pub fn set_total_rows_processed(self, total_rows_processed: Arc<AtomicUsize>) -> Self {
571        Self {
572            total_rows_processed: Some(total_rows_processed),
573            ..self
574        }
575    }
576
577    /// Sets the column filter and original (unfiltered) variable count.
578    ///
579    /// Accepts an `Arc`-wrapped filter for cheap sharing across parallel chunks.
580    /// Must be called **before** [`init`](ReadStatData::init) so that
581    /// `total_var_count` is preserved when `set_metadata` runs.
582    #[must_use]
583    pub fn set_column_filter(
584        self,
585        filter: Option<Arc<BTreeMap<i32, i32>>>,
586        total_var_count: i32,
587    ) -> Self {
588        Self {
589            column_filter: filter,
590            total_var_count,
591            ..self
592        }
593    }
594
595    /// Attaches a progress callback for feedback during parsing.
596    ///
597    /// The callback receives progress increments and parsing status updates.
598    /// See [`ProgressCallback`] for the required interface.
599    #[must_use]
600    pub fn set_progress(self, progress: Arc<dyn ProgressCallback>) -> Self {
601        Self {
602            progress: Some(progress),
603            ..self
604        }
605    }
606}