readstat/rs_data.rs
1//! Data reading and Arrow [`RecordBatch`](arrow_array::RecordBatch) conversion.
2//!
3//! [`ReadStatData`] coordinates the FFI parsing of row values from a `.sas7bdat` file,
4//! accumulating them directly into typed Arrow builders via the `handle_value`
5//! callback, then finishing them into an Arrow `RecordBatch` for downstream writing.
6//! Supports streaming chunks with configurable row offsets and progress tracking.
7
8use arrow::datatypes::Schema;
9use arrow_array::{
10 ArrayRef, RecordBatch,
11 builder::{
12 Date32Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, StringBuilder,
13 Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
14 Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
15 TimestampNanosecondBuilder, TimestampSecondBuilder,
16 },
17};
18use log::debug;
19use std::{
20 collections::BTreeMap,
21 ffi::CString,
22 os::raw::c_void,
23 sync::{Arc, atomic::AtomicUsize},
24};
25
26use crate::{
27 cb,
28 err::{ReadStatError, check_c_error},
29 progress::ProgressCallback,
30 rs_buffer_io::ReadStatBufferCtx,
31 rs_metadata::{ReadStatMetadata, ReadStatVarMetadata},
32 rs_parser::ReadStatParser,
33 rs_path::ReadStatPath,
34 rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass},
35};
36
37/// Upper bound on the row capacity pre-allocated for Arrow builders.
38///
39/// The claimed row count comes from an untrusted file header, so the up-front
40/// allocation is capped here; builders grow on demand past this for honest
41/// files. 1,000,000 rows is far beyond the default 10k streaming chunk while
42/// keeping the worst-case empty-builder reservation bounded.
43const MAX_PREALLOC_ROWS: usize = 1_000_000;
44
45/// A typed Arrow array builder for a single column.
46///
47/// Each variant wraps the corresponding Arrow builder, pre-sized with capacity
48/// hints from the metadata (row count, string `storage_width`). Values are
49/// appended directly during FFI callbacks, eliminating intermediate allocations.
50pub(crate) enum ColumnBuilder {
51 /// UTF-8 string column.
52 Str(StringBuilder),
53 /// 16-bit signed integer column (covers both SAS Int8 and Int16).
54 Int16(Int16Builder),
55 /// 32-bit signed integer column.
56 Int32(Int32Builder),
57 /// 32-bit floating point column.
58 Float32(Float32Builder),
59 /// 64-bit floating point column.
60 Float64(Float64Builder),
61 /// Date column (days since Unix epoch).
62 Date32(Date32Builder),
63 /// Timestamp with second precision.
64 TimestampSecond(TimestampSecondBuilder),
65 /// Timestamp with millisecond precision.
66 TimestampMillisecond(TimestampMillisecondBuilder),
67 /// Timestamp with microsecond precision.
68 TimestampMicrosecond(TimestampMicrosecondBuilder),
69 /// Timestamp with nanosecond precision.
70 TimestampNanosecond(TimestampNanosecondBuilder),
71 /// Time of day with second precision.
72 Time32Second(Time32SecondBuilder),
73 /// Time of day with millisecond precision.
74 Time32Millisecond(Time32MillisecondBuilder),
75 /// Time of day with microsecond precision.
76 Time64Microsecond(Time64MicrosecondBuilder),
77 /// Time of day with nanosecond precision.
78 Time64Nanosecond(Time64NanosecondBuilder),
79}
80
81impl ColumnBuilder {
82 /// Appends a null value, regardless of the underlying builder type.
83 pub(crate) fn append_null(&mut self) {
84 match self {
85 Self::Str(b) => b.append_null(),
86 Self::Int16(b) => b.append_null(),
87 Self::Int32(b) => b.append_null(),
88 Self::Float32(b) => b.append_null(),
89 Self::Float64(b) => b.append_null(),
90 Self::Date32(b) => b.append_null(),
91 Self::TimestampSecond(b) => b.append_null(),
92 Self::TimestampMillisecond(b) => b.append_null(),
93 Self::TimestampMicrosecond(b) => b.append_null(),
94 Self::TimestampNanosecond(b) => b.append_null(),
95 Self::Time32Second(b) => b.append_null(),
96 Self::Time32Millisecond(b) => b.append_null(),
97 Self::Time64Microsecond(b) => b.append_null(),
98 Self::Time64Nanosecond(b) => b.append_null(),
99 }
100 }
101
102 /// Finishes the builder and returns the completed Arrow array.
103 pub(crate) fn finish(&mut self) -> ArrayRef {
104 match self {
105 Self::Str(b) => Arc::new(b.finish()),
106 Self::Int16(b) => Arc::new(b.finish()),
107 Self::Int32(b) => Arc::new(b.finish()),
108 Self::Float32(b) => Arc::new(b.finish()),
109 Self::Float64(b) => Arc::new(b.finish()),
110 Self::Date32(b) => Arc::new(b.finish()),
111 Self::TimestampSecond(b) => Arc::new(b.finish()),
112 Self::TimestampMillisecond(b) => Arc::new(b.finish()),
113 Self::TimestampMicrosecond(b) => Arc::new(b.finish()),
114 Self::TimestampNanosecond(b) => Arc::new(b.finish()),
115 Self::Time32Second(b) => Arc::new(b.finish()),
116 Self::Time32Millisecond(b) => Arc::new(b.finish()),
117 Self::Time64Microsecond(b) => Arc::new(b.finish()),
118 Self::Time64Nanosecond(b) => Arc::new(b.finish()),
119 }
120 }
121
122 /// Creates a typed builder matching the variable's metadata.
123 ///
124 /// Uses `var_type`, `var_type_class`, and `var_format_class` to select the
125 /// correct builder variant, and pre-sizes it with `capacity` rows.
126 /// For string columns, `storage_width` provides a byte-level capacity hint.
127 fn from_metadata(vm: &ReadStatVarMetadata, capacity: usize) -> Self {
128 match vm.var_type_class {
129 ReadStatVarTypeClass::String => Self::Str(StringBuilder::with_capacity(
130 capacity,
131 // saturating_mul: storage_width is an untrusted file-header
132 // field, so guard the byte hint against usize overflow.
133 capacity.saturating_mul(vm.storage_width),
134 )),
135 ReadStatVarTypeClass::Numeric => {
136 match vm.var_format_class {
137 Some(ReadStatVarFormatClass::Date) => {
138 Self::Date32(Date32Builder::with_capacity(capacity))
139 }
140 Some(ReadStatVarFormatClass::DateTime) => {
141 Self::TimestampSecond(TimestampSecondBuilder::with_capacity(capacity))
142 }
143 Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
144 Self::TimestampMillisecond(TimestampMillisecondBuilder::with_capacity(
145 capacity,
146 ))
147 }
148 Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
149 Self::TimestampMicrosecond(TimestampMicrosecondBuilder::with_capacity(
150 capacity,
151 ))
152 }
153 Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
154 Self::TimestampNanosecond(TimestampNanosecondBuilder::with_capacity(
155 capacity,
156 ))
157 }
158 Some(ReadStatVarFormatClass::Time) => {
159 Self::Time32Second(Time32SecondBuilder::with_capacity(capacity))
160 }
161 Some(ReadStatVarFormatClass::TimeWithMilliseconds) => {
162 Self::Time32Millisecond(Time32MillisecondBuilder::with_capacity(capacity))
163 }
164 Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
165 Self::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(capacity))
166 }
167 Some(ReadStatVarFormatClass::TimeWithNanoseconds) => {
168 Self::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity))
169 }
170 None => {
171 // Plain numeric — dispatch by storage type
172 match vm.var_type {
173 ReadStatVarType::Int8 | ReadStatVarType::Int16 => {
174 Self::Int16(Int16Builder::with_capacity(capacity))
175 }
176 ReadStatVarType::Int32 => {
177 Self::Int32(Int32Builder::with_capacity(capacity))
178 }
179 ReadStatVarType::Float => {
180 Self::Float32(Float32Builder::with_capacity(capacity))
181 }
182 _ => Self::Float64(Float64Builder::with_capacity(capacity)),
183 }
184 }
185 }
186 }
187 }
188 }
189}
190
191/// Holds parsed row data from a `.sas7bdat` file and converts it to Arrow format.
192///
193/// Each instance processes one streaming chunk of rows. Values are appended
194/// directly into typed Arrow `ColumnBuilder`s during the `handle_value`
195/// callback, then finished into an Arrow [`RecordBatch`] via `cols_to_batch`.
196pub struct ReadStatData {
197 /// Number of variables (columns) in the dataset.
198 pub var_count: i32,
199 /// Per-variable metadata, keyed by variable index.
200 /// Wrapped in `Arc` so parallel chunks share the same metadata without deep cloning.
201 pub vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
202 /// Typed Arrow builders — one per variable, pre-sized with capacity hints.
203 pub(crate) builders: Vec<ColumnBuilder>,
204 /// Arrow schema for the dataset.
205 /// Wrapped in `Arc` for cheap sharing across parallel chunks.
206 pub schema: Arc<Schema>,
207 /// The Arrow `RecordBatch` produced after parsing, if available.
208 pub batch: Option<RecordBatch>,
209 /// Number of rows to process in this chunk.
210 pub chunk_rows_to_process: usize,
211 /// Starting row offset for this chunk.
212 pub(crate) chunk_row_start: usize,
213 /// Ending row offset (exclusive) for this chunk.
214 pub(crate) chunk_row_end: usize,
215 /// Number of rows actually processed so far in this chunk.
216 pub(crate) chunk_rows_processed: usize,
217 /// Shared atomic counter of total rows processed across all chunks.
218 pub(crate) total_rows_processed: Option<Arc<AtomicUsize>>,
219 /// Optional progress callback for visual feedback during parsing.
220 pub(crate) progress: Option<Arc<dyn ProgressCallback>>,
221 /// A typed error raised by a value callback that aborted parsing.
222 ///
223 /// Set by `handle_value` (e.g. on date/time overflow or a builder/value
224 /// type mismatch) and surfaced by the parse routines in preference to the
225 /// generic `USER_ABORT` the C library reports for any callback abort.
226 pub(crate) abort_error: Option<ReadStatError>,
227 /// Optional mapping: original var index -> filtered column index.
228 /// Wrapped in `Arc` so parallel chunks share the same filter without deep cloning.
229 pub(crate) column_filter: Option<Arc<BTreeMap<i32, i32>>>,
230 /// Total variable count in the unfiltered dataset.
231 /// Used for row-boundary detection in `handle_value` when filtering is active.
232 /// Defaults to `var_count` when no filter is set.
233 pub(crate) total_var_count: i32,
234}
235
236impl Default for ReadStatData {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242impl ReadStatData {
243 /// Creates a new `ReadStatData` with default (empty) values.
244 pub fn new() -> Self {
245 Self {
246 // metadata
247 var_count: 0,
248 vars: Arc::new(BTreeMap::new()),
249 // data
250 builders: Vec::new(),
251 schema: Arc::new(Schema::empty()),
252 // record batch
253 batch: None,
254 chunk_rows_to_process: 0,
255 chunk_rows_processed: 0,
256 chunk_row_start: 0,
257 chunk_row_end: 0,
258 // total rows
259 total_rows_processed: None,
260 // progress
261 progress: None,
262 // errors
263 abort_error: None,
264 // column filtering
265 column_filter: None,
266 total_var_count: 0,
267 }
268 }
269
270 /// Allocates typed Arrow builders with capacity for `chunk_rows_to_process`.
271 ///
272 /// Each builder's type is determined by the variable metadata. String builders
273 /// are additionally pre-sized with `storage_width * chunk_rows` bytes.
274 ///
275 /// The capacity hint is clamped to [`MAX_PREALLOC_ROWS`] because both the row
276 /// count and per-string `storage_width` originate from untrusted file headers;
277 /// a crafted file claiming billions of rows would otherwise trigger a multi-GB
278 /// up-front allocation (or a multiply overflow) before a single row is parsed.
279 /// Builders grow on demand, so clamping costs honest files nothing.
280 #[must_use]
281 pub fn allocate_builders(self) -> Self {
282 let capacity = self.chunk_rows_to_process.min(MAX_PREALLOC_ROWS);
283 let builders: Vec<ColumnBuilder> = self
284 .vars
285 .values()
286 .map(|vm| ColumnBuilder::from_metadata(vm, capacity))
287 .collect();
288 Self { builders, ..self }
289 }
290
291 /// Finishes all builders and assembles the Arrow [`RecordBatch`].
292 ///
293 /// Each builder produces its final array via `finish()`, which is an O(1)
294 /// operation (no data copying). The heavy work was already done during
295 /// `handle_value` when values were appended directly into the builders.
296 pub(crate) fn cols_to_batch(&mut self) -> Result<(), ReadStatError> {
297 let arrays: Vec<ArrayRef> = self
298 .builders
299 .iter_mut()
300 .map(ColumnBuilder::finish)
301 .collect();
302
303 self.batch = Some(RecordBatch::try_new(self.schema.clone(), arrays)?);
304
305 Ok(())
306 }
307
308 /// Records that a value was observed for `var_index` during parsing.
309 ///
310 /// When `var_index` is the dataset's final variable, the cell marks the end
311 /// of a row, so the per-chunk and shared row counters are advanced. Boundary
312 /// detection uses `total_var_count` (the *unfiltered* variable count) so it
313 /// stays correct even when a column filter skips trailing columns.
314 ///
315 /// Called from the value callback for both stored and filter-skipped cells,
316 /// keeping row-boundary accounting in a single place.
317 pub(crate) fn note_value(&mut self, var_index: i32) {
318 if var_index == self.total_var_count - 1 {
319 self.chunk_rows_processed += 1;
320 if let Some(trp) = &self.total_rows_processed {
321 trp.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
322 }
323 }
324 }
325
326 /// Parses row data from the file and converts it to an Arrow [`RecordBatch`].
327 ///
328 /// # Errors
329 ///
330 /// Returns [`ReadStatError`] if FFI parsing or Arrow conversion fails.
331 pub fn read_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
332 // parse data and if successful then convert cols into a record batch
333 self.parse_data(rsp)?;
334 self.cols_to_batch()?;
335 Ok(())
336 }
337
338 /// Parses row data from an in-memory byte slice and converts it to an Arrow [`RecordBatch`].
339 ///
340 /// Equivalent to [`read_data`](ReadStatData::read_data) but reads from a `&[u8]`
341 /// buffer instead of a file path.
342 ///
343 /// # Errors
344 ///
345 /// Returns [`ReadStatError`] if FFI parsing or Arrow conversion fails.
346 pub fn read_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
347 self.parse_data_from_bytes(bytes)?;
348 self.cols_to_batch()?;
349 Ok(())
350 }
351
352 /// Parses row data from a memory-mapped `.sas7bdat` file and converts it to an Arrow [`RecordBatch`].
353 ///
354 /// Opens the file at `path` and memory-maps it, avoiding explicit read syscalls.
355 /// Especially beneficial for large files and repeated chunk reads against the
356 /// same file, as the OS manages page caching automatically.
357 ///
358 /// # Safety
359 ///
360 /// Memory mapping is safe as long as the file is not modified or truncated by
361 /// another process while the map is active.
362 ///
363 /// # Errors
364 ///
365 /// Returns [`ReadStatError`] if the file cannot be opened, mapped, or parsed.
366 #[cfg(not(target_arch = "wasm32"))]
367 pub fn read_data_from_mmap(&mut self, path: &std::path::Path) -> Result<(), ReadStatError> {
368 let file = std::fs::File::open(path)?;
369 let mmap = unsafe { memmap2::Mmap::map(&file)? };
370 self.read_data_from_bytes(&mmap)
371 }
372
373 /// Parses row data from the file via FFI callbacks (without Arrow conversion).
374 #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
375 pub(crate) fn parse_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
376 // path as pointer
377 debug!("Path as C string is {:?}", rsp.cstring_path);
378 let ppath = rsp.cstring_path.as_ptr();
379
380 // initialize context
381 let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
382
383 // initialize error
384 let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
385 debug!("Initially, error ==> {error:#?}");
386
387 // setup parser
388 // once call parse_sas7bdat, iteration begins
389 let error = ReadStatParser::new()?
390 // do not set metadata handler nor variable handler as already processed
391 .set_value_handler(Some(cb::handle_value))?
392 .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
393 .set_row_offset(Some(self.chunk_row_start.try_into()?))?
394 .parse_sas7bdat(ppath, ctx);
395
396 // A value callback may have aborted with a specific, typed error; prefer
397 // it over the generic `USER_ABORT` the C library reports for any abort.
398 if let Some(e) = self.abort_error.take() {
399 return Err(e);
400 }
401 check_c_error(error as i32)?;
402
403 // Advance the progress bar by the rows just parsed. Doing this *after*
404 // the chunk completes (rather than before) keeps the displayed position
405 // in step with work actually done — under `--parallel` a pre-parse
406 // increment made the bar jump straight to 100%.
407 if let Some(progress) = &self.progress {
408 progress.inc(self.chunk_rows_to_process as u64);
409 }
410
411 Ok(())
412 }
413
414 #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
415 fn parse_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
416 let mut buffer_ctx = ReadStatBufferCtx::new(bytes);
417
418 // initialize context
419 let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
420
421 // initialize error
422 let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
423 debug!("Initially, error ==> {error:#?}");
424
425 // Dummy path — custom I/O handlers ignore it
426 let dummy_path = CString::new("").expect("empty string is valid C string");
427
428 // setup parser with buffer I/O
429 let error = buffer_ctx
430 .configure_parser(
431 ReadStatParser::new()?
432 .set_value_handler(Some(cb::handle_value))?
433 .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
434 .set_row_offset(Some(self.chunk_row_start.try_into()?))?,
435 )?
436 .parse_sas7bdat(dummy_path.as_ptr(), ctx);
437
438 // A value callback may have aborted with a specific, typed error; prefer
439 // it over the generic `USER_ABORT` the C library reports for any abort.
440 if let Some(e) = self.abort_error.take() {
441 return Err(e);
442 }
443 check_c_error(error as i32)?;
444 Ok(())
445 }
446
447 /// Initializes this instance with metadata and chunk boundaries, allocating builders.
448 ///
449 /// Wraps `vars` and `schema` in `Arc` internally. For the parallel read path,
450 /// prefer [`init_shared`](ReadStatData::init_shared) which accepts pre-wrapped
451 /// `Arc`s to avoid repeated deep clones.
452 #[must_use]
453 pub fn init(self, md: ReadStatMetadata, row_start: u32, row_end: u32) -> Self {
454 self.set_metadata(md)
455 .set_chunk_counts(row_start, row_end)
456 .allocate_builders()
457 }
458
459 /// Initializes this instance with a column filter applied, in one step.
460 ///
461 /// Combines [`set_column_filter`](ReadStatData::set_column_filter) and
462 /// [`init`](ReadStatData::init) in the correct order so callers cannot
463 /// accidentally invoke them the wrong way around (which would clobber the
464 /// original variable count needed for row-boundary detection).
465 ///
466 /// `md` must be the **original, unfiltered** metadata and `mapping` the
467 /// result of [`ReadStatMetadata::resolve_selected_columns`]. The filtered
468 /// metadata and the original variable count are derived internally.
469 ///
470 /// ```no_run
471 /// use readstat::{ReadStatPath, ReadStatMetadata, ReadStatData};
472 ///
473 /// # fn main() -> Result<(), readstat::ReadStatError> {
474 /// let rsp = ReadStatPath::new("data.sas7bdat")?;
475 /// let mut md = ReadStatMetadata::new();
476 /// md.read_metadata(&rsp, false)?;
477 ///
478 /// if let Some(mapping) = md.resolve_selected_columns(Some(vec!["name".into(), "age".into()]))? {
479 /// let row_count = u32::try_from(md.row_count)?;
480 /// let mut d = ReadStatData::new().init_filtered(md, &mapping, 0, row_count);
481 /// d.read_data(&rsp)?;
482 /// }
483 /// # Ok(())
484 /// # }
485 /// ```
486 #[must_use]
487 pub fn init_filtered(
488 self,
489 md: ReadStatMetadata,
490 mapping: &BTreeMap<i32, i32>,
491 row_start: u32,
492 row_end: u32,
493 ) -> Self {
494 let original_var_count = md.var_count;
495 let filtered = md.filter_to_selected_columns(mapping);
496 self.set_column_filter(Some(Arc::new(mapping.clone())), original_var_count)
497 .init(filtered, row_start, row_end)
498 }
499
500 /// Initializes this instance with pre-shared metadata and chunk boundaries.
501 ///
502 /// Accepts `Arc`-wrapped `vars` and `schema` for cheap cloning in parallel loops.
503 /// Each call only increments reference counts (atomic +1) instead of deep-cloning
504 /// the entire metadata tree.
505 #[must_use]
506 pub fn init_shared(
507 self,
508 var_count: i32,
509 vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
510 schema: Arc<Schema>,
511 row_start: u32,
512 row_end: u32,
513 ) -> Self {
514 let total_var_count = if self.total_var_count != 0 {
515 self.total_var_count
516 } else {
517 var_count
518 };
519 Self {
520 var_count,
521 vars,
522 schema,
523 total_var_count,
524 ..self
525 }
526 .set_chunk_counts(row_start, row_end)
527 .allocate_builders()
528 }
529
530 #[allow(clippy::cast_possible_truncation)]
531 fn set_chunk_counts(self, row_start: u32, row_end: u32) -> Self {
532 // saturating_sub: guard against a caller passing row_end < row_start,
533 // which would underflow-panic in debug and wrap to ~4 billion in
534 // release (then feed an enormous builder pre-allocation).
535 let chunk_rows_to_process = row_end.saturating_sub(row_start) as usize;
536 let chunk_row_start = row_start as usize;
537 let chunk_row_end = row_end as usize;
538 let chunk_rows_processed = 0_usize;
539
540 Self {
541 chunk_rows_to_process,
542 chunk_row_start,
543 chunk_row_end,
544 chunk_rows_processed,
545 ..self
546 }
547 }
548
549 fn set_metadata(self, md: ReadStatMetadata) -> Self {
550 let var_count = md.var_count;
551 let vars = Arc::new(md.vars);
552 let schema = Arc::new(md.schema);
553 // Only set total_var_count from metadata if not already set by set_column_filter
554 let total_var_count = if self.total_var_count != 0 {
555 self.total_var_count
556 } else {
557 var_count
558 };
559 Self {
560 var_count,
561 vars,
562 schema,
563 total_var_count,
564 ..self
565 }
566 }
567
568 /// Sets the shared atomic counter for tracking rows processed across chunks.
569 #[must_use]
570 pub fn set_total_rows_processed(self, total_rows_processed: Arc<AtomicUsize>) -> Self {
571 Self {
572 total_rows_processed: Some(total_rows_processed),
573 ..self
574 }
575 }
576
577 /// Sets the column filter and original (unfiltered) variable count.
578 ///
579 /// Accepts an `Arc`-wrapped filter for cheap sharing across parallel chunks.
580 /// Must be called **before** [`init`](ReadStatData::init) so that
581 /// `total_var_count` is preserved when `set_metadata` runs.
582 #[must_use]
583 pub fn set_column_filter(
584 self,
585 filter: Option<Arc<BTreeMap<i32, i32>>>,
586 total_var_count: i32,
587 ) -> Self {
588 Self {
589 column_filter: filter,
590 total_var_count,
591 ..self
592 }
593 }
594
595 /// Attaches a progress callback for feedback during parsing.
596 ///
597 /// The callback receives progress increments and parsing status updates.
598 /// See [`ProgressCallback`] for the required interface.
599 #[must_use]
600 pub fn set_progress(self, progress: Arc<dyn ProgressCallback>) -> Self {
601 Self {
602 progress: Some(progress),
603 ..self
604 }
605 }
606}