1use colored::Colorize;
4use crossbeam::channel::bounded;
5use indicatif::{ProgressBar, ProgressStyle};
6use log::debug;
7use path_abs::{PathAbs, PathInfo};
8use rayon::prelude::*;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::thread;
12
13use readstat::{
14 OutFormat, ProgressCallback, ReadStatData, ReadStatError, ReadStatMetadata, ReadStatPath,
15 ReadStatWriter, WriteConfig, build_offsets,
16};
17
18use crate::cli::{ReadStatCli, ReadStatCliCommands, Reader};
19
20const STREAM_ROWS: u32 = 10000;
22
23const CHANNEL_CAPACITY: usize = 10;
26
27fn write_empty_output(
31 var_count: i32,
32 vars: Arc<std::collections::BTreeMap<i32, readstat::ReadStatVarMetadata>>,
33 schema: Arc<arrow_schema::Schema>,
34 wc: &WriteConfig,
35 input_path: &std::path::Path,
36) -> Result<(), ReadStatError> {
37 let mut d = ReadStatData::new().init_shared(var_count, vars, schema.clone(), 0, 0);
38 d.batch = Some(arrow_array::RecordBatch::new_empty(schema));
39 let mut wtr = ReadStatWriter::new();
40 wtr.write(&d, wc)?;
41 let rows = wtr.finish(&d, wc)?;
42 print_write_summary(rows, input_path, wc.out_path());
43 Ok(())
44}
45
46fn print_write_summary(rows: usize, in_path: &std::path::Path, out_path: Option<&std::path::Path>) {
49 let in_f = in_path
50 .file_name()
51 .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
52 let out_f = out_path
53 .and_then(std::path::Path::file_name)
54 .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
55 println!(
56 "In total, wrote {} rows from file {in_f} into {out_f}",
57 format_with_commas(rows)
58 );
59}
60
61fn format_with_commas(n: usize) -> String {
63 let s = n.to_string();
64 let bytes = s.as_bytes();
65 let len = bytes.len();
66 if len <= 3 {
67 return s;
68 }
69 let mut result = String::with_capacity(len + len / 3);
70 for (i, &b) in bytes.iter().enumerate() {
71 if i > 0 && (len - i).is_multiple_of(3) {
72 result.push(',');
73 }
74 result.push(b as char);
75 }
76 result
77}
78
79fn resolve_stream_rows(reader: Option<Reader>, stream_rows: Option<u32>, total_rows: u32) -> u32 {
81 match reader {
82 Some(Reader::Stream) | None => stream_rows.unwrap_or(STREAM_ROWS),
83 Some(Reader::Mem) => total_rows,
84 }
85}
86
87struct IndicatifProgress {
89 pb: ProgressBar,
90}
91
92impl ProgressCallback for IndicatifProgress {
93 fn inc(&self, n: u64) {
94 self.pb.inc(n);
95 }
96
97 fn parsing_started(&self, path: &str) {
98 self.pb
103 .set_message(format!("Parsing sas7bdat data from file {path}"));
104 self.pb
105 .enable_steady_tick(std::time::Duration::from_millis(120));
106 }
107}
108
109fn create_progress(
111 no_progress: bool,
112 total_rows: u32,
113) -> Result<Option<Arc<IndicatifProgress>>, ReadStatError> {
114 if no_progress {
115 return Ok(None);
116 }
117 let pb = ProgressBar::new(u64::from(total_rows));
118 pb.set_style(
119 ProgressStyle::default_bar()
120 .template(
121 "[{spinner:.green} {elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} rows {msg}",
122 )
123 .map_err(|e| ReadStatError::Other(format!("Progress bar template error: {e}")))?
124 .progress_chars("##-"),
125 );
126 Ok(Some(Arc::new(IndicatifProgress { pb })))
127}
128
129fn resolve_columns(
131 columns: Option<Vec<String>>,
132 columns_file: Option<PathBuf>,
133) -> Result<Option<Vec<String>>, ReadStatError> {
134 if let Some(path) = columns_file {
135 let names = ReadStatMetadata::parse_columns_file(&path)?;
136 if names.is_empty() {
137 Err(ReadStatError::EmptyColumnsFile(path))
140 } else {
141 Ok(Some(names))
142 }
143 } else {
144 Ok(columns)
145 }
146}
147
148#[cfg(feature = "sql")]
150fn resolve_sql(
151 sql: Option<String>,
152 sql_file: Option<PathBuf>,
153) -> Result<Option<String>, ReadStatError> {
154 if let Some(path) = sql_file {
155 Ok(Some(readstat::read_sql_file(&path)?))
156 } else {
157 Ok(sql)
158 }
159}
160
161#[cfg(feature = "sql")]
163fn table_name_from_path(path: &std::path::Path) -> String {
164 path.file_stem()
165 .and_then(|s| s.to_str())
166 .unwrap_or("data")
167 .to_string()
168}
169
170pub fn run(rs: ReadStatCli) -> Result<(), ReadStatError> {
175 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
179
180 match rs.command {
181 cmd @ ReadStatCliCommands::Metadata { .. } => run_metadata(cmd),
182 cmd @ ReadStatCliCommands::Preview { .. } => run_preview(cmd),
183 cmd @ ReadStatCliCommands::Data { .. } => run_data(cmd),
184 }
185}
186
187fn run_metadata(cmd: ReadStatCliCommands) -> Result<(), ReadStatError> {
189 let ReadStatCliCommands::Metadata {
190 input: in_path,
191 as_json,
192 skip_row_count,
193 } = cmd
194 else {
195 unreachable!()
196 };
197 let sas_path = PathAbs::new(in_path)?.as_path().to_path_buf();
198 debug!(
199 "Retrieving metadata from the file {}",
200 &sas_path.to_string_lossy()
201 );
202
203 let rsp = ReadStatPath::new(sas_path)?;
204 let mut md = ReadStatMetadata::new();
205 md.read_metadata(&rsp, skip_row_count)?;
206 println!("{}", ReadStatWriter::metadata_to_string(&md, &rsp, as_json)?);
207 Ok(())
208}
209
210#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
212fn run_preview(cmd: ReadStatCliCommands) -> Result<(), ReadStatError> {
213 let ReadStatCliCommands::Preview {
214 input,
215 rows,
216 reader,
217 stream_rows,
218 no_progress,
219 columns,
220 columns_file,
221 #[cfg(feature = "sql")]
222 sql,
223 #[cfg(feature = "sql")]
224 sql_file,
225 } = cmd
226 else {
227 unreachable!()
228 };
229
230 #[cfg(feature = "sql")]
231 let sql_query = resolve_sql(sql, sql_file)?;
232
233 let sas_path = PathAbs::new(input)?.as_path().to_path_buf();
234 debug!(
235 "Generating data preview from the file {}",
236 &sas_path.to_string_lossy()
237 );
238
239 let rsp = ReadStatPath::new(sas_path)?;
240 let mut md = ReadStatMetadata::new();
241 md.read_metadata(&rsp, false)?;
242
243 let col_names = resolve_columns(columns, columns_file)?;
245 let column_filter = md.resolve_selected_columns(col_names)?;
246 let original_var_count = md.var_count;
247 if let Some(ref mapping) = column_filter {
248 md = md.filter_to_selected_columns(mapping);
249 }
250
251 let column_filter = column_filter.map(Arc::new);
252 let total_rows_to_process = std::cmp::min(rows, md.row_count as u32);
253 let total_rows_to_stream = resolve_stream_rows(reader, stream_rows, total_rows_to_process);
254 let total_rows_processed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
255 let progress = create_progress(no_progress, total_rows_to_process)?;
256
257 let offsets = build_offsets(total_rows_to_process, total_rows_to_stream);
258 let offsets_pairs = offsets.windows(2);
259
260 let var_count = md.var_count;
261 let vars_shared = Arc::new(md.vars);
262 let schema_shared = Arc::new(md.schema);
263
264 if let Some(ref p) = progress {
266 p.parsing_started(&rsp.path.to_string_lossy());
267 }
268
269 let mut all_batches: Vec<arrow_array::RecordBatch> = Vec::new();
271 for w in offsets_pairs {
272 let row_start = w[0];
273 let row_end = w[1];
274
275 let mut d = ReadStatData::new()
276 .set_column_filter(column_filter.clone(), original_var_count)
277 .set_total_rows_processed(total_rows_processed.clone())
278 .init_shared(
279 var_count,
280 vars_shared.clone(),
281 schema_shared.clone(),
282 row_start,
283 row_end,
284 );
285
286 if let Some(ref p) = progress {
287 d = d.set_progress(p.clone() as Arc<dyn ProgressCallback>);
288 }
289
290 d.read_data(&rsp)?;
291
292 if let Some(batch) = d.batch {
293 all_batches.push(batch);
294 }
295 }
296
297 if let Some(p) = progress {
298 p.pb.finish_with_message("Done");
299 }
300
301 #[cfg(feature = "sql")]
303 let all_batches = if let Some(ref query) = sql_query {
304 let table_name = table_name_from_path(&rsp.path);
305 readstat::execute_sql(all_batches, schema_shared.clone(), &table_name, query)?
306 } else {
307 all_batches
308 };
309
310 #[cfg(feature = "csv")]
312 {
313 let stdout = std::io::stdout();
314 let mut csv_writer = arrow_csv::WriterBuilder::new()
315 .with_header(true)
316 .build(stdout);
317 for batch in &all_batches {
318 csv_writer.write(batch)?;
319 }
320 }
321 #[cfg(not(feature = "csv"))]
322 {
323 let _ = all_batches;
324 return Err(ReadStatError::Other(
325 "CSV feature is required for preview output".to_string(),
326 ));
327 }
328 #[cfg(feature = "csv")]
329 Ok(())
330}
331
332#[allow(
334 clippy::too_many_lines,
335 clippy::cast_sign_loss,
336 clippy::cast_possible_truncation
337)]
338fn run_data(cmd: ReadStatCliCommands) -> Result<(), ReadStatError> {
339 let ReadStatCliCommands::Data {
340 input,
341 output,
342 format,
343 rows,
344 reader,
345 stream_rows,
346 no_progress,
347 overwrite,
348 parallel,
349 parallel_write,
350 #[cfg(feature = "parquet")]
351 parallel_write_buffer_mb,
352 #[cfg(not(feature = "parquet"))]
353 parallel_write_buffer_mb: _,
354 compression,
355 compression_level,
356 columns,
357 columns_file,
358 #[cfg(feature = "sql")]
359 sql,
360 #[cfg(feature = "sql")]
361 sql_file,
362 } = cmd
363 else {
364 unreachable!()
365 };
366
367 #[cfg(feature = "sql")]
368 let sql_query = resolve_sql(sql, sql_file)?;
369
370 let sas_path = PathAbs::new(input)?.as_path().to_path_buf();
371 debug!(
372 "Generating data from the file {}",
373 &sas_path.to_string_lossy()
374 );
375
376 let rsp = ReadStatPath::new(sas_path)?;
377 let wc = WriteConfig::new(
378 output,
379 format.map(Into::into),
380 overwrite,
381 compression.map(Into::into),
382 compression_level,
383 )?;
384
385 let mut md = ReadStatMetadata::new();
386 md.read_metadata(&rsp, false)?;
387
388 match wc.out_path() {
390 None => {
391 #[cfg(feature = "sql")]
395 if sql_query.is_some() {
396 return Err(ReadStatError::Other(
397 "--sql/--sql-file requires --output: the query result needs a destination file"
398 .to_string(),
399 ));
400 }
401
402 println!(
403 "{}: a value was not provided for the parameter {}, thus displaying metadata only\n",
404 "Warning".bright_yellow(),
405 "--output".bright_cyan()
406 );
407
408 println!("{}", ReadStatWriter::metadata_to_string(&md, &rsp, false)?);
411 Ok(())
412 }
413 Some(p) => {
414 println!(
415 "Writing parsed data to file {}",
416 p.to_string_lossy().bright_yellow()
417 );
418
419 let col_names = resolve_columns(columns, columns_file)?;
421 let column_filter = md.resolve_selected_columns(col_names)?;
422 let original_var_count = md.var_count;
423 if let Some(ref mapping) = column_filter {
424 md = md.filter_to_selected_columns(mapping);
425 }
426 let column_filter = column_filter.map(Arc::new);
427
428 let total_rows_to_process = if let Some(r) = rows {
430 std::cmp::min(r, md.row_count as u32)
431 } else {
432 md.row_count as u32
433 };
434
435 let total_rows_to_stream =
436 resolve_stream_rows(reader, stream_rows, total_rows_to_process);
437 let total_rows_processed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
438 let progress = create_progress(no_progress, total_rows_to_process)?;
439
440 let offsets = build_offsets(total_rows_to_process, total_rows_to_stream);
441
442 let use_parallel_writes =
443 parallel && parallel_write && matches!(wc.format(), OutFormat::Parquet);
444
445 let input_path = rsp.path.clone();
446
447 let var_count = md.var_count;
448 let vars_shared = Arc::new(md.vars);
449 let schema_shared = Arc::new(md.schema);
450
451 #[cfg(feature = "sql")]
453 let sql_table_name = table_name_from_path(&rsp.path);
454
455 let (s, r) = bounded(CHANNEL_CAPACITY);
456 let progress_thread = progress.clone();
457 let wc_thread = wc.clone();
458
459 let vars_writer = vars_shared.clone();
463 let schema_writer = schema_shared.clone();
464
465 if let Some(ref p) = progress {
469 p.parsing_started(&rsp.path.to_string_lossy());
470 }
471
472 let reader_handle = spawn_reader(
476 ReaderConfig {
477 rsp,
478 offsets,
479 parallel,
480 column_filter,
481 original_var_count,
482 total_rows_processed,
483 var_count,
484 vars: vars_shared,
485 schema: schema_shared,
486 progress: progress_thread,
487 wc: wc_thread,
488 },
489 s,
490 );
491
492 let ctx = WriteContext {
497 rx: r,
498 reader: reader_handle,
499 wc,
500 input_path,
501 var_count,
502 vars: vars_writer,
503 schema: schema_writer,
504 };
505
506 #[cfg(feature = "sql")]
507 let has_sql = sql_query.is_some();
508 #[cfg(not(feature = "sql"))]
509 let has_sql = false;
510
511 if has_sql {
512 #[cfg(feature = "sql")]
513 {
514 let query = sql_query
515 .as_ref()
516 .expect("sql_query must be set when has_sql is true");
517 write_with_sql(ctx, query, &sql_table_name)?;
518 }
519 } else if use_parallel_writes {
520 #[cfg(feature = "parquet")]
521 {
522 let buffer_size_bytes = (parallel_write_buffer_mb * 1024 * 1024) as usize;
523 write_parallel_parquet(ctx, buffer_size_bytes)?;
524 }
525 #[cfg(not(feature = "parquet"))]
526 {
527 return Err(ReadStatError::Other(
528 "Parallel writes require the parquet feature".to_string(),
529 ));
530 }
531 } else {
532 write_sequential(ctx)?;
533 }
534
535 if let Some(p) = progress {
536 p.pb.finish_with_message("Done");
537 }
538
539 Ok(())
540 }
541 }
542}
543
544struct ReaderConfig {
549 rsp: ReadStatPath,
551 offsets: Vec<u32>,
553 parallel: bool,
555 column_filter: Option<Arc<std::collections::BTreeMap<i32, i32>>>,
557 original_var_count: i32,
559 total_rows_processed: Arc<std::sync::atomic::AtomicUsize>,
561 var_count: i32,
563 vars: Arc<std::collections::BTreeMap<i32, readstat::ReadStatVarMetadata>>,
565 schema: Arc<arrow_schema::Schema>,
567 progress: Option<Arc<IndicatifProgress>>,
569 wc: WriteConfig,
571}
572
573fn spawn_reader(
580 cfg: ReaderConfig,
581 sender: crossbeam::channel::Sender<(ReadStatData, WriteConfig, usize)>,
582) -> thread::JoinHandle<Result<(), ReadStatError>> {
583 let ReaderConfig {
584 rsp,
585 offsets,
586 parallel,
587 column_filter,
588 original_var_count,
589 total_rows_processed,
590 var_count,
591 vars,
592 schema,
593 progress,
594 wc,
595 } = cfg;
596
597 thread::spawn(move || -> Result<(), ReadStatError> {
598 let offsets_pairs: Vec<_> = offsets.windows(2).collect();
599 let pairs_cnt = offsets_pairs.len();
600
601 let parse_chunk = |w: &[u32]| -> Result<ReadStatData, ReadStatError> {
602 let row_start = w[0];
603 let row_end = w[1];
604
605 let mut d = ReadStatData::new()
606 .set_column_filter(column_filter.clone(), original_var_count)
607 .set_total_rows_processed(total_rows_processed.clone())
608 .init_shared(
609 var_count,
610 vars.clone(),
611 schema.clone(),
612 row_start,
613 row_end,
614 );
615
616 if let Some(ref p) = progress {
617 d = d.set_progress(p.clone() as Arc<dyn ProgressCallback>);
618 }
619
620 d.read_data(&rsp)?;
621
622 Ok(d)
623 };
624
625 let send_err = || {
626 ReadStatError::Other(
627 "Error when attempting to send read data for writing".to_string(),
628 )
629 };
630
631 if parallel {
632 let results: Vec<Result<ReadStatData, ReadStatError>> =
636 offsets_pairs.par_iter().map(|w| parse_chunk(w)).collect();
637
638 for result in results {
639 let d = result?;
640 sender
641 .send((d, wc.clone(), pairs_cnt))
642 .map_err(|_| send_err())?;
643 }
644 } else {
645 for w in &offsets_pairs {
649 let d = parse_chunk(w)?;
650 sender
651 .send((d, wc.clone(), pairs_cnt))
652 .map_err(|_| send_err())?;
653 }
654 }
655
656 Ok(())
657 })
658}
659
660struct WriteContext {
667 rx: crossbeam::channel::Receiver<(ReadStatData, WriteConfig, usize)>,
669 reader: thread::JoinHandle<Result<(), ReadStatError>>,
671 wc: WriteConfig,
673 input_path: PathBuf,
675 var_count: i32,
677 vars: Arc<std::collections::BTreeMap<i32, readstat::ReadStatVarMetadata>>,
679 schema: Arc<arrow_schema::Schema>,
681}
682
683fn join_reader(
689 handle: thread::JoinHandle<Result<(), ReadStatError>>,
690) -> Result<(), ReadStatError> {
691 match handle.join() {
692 Ok(res) => res,
693 Err(_) => Err(ReadStatError::Other("Reader thread panicked".to_string())),
694 }
695}
696
697fn write_sequential(ctx: WriteContext) -> Result<(), ReadStatError> {
701 let WriteContext {
702 rx,
703 reader,
704 wc,
705 input_path,
706 var_count,
707 vars,
708 schema,
709 } = ctx;
710
711 let mut wtr = ReadStatWriter::new();
712
713 let mut last: Option<(ReadStatData, WriteConfig)> = None;
717 for (d, chunk_wc, _pairs_cnt) in rx.iter() {
718 wtr.write(&d, &chunk_wc)?;
719 last = Some((d, chunk_wc));
720 }
721
722 join_reader(reader)?;
724
725 match last {
726 Some((d, chunk_wc)) => {
727 let rows = wtr.finish(&d, &chunk_wc)?;
728 print_write_summary(rows, &input_path, chunk_wc.out_path());
729 }
730 None => {
731 write_empty_output(var_count, vars, schema, &wc, &input_path)?;
733 }
734 }
735
736 Ok(())
737}
738
739#[cfg(feature = "parquet")]
743fn write_parallel_parquet(ctx: WriteContext, buffer_size_bytes: usize) -> Result<(), ReadStatError> {
744 let WriteContext {
745 rx,
746 reader,
747 wc,
748 input_path,
749 var_count,
750 vars,
751 schema,
752 } = ctx;
753
754 let out_path = wc.out_path().map(std::path::Path::to_path_buf);
755 let compression = wc.compression();
756 let compression_level = wc.compression_level();
757
758 let temp_dir = if let Some(out_path) = &out_path {
759 match out_path.parent() {
760 Ok(parent) => parent.to_path_buf(),
761 Err(_) => std::env::current_dir()?,
762 }
763 } else {
764 return Err(ReadStatError::Other(
765 "No output path specified for parallel write".to_string(),
766 ));
767 };
768
769 let staging = tempfile::Builder::new()
775 .prefix(".readstat-parquet-")
776 .tempdir_in(&temp_dir)?;
777
778 let mut all_temp_files: Vec<PathBuf> = Vec::new();
779 let mut merged_schema: Option<Arc<arrow_schema::Schema>> = None;
780 let mut batch_idx: usize = 0;
781
782 loop {
783 let mut batch_group: Vec<(ReadStatData, WriteConfig, usize)> =
784 Vec::with_capacity(CHANNEL_CAPACITY);
785 for item in &rx {
786 batch_group.push(item);
787 if batch_group.len() >= CHANNEL_CAPACITY {
788 break;
789 }
790 }
791
792 if batch_group.is_empty() {
793 break;
794 }
795
796 if merged_schema.is_none() {
797 merged_schema = Some(batch_group[0].0.schema.clone());
798 }
799 let schema_ref = merged_schema
800 .as_ref()
801 .expect("schema must be set after first batch group");
802
803 let temp_files: Vec<PathBuf> = batch_group
804 .par_iter()
805 .enumerate()
806 .map(|(i, (d, _wc, _))| -> Result<PathBuf, ReadStatError> {
807 let temp_file = staging
808 .path()
809 .join(format!("part_{}.parquet", batch_idx + i));
810
811 if let Some(batch) = &d.batch {
812 ReadStatWriter::write_batch_to_parquet(
813 batch,
814 schema_ref,
815 &temp_file,
816 compression,
817 compression_level,
818 buffer_size_bytes,
819 )?;
820 }
821
822 Ok(temp_file)
823 })
824 .collect::<Result<Vec<_>, _>>()?;
825
826 batch_idx += batch_group.len();
827 all_temp_files.extend(temp_files);
830 }
831
832 join_reader(reader)?;
834
835 if all_temp_files.is_empty() {
837 write_empty_output(var_count, vars, schema, &wc, &input_path)?;
839 } else if let Some(out_path) = &out_path {
840 ReadStatWriter::merge_parquet_files(
841 &all_temp_files,
842 out_path,
843 merged_schema
844 .as_ref()
845 .expect("schema must be set when temp files exist"),
846 compression,
847 compression_level,
848 )?;
849 }
850
851 Ok(())
852}
853
854#[cfg(feature = "sql")]
857fn write_with_sql(ctx: WriteContext, query: &str, table_name: &str) -> Result<(), ReadStatError> {
858 let WriteContext {
859 rx, reader, wc, schema, ..
860 } = ctx;
861
862 let mut all_batches = Vec::new();
863 for (d, _wc, _) in rx.iter() {
864 if let Some(batch) = d.batch {
865 all_batches.push(batch);
866 }
867 }
868
869 join_reader(reader)?;
870
871 let results = readstat::execute_sql(all_batches, schema, table_name, query)?;
872 if let Some(out_path) = wc.out_path() {
873 readstat::write_sql_results(
874 &results,
875 out_path,
876 wc.format(),
877 wc.compression(),
878 wc.compression_level(),
879 )?;
880 }
881
882 Ok(())
883}