1use arrow::datatypes::Schema;
9use arrow_array::{
10 ArrayRef, RecordBatch,
11 builder::{
12 Date32Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, StringBuilder,
13 Time32SecondBuilder, Time64MicrosecondBuilder, TimestampMicrosecondBuilder,
14 TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder,
15 },
16};
17use log::debug;
18use std::{
19 collections::BTreeMap,
20 ffi::CString,
21 os::raw::c_void,
22 sync::{Arc, atomic::AtomicUsize},
23};
24
25use crate::{
26 cb,
27 err::{ReadStatError, check_c_error},
28 progress::ProgressCallback,
29 rs_buffer_io::ReadStatBufferCtx,
30 rs_metadata::{ReadStatMetadata, ReadStatVarMetadata},
31 rs_parser::ReadStatParser,
32 rs_path::ReadStatPath,
33 rs_var::{ReadStatVarFormatClass, ReadStatVarType, ReadStatVarTypeClass},
34};
35
36pub(crate) enum ColumnBuilder {
42 Str(StringBuilder),
44 Int16(Int16Builder),
46 Int32(Int32Builder),
48 Float32(Float32Builder),
50 Float64(Float64Builder),
52 Date32(Date32Builder),
54 TimestampSecond(TimestampSecondBuilder),
56 TimestampMillisecond(TimestampMillisecondBuilder),
58 TimestampMicrosecond(TimestampMicrosecondBuilder),
60 TimestampNanosecond(TimestampNanosecondBuilder),
62 Time32Second(Time32SecondBuilder),
64 Time64Microsecond(Time64MicrosecondBuilder),
66}
67
68impl ColumnBuilder {
69 pub(crate) fn as_string_mut(&mut self) -> &mut StringBuilder {
74 match self {
75 Self::Str(b) => b,
76 _ => panic!("ColumnBuilder::as_string_mut called on non-string builder"),
77 }
78 }
79
80 pub(crate) fn append_null(&mut self) {
82 match self {
83 Self::Str(b) => b.append_null(),
84 Self::Int16(b) => b.append_null(),
85 Self::Int32(b) => b.append_null(),
86 Self::Float32(b) => b.append_null(),
87 Self::Float64(b) => b.append_null(),
88 Self::Date32(b) => b.append_null(),
89 Self::TimestampSecond(b) => b.append_null(),
90 Self::TimestampMillisecond(b) => b.append_null(),
91 Self::TimestampMicrosecond(b) => b.append_null(),
92 Self::TimestampNanosecond(b) => b.append_null(),
93 Self::Time32Second(b) => b.append_null(),
94 Self::Time64Microsecond(b) => b.append_null(),
95 }
96 }
97
98 pub(crate) fn finish(&mut self) -> ArrayRef {
100 match self {
101 Self::Str(b) => Arc::new(b.finish()),
102 Self::Int16(b) => Arc::new(b.finish()),
103 Self::Int32(b) => Arc::new(b.finish()),
104 Self::Float32(b) => Arc::new(b.finish()),
105 Self::Float64(b) => Arc::new(b.finish()),
106 Self::Date32(b) => Arc::new(b.finish()),
107 Self::TimestampSecond(b) => Arc::new(b.finish()),
108 Self::TimestampMillisecond(b) => Arc::new(b.finish()),
109 Self::TimestampMicrosecond(b) => Arc::new(b.finish()),
110 Self::TimestampNanosecond(b) => Arc::new(b.finish()),
111 Self::Time32Second(b) => Arc::new(b.finish()),
112 Self::Time64Microsecond(b) => Arc::new(b.finish()),
113 }
114 }
115
116 fn from_metadata(vm: &ReadStatVarMetadata, capacity: usize) -> Self {
122 match vm.var_type_class {
123 ReadStatVarTypeClass::String => Self::Str(StringBuilder::with_capacity(
124 capacity,
125 capacity * vm.storage_width,
126 )),
127 ReadStatVarTypeClass::Numeric => {
128 match vm.var_format_class {
129 Some(ReadStatVarFormatClass::Date) => {
130 Self::Date32(Date32Builder::with_capacity(capacity))
131 }
132 Some(ReadStatVarFormatClass::DateTime) => {
133 Self::TimestampSecond(TimestampSecondBuilder::with_capacity(capacity))
134 }
135 Some(ReadStatVarFormatClass::DateTimeWithMilliseconds) => {
136 Self::TimestampMillisecond(TimestampMillisecondBuilder::with_capacity(
137 capacity,
138 ))
139 }
140 Some(ReadStatVarFormatClass::DateTimeWithMicroseconds) => {
141 Self::TimestampMicrosecond(TimestampMicrosecondBuilder::with_capacity(
142 capacity,
143 ))
144 }
145 Some(ReadStatVarFormatClass::DateTimeWithNanoseconds) => {
146 Self::TimestampNanosecond(TimestampNanosecondBuilder::with_capacity(
147 capacity,
148 ))
149 }
150 Some(ReadStatVarFormatClass::Time) => {
151 Self::Time32Second(Time32SecondBuilder::with_capacity(capacity))
152 }
153 Some(ReadStatVarFormatClass::TimeWithMicroseconds) => {
154 Self::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(capacity))
155 }
156 None => {
157 match vm.var_type {
159 ReadStatVarType::Int8 | ReadStatVarType::Int16 => {
160 Self::Int16(Int16Builder::with_capacity(capacity))
161 }
162 ReadStatVarType::Int32 => {
163 Self::Int32(Int32Builder::with_capacity(capacity))
164 }
165 ReadStatVarType::Float => {
166 Self::Float32(Float32Builder::with_capacity(capacity))
167 }
168 _ => Self::Float64(Float64Builder::with_capacity(capacity)),
169 }
170 }
171 }
172 }
173 }
174 }
175}
176
177pub struct ReadStatData {
183 pub var_count: i32,
185 pub vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
188 pub(crate) builders: Vec<ColumnBuilder>,
190 pub schema: Arc<Schema>,
193 pub batch: Option<RecordBatch>,
195 pub chunk_rows_to_process: usize,
197 pub chunk_row_start: usize,
199 pub chunk_row_end: usize,
201 pub chunk_rows_processed: usize,
203 pub total_rows_to_process: usize,
205 pub total_rows_processed: Option<Arc<AtomicUsize>>,
207 pub progress: Option<Arc<dyn ProgressCallback>>,
209 pub no_progress: bool,
211 pub errors: Vec<String>,
213 pub column_filter: Option<Arc<BTreeMap<i32, i32>>>,
216 pub total_var_count: i32,
220}
221
222impl Default for ReadStatData {
223 fn default() -> Self {
224 Self::new()
225 }
226}
227
228impl ReadStatData {
229 pub fn new() -> Self {
231 Self {
232 var_count: 0,
234 vars: Arc::new(BTreeMap::new()),
235 builders: Vec::new(),
237 schema: Arc::new(Schema::empty()),
238 batch: None,
240 chunk_rows_to_process: 0,
241 chunk_rows_processed: 0,
242 chunk_row_start: 0,
243 chunk_row_end: 0,
244 total_rows_to_process: 0,
246 total_rows_processed: None,
247 progress: None,
249 no_progress: false,
250 errors: Vec::new(),
252 column_filter: None,
254 total_var_count: 0,
255 }
256 }
257
258 #[must_use]
263 pub fn allocate_builders(self) -> Self {
264 let capacity = self.chunk_rows_to_process;
265 let builders: Vec<ColumnBuilder> = self
266 .vars
267 .values()
268 .map(|vm| ColumnBuilder::from_metadata(vm, capacity))
269 .collect();
270 Self { builders, ..self }
271 }
272
273 pub(crate) fn cols_to_batch(&mut self) -> Result<(), ReadStatError> {
279 let arrays: Vec<ArrayRef> = self
280 .builders
281 .iter_mut()
282 .map(ColumnBuilder::finish)
283 .collect();
284
285 self.batch = Some(RecordBatch::try_new(self.schema.clone(), arrays)?);
286
287 Ok(())
288 }
289
290 pub fn read_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
296 self.parse_data(rsp)?;
298 self.cols_to_batch()?;
299 Ok(())
300 }
301
302 pub fn read_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
311 self.parse_data_from_bytes(bytes)?;
312 self.cols_to_batch()?;
313 Ok(())
314 }
315
316 #[cfg(not(target_arch = "wasm32"))]
331 pub fn read_data_from_mmap(&mut self, path: &std::path::Path) -> Result<(), ReadStatError> {
332 let file = std::fs::File::open(path)?;
333 let mmap = unsafe { memmap2::Mmap::map(&file)? };
334 self.read_data_from_bytes(&mmap)
335 }
336
337 #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
339 pub(crate) fn parse_data(&mut self, rsp: &ReadStatPath) -> Result<(), ReadStatError> {
340 debug!("Path as C string is {:?}", rsp.cstring_path);
342 let ppath = rsp.cstring_path.as_ptr();
343
344 if let Some(progress) = &self.progress {
346 progress.inc(self.chunk_rows_to_process as u64);
347 progress.parsing_started(&rsp.path.to_string_lossy());
348 }
349
350 let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
352
353 let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
355 debug!("Initially, error ==> {error:#?}");
356
357 let error = ReadStatParser::new()
360 .set_value_handler(Some(cb::handle_value))?
362 .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
363 .set_row_offset(Some(self.chunk_row_start.try_into()?))?
364 .parse_sas7bdat(ppath, ctx);
365
366 check_c_error(error as i32)?;
367 Ok(())
368 }
369
370 #[allow(clippy::cast_possible_wrap, clippy::ptr_as_ptr)]
371 fn parse_data_from_bytes(&mut self, bytes: &[u8]) -> Result<(), ReadStatError> {
372 let mut buffer_ctx = ReadStatBufferCtx::new(bytes);
373
374 let ctx = std::ptr::from_mut::<Self>(self) as *mut c_void;
376
377 let error: readstat_sys::readstat_error_t = readstat_sys::readstat_error_e_READSTAT_OK;
379 debug!("Initially, error ==> {error:#?}");
380
381 let dummy_path = CString::new("").expect("empty string is valid C string");
383
384 let error = buffer_ctx
386 .configure_parser(
387 ReadStatParser::new()
388 .set_value_handler(Some(cb::handle_value))?
389 .set_row_limit(Some(self.chunk_rows_to_process.try_into()?))?
390 .set_row_offset(Some(self.chunk_row_start.try_into()?))?,
391 )?
392 .parse_sas7bdat(dummy_path.as_ptr(), ctx);
393
394 check_c_error(error as i32)?;
395 Ok(())
396 }
397
398 #[must_use]
404 pub fn init(self, md: ReadStatMetadata, row_start: u32, row_end: u32) -> Self {
405 self.set_metadata(md)
406 .set_chunk_counts(row_start, row_end)
407 .allocate_builders()
408 }
409
410 #[must_use]
416 pub fn init_shared(
417 self,
418 var_count: i32,
419 vars: Arc<BTreeMap<i32, ReadStatVarMetadata>>,
420 schema: Arc<Schema>,
421 row_start: u32,
422 row_end: u32,
423 ) -> Self {
424 let total_var_count = if self.total_var_count != 0 {
425 self.total_var_count
426 } else {
427 var_count
428 };
429 Self {
430 var_count,
431 vars,
432 schema,
433 total_var_count,
434 ..self
435 }
436 .set_chunk_counts(row_start, row_end)
437 .allocate_builders()
438 }
439
440 #[allow(clippy::cast_possible_truncation)]
441 fn set_chunk_counts(self, row_start: u32, row_end: u32) -> Self {
442 let chunk_rows_to_process = (row_end - row_start) as usize;
443 let chunk_row_start = row_start as usize;
444 let chunk_row_end = row_end as usize;
445 let chunk_rows_processed = 0_usize;
446
447 Self {
448 chunk_rows_to_process,
449 chunk_row_start,
450 chunk_row_end,
451 chunk_rows_processed,
452 ..self
453 }
454 }
455
456 fn set_metadata(self, md: ReadStatMetadata) -> Self {
457 let var_count = md.var_count;
458 let vars = Arc::new(md.vars);
459 let schema = Arc::new(md.schema);
460 let total_var_count = if self.total_var_count != 0 {
462 self.total_var_count
463 } else {
464 var_count
465 };
466 Self {
467 var_count,
468 vars,
469 schema,
470 total_var_count,
471 ..self
472 }
473 }
474
475 #[must_use]
477 pub fn set_no_progress(self, no_progress: bool) -> Self {
478 Self {
479 no_progress,
480 ..self
481 }
482 }
483
484 #[must_use]
486 pub fn set_total_rows_to_process(self, total_rows_to_process: usize) -> Self {
487 Self {
488 total_rows_to_process,
489 ..self
490 }
491 }
492
493 #[must_use]
495 pub fn set_total_rows_processed(self, total_rows_processed: Arc<AtomicUsize>) -> Self {
496 Self {
497 total_rows_processed: Some(total_rows_processed),
498 ..self
499 }
500 }
501
502 #[must_use]
508 pub fn set_column_filter(
509 self,
510 filter: Option<Arc<BTreeMap<i32, i32>>>,
511 total_var_count: i32,
512 ) -> Self {
513 Self {
514 column_filter: filter,
515 total_var_count,
516 ..self
517 }
518 }
519
520 #[must_use]
525 pub fn set_progress(self, progress: Arc<dyn ProgressCallback>) -> Self {
526 Self {
527 progress: Some(progress),
528 ..self
529 }
530 }
531}