readstat/
rs_data.rs

1//! Data reading and Arrow [`RecordBatch`](arrow_array::RecordBatch) conversion.
2//!
3//! [`ReadStatData`] coordinates the FFI parsing of row values from a `.sas7bdat` file,
4//! accumulating them directly into typed Arrow builders via the `handle_value`
5//! callback, then finishing them into an Arrow `RecordBatch` for downstream writing.
6//! Supports streaming chunks with configurable row offsets and progress tracking.
7
8use arrow::datatypes::Schema;
9use arrow_array::{
10    ArrayRef, RecordBatch,
11    builder::{
12        Date32Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, StringBuilder,
13        Time32SecondBuilder, Time64MicrosecondBuilder, TimestampMicrosecondBuilder,
14        TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder,
15    },
16};
17use log::debug;
18use std::{
19    collections::BTreeMap,
20    ffi::CString,
21    os::raw::c_void,
22    sync::{Arc, atomic::AtomicUsize},
23};
24
25use crate::{
26    cb,
27    err::{ReadStatError, check_c_error},
28    progress::ProgressCallback,
29    rs_buffer_io::ReadStatBufferCtx,
30    rs_metadata::{ReadStatMetadata, ReadStatVarMetadata},
31    rs_parser::ReadStatParser,
32    rs_path::ReadStatPath,
33    rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass},
34};
35
36/// A typed Arrow array builder for a single column.
37///
38/// Each variant wraps the corresponding Arrow builder, pre-sized with capacity
39/// hints from the metadata (row count, string `storage_width`). Values are
40/// appended directly during FFI callbacks, eliminating intermediate allocations.
41pub(crate) enum ColumnBuilder {
42    /// UTF-8 string column.
43    Str(StringBuilder),
44    /// 16-bit signed integer column (covers both SAS Int8 and Int16).
45    Int16(Int16Builder),
46    /// 32-bit signed integer column.
47    Int32(Int32Builder),
48    /// 32-bit floating point column.
49    Float32(Float32Builder),
50    /// 64-bit floating point column.
51    Float64(Float64Builder),
52    /// Date column (days since Unix epoch).
53    Date32(Date32Builder),
54    /// Timestamp with second precision.
55    TimestampSecond(TimestampSecondBuilder),
56    /// Timestamp with millisecond precision.
57    TimestampMillisecond(TimestampMillisecondBuilder),
58    /// Timestamp with microsecond precision.
59    TimestampMicrosecond(TimestampMicrosecondBuilder),
60    /// Timestamp with nanosecond precision.
61    TimestampNanosecond(TimestampNanosecondBuilder),
62    /// Time of day with second precision.
63    Time32Second(Time32SecondBuilder),
64    /// Time of day with microsecond precision.
65    Time64Microsecond(Time64MicrosecondBuilder),
66}
67
68impl ColumnBuilder {
69    /// Returns a mutable reference to the inner [`StringBuilder`].
70    ///
71    /// # Panics
72    /// Panics if `self` is not `ColumnBuilder::Str`.
73    pub(crate) fn as_string_mut(&mut self) -> &mut StringBuilder {
74        match self {
75            Self::Str(b) => b,
76            _ => panic!("ColumnBuilder::as_string_mut called on non-string builder"),
77        }
78    }
79
80    /// Appends a null value, regardless of the underlying builder type.
81    pub(crate) fn append_null(&mut self) {
82        match self {
83            Self::Str(b) => b.append_null(),
84            Self::Int16(b) => b.append_null(),
85            Self::Int32(b) => b.append_null(),
86            Self::Float32(b) => b.append_null(),
87            Self::Float64(b) => b.append_null(),
88            Self::Date32(b) => b.append_null(),
89            Self::TimestampSecond(b) => b.append_null(),
90            Self::TimestampMillisecond(b) => b.append_null(),
91            Self::TimestampMicrosecond(b) => b.append_null(),
92            Self::TimestampNanosecond(b) => b.append_null(),
93            Self::Time32Second(b) => b.append_null(),
94            Self::Time64Microsecond(b) => b.append_null(),
95        }
96    }
97
98    /// Finishes the builder and returns the completed Arrow array.
99    pub(crate) fn finish(&mut self) -> ArrayRef {
100        match self {
101            Self::Str(b) => Arc::new(b.finish()),
102            Self::Int16(b) => Arc::new(b.finish()),
103            Self::Int32(b) => Arc::new(b.finish()),
104            Self::Float32(b) => Arc::new(b.finish()),
105            Self::Float64(b) => Arc::new(b.finish()),
106            Self::Date32(b) => Arc::new(b.finish()),
107            Self::TimestampSecond(b) => Arc::new(b.finish()),
108            Self::TimestampMillisecond(b) => Arc::new(b.finish()),
109            Self::TimestampMicrosecond(b) => Arc::new(b.finish()),
110            Self::TimestampNanosecond(b) => Arc::new(b.finish()),
111            Self::Time32Second(b) => Arc::new(b.finish()),
112            Self::Time64Microsecond(b) => Arc::new(b.finish()),
113        }
114    }
115
116    /// Creates a typed builder matching the variable's metadata.
117    ///
118    /// Uses `var_type`, `var_type_class`, and `var_format_class` to select the
119    /// correct builder variant, and pre-sizes it with `capacity` rows.
120    /// For string columns, `storage_width` provides a byte-level capacity hint.
121    fn from_metadata(vm: &ReadStatVarMetadata, capacity: usize) -> Self {
122        match vm.var_type_class {
123            ReadStatVarTypeClass::String => Self::Str(StringBuilder::with_capacity(
124                capacity,
125                capacity * vm.storage_width,
126            )),
127            ReadStatVarTypeClass::Numeric => {
128                match vm.var_format_class {
129                    Some(ReadStatVarFormatClass::Date) => {
130                        Self::Date32(Date32Builder::with_capacity(capacity))
131                    }
132                    Some(ReadStatVarFormatClass::DateTime) => {
133                        Self::TimestampSecond(TimestampSecondBuilder::with_capacity(capacity))
134                    }
135                    Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
136                        Self::TimestampMillisecond(TimestampMillisecondBuilder::with_capacity(
137                            capacity,
138                        ))
139                    }
140                    Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
141                        Self::TimestampMicrosecond(TimestampMicrosecondBuilder::with_capacity(
142                            capacity,
143                        ))
144                    }
145                    Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
146                        Self::TimestampNanosecond(TimestampNanosecondBuilder::with_capacity(
147                            capacity,
148                        ))
149                    }
150                    Some(ReadStatVarFormatClass::Time) => {
151                        Self::Time32Second(Time32SecondBuilder::with_capacity(capacity))
152                    }
153                    Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
154                        Self::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(capacity))
155                    }
156                    None => {
157                        // Plain numeric — dispatch by storage type
158                        match vm.var_type {
159                            ReadStatVarType::Int8 | ReadStatVarType::Int16 => {
160                                Self::Int16(Int16Builder::with_capacity(capacity))
161                            }
162                            ReadStatVarType::Int32 => {
163                                Self::Int32(Int32Builder::with_capacity(capacity))
164                            }
165                            ReadStatVarType::Float => {
166                                Self::Float32(Float32Builder::with_capacity(capacity))
167                            }
168                            _ => Self::Float64(Float64Builder::with_capacity(capacity)),
169                        }
170                    }
171                }
172            }
173        }
174    }
175}
176
177/// Holds parsed row data from a `.sas7bdat` file and converts it to Arrow format.
178///
179/// Each instance processes one streaming chunk of rows. Values are appended
180/// directly into typed Arrow `ColumnBuilder`s during the `handle_value`
181/// callback, then finished into an Arrow [`RecordBatch`] via `cols_to_batch`.
182pub struct ReadStatData {
183    /// Number of variables (columns) in the dataset.
184    pub var_count: i32,
185    /// Per-variable metadata, keyed by variable index.
186    /// Wrapped in `Arc` so parallel chunks share the same metadata without deep cloning.
187    pub vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
188    /// Typed Arrow builders — one per variable, pre-sized with capacity hints.
189    pub(crate) builders: Vec<ColumnBuilder>,
190    /// Arrow schema for the dataset.
191    /// Wrapped in `Arc` for cheap sharing across parallel chunks.
192    pub schema: Arc<Schema>,
193    /// The Arrow `RecordBatch` produced after parsing, if available.
194    pub batch: Option<RecordBatch>,
195    /// Number of rows to process in this chunk.
196    pub chunk_rows_to_process: usize,
197    /// Starting row offset for this chunk.
198    pub chunk_row_start: usize,
199    /// Ending row offset (exclusive) for this chunk.
200    pub chunk_row_end: usize,
201    /// Number of rows actually processed so far in this chunk.
202    pub chunk_rows_processed: usize,
203    /// Total rows to process across all chunks.
204    pub total_rows_to_process: usize,
205    /// Shared atomic counter of total rows processed across all chunks.
206    pub total_rows_processed: Option<Arc<AtomicUsize>>,
207    /// Optional progress callback for visual feedback during parsing.
208    pub progress: Option<Arc<dyn ProgressCallback>>,
209    /// Whether progress display is disabled.
210    pub no_progress: bool,
211    /// Errors collected during value parsing callbacks.
212    pub errors: Vec<String>,
213    /// Optional mapping: original var index -> filtered column index.
214    /// Wrapped in `Arc` so parallel chunks share the same filter without deep cloning.
215    pub column_filter: Option<Arc<BTreeMap<i32, i32>>>,
216    /// Total variable count in the unfiltered dataset.
217    /// Used for row-boundary detection in `handle_value` when filtering is active.
218    /// Defaults to `var_count` when no filter is set.
219    pub total_var_count: i32,
220}
221
222impl Default for ReadStatData {
223    fn default() -> Self {
224        Self::new()
225    }
226}
227
228impl ReadStatData {
229    /// Creates a new `ReadStatData` with default (empty) values.
230    pub fn new() -> Self {
231        Self {
232            // metadata
233            var_count: 0,
234            vars: Arc::new(BTreeMap::new()),
235            // data
236            builders: Vec::new(),
237            schema: Arc::new(Schema::empty()),
238            // record batch
239            batch: None,
240            chunk_rows_to_process: 0,
241            chunk_rows_processed: 0,
242            chunk_row_start: 0,
243            chunk_row_end: 0,
244            // total rows
245            total_rows_to_process: 0,
246            total_rows_processed: None,
247            // progress
248            progress: None,
249            no_progress: false,
250            // errors
251            errors: Vec::new(),
252            // column filtering
253            column_filter: None,
254            total_var_count: 0,
255        }
256    }
257
258    /// Allocates typed Arrow builders with capacity for `chunk_rows_to_process`.
259    ///
260    /// Each builder's type is determined by the variable metadata. String builders
261    /// are additionally pre-sized with `storage_width * chunk_rows` bytes.
262    #[must_use]
263    pub fn allocate_builders(self) -> Self {
264        let capacity = self.chunk_rows_to_process;
265        let builders: Vec<ColumnBuilder> = self
266            .vars
267            .values()
268            .map(|vm| ColumnBuilder::from_metadata(vm, capacity))
269            .collect();
270        Self { builders, ..self }
271    }
272
273    /// Finishes all builders and assembles the Arrow [`RecordBatch`].
274    ///
275    /// Each builder produces its final array via `finish()`, which is an O(1)
276    /// operation (no data copying). The heavy work was already done during
277    /// `handle_value` when values were appended directly into the builders.
278    pub(crate) fn cols_to_batch(&mut self) -> Result<(), ReadStatError> {
279        let arrays: Vec<ArrayRef> = self
280            .builders
281            .iter_mut()
282            .map(ColumnBuilder::finish)
283            .collect();
284
285        self.batch = Some(RecordBatch::try_new(self.schema.clone(), arrays)?);
286
287        Ok(())
288    }
289
290    /// Parses row data from the file and converts it to an Arrow [`RecordBatch`].
291    ///
292    /// # Errors
293    ///
294    /// Returns [`ReadStatError`] if FFI parsing or Arrow conversion fails.
295    pub fn read_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
296        // parse data and if successful then convert cols into a record batch
297        self.parse_data(rsp)?;
298        self.cols_to_batch()?;
299        Ok(())
300    }
301
302    /// Parses row data from an in-memory byte slice and converts it to an Arrow [`RecordBatch`].
303    ///
304    /// Equivalent to [`read_data`](ReadStatData::read_data) but reads from a `&[u8]`
305    /// buffer instead of a file path.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`ReadStatError`] if FFI parsing or Arrow conversion fails.
310    pub fn read_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
311        self.parse_data_from_bytes(bytes)?;
312        self.cols_to_batch()?;
313        Ok(())
314    }
315
316    /// Parses row data from a memory-mapped `.sas7bdat` file and converts it to an Arrow [`RecordBatch`].
317    ///
318    /// Opens the file at `path` and memory-maps it, avoiding explicit read syscalls.
319    /// Especially beneficial for large files and repeated chunk reads against the
320    /// same file, as the OS manages page caching automatically.
321    ///
322    /// # Safety
323    ///
324    /// Memory mapping is safe as long as the file is not modified or truncated by
325    /// another process while the map is active.
326    ///
327    /// # Errors
328    ///
329    /// Returns [`ReadStatError`] if the file cannot be opened, mapped, or parsed.
330    #[cfg(not(target_arch = "wasm32"))]
331    pub fn read_data_from_mmap(&mut self, path: &std::path::Path) -> Result<(), ReadStatError> {
332        let file = std::fs::File::open(path)?;
333        let mmap = unsafe { memmap2::Mmap::map(&file)? };
334        self.read_data_from_bytes(&mmap)
335    }
336
337    /// Parses row data from the file via FFI callbacks (without Arrow conversion).
338    #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
339    pub(crate) fn parse_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
340        // path as pointer
341        debug!("Path as C string is {:?}", rsp.cstring_path);
342        let ppath = rsp.cstring_path.as_ptr();
343
344        // Notify progress callback
345        if let Some(progress) = &self.progress {
346            progress.inc(self.chunk_rows_to_process as u64);
347            progress.parsing_started(&rsp.path.to_string_lossy());
348        }
349
350        // initialize context
351        let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
352
353        // initialize error
354        let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
355        debug!("Initially, error ==> {error:#?}");
356
357        // setup parser
358        // once call parse_sas7bdat, iteration begins
359        let error = ReadStatParser::new()
360            // do not set metadata handler nor variable handler as already processed
361            .set_value_handler(Some(cb::handle_value))?
362            .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
363            .set_row_offset(Some(self.chunk_row_start.try_into()?))?
364            .parse_sas7bdat(ppath, ctx);
365
366        check_c_error(error as i32)?;
367        Ok(())
368    }
369
370    #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
371    fn parse_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
372        let mut buffer_ctx = ReadStatBufferCtx::new(bytes);
373
374        // initialize context
375        let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
376
377        // initialize error
378        let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
379        debug!("Initially, error ==> {error:#?}");
380
381        // Dummy path — custom I/O handlers ignore it
382        let dummy_path = CString::new("").expect("empty string is valid C string");
383
384        // setup parser with buffer I/O
385        let error = buffer_ctx
386            .configure_parser(
387                ReadStatParser::new()
388                    .set_value_handler(Some(cb::handle_value))?
389                    .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
390                    .set_row_offset(Some(self.chunk_row_start.try_into()?))?,
391            )?
392            .parse_sas7bdat(dummy_path.as_ptr(), ctx);
393
394        check_c_error(error as i32)?;
395        Ok(())
396    }
397
398    /// Initializes this instance with metadata and chunk boundaries, allocating builders.
399    ///
400    /// Wraps `vars` and `schema` in `Arc` internally. For the parallel read path,
401    /// prefer [`init_shared`](ReadStatData::init_shared) which accepts pre-wrapped
402    /// `Arc`s to avoid repeated deep clones.
403    #[must_use]
404    pub fn init(self, md: ReadStatMetadata, row_start: u32, row_end: u32) -> Self {
405        self.set_metadata(md)
406            .set_chunk_counts(row_start, row_end)
407            .allocate_builders()
408    }
409
410    /// Initializes this instance with pre-shared metadata and chunk boundaries.
411    ///
412    /// Accepts `Arc`-wrapped `vars` and `schema` for cheap cloning in parallel loops.
413    /// Each call only increments reference counts (atomic +1) instead of deep-cloning
414    /// the entire metadata tree.
415    #[must_use]
416    pub fn init_shared(
417        self,
418        var_count: i32,
419        vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
420        schema: Arc<Schema>,
421        row_start: u32,
422        row_end: u32,
423    ) -> Self {
424        let total_var_count = if self.total_var_count != 0 {
425            self.total_var_count
426        } else {
427            var_count
428        };
429        Self {
430            var_count,
431            vars,
432            schema,
433            total_var_count,
434            ..self
435        }
436        .set_chunk_counts(row_start, row_end)
437        .allocate_builders()
438    }
439
440    #[allow(clippy::cast_possible_truncation)]
441    fn set_chunk_counts(self, row_start: u32, row_end: u32) -> Self {
442        let chunk_rows_to_process = (row_end - row_start) as usize;
443        let chunk_row_start = row_start as usize;
444        let chunk_row_end = row_end as usize;
445        let chunk_rows_processed = 0_usize;
446
447        Self {
448            chunk_rows_to_process,
449            chunk_row_start,
450            chunk_row_end,
451            chunk_rows_processed,
452            ..self
453        }
454    }
455
456    fn set_metadata(self, md: ReadStatMetadata) -> Self {
457        let var_count = md.var_count;
458        let vars = Arc::new(md.vars);
459        let schema = Arc::new(md.schema);
460        // Only set total_var_count from metadata if not already set by set_column_filter
461        let total_var_count = if self.total_var_count != 0 {
462            self.total_var_count
463        } else {
464            var_count
465        };
466        Self {
467            var_count,
468            vars,
469            schema,
470            total_var_count,
471            ..self
472        }
473    }
474
475    /// Disables or enables the progress bar display.
476    #[must_use]
477    pub fn set_no_progress(self, no_progress: bool) -> Self {
478        Self {
479            no_progress,
480            ..self
481        }
482    }
483
484    /// Sets the total number of rows to process across all chunks.
485    #[must_use]
486    pub fn set_total_rows_to_process(self, total_rows_to_process: usize) -> Self {
487        Self {
488            total_rows_to_process,
489            ..self
490        }
491    }
492
493    /// Sets the shared atomic counter for tracking rows processed across chunks.
494    #[must_use]
495    pub fn set_total_rows_processed(self, total_rows_processed: Arc<AtomicUsize>) -> Self {
496        Self {
497            total_rows_processed: Some(total_rows_processed),
498            ..self
499        }
500    }
501
502    /// Sets the column filter and original (unfiltered) variable count.
503    ///
504    /// Accepts an `Arc`-wrapped filter for cheap sharing across parallel chunks.
505    /// Must be called **before** [`init`](ReadStatData::init) so that
506    /// `total_var_count` is preserved when `set_metadata` runs.
507    #[must_use]
508    pub fn set_column_filter(
509        self,
510        filter: Option<Arc<BTreeMap<i32, i32>>>,
511        total_var_count: i32,
512    ) -> Self {
513        Self {
514            column_filter: filter,
515            total_var_count,
516            ..self
517        }
518    }
519
520    /// Attaches a progress callback for feedback during parsing.
521    ///
522    /// The callback receives progress increments and parsing status updates.
523    /// See [`ProgressCallback`] for the required interface.
524    #[must_use]
525    pub fn set_progress(self, progress: Arc<dyn ProgressCallback>) -> Self {
526        Self {
527            progress: Some(progress),
528            ..self
529        }
530    }
531}