readstat/
cb.rs

1//! FFI callback functions invoked by the `ReadStat` C library during parsing.
2//!
3//! The `ReadStat` C parser uses a callback-driven architecture: as it reads a `.sas7bdat`
4//! file, it invokes registered callbacks for metadata, variables, and values. Each
5//! callback receives a raw `*mut c_void` context pointer that is cast back to the
6//! appropriate Rust struct ([`ReadStatMetadata`](crate::ReadStatMetadata) or
7//! [`ReadStatData`](crate::ReadStatData)) to accumulate parsed results.
8
9use chrono::DateTime;
10use log::debug;
11use num_traits::FromPrimitive;
12use std::os::raw::{c_char, c_int, c_void};
13
14use crate::{
15    common::ptr_to_string,
16    formats,
17    rs_data::{ColumnBuilder, ReadStatData},
18    rs_metadata::{ReadStatCompress, ReadStatEndian, ReadStatMetadata, ReadStatVarMetadata},
19    rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass},
20};
21
22/// Return codes for `ReadStat` C callback functions.
23///
24/// Mirrors the `readstat_handler_t` enum from the C API. Only `OK` and `ABORT`
25/// are currently used; `SKIP_VARIABLE` is included for completeness with the
26/// C API contract.
27#[allow(dead_code, non_camel_case_types)]
28#[derive(Debug)]
29#[repr(C)]
30enum ReadStatHandler {
31    READSTAT_HANDLER_OK,
32    READSTAT_HANDLER_ABORT,
33    READSTAT_HANDLER_SKIP_VARIABLE,
34}
35
36// C callback functions
37
38/// FFI callback that extracts file-level metadata from the `ReadStat` C parser.
39///
40/// Called once during parsing. Populates the [`ReadStatMetadata`] struct
41/// (accessed via the `ctx` pointer) with row/variable counts, encoding,
42/// timestamps, compression, and endianness.
43///
44/// # Safety
45///
46/// - `metadata` must be a valid pointer to a `readstat_metadata_t` produced by the C parser.
47/// - `ctx` must be a valid pointer to a [`ReadStatMetadata`] instance that outlives this call.
48/// - This function must only be called by the `ReadStat` C library as a registered callback.
49#[allow(
50    clippy::cast_possible_truncation,
51    clippy::cast_sign_loss,
52    clippy::cast_possible_wrap
53)]
54pub(crate) extern "C" fn handle_metadata(
55    metadata: *mut readstat_sys::readstat_metadata_t,
56    ctx: *mut c_void,
57) -> c_int {
58    // dereference ctx pointer
59    let m = unsafe { &mut *ctx.cast::<ReadStatMetadata>() };
60
61    // get metadata
62    let rc: c_int = unsafe { readstat_sys::readstat_get_row_count(metadata) };
63    let vc: c_int = unsafe { readstat_sys::readstat_get_var_count(metadata) };
64    let table_name = unsafe { ptr_to_string(readstat_sys::readstat_get_table_name(metadata)) };
65    let file_label = unsafe { ptr_to_string(readstat_sys::readstat_get_file_label(metadata)) };
66    let file_encoding =
67        unsafe { ptr_to_string(readstat_sys::readstat_get_file_encoding(metadata)) };
68    let version: c_int = unsafe { readstat_sys::readstat_get_file_format_version(metadata) };
69    let is64bit = unsafe { readstat_sys::readstat_get_file_format_is_64bit(metadata) };
70    let ct = DateTime::from_timestamp(
71        unsafe { readstat_sys::readstat_get_creation_time(metadata) },
72        0,
73    )
74    .unwrap_or_default()
75    .format("%Y-%m-%d %H:%M:%S")
76    .to_string();
77    let mt = DateTime::from_timestamp(
78        unsafe { readstat_sys::readstat_get_modified_time(metadata) },
79        0,
80    )
81    .unwrap_or_default()
82    .format("%Y-%m-%d %H:%M:%S")
83    .to_string();
84
85    #[allow(clippy::useless_conversion)]
86    let compression =
87        FromPrimitive::from_i32(unsafe { readstat_sys::readstat_get_compression(metadata) } as i32)
88            .unwrap_or(ReadStatCompress::None);
89
90    #[allow(clippy::useless_conversion)]
91    let endianness =
92        FromPrimitive::from_i32(unsafe { readstat_sys::readstat_get_endianness(metadata) } as i32)
93            .unwrap_or(ReadStatEndian::None);
94
95    debug!("row_count is {rc}");
96    debug!("var_count is {vc}");
97    debug!("table_name is {table_name}");
98    debug!("file_label is {file_label}");
99    debug!("file_encoding is {file_encoding}");
100    debug!("version is {version}");
101    debug!("is64bit is {is64bit}");
102    debug!("creation_time is {ct}");
103    debug!("modified_time is {mt}");
104    debug!("compression is {compression:#?}");
105    debug!("endianness is {endianness:#?}");
106
107    // insert into ReadStatMetadata struct
108    m.row_count = rc;
109    m.var_count = vc;
110    m.table_name = table_name;
111    m.file_label = file_label;
112    m.file_encoding = file_encoding;
113    m.version = version;
114    m.is64bit = is64bit;
115    m.creation_time = ct;
116    m.modified_time = mt;
117    m.compression = compression;
118    m.endianness = endianness;
119
120    debug!("metadata struct is {m:#?}");
121
122    ReadStatHandler::READSTAT_HANDLER_OK as c_int
123}
124
125/// FFI callback that extracts per-variable metadata from the `ReadStat` C parser.
126///
127/// Called once for each variable (column) in the dataset. Populates a
128/// [`ReadStatVarMetadata`] entry in the [`ReadStatMetadata::vars`] map
129/// with the variable's name, type, label, and SAS format classification.
130///
131/// # Safety
132///
133/// - `variable` must be a valid pointer to a `readstat_variable_t` produced by the C parser.
134/// - `ctx` must be a valid pointer to a [`ReadStatMetadata`] instance that outlives this call.
135/// - This function must only be called by the `ReadStat` C library as a registered callback.
136#[allow(
137    clippy::cast_possible_truncation,
138    clippy::cast_sign_loss,
139    clippy::cast_possible_wrap
140)]
141pub(crate) extern "C" fn handle_variable(
142    index: c_int,
143    variable: *mut readstat_sys::readstat_variable_t,
144    #[allow(unused_variables)] val_labels: *const c_char,
145    ctx: *mut c_void,
146) -> c_int {
147    // dereference ctx pointer
148    let m = unsafe { &mut *ctx.cast::<ReadStatMetadata>() };
149
150    // get variable metadata
151    #[allow(clippy::useless_conversion)]
152    let var_type =
153        FromPrimitive::from_i32(
154            unsafe { readstat_sys::readstat_variable_get_type(variable) } as i32,
155        )
156        .unwrap_or(ReadStatVarType::Unknown);
157
158    #[allow(clippy::useless_conversion)]
159    let var_type_class =
160        FromPrimitive::from_i32(
161            unsafe { readstat_sys::readstat_variable_get_type_class(variable) } as i32,
162        )
163        .unwrap_or(ReadStatVarTypeClass::Numeric);
164
165    let var_name = unsafe { ptr_to_string(readstat_sys::readstat_variable_get_name(variable)) };
166    let var_label = unsafe { ptr_to_string(readstat_sys::readstat_variable_get_label(variable)) };
167    let var_format = unsafe { ptr_to_string(readstat_sys::readstat_variable_get_format(variable)) };
168    let var_format_class = formats::match_var_format(&var_format);
169    let storage_width =
170        unsafe { readstat_sys::readstat_variable_get_storage_width(variable) } as usize;
171    let display_width =
172        unsafe { readstat_sys::readstat_variable_get_display_width(variable) } as i32;
173
174    debug!("var_type is {var_type:#?}");
175    debug!("var_type_class is {var_type_class:#?}");
176    debug!("var_name is {var_name}");
177    debug!("var_label is {var_label}");
178    debug!("var_format is {var_format}");
179    debug!("var_format_class is {var_format_class:#?}");
180    debug!("storage_width is {storage_width}");
181    debug!("display_width is {display_width}");
182
183    // insert into BTreeMap within ReadStatMetadata struct
184    m.vars.insert(
185        index,
186        ReadStatVarMetadata::new(
187            var_name,
188            var_type,
189            var_type_class,
190            var_label,
191            var_format,
192            var_format_class,
193            storage_width,
194            display_width,
195        ),
196    );
197
198    ReadStatHandler::READSTAT_HANDLER_OK as c_int
199}
200
201/// SAS epoch (1960-01-01) to Unix epoch (1970-01-01) offset in days.
202const DAY_SHIFT: i32 = 3653;
203/// SAS epoch to Unix epoch offset in seconds.
204const SEC_SHIFT: i64 = 315_619_200;
205
206/// Scale factor for rounding: `10^DECIMAL_PLACES`, computed once.
207const ROUND_SCALE: f64 = 1e14;
208
209/// Rounds an f64 to [`DECIMAL_PLACES`] decimal places using pure arithmetic.
210///
211/// Eliminates the string formatting roundtrip entirely. For values like 4.6
212/// that can't be exactly represented in IEEE 754, this cleans up trailing
213/// noise (e.g. `4.6000000000000005` → `4.6`).
214///
215/// Splits into integer and fractional parts before scaling to avoid overflow:
216/// large SAS datetime values (~1.9e9) multiplied by 1e14 would exceed f64's
217/// exact integer range (2^53), causing rounding errors.
218#[inline]
219fn round_decimal_f64(v: f64) -> f64 {
220    if !v.is_finite() {
221        return v;
222    }
223    let int_part = v.trunc();
224    let frac_part = v.fract(); // always in (-1, 1), so frac * 1e14 < 1e14 < 2^53
225    let rounded_frac = (frac_part * ROUND_SCALE).round() / ROUND_SCALE;
226    int_part + rounded_frac
227}
228
229/// Rounds an f32 to [`DECIMAL_PLACES`] decimal places using pure arithmetic.
230#[inline]
231#[allow(clippy::cast_possible_truncation)]
232fn round_decimal_f32(v: f32) -> f32 {
233    if !v.is_finite() {
234        return v;
235    }
236    // Promote to f64 for the rounding to avoid f32 precision loss
237    let v64 = f64::from(v);
238    let int_part = v64.trunc();
239    let frac_part = v64.fract();
240    let rounded_frac = (frac_part * ROUND_SCALE).round() / ROUND_SCALE;
241    (int_part + rounded_frac) as f32
242}
243
244/// FFI callback that extracts a single cell value during row parsing.
245///
246/// Called for every cell in every row. Appends the value directly into the
247/// appropriate typed Arrow [`ColumnBuilder`] in [`ReadStatData::builders`],
248/// eliminating intermediate `String` allocations for string columns.
249/// Tracks row completion for progress reporting.
250///
251/// # Safety
252///
253/// - `variable` must be a valid pointer to a `readstat_variable_t` produced by the C parser.
254/// - `value` must be a valid `readstat_value_t` produced by the C parser.
255/// - `ctx` must be a valid pointer to a [`ReadStatData`] instance that outlives this call.
256/// - This function must only be called by the `ReadStat` C library as a registered callback.
257#[allow(
258    clippy::too_many_lines,
259    clippy::cast_possible_truncation,
260    clippy::cast_sign_loss,
261    clippy::cast_precision_loss
262)]
263pub(crate) extern "C" fn handle_value(
264    obs_index: c_int,
265    variable: *mut readstat_sys::readstat_variable_t,
266    value: readstat_sys::readstat_value_t,
267    ctx: *mut c_void,
268) -> c_int {
269    // dereference ctx pointer
270    let d = unsafe { &mut *ctx.cast::<ReadStatData>() };
271
272    // get index, type, and missingness
273    let var_index: c_int = unsafe { readstat_sys::readstat_variable_get_index(variable) };
274    let value_type: readstat_sys::readstat_type_t =
275        unsafe { readstat_sys::readstat_value_type(value) };
276    let is_missing: c_int = unsafe { readstat_sys::readstat_value_is_system_missing(value) };
277
278    debug!("chunk_rows_to_process is {}", d.chunk_rows_to_process);
279    debug!("chunk_row_start is {}", d.chunk_row_start);
280    debug!("chunk_row_end is {}", d.chunk_row_end);
281    debug!("chunk_rows_processed is {}", d.chunk_rows_processed);
282    debug!("var_count is {}", d.var_count);
283    debug!("obs_index is {obs_index}");
284    debug!("var_index is {var_index}");
285    debug!("value_type is {value_type:#?}");
286    debug!("is_missing is {is_missing}");
287
288    // Determine the column index for storage, applying column filter if active
289    let col_index = if let Some(ref filter) = d.column_filter {
290        if let Some(&mapped) = filter.get(&var_index) {
291            mapped
292        } else {
293            // This variable is not selected; skip it but still check row boundary
294            if var_index == (d.total_var_count - 1) {
295                d.chunk_rows_processed += 1;
296                if let Some(trp) = &d.total_rows_processed {
297                    trp.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
298                }
299            }
300            return ReadStatHandler::READSTAT_HANDLER_OK as c_int;
301        }
302    } else {
303        var_index
304    };
305
306    // Append value directly into the typed Arrow builder
307    let builder = &mut d.builders[col_index as usize];
308
309    match value_type {
310        readstat_sys::readstat_type_e_READSTAT_TYPE_STRING
311        | readstat_sys::readstat_type_e_READSTAT_TYPE_STRING_REF => {
312            let sb = builder.as_string_mut();
313            if is_missing == 1 {
314                sb.append_null();
315            } else {
316                let ptr = unsafe { readstat_sys::readstat_string_value(value) };
317                if ptr.is_null() {
318                    sb.append_null();
319                } else {
320                    let cstr = unsafe { std::ffi::CStr::from_ptr(ptr) };
321                    // Fast path: valid UTF-8 (the common case for SAS data)
322                    if let Ok(s) = cstr.to_str() {
323                        sb.append_value(s);
324                    } else {
325                        // Lossy fallback for rare non-UTF-8 data
326                        let s = String::from_utf8_lossy(cstr.to_bytes());
327                        sb.append_value(s.as_ref());
328                    }
329                }
330            }
331        }
332        readstat_sys::readstat_type_e_READSTAT_TYPE_INT8 => {
333            if is_missing == 1 {
334                builder.append_null();
335            } else {
336                let v = unsafe { readstat_sys::readstat_int8_value(value) };
337                debug!("value is {v:#?}");
338                // Schema maps Int8 → Int16, so widen
339                if let ColumnBuilder::Int16(b) = builder {
340                    b.append_value(i16::from(v));
341                }
342            }
343        }
344        readstat_sys::readstat_type_e_READSTAT_TYPE_INT16 => {
345            if is_missing == 1 {
346                builder.append_null();
347            } else {
348                let v = unsafe { readstat_sys::readstat_int16_value(value) };
349                debug!("value is {v:#?}");
350                if let ColumnBuilder::Int16(b) = builder {
351                    b.append_value(v);
352                }
353            }
354        }
355        readstat_sys::readstat_type_e_READSTAT_TYPE_INT32 => {
356            if is_missing == 1 {
357                builder.append_null();
358            } else {
359                let v = unsafe { readstat_sys::readstat_int32_value(value) };
360                debug!("value is {v:#?}");
361                if let ColumnBuilder::Int32(b) = builder {
362                    b.append_value(v);
363                }
364            }
365        }
366        readstat_sys::readstat_type_e_READSTAT_TYPE_FLOAT => {
367            if is_missing == 1 {
368                builder.append_null();
369            } else {
370                let raw = unsafe { readstat_sys::readstat_float_value(value) };
371                debug!("value (before parsing) is {raw:#?}");
372                let val = round_decimal_f32(raw);
373                debug!("value (after parsing) is {val:#?}");
374                if let ColumnBuilder::Float32(b) = builder {
375                    b.append_value(val);
376                }
377            }
378        }
379        readstat_sys::readstat_type_e_READSTAT_TYPE_DOUBLE => {
380            let var_format_class = d.vars.get(&col_index).and_then(|vm| vm.var_format_class);
381
382            if is_missing == 1 {
383                builder.append_null();
384            } else {
385                let raw = unsafe { readstat_sys::readstat_double_value(value) };
386                debug!("value (before parsing) is {raw:#?}");
387                let val = round_decimal_f64(raw);
388                debug!("value (after parsing) is {val:#?}");
389
390                match var_format_class {
391                    None => {
392                        if let ColumnBuilder::Float64(b) = builder {
393                            b.append_value(val);
394                        }
395                    }
396                    Some(ReadStatVarFormatClass::Date) => {
397                        if let ColumnBuilder::Date32(b) = builder {
398                            if let Some(shifted) = (val as i32).checked_sub(DAY_SHIFT) {
399                                b.append_value(shifted);
400                            } else {
401                                d.errors.push("Date overflow".to_string());
402                                return ReadStatHandler::READSTAT_HANDLER_ABORT as c_int;
403                            }
404                        }
405                    }
406                    Some(ReadStatVarFormatClass::DateTime) => {
407                        if let ColumnBuilder::TimestampSecond(b) = builder {
408                            if let Some(shifted) = (val as i64).checked_sub(SEC_SHIFT) {
409                                b.append_value(shifted);
410                            } else {
411                                d.errors.push("DateTime overflow".to_string());
412                                return ReadStatHandler::READSTAT_HANDLER_ABORT as c_int;
413                            }
414                        }
415                    }
416                    Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
417                        if let ColumnBuilder::TimestampMillisecond(b) = builder {
418                            b.append_value(((val - SEC_SHIFT as f64) * 1000.0) as i64);
419                        }
420                    }
421                    Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
422                        if let ColumnBuilder::TimestampMicrosecond(b) = builder {
423                            b.append_value(((val - SEC_SHIFT as f64) * 1_000_000.0) as i64);
424                        }
425                    }
426                    Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
427                        if let ColumnBuilder::TimestampNanosecond(b) = builder {
428                            b.append_value(((val - SEC_SHIFT as f64) * 1_000_000_000.0) as i64);
429                        }
430                    }
431                    Some(ReadStatVarFormatClass::Time) => {
432                        if let ColumnBuilder::Time32Second(b) = builder {
433                            b.append_value(val as i32);
434                        }
435                    }
436                    Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
437                        if let ColumnBuilder::Time64Microsecond(b) = builder {
438                            b.append_value((val * 1_000_000.0) as i64);
439                        }
440                    }
441                }
442            }
443        }
444        _ => unreachable!(),
445    }
446
447    // if row is complete (use total_var_count for boundary detection)
448    if var_index == (d.total_var_count - 1) {
449        d.chunk_rows_processed += 1;
450        if let Some(trp) = &d.total_rows_processed {
451            trp.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
452        }
453    }
454
455    ReadStatHandler::READSTAT_HANDLER_OK as c_int
456}