1use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
9use log::debug;
10use num_derive::FromPrimitive;
11use serde::Serialize;
12#[cfg(any(not(target_arch = "wasm32"), test))]
13use std::fs::File;
14use std::{
15 collections::{BTreeMap, BTreeSet, HashMap},
16 ffi::{CString, c_void},
17 path::Path,
18};
19
20use crate::cb::{handle_metadata, handle_variable};
21use crate::err::{ReadStatError, check_c_error};
22use crate::rs_buffer_io::ReadStatBufferCtx;
23use crate::rs_parser::ReadStatParser;
24use crate::rs_path::ReadStatPath;
25use crate::rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass};
26
27#[derive(Clone, Debug, Serialize)]
33pub struct ReadStatMetadata {
34 pub row_count: i32,
36 pub var_count: i32,
38 pub table_name: String,
40 pub file_label: String,
42 pub file_encoding: String,
44 pub version: i32,
46 pub is64bit: i32,
48 pub creation_time: String,
50 pub modified_time: String,
52 pub compression: ReadStatCompress,
54 pub endianness: ReadStatEndian,
56 pub vars: BTreeMap<i32, ReadStatVarMetadata>,
58 #[serde(skip_serializing)]
60 pub schema: Schema,
61}
62
63impl Default for ReadStatMetadata {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl ReadStatMetadata {
70 pub fn new() -> Self {
72 Self {
73 row_count: 0,
74 var_count: 0,
75 table_name: String::new(),
76 file_label: String::new(),
77 file_encoding: String::new(),
78 version: 0,
79 is64bit: 0,
80 creation_time: String::new(),
81 modified_time: String::new(),
82 compression: ReadStatCompress::None,
83 endianness: ReadStatEndian::None,
84 vars: BTreeMap::new(),
85 schema: Schema::empty(),
86 }
87 }
88
89 fn initialize_schema(&self) -> Schema {
90 let fields: Vec<Field> = self
92 .vars
93 .values()
94 .map(|vm| {
95 let var_dt = match &vm.var_type {
96 ReadStatVarType::String
97 | ReadStatVarType::StringRef
98 | ReadStatVarType::Unknown => DataType::Utf8,
99 ReadStatVarType::Int8 | ReadStatVarType::Int16 => DataType::Int16,
100 ReadStatVarType::Int32 => DataType::Int32,
101 ReadStatVarType::Float => DataType::Float32,
102 ReadStatVarType::Double => match &vm.var_format_class {
103 Some(ReadStatVarFormatClass::Date) => DataType::Date32,
104 Some(ReadStatVarFormatClass::DateTime) => {
105 DataType::Timestamp(TimeUnit::Second, None)
106 }
107 Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
108 DataType::Timestamp(TimeUnit::Millisecond, None)
109 }
110 Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
111 DataType::Timestamp(TimeUnit::Microsecond, None)
112 }
113 Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
114 DataType::Timestamp(TimeUnit::Nanosecond, None)
115 }
116 Some(ReadStatVarFormatClass::Time) => DataType::Time32(TimeUnit::Second),
117 Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
118 DataType::Time64(TimeUnit::Microsecond)
119 }
120 None => DataType::Float64,
121 },
122 };
123
124 let mut field = Field::new(&vm.var_name, var_dt, true);
126 let mut metadata = HashMap::new();
127 if !vm.var_label.is_empty() {
128 metadata.insert("label".to_string(), vm.var_label.clone());
129 }
130 if !vm.var_format.is_empty() {
131 metadata.insert("sas_format".to_string(), vm.var_format.clone());
132 }
133 metadata.insert("storage_width".to_string(), vm.storage_width.to_string());
134 if vm.display_width != 0 {
135 metadata.insert("display_width".to_string(), vm.display_width.to_string());
136 }
137 if !metadata.is_empty() {
138 field = field.with_metadata(metadata);
139 }
140 field
141 })
142 .collect();
143
144 if self.file_label.is_empty() {
146 Schema::new(fields)
147 } else {
148 let mut schema_metadata = HashMap::new();
149 schema_metadata.insert("table_label".to_string(), self.file_label.clone());
150 Schema::new_with_metadata(fields, schema_metadata)
151 }
152 }
153
154 #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
165 pub fn read_metadata(
166 &mut self,
167 rsp: &ReadStatPath,
168 skip_row_count: bool,
169 ) -> Result<(), ReadStatError> {
170 debug!("Path as C string is {:?}", rsp.cstring_path);
171 let ppath = rsp.cstring_path.as_ptr();
172
173 let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
174
175 let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
176 debug!("Initially, error ==> {error}");
177
178 let row_limit = if skip_row_count { Some(1) } else { None };
179
180 let error = ReadStatParser::new()
181 .set_metadata_handler(Some(handle_metadata))?
182 .set_variable_handler(Some(handle_variable))?
183 .set_row_limit(row_limit)?
184 .parse_sas7bdat(ppath, ctx);
185
186 check_c_error(error as i32)?;
187
188 self.schema = self.initialize_schema();
190 Ok(())
191 }
192
193 #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
207 pub fn read_metadata_from_bytes(
208 &mut self,
209 bytes: &[u8],
210 skip_row_count: bool,
211 ) -> Result<(), ReadStatError> {
212 let mut buffer_ctx = ReadStatBufferCtx::new(bytes);
213
214 let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
215
216 let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
217 debug!("Initially, error ==> {error}");
218
219 let row_limit = if skip_row_count { Some(1) } else { None };
220
221 let dummy_path = CString::new("").expect("empty string is valid C string");
223
224 let error = buffer_ctx
225 .configure_parser(
226 ReadStatParser::new()
227 .set_metadata_handler(Some(handle_metadata))?
228 .set_variable_handler(Some(handle_variable))?
229 .set_row_limit(row_limit)?,
230 )?
231 .parse_sas7bdat(dummy_path.as_ptr(), ctx);
232
233 check_c_error(error as i32)?;
234
235 self.schema = self.initialize_schema();
237 Ok(())
238 }
239
240 #[cfg(not(target_arch = "wasm32"))]
257 pub fn read_metadata_from_mmap(
258 &mut self,
259 path: &Path,
260 skip_row_count: bool,
261 ) -> Result<(), ReadStatError> {
262 let file = File::open(path)?;
263 let mmap = unsafe { memmap2::Mmap::map(&file)? };
264 self.read_metadata_from_bytes(&mmap, skip_row_count)
265 }
266
267 pub fn parse_columns_file(path: &Path) -> Result<Vec<String>, ReadStatError> {
276 let contents = std::fs::read_to_string(path)?;
277 let names: Vec<String> = contents
278 .lines()
279 .map(str::trim)
280 .filter(|line| !line.is_empty() && !line.starts_with('#'))
281 .map(std::string::ToString::to_string)
282 .collect();
283 Ok(names)
284 }
285
286 pub fn resolve_selected_columns(
297 &self,
298 columns: Option<Vec<String>>,
299 ) -> Result<Option<BTreeMap<i32, i32>>, ReadStatError> {
300 let Some(columns) = columns else {
301 return Ok(None);
302 };
303
304 let requested: BTreeSet<String> = columns.into_iter().collect();
306
307 let name_to_index: HashMap<&str, i32> = self
309 .vars
310 .iter()
311 .map(|(&idx, vm)| (vm.var_name.as_str(), idx))
312 .collect();
313
314 let not_found: Vec<String> = requested
316 .iter()
317 .filter(|name| !name_to_index.contains_key(name.as_str()))
318 .cloned()
319 .collect();
320
321 if !not_found.is_empty() {
322 let available: Vec<String> = self.vars.values().map(|vm| vm.var_name.clone()).collect();
323 return Err(ReadStatError::ColumnsNotFound {
324 requested: not_found,
325 available,
326 });
327 }
328
329 let mut mapping = BTreeMap::new();
332 let mut new_index = 0i32;
333 for (&orig_index, vm) in &self.vars {
334 if requested.contains(&vm.var_name) {
335 mapping.insert(orig_index, new_index);
336 new_index += 1;
337 }
338 }
339
340 Ok(Some(mapping))
341 }
342
343 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
349 pub fn filter_to_selected_columns(&self, mapping: &BTreeMap<i32, i32>) -> Self {
350 let new_vars: BTreeMap<i32, ReadStatVarMetadata> = mapping
351 .iter()
352 .filter_map(|(&orig_idx, &new_idx)| {
353 self.vars.get(&orig_idx).map(|vm| (new_idx, vm.clone()))
354 })
355 .collect();
356
357 let mut filtered = Self {
358 row_count: self.row_count,
359 var_count: mapping.len() as i32,
360 table_name: self.table_name.clone(),
361 file_label: self.file_label.clone(),
362 file_encoding: self.file_encoding.clone(),
363 version: self.version,
364 is64bit: self.is64bit,
365 creation_time: self.creation_time.clone(),
366 modified_time: self.modified_time.clone(),
367 compression: self.compression.clone(),
368 endianness: self.endianness.clone(),
369 vars: new_vars,
370 schema: Schema::empty(),
371 };
372 filtered.schema = filtered.initialize_schema();
373 filtered
374 }
375}
376
377#[derive(Clone, Debug, Default, FromPrimitive, Serialize)]
379#[allow(clippy::cast_possible_wrap)]
380pub enum ReadStatCompress {
381 #[default]
383 None = readstat_sys::readstat_compress_e_READSTAT_COMPRESS_NONE as isize,
384 Rows = readstat_sys::readstat_compress_e_READSTAT_COMPRESS_ROWS as isize,
386 Binary = readstat_sys::readstat_compress_e_READSTAT_COMPRESS_BINARY as isize,
388}
389
390#[derive(Clone, Debug, Default, FromPrimitive, Serialize)]
392#[allow(clippy::cast_possible_wrap)]
393pub enum ReadStatEndian {
394 #[default]
396 None = readstat_sys::readstat_endian_e_READSTAT_ENDIAN_NONE as isize,
397 Little = readstat_sys::readstat_endian_e_READSTAT_ENDIAN_LITTLE as isize,
399 Big = readstat_sys::readstat_endian_e_READSTAT_ENDIAN_BIG as isize,
401}
402
403#[derive(Clone, Debug, Serialize)]
405pub struct ReadStatVarMetadata {
406 pub var_name: String,
408 pub var_type: ReadStatVarType,
410 pub var_type_class: ReadStatVarTypeClass,
412 pub var_label: String,
414 pub var_format: String,
416 pub var_format_class: Option<ReadStatVarFormatClass>,
418 pub storage_width: usize,
421 pub display_width: i32,
423}
424
425impl ReadStatVarMetadata {
426 #[allow(clippy::too_many_arguments)]
428 pub fn new(
429 var_name: String,
430 var_type: ReadStatVarType,
431 var_type_class: ReadStatVarTypeClass,
432 var_label: String,
433 var_format: String,
434 var_format_class: Option<ReadStatVarFormatClass>,
435 storage_width: usize,
436 display_width: i32,
437 ) -> Self {
438 Self {
439 var_name,
440 var_type,
441 var_type_class,
442 var_label,
443 var_format,
444 var_format_class,
445 storage_width,
446 display_width,
447 }
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use std::io::Write;
455
456 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
458 fn test_metadata(var_names: &[&str]) -> ReadStatMetadata {
459 let mut md = ReadStatMetadata::new();
460 for (i, name) in var_names.iter().enumerate() {
461 md.vars.insert(
462 i as i32,
463 ReadStatVarMetadata::new(
464 name.to_string(),
465 ReadStatVarType::Double,
466 ReadStatVarTypeClass::Numeric,
467 String::new(),
468 "BEST12".to_string(),
469 None,
470 8,
471 0,
472 ),
473 );
474 }
475 md.var_count = var_names.len() as i32;
476 md.schema = md.initialize_schema();
477 md
478 }
479
480 #[test]
483 fn resolve_columns_none_returns_none() {
484 let md = test_metadata(&["a", "b", "c"]);
485 assert!(md.resolve_selected_columns(None).unwrap().is_none());
486 }
487
488 #[test]
489 fn resolve_columns_valid_subset() {
490 let md = test_metadata(&["a", "b", "c"]);
491 let mapping = md
492 .resolve_selected_columns(Some(vec!["a".into(), "c".into()]))
493 .unwrap()
494 .unwrap();
495 assert_eq!(mapping.len(), 2);
496 assert_eq!(mapping[&0], 0);
498 assert_eq!(mapping[&2], 1);
500 }
501
502 #[test]
503 fn resolve_columns_invalid_name_errors() {
504 let md = test_metadata(&["a", "b", "c"]);
505 let err = md
506 .resolve_selected_columns(Some(vec!["a".into(), "nonexistent".into()]))
507 .unwrap_err();
508 match err {
509 ReadStatError::ColumnsNotFound {
510 requested,
511 available,
512 } => {
513 assert_eq!(requested, vec!["nonexistent"]);
514 assert_eq!(available, vec!["a", "b", "c"]);
515 }
516 other => panic!("Expected ColumnsNotFound, got {other:?}"),
517 }
518 }
519
520 #[test]
521 fn resolve_columns_all_columns() {
522 let md = test_metadata(&["x", "y", "z"]);
523 let mapping = md
524 .resolve_selected_columns(Some(vec!["x".into(), "y".into(), "z".into()]))
525 .unwrap()
526 .unwrap();
527 assert_eq!(mapping.len(), 3);
528 assert_eq!(mapping[&0], 0);
529 assert_eq!(mapping[&1], 1);
530 assert_eq!(mapping[&2], 2);
531 }
532
533 #[test]
536 fn filter_produces_contiguous_indices() {
537 let md = test_metadata(&["a", "b", "c", "d"]);
538 let mapping = md
539 .resolve_selected_columns(Some(vec!["b".into(), "d".into()]))
540 .unwrap()
541 .unwrap();
542 let filtered = md.filter_to_selected_columns(&mapping);
543
544 assert_eq!(filtered.var_count, 2);
545 assert_eq!(filtered.vars[&0].var_name, "b");
546 assert_eq!(filtered.vars[&1].var_name, "d");
547 }
548
549 #[test]
550 fn filter_preserves_schema() {
551 let md = test_metadata(&["a", "b", "c"]);
552 let mapping = md
553 .resolve_selected_columns(Some(vec!["b".into()]))
554 .unwrap()
555 .unwrap();
556 let filtered = md.filter_to_selected_columns(&mapping);
557
558 assert_eq!(filtered.schema.fields().len(), 1);
559 assert_eq!(filtered.schema.fields()[0].name(), "b");
560 }
561
562 #[test]
565 fn schema_string_type() {
566 let mut md = ReadStatMetadata::new();
567 md.vars.insert(
568 0,
569 ReadStatVarMetadata::new(
570 "name".into(),
571 ReadStatVarType::String,
572 ReadStatVarTypeClass::String,
573 String::new(),
574 "$30".into(),
575 None,
576 30,
577 0,
578 ),
579 );
580 md.var_count = 1;
581 let schema = md.initialize_schema();
582 assert_eq!(*schema.fields()[0].data_type(), DataType::Utf8);
583 }
584
585 #[test]
586 fn schema_float64_type() {
587 let mut md = ReadStatMetadata::new();
588 md.vars.insert(
589 0,
590 ReadStatVarMetadata::new(
591 "value".into(),
592 ReadStatVarType::Double,
593 ReadStatVarTypeClass::Numeric,
594 String::new(),
595 "BEST12".into(),
596 None,
597 8,
598 0,
599 ),
600 );
601 md.var_count = 1;
602 let schema = md.initialize_schema();
603 assert_eq!(*schema.fields()[0].data_type(), DataType::Float64);
604 }
605
606 #[test]
607 fn schema_date_type() {
608 let mut md = ReadStatMetadata::new();
609 md.vars.insert(
610 0,
611 ReadStatVarMetadata::new(
612 "dt".into(),
613 ReadStatVarType::Double,
614 ReadStatVarTypeClass::Numeric,
615 String::new(),
616 "DATE9".into(),
617 Some(ReadStatVarFormatClass::Date),
618 8,
619 0,
620 ),
621 );
622 md.var_count = 1;
623 let schema = md.initialize_schema();
624 assert_eq!(*schema.fields()[0].data_type(), DataType::Date32);
625 }
626
627 #[test]
628 fn schema_datetime_type() {
629 let mut md = ReadStatMetadata::new();
630 md.vars.insert(
631 0,
632 ReadStatVarMetadata::new(
633 "ts".into(),
634 ReadStatVarType::Double,
635 ReadStatVarTypeClass::Numeric,
636 String::new(),
637 "DATETIME22".into(),
638 Some(ReadStatVarFormatClass::DateTime),
639 8,
640 0,
641 ),
642 );
643 md.var_count = 1;
644 let schema = md.initialize_schema();
645 assert_eq!(
646 *schema.fields()[0].data_type(),
647 DataType::Timestamp(TimeUnit::Second, None)
648 );
649 }
650
651 #[test]
652 fn schema_time_type() {
653 let mut md = ReadStatMetadata::new();
654 md.vars.insert(
655 0,
656 ReadStatVarMetadata::new(
657 "tm".into(),
658 ReadStatVarType::Double,
659 ReadStatVarTypeClass::Numeric,
660 String::new(),
661 "TIME8".into(),
662 Some(ReadStatVarFormatClass::Time),
663 8,
664 0,
665 ),
666 );
667 md.var_count = 1;
668 let schema = md.initialize_schema();
669 assert_eq!(
670 *schema.fields()[0].data_type(),
671 DataType::Time32(TimeUnit::Second)
672 );
673 }
674
675 #[test]
676 fn schema_int32_type() {
677 let mut md = ReadStatMetadata::new();
678 md.vars.insert(
679 0,
680 ReadStatVarMetadata::new(
681 "count".into(),
682 ReadStatVarType::Int32,
683 ReadStatVarTypeClass::Numeric,
684 String::new(),
685 String::new(),
686 None,
687 4,
688 0,
689 ),
690 );
691 md.var_count = 1;
692 let schema = md.initialize_schema();
693 assert_eq!(*schema.fields()[0].data_type(), DataType::Int32);
694 }
695
696 #[test]
697 fn schema_with_labels_metadata() {
698 let mut md = ReadStatMetadata::new();
699 md.vars.insert(
700 0,
701 ReadStatVarMetadata::new(
702 "col".into(),
703 ReadStatVarType::Double,
704 ReadStatVarTypeClass::Numeric,
705 "My Label".into(),
706 "BEST12".into(),
707 None,
708 8,
709 0,
710 ),
711 );
712 md.var_count = 1;
713 md.file_label = "My Table".into();
714 let schema = md.initialize_schema();
715
716 let field_meta = schema.fields()[0].metadata();
718 assert_eq!(field_meta.get("label").unwrap(), "My Label");
719
720 let schema_meta = schema.metadata();
722 assert_eq!(schema_meta.get("table_label").unwrap(), "My Table");
723 }
724
725 #[test]
726 fn schema_no_labels_has_format_and_width_metadata() {
727 let mut md = ReadStatMetadata::new();
728 md.vars.insert(
729 0,
730 ReadStatVarMetadata::new(
731 "col".into(),
732 ReadStatVarType::Double,
733 ReadStatVarTypeClass::Numeric,
734 String::new(),
735 "BEST12".into(),
736 None,
737 8,
738 0,
739 ),
740 );
741 md.var_count = 1;
742 let schema = md.initialize_schema();
743
744 let field_meta = schema.fields()[0].metadata();
745 assert!(!field_meta.contains_key("label"));
746 assert_eq!(field_meta.get("sas_format").unwrap(), "BEST12");
747 assert_eq!(field_meta.get("storage_width").unwrap(), "8");
748 assert!(!field_meta.contains_key("display_width"));
749 assert!(schema.metadata().is_empty());
750 }
751
752 #[test]
755 fn parse_columns_file_normal() {
756 let dir = tempfile::tempdir().unwrap();
757 let path = dir.path().join("cols.txt");
758 let mut f = File::create(&path).unwrap();
759 writeln!(f, "col_a").unwrap();
760 writeln!(f, "col_b").unwrap();
761 writeln!(f, "col_c").unwrap();
762
763 let names = ReadStatMetadata::parse_columns_file(&path).unwrap();
764 assert_eq!(names, vec!["col_a", "col_b", "col_c"]);
765 }
766
767 #[test]
768 fn parse_columns_file_with_comments_and_blanks() {
769 let dir = tempfile::tempdir().unwrap();
770 let path = dir.path().join("cols.txt");
771 let mut f = File::create(&path).unwrap();
772 writeln!(f, "# This is a comment").unwrap();
773 writeln!(f, "col_a").unwrap();
774 writeln!(f).unwrap();
775 writeln!(f, " col_b ").unwrap();
776 writeln!(f, "# Another comment").unwrap();
777
778 let names = ReadStatMetadata::parse_columns_file(&path).unwrap();
779 assert_eq!(names, vec!["col_a", "col_b"]);
780 }
781
782 #[test]
783 fn parse_columns_file_empty() {
784 let dir = tempfile::tempdir().unwrap();
785 let path = dir.path().join("cols.txt");
786 File::create(&path).unwrap();
787
788 let names = ReadStatMetadata::parse_columns_file(&path).unwrap();
789 assert!(names.is_empty());
790 }
791
792 #[test]
793 fn parse_columns_file_nonexistent() {
794 let path = Path::new("/nonexistent/path/cols.txt");
795 assert!(ReadStatMetadata::parse_columns_file(path).is_err());
796 }
797
798 #[test]
801 fn default_metadata() {
802 let md = ReadStatMetadata::new();
803 assert_eq!(md.row_count, 0);
804 assert_eq!(md.var_count, 0);
805 assert!(md.table_name.is_empty());
806 assert!(md.vars.is_empty());
807 assert!(md.schema.fields().is_empty());
808 }
809}