1use chrono::DateTime;
10use log::debug;
11use num_traits::FromPrimitive;
12use std::os::raw::{c_char, c_int, c_void};
13
14use crate::{
15 common::ptr_to_string,
16 formats,
17 rs_data::{ColumnBuilder, ReadStatData},
18 rs_metadata::{ReadStatCompress, ReadStatEndian, ReadStatMetadata, ReadStatVarMetadata},
19 rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass},
20};
21
22#[allow(dead_code, non_camel_case_types)]
28#[derive(Debug)]
29#[repr(C)]
30enum ReadStatHandler {
31 READSTAT_HANDLER_OK,
32 READSTAT_HANDLER_ABORT,
33 READSTAT_HANDLER_SKIP_VARIABLE,
34}
35
36#[allow(
50 clippy::cast_possible_truncation,
51 clippy::cast_sign_loss,
52 clippy::cast_possible_wrap
53)]
54pub(crate) extern "C" fn handle_metadata(
55 metadata: *mut readstat_sys::readstat_metadata_t,
56 ctx: *mut c_void,
57) -> c_int {
58 let m = unsafe { &mut *ctx.cast::<ReadStatMetadata>() };
60
61 let rc: c_int = unsafe { readstat_sys::readstat_get_row_count(metadata) };
63 let vc: c_int = unsafe { readstat_sys::readstat_get_var_count(metadata) };
64 let table_name = unsafe { ptr_to_string(readstat_sys::readstat_get_table_name(metadata)) };
65 let file_label = unsafe { ptr_to_string(readstat_sys::readstat_get_file_label(metadata)) };
66 let file_encoding =
67 unsafe { ptr_to_string(readstat_sys::readstat_get_file_encoding(metadata)) };
68 let version: c_int = unsafe { readstat_sys::readstat_get_file_format_version(metadata) };
69 let is64bit = unsafe { readstat_sys::readstat_get_file_format_is_64bit(metadata) };
70 let ct = DateTime::from_timestamp(
71 unsafe { readstat_sys::readstat_get_creation_time(metadata) },
72 0,
73 )
74 .unwrap_or_default()
75 .format("%Y-%m-%d %H:%M:%S")
76 .to_string();
77 let mt = DateTime::from_timestamp(
78 unsafe { readstat_sys::readstat_get_modified_time(metadata) },
79 0,
80 )
81 .unwrap_or_default()
82 .format("%Y-%m-%d %H:%M:%S")
83 .to_string();
84
85 #[allow(clippy::useless_conversion)]
86 let compression =
87 FromPrimitive::from_i32(unsafe { readstat_sys::readstat_get_compression(metadata) } as i32)
88 .unwrap_or(ReadStatCompress::None);
89
90 #[allow(clippy::useless_conversion)]
91 let endianness =
92 FromPrimitive::from_i32(unsafe { readstat_sys::readstat_get_endianness(metadata) } as i32)
93 .unwrap_or(ReadStatEndian::None);
94
95 debug!("row_count is {rc}");
96 debug!("var_count is {vc}");
97 debug!("table_name is {table_name}");
98 debug!("file_label is {file_label}");
99 debug!("file_encoding is {file_encoding}");
100 debug!("version is {version}");
101 debug!("is64bit is {is64bit}");
102 debug!("creation_time is {ct}");
103 debug!("modified_time is {mt}");
104 debug!("compression is {compression:#?}");
105 debug!("endianness is {endianness:#?}");
106
107 m.row_count = rc;
109 m.var_count = vc;
110 m.table_name = table_name;
111 m.file_label = file_label;
112 m.file_encoding = file_encoding;
113 m.version = version;
114 m.is64bit = is64bit;
115 m.creation_time = ct;
116 m.modified_time = mt;
117 m.compression = compression;
118 m.endianness = endianness;
119
120 debug!("metadata struct is {m:#?}");
121
122 ReadStatHandler::READSTAT_HANDLER_OK as c_int
123}
124
125#[allow(
137 clippy::cast_possible_truncation,
138 clippy::cast_sign_loss,
139 clippy::cast_possible_wrap
140)]
141pub(crate) extern "C" fn handle_variable(
142 index: c_int,
143 variable: *mut readstat_sys::readstat_variable_t,
144 #[allow(unused_variables)] val_labels: *const c_char,
145 ctx: *mut c_void,
146) -> c_int {
147 let m = unsafe { &mut *ctx.cast::<ReadStatMetadata>() };
149
150 #[allow(clippy::useless_conversion)]
152 let var_type =
153 FromPrimitive::from_i32(
154 unsafe { readstat_sys::readstat_variable_get_type(variable) } as i32,
155 )
156 .unwrap_or(ReadStatVarType::Unknown);
157
158 #[allow(clippy::useless_conversion)]
159 let var_type_class =
160 FromPrimitive::from_i32(
161 unsafe { readstat_sys::readstat_variable_get_type_class(variable) } as i32,
162 )
163 .unwrap_or(ReadStatVarTypeClass::Numeric);
164
165 let var_name = unsafe { ptr_to_string(readstat_sys::readstat_variable_get_name(variable)) };
166 let var_label = unsafe { ptr_to_string(readstat_sys::readstat_variable_get_label(variable)) };
167 let var_format = unsafe { ptr_to_string(readstat_sys::readstat_variable_get_format(variable)) };
168 let var_format_class = formats::match_var_format(&var_format);
169 let storage_width =
170 unsafe { readstat_sys::readstat_variable_get_storage_width(variable) } as usize;
171 let display_width =
172 unsafe { readstat_sys::readstat_variable_get_display_width(variable) } as i32;
173
174 debug!("var_type is {var_type:#?}");
175 debug!("var_type_class is {var_type_class:#?}");
176 debug!("var_name is {var_name}");
177 debug!("var_label is {var_label}");
178 debug!("var_format is {var_format}");
179 debug!("var_format_class is {var_format_class:#?}");
180 debug!("storage_width is {storage_width}");
181 debug!("display_width is {display_width}");
182
183 m.vars.insert(
185 index,
186 ReadStatVarMetadata::new(
187 var_name,
188 var_type,
189 var_type_class,
190 var_label,
191 var_format,
192 var_format_class,
193 storage_width,
194 display_width,
195 ),
196 );
197
198 ReadStatHandler::READSTAT_HANDLER_OK as c_int
199}
200
201const DAY_SHIFT: i32 = 3653;
203const SEC_SHIFT: i64 = 315_619_200;
205
206const ROUND_SCALE: f64 = 1e14;
208
209#[inline]
219fn round_decimal_f64(v: f64) -> f64 {
220 if !v.is_finite() {
221 return v;
222 }
223 let int_part = v.trunc();
224 let frac_part = v.fract(); let rounded_frac = (frac_part * ROUND_SCALE).round() / ROUND_SCALE;
226 int_part + rounded_frac
227}
228
229#[inline]
231#[allow(clippy::cast_possible_truncation)]
232fn round_decimal_f32(v: f32) -> f32 {
233 if !v.is_finite() {
234 return v;
235 }
236 let v64 = f64::from(v);
238 let int_part = v64.trunc();
239 let frac_part = v64.fract();
240 let rounded_frac = (frac_part * ROUND_SCALE).round() / ROUND_SCALE;
241 (int_part + rounded_frac) as f32
242}
243
244#[allow(
258 clippy::too_many_lines,
259 clippy::cast_possible_truncation,
260 clippy::cast_sign_loss,
261 clippy::cast_precision_loss
262)]
263pub(crate) extern "C" fn handle_value(
264 obs_index: c_int,
265 variable: *mut readstat_sys::readstat_variable_t,
266 value: readstat_sys::readstat_value_t,
267 ctx: *mut c_void,
268) -> c_int {
269 let d = unsafe { &mut *ctx.cast::<ReadStatData>() };
271
272 let var_index: c_int = unsafe { readstat_sys::readstat_variable_get_index(variable) };
274 let value_type: readstat_sys::readstat_type_t =
275 unsafe { readstat_sys::readstat_value_type(value) };
276 let is_missing: c_int = unsafe { readstat_sys::readstat_value_is_system_missing(value) };
277
278 debug!("chunk_rows_to_process is {}", d.chunk_rows_to_process);
279 debug!("chunk_row_start is {}", d.chunk_row_start);
280 debug!("chunk_row_end is {}", d.chunk_row_end);
281 debug!("chunk_rows_processed is {}", d.chunk_rows_processed);
282 debug!("var_count is {}", d.var_count);
283 debug!("obs_index is {obs_index}");
284 debug!("var_index is {var_index}");
285 debug!("value_type is {value_type:#?}");
286 debug!("is_missing is {is_missing}");
287
288 let col_index = if let Some(ref filter) = d.column_filter {
290 if let Some(&mapped) = filter.get(&var_index) {
291 mapped
292 } else {
293 if var_index == (d.total_var_count - 1) {
295 d.chunk_rows_processed += 1;
296 if let Some(trp) = &d.total_rows_processed {
297 trp.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
298 }
299 }
300 return ReadStatHandler::READSTAT_HANDLER_OK as c_int;
301 }
302 } else {
303 var_index
304 };
305
306 let builder = &mut d.builders[col_index as usize];
308
309 match value_type {
310 readstat_sys::readstat_type_e_READSTAT_TYPE_STRING
311 | readstat_sys::readstat_type_e_READSTAT_TYPE_STRING_REF => {
312 let sb = builder.as_string_mut();
313 if is_missing == 1 {
314 sb.append_null();
315 } else {
316 let ptr = unsafe { readstat_sys::readstat_string_value(value) };
317 if ptr.is_null() {
318 sb.append_null();
319 } else {
320 let cstr = unsafe { std::ffi::CStr::from_ptr(ptr) };
321 if let Ok(s) = cstr.to_str() {
323 sb.append_value(s);
324 } else {
325 let s = String::from_utf8_lossy(cstr.to_bytes());
327 sb.append_value(s.as_ref());
328 }
329 }
330 }
331 }
332 readstat_sys::readstat_type_e_READSTAT_TYPE_INT8 => {
333 if is_missing == 1 {
334 builder.append_null();
335 } else {
336 let v = unsafe { readstat_sys::readstat_int8_value(value) };
337 debug!("value is {v:#?}");
338 if let ColumnBuilder::Int16(b) = builder {
340 b.append_value(i16::from(v));
341 }
342 }
343 }
344 readstat_sys::readstat_type_e_READSTAT_TYPE_INT16 => {
345 if is_missing == 1 {
346 builder.append_null();
347 } else {
348 let v = unsafe { readstat_sys::readstat_int16_value(value) };
349 debug!("value is {v:#?}");
350 if let ColumnBuilder::Int16(b) = builder {
351 b.append_value(v);
352 }
353 }
354 }
355 readstat_sys::readstat_type_e_READSTAT_TYPE_INT32 => {
356 if is_missing == 1 {
357 builder.append_null();
358 } else {
359 let v = unsafe { readstat_sys::readstat_int32_value(value) };
360 debug!("value is {v:#?}");
361 if let ColumnBuilder::Int32(b) = builder {
362 b.append_value(v);
363 }
364 }
365 }
366 readstat_sys::readstat_type_e_READSTAT_TYPE_FLOAT => {
367 if is_missing == 1 {
368 builder.append_null();
369 } else {
370 let raw = unsafe { readstat_sys::readstat_float_value(value) };
371 debug!("value (before parsing) is {raw:#?}");
372 let val = round_decimal_f32(raw);
373 debug!("value (after parsing) is {val:#?}");
374 if let ColumnBuilder::Float32(b) = builder {
375 b.append_value(val);
376 }
377 }
378 }
379 readstat_sys::readstat_type_e_READSTAT_TYPE_DOUBLE => {
380 let var_format_class = d.vars.get(&col_index).and_then(|vm| vm.var_format_class);
381
382 if is_missing == 1 {
383 builder.append_null();
384 } else {
385 let raw = unsafe { readstat_sys::readstat_double_value(value) };
386 debug!("value (before parsing) is {raw:#?}");
387 let val = round_decimal_f64(raw);
388 debug!("value (after parsing) is {val:#?}");
389
390 match var_format_class {
391 None => {
392 if let ColumnBuilder::Float64(b) = builder {
393 b.append_value(val);
394 }
395 }
396 Some(ReadStatVarFormatClass::Date) => {
397 if let ColumnBuilder::Date32(b) = builder {
398 if let Some(shifted) = (val as i32).checked_sub(DAY_SHIFT) {
399 b.append_value(shifted);
400 } else {
401 d.errors.push("Date overflow".to_string());
402 return ReadStatHandler::READSTAT_HANDLER_ABORT as c_int;
403 }
404 }
405 }
406 Some(ReadStatVarFormatClass::DateTime) => {
407 if let ColumnBuilder::TimestampSecond(b) = builder {
408 if let Some(shifted) = (val as i64).checked_sub(SEC_SHIFT) {
409 b.append_value(shifted);
410 } else {
411 d.errors.push("DateTime overflow".to_string());
412 return ReadStatHandler::READSTAT_HANDLER_ABORT as c_int;
413 }
414 }
415 }
416 Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
417 if let ColumnBuilder::TimestampMillisecond(b) = builder {
418 b.append_value(((val - SEC_SHIFT as f64) * 1000.0) as i64);
419 }
420 }
421 Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
422 if let ColumnBuilder::TimestampMicrosecond(b) = builder {
423 b.append_value(((val - SEC_SHIFT as f64) * 1_000_000.0) as i64);
424 }
425 }
426 Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
427 if let ColumnBuilder::TimestampNanosecond(b) = builder {
428 b.append_value(((val - SEC_SHIFT as f64) * 1_000_000_000.0) as i64);
429 }
430 }
431 Some(ReadStatVarFormatClass::Time) => {
432 if let ColumnBuilder::Time32Second(b) = builder {
433 b.append_value(val as i32);
434 }
435 }
436 Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
437 if let ColumnBuilder::Time64Microsecond(b) = builder {
438 b.append_value((val * 1_000_000.0) as i64);
439 }
440 }
441 }
442 }
443 }
444 _ => unreachable!(),
445 }
446
447 if var_index == (d.total_var_count - 1) {
449 d.chunk_rows_processed += 1;
450 if let Some(trp) = &d.total_rows_processed {
451 trp.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
452 }
453 }
454
455 ReadStatHandler::READSTAT_HANDLER_OK as c_int
456}