Skip to main content

readstat/
run.rs

1//! CLI dispatch logic for the readstat binary.
2
3use colored::Colorize;
4use crossbeam::channel::bounded;
5use indicatif::{ProgressBar, ProgressStyle};
6use log::debug;
7use path_abs::{PathAbs, PathInfo};
8use rayon::prelude::*;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::thread;
12
13use readstat::{
14    OutFormat, ProgressCallback, ReadStatData, ReadStatError, ReadStatMetadata, ReadStatPath,
15    ReadStatWriter, WriteConfig, build_offsets,
16};
17
18use crate::cli::{ReadStatCli, ReadStatCliCommands, Reader};
19
20/// Default number of rows to read per streaming chunk.
21const STREAM_ROWS: u32 = 10000;
22
23/// Capacity of the bounded channel between reader and writer threads.
24/// Also used as the batch size for bounded-batch parallel writes.
25const CHANNEL_CAPACITY: usize = 10;
26
27/// Writes a valid empty output file (header-only CSV, empty Parquet/Feather/
28/// NDJSON) when the input contributed zero rows. Without this, a zero-row
29/// input would produce no output file at all despite a success exit code.
30fn write_empty_output(
31    var_count: i32,
32    vars: Arc<std::collections::BTreeMap<i32, readstat::ReadStatVarMetadata>>,
33    schema: Arc<arrow_schema::Schema>,
34    wc: &WriteConfig,
35    input_path: &std::path::Path,
36) -> Result<(), ReadStatError> {
37    let mut d = ReadStatData::new().init_shared(var_count, vars, schema.clone(), 0, 0);
38    d.batch = Some(arrow_array::RecordBatch::new_empty(schema));
39    let mut wtr = ReadStatWriter::new();
40    wtr.write(&d, wc)?;
41    let rows = wtr.finish(&d, wc)?;
42    print_write_summary(rows, input_path, wc.out_path());
43    Ok(())
44}
45
46/// Prints the "wrote N rows" summary. The library no longer prints this; the
47/// CLI owns all user-facing output.
48fn print_write_summary(rows: usize, in_path: &std::path::Path, out_path: Option<&std::path::Path>) {
49    let in_f = in_path
50        .file_name()
51        .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
52    let out_f = out_path
53        .and_then(std::path::Path::file_name)
54        .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
55    println!(
56        "In total, wrote {} rows from file {in_f} into {out_f}",
57        format_with_commas(rows)
58    );
59}
60
61/// Formats a number with comma thousands separators (e.g. 1081 -> "1,081").
62fn format_with_commas(n: usize) -> String {
63    let s = n.to_string();
64    let bytes = s.as_bytes();
65    let len = bytes.len();
66    if len <= 3 {
67        return s;
68    }
69    let mut result = String::with_capacity(len + len / 3);
70    for (i, &b) in bytes.iter().enumerate() {
71        if i > 0 && (len - i).is_multiple_of(3) {
72            result.push(',');
73        }
74        result.push(b as char);
75    }
76    result
77}
78
79/// Determine stream row count based on reader type.
80fn resolve_stream_rows(reader: Option<Reader>, stream_rows: Option<u32>, total_rows: u32) -> u32 {
81    match reader {
82        Some(Reader::Stream) | None => stream_rows.unwrap_or(STREAM_ROWS),
83        Some(Reader::Mem) => total_rows,
84    }
85}
86
87/// [`ProgressCallback`] implementation backed by an `indicatif::ProgressBar`.
88struct IndicatifProgress {
89    pb: ProgressBar,
90}
91
92impl ProgressCallback for IndicatifProgress {
93    fn inc(&self, n: u64) {
94        self.pb.inc(n);
95    }
96
97    fn parsing_started(&self, path: &str) {
98        // Keep the {pos}/{len} row bar (configured in `create_progress`) and
99        // just animate its spinner for liveness while a chunk is parsing — the
100        // previous implementation swapped in a message-only spinner, so the row
101        // bar never appeared. Set the message to the file being parsed.
102        self.pb
103            .set_message(format!("Parsing sas7bdat data from file {path}"));
104        self.pb
105            .enable_steady_tick(std::time::Duration::from_millis(120));
106    }
107}
108
109/// Create a progress bar if progress is enabled.
110fn create_progress(
111    no_progress: bool,
112    total_rows: u32,
113) -> Result<Option<Arc<IndicatifProgress>>, ReadStatError> {
114    if no_progress {
115        return Ok(None);
116    }
117    let pb = ProgressBar::new(u64::from(total_rows));
118    pb.set_style(
119        ProgressStyle::default_bar()
120            .template(
121                "[{spinner:.green} {elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} rows {msg}",
122            )
123            .map_err(|e| ReadStatError::Other(format!("Progress bar template error: {e}")))?
124            .progress_chars("##-"),
125    );
126    Ok(Some(Arc::new(IndicatifProgress { pb })))
127}
128
129/// Resolve column names from `--columns` or `--columns-file` CLI options.
130fn resolve_columns(
131    columns: Option<Vec<String>>,
132    columns_file: Option<PathBuf>,
133) -> Result<Option<Vec<String>>, ReadStatError> {
134    if let Some(path) = columns_file {
135        let names = ReadStatMetadata::parse_columns_file(&path)?;
136        if names.is_empty() {
137            // An empty columns file is almost certainly a mistake; selecting ALL
138            // columns silently would mask it. Surface it as an error instead.
139            Err(ReadStatError::EmptyColumnsFile(path))
140        } else {
141            Ok(Some(names))
142        }
143    } else {
144        Ok(columns)
145    }
146}
147
148/// Resolve the SQL query from `--sql` or `--sql-file` CLI options.
149#[cfg(feature = "sql")]
150fn resolve_sql(
151    sql: Option<String>,
152    sql_file: Option<PathBuf>,
153) -> Result<Option<String>, ReadStatError> {
154    if let Some(path) = sql_file {
155        Ok(Some(readstat::read_sql_file(&path)?))
156    } else {
157        Ok(sql)
158    }
159}
160
161/// Extract a table name from the input file stem (e.g. "cars" from "cars.sas7bdat").
162#[cfg(feature = "sql")]
163fn table_name_from_path(path: &std::path::Path) -> String {
164    path.file_stem()
165        .and_then(|s| s.to_str())
166        .unwrap_or("data")
167        .to_string()
168}
169
170/// Executes the CLI command specified by the parsed [`ReadStatCli`] arguments.
171///
172/// This is the main entry point for the CLI binary, dispatching to the
173/// `metadata`, `preview`, or `data` subcommand.
174pub fn run(rs: ReadStatCli) -> Result<(), ReadStatError> {
175    // Default to showing warnings (e.g. "file will be overwritten") rather than
176    // env_logger's stock `error`-only filter, under which library `warn!`s were
177    // invisible. `RUST_LOG` still overrides this.
178    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
179
180    match rs.command {
181        cmd @ ReadStatCliCommands::Metadata { .. } => run_metadata(cmd),
182        cmd @ ReadStatCliCommands::Preview { .. } => run_preview(cmd),
183        cmd @ ReadStatCliCommands::Data { .. } => run_data(cmd),
184    }
185}
186
187/// Handle the `metadata` subcommand: read and display SAS file metadata.
188fn run_metadata(cmd: ReadStatCliCommands) -> Result<(), ReadStatError> {
189    let ReadStatCliCommands::Metadata {
190        input: in_path,
191        as_json,
192        skip_row_count,
193    } = cmd
194    else {
195        unreachable!()
196    };
197    let sas_path = PathAbs::new(in_path)?.as_path().to_path_buf();
198    debug!(
199        "Retrieving metadata from the file {}",
200        &sas_path.to_string_lossy()
201    );
202
203    let rsp = ReadStatPath::new(sas_path)?;
204    let mut md = ReadStatMetadata::new();
205    md.read_metadata(&rsp, skip_row_count)?;
206    println!("{}", ReadStatWriter::metadata_to_string(&md, &rsp, as_json)?);
207    Ok(())
208}
209
210/// Handle the `preview` subcommand: read a limited number of rows and write to stdout as CSV.
211#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
212fn run_preview(cmd: ReadStatCliCommands) -> Result<(), ReadStatError> {
213    let ReadStatCliCommands::Preview {
214        input,
215        rows,
216        reader,
217        stream_rows,
218        no_progress,
219        columns,
220        columns_file,
221        #[cfg(feature = "sql")]
222        sql,
223        #[cfg(feature = "sql")]
224        sql_file,
225    } = cmd
226    else {
227        unreachable!()
228    };
229
230    #[cfg(feature = "sql")]
231    let sql_query = resolve_sql(sql, sql_file)?;
232
233    let sas_path = PathAbs::new(input)?.as_path().to_path_buf();
234    debug!(
235        "Generating data preview from the file {}",
236        &sas_path.to_string_lossy()
237    );
238
239    let rsp = ReadStatPath::new(sas_path)?;
240    let mut md = ReadStatMetadata::new();
241    md.read_metadata(&rsp, false)?;
242
243    // Resolve column selection
244    let col_names = resolve_columns(columns, columns_file)?;
245    let column_filter = md.resolve_selected_columns(col_names)?;
246    let original_var_count = md.var_count;
247    if let Some(ref mapping) = column_filter {
248        md = md.filter_to_selected_columns(mapping);
249    }
250
251    let column_filter = column_filter.map(Arc::new);
252    let total_rows_to_process = std::cmp::min(rows, md.row_count as u32);
253    let total_rows_to_stream = resolve_stream_rows(reader, stream_rows, total_rows_to_process);
254    let total_rows_processed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
255    let progress = create_progress(no_progress, total_rows_to_process)?;
256
257    let offsets = build_offsets(total_rows_to_process, total_rows_to_stream);
258    let offsets_pairs = offsets.windows(2);
259
260    let var_count = md.var_count;
261    let vars_shared = Arc::new(md.vars);
262    let schema_shared = Arc::new(md.schema);
263
264    // Signal "parsing started" once (the library no longer does this per-chunk).
265    if let Some(ref p) = progress {
266        p.parsing_started(&rsp.path.to_string_lossy());
267    }
268
269    // Read all chunks into batches
270    let mut all_batches: Vec<arrow_array::RecordBatch> = Vec::new();
271    for w in offsets_pairs {
272        let row_start = w[0];
273        let row_end = w[1];
274
275        let mut d = ReadStatData::new()
276            .set_column_filter(column_filter.clone(), original_var_count)
277            .set_total_rows_processed(total_rows_processed.clone())
278            .init_shared(
279                var_count,
280                vars_shared.clone(),
281                schema_shared.clone(),
282                row_start,
283                row_end,
284            );
285
286        if let Some(ref p) = progress {
287            d = d.set_progress(p.clone() as Arc<dyn ProgressCallback>);
288        }
289
290        d.read_data(&rsp)?;
291
292        if let Some(batch) = d.batch {
293            all_batches.push(batch);
294        }
295    }
296
297    if let Some(p) = progress {
298        p.pb.finish_with_message("Done");
299    }
300
301    // Apply SQL query if provided
302    #[cfg(feature = "sql")]
303    let all_batches = if let Some(ref query) = sql_query {
304        let table_name = table_name_from_path(&rsp.path);
305        readstat::execute_sql(all_batches, schema_shared.clone(), &table_name, query)?
306    } else {
307        all_batches
308    };
309
310    // Write all batches to stdout as CSV
311    #[cfg(feature = "csv")]
312    {
313        let stdout = std::io::stdout();
314        let mut csv_writer = arrow_csv::WriterBuilder::new()
315            .with_header(true)
316            .build(stdout);
317        for batch in &all_batches {
318            csv_writer.write(batch)?;
319        }
320    }
321    #[cfg(not(feature = "csv"))]
322    {
323        let _ = all_batches;
324        return Err(ReadStatError::Other(
325            "CSV feature is required for preview output".to_string(),
326        ));
327    }
328    #[cfg(feature = "csv")]
329    Ok(())
330}
331
332/// Handle the `data` subcommand: read SAS data and write to an output file.
333#[allow(
334    clippy::too_many_lines,
335    clippy::cast_sign_loss,
336    clippy::cast_possible_truncation
337)]
338fn run_data(cmd: ReadStatCliCommands) -> Result<(), ReadStatError> {
339    let ReadStatCliCommands::Data {
340        input,
341        output,
342        format,
343        rows,
344        reader,
345        stream_rows,
346        no_progress,
347        overwrite,
348        parallel,
349        parallel_write,
350        #[cfg(feature = "parquet")]
351        parallel_write_buffer_mb,
352        #[cfg(not(feature = "parquet"))]
353            parallel_write_buffer_mb: _,
354        compression,
355        compression_level,
356        columns,
357        columns_file,
358        #[cfg(feature = "sql")]
359        sql,
360        #[cfg(feature = "sql")]
361        sql_file,
362    } = cmd
363    else {
364        unreachable!()
365    };
366
367    #[cfg(feature = "sql")]
368    let sql_query = resolve_sql(sql, sql_file)?;
369
370    let sas_path = PathAbs::new(input)?.as_path().to_path_buf();
371    debug!(
372        "Generating data from the file {}",
373        &sas_path.to_string_lossy()
374    );
375
376    let rsp = ReadStatPath::new(sas_path)?;
377    let wc = WriteConfig::new(
378        output,
379        format.map(Into::into),
380        overwrite,
381        compression.map(Into::into),
382        compression_level,
383    )?;
384
385    let mut md = ReadStatMetadata::new();
386    md.read_metadata(&rsp, false)?;
387
388    // If no output path then only read metadata; otherwise read data
389    match wc.out_path() {
390        None => {
391            // A SQL query with no destination would be silently discarded —
392            // surface it as an error rather than quietly falling through to the
393            // metadata-only display.
394            #[cfg(feature = "sql")]
395            if sql_query.is_some() {
396                return Err(ReadStatError::Other(
397                    "--sql/--sql-file requires --output: the query result needs a destination file"
398                        .to_string(),
399                ));
400            }
401
402            println!(
403                "{}: a value was not provided for the parameter {}, thus displaying metadata only\n",
404                "Warning".bright_yellow(),
405                "--output".bright_cyan()
406            );
407
408            // Column selection does not apply to a metadata-only display, so
409            // reuse the metadata already read above rather than parsing again.
410            println!("{}", ReadStatWriter::metadata_to_string(&md, &rsp, false)?);
411            Ok(())
412        }
413        Some(p) => {
414            println!(
415                "Writing parsed data to file {}",
416                p.to_string_lossy().bright_yellow()
417            );
418
419            // Resolve column selection (only meaningful when writing data).
420            let col_names = resolve_columns(columns, columns_file)?;
421            let column_filter = md.resolve_selected_columns(col_names)?;
422            let original_var_count = md.var_count;
423            if let Some(ref mapping) = column_filter {
424                md = md.filter_to_selected_columns(mapping);
425            }
426            let column_filter = column_filter.map(Arc::new);
427
428            // Determine row count
429            let total_rows_to_process = if let Some(r) = rows {
430                std::cmp::min(r, md.row_count as u32)
431            } else {
432                md.row_count as u32
433            };
434
435            let total_rows_to_stream =
436                resolve_stream_rows(reader, stream_rows, total_rows_to_process);
437            let total_rows_processed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
438            let progress = create_progress(no_progress, total_rows_to_process)?;
439
440            let offsets = build_offsets(total_rows_to_process, total_rows_to_stream);
441
442            let use_parallel_writes =
443                parallel && parallel_write && matches!(wc.format(), OutFormat::Parquet);
444
445            let input_path = rsp.path.clone();
446
447            let var_count = md.var_count;
448            let vars_shared = Arc::new(md.vars);
449            let schema_shared = Arc::new(md.schema);
450
451            // Computed before `rsp` moves into the reader thread below.
452            #[cfg(feature = "sql")]
453            let sql_table_name = table_name_from_path(&rsp.path);
454
455            let (s, r) = bounded(CHANNEL_CAPACITY);
456            let progress_thread = progress.clone();
457            let wc_thread = wc.clone();
458
459            // Arc handles for the writer side (the originals move into the
460            // reader thread); used to produce a valid empty output file when
461            // the input has zero rows.
462            let vars_writer = vars_shared.clone();
463            let schema_writer = schema_shared.clone();
464
465            // Signal "parsing started" exactly once (the library no longer does
466            // this per-chunk). Must happen before `rsp` moves into the reader
467            // thread below.
468            if let Some(ref p) = progress {
469                p.parsing_started(&rsp.path.to_string_lossy());
470            }
471
472            // Spawn the reader thread: it parses chunks and sends them down the
473            // channel. `rsp` and the shared metadata move into it here, so all
474            // uses of `rsp.path` above must already have happened.
475            let reader_handle = spawn_reader(
476                ReaderConfig {
477                    rsp,
478                    offsets,
479                    parallel,
480                    column_filter,
481                    original_var_count,
482                    total_rows_processed,
483                    var_count,
484                    vars: vars_shared,
485                    schema: schema_shared,
486                    progress: progress_thread,
487                    wc: wc_thread,
488                },
489                s,
490            );
491
492            // Everything the write strategies share. `ctx` owns the channel
493            // receiver and the reader-thread handle, so it moves into whichever
494            // strategy runs; each one drains the channel, joins the reader, and
495            // finalizes its output.
496            let ctx = WriteContext {
497                rx: r,
498                reader: reader_handle,
499                wc,
500                input_path,
501                var_count,
502                vars: vars_writer,
503                schema: schema_writer,
504            };
505
506            #[cfg(feature = "sql")]
507            let has_sql = sql_query.is_some();
508            #[cfg(not(feature = "sql"))]
509            let has_sql = false;
510
511            if has_sql {
512                #[cfg(feature = "sql")]
513                {
514                    let query = sql_query
515                        .as_ref()
516                        .expect("sql_query must be set when has_sql is true");
517                    write_with_sql(ctx, query, &sql_table_name)?;
518                }
519            } else if use_parallel_writes {
520                #[cfg(feature = "parquet")]
521                {
522                    let buffer_size_bytes = (parallel_write_buffer_mb * 1024 * 1024) as usize;
523                    write_parallel_parquet(ctx, buffer_size_bytes)?;
524                }
525                #[cfg(not(feature = "parquet"))]
526                {
527                    return Err(ReadStatError::Other(
528                        "Parallel writes require the parquet feature".to_string(),
529                    ));
530                }
531            } else {
532                write_sequential(ctx)?;
533            }
534
535            if let Some(p) = progress {
536                p.pb.finish_with_message("Done");
537            }
538
539            Ok(())
540        }
541    }
542}
543
544/// Inputs to the reader thread spawned by [`run_data`].
545///
546/// Bundles the parse configuration so [`spawn_reader`] takes a single named
547/// value rather than a long positional argument list.
548struct ReaderConfig {
549    /// Validated input path; moves into the reader thread.
550    rsp: ReadStatPath,
551    /// Chunk boundaries from [`build_offsets`]; consumed as `windows(2)` pairs.
552    offsets: Vec<u32>,
553    /// Whether to parse chunks concurrently on the rayon pool.
554    parallel: bool,
555    /// Optional original-index → filtered-index column mapping.
556    column_filter: Option<Arc<std::collections::BTreeMap<i32, i32>>>,
557    /// Unfiltered variable count, for row-boundary detection under filtering.
558    original_var_count: i32,
559    /// Shared counter of rows processed across all chunks.
560    total_rows_processed: Arc<std::sync::atomic::AtomicUsize>,
561    /// (Possibly filtered) variable count.
562    var_count: i32,
563    /// Shared variable metadata.
564    vars: Arc<std::collections::BTreeMap<i32, readstat::ReadStatVarMetadata>>,
565    /// Shared Arrow schema.
566    schema: Arc<arrow_schema::Schema>,
567    /// Optional progress callback.
568    progress: Option<Arc<IndicatifProgress>>,
569    /// Output configuration, sent alongside each chunk for the writer.
570    wc: WriteConfig,
571}
572
573/// Spawns the reader thread that parses row chunks and sends them to `sender`.
574///
575/// Any chunk error is returned from the thread so it propagates to the exit
576/// code — chunks must never be silently dropped, as that would corrupt the
577/// output. The returned handle is joined (via [`join_reader`]) by whichever
578/// write strategy drains the channel.
579fn spawn_reader(
580    cfg: ReaderConfig,
581    sender: crossbeam::channel::Sender<(ReadStatData, WriteConfig, usize)>,
582) -> thread::JoinHandle<Result<(), ReadStatError>> {
583    let ReaderConfig {
584        rsp,
585        offsets,
586        parallel,
587        column_filter,
588        original_var_count,
589        total_rows_processed,
590        var_count,
591        vars,
592        schema,
593        progress,
594        wc,
595    } = cfg;
596
597    thread::spawn(move || -> Result<(), ReadStatError> {
598        let offsets_pairs: Vec<_> = offsets.windows(2).collect();
599        let pairs_cnt = offsets_pairs.len();
600
601        let parse_chunk = |w: &[u32]| -> Result<ReadStatData, ReadStatError> {
602            let row_start = w[0];
603            let row_end = w[1];
604
605            let mut d = ReadStatData::new()
606                .set_column_filter(column_filter.clone(), original_var_count)
607                .set_total_rows_processed(total_rows_processed.clone())
608                .init_shared(
609                    var_count,
610                    vars.clone(),
611                    schema.clone(),
612                    row_start,
613                    row_end,
614                );
615
616            if let Some(ref p) = progress {
617                d = d.set_progress(p.clone() as Arc<dyn ProgressCallback>);
618            }
619
620            d.read_data(&rsp)?;
621
622            Ok(d)
623        };
624
625        let send_err = || {
626            ReadStatError::Other(
627                "Error when attempting to send read data for writing".to_string(),
628            )
629        };
630
631        if parallel {
632            // Parse chunks concurrently on the global rayon pool. This buffers
633            // all chunks before sending — output order must be preserved for
634            // the writer, so --parallel trades memory for parse speed.
635            let results: Vec<Result<ReadStatData, ReadStatError>> =
636                offsets_pairs.par_iter().map(|w| parse_chunk(w)).collect();
637
638            for result in results {
639                let d = result?;
640                sender
641                    .send((d, wc.clone(), pairs_cnt))
642                    .map_err(|_| send_err())?;
643            }
644        } else {
645            // Default streaming mode: parse and send one chunk at a time. The
646            // bounded channel provides backpressure, so memory stays at
647            // ~CHANNEL_CAPACITY chunks regardless of file size.
648            for w in &offsets_pairs {
649                let d = parse_chunk(w)?;
650                sender
651                    .send((d, wc.clone(), pairs_cnt))
652                    .map_err(|_| send_err())?;
653            }
654        }
655
656        Ok(())
657    })
658}
659
660/// State shared by every write strategy in [`run_data`].
661///
662/// Owns the channel receiver and the reader-thread handle so it can move into
663/// whichever strategy runs. The metadata fields (`var_count`, `vars`, `schema`)
664/// and `input_path` are used to emit a valid empty file when the input has zero
665/// rows; the SQL path ignores them.
666struct WriteContext {
667    /// Receiver of parsed chunks from the reader thread.
668    rx: crossbeam::channel::Receiver<(ReadStatData, WriteConfig, usize)>,
669    /// Handle to the reader thread, joined before output is finalized.
670    reader: thread::JoinHandle<Result<(), ReadStatError>>,
671    /// Output configuration (path, format, compression).
672    wc: WriteConfig,
673    /// Input file path, for the write summary.
674    input_path: PathBuf,
675    /// Variable count, for emitting an empty file on zero rows.
676    var_count: i32,
677    /// Variable metadata, for emitting an empty file on zero rows.
678    vars: Arc<std::collections::BTreeMap<i32, readstat::ReadStatVarMetadata>>,
679    /// Arrow schema, for emitting an empty file on zero rows.
680    schema: Arc<arrow_schema::Schema>,
681}
682
683/// Joins the reader thread, surfacing either its internal error or a panic.
684///
685/// Must be called after the channel drains and BEFORE finalizing output:
686/// writing a Parquet/Feather footer over missing chunks would produce a
687/// silently-corrupt file with exit code 0.
688fn join_reader(
689    handle: thread::JoinHandle<Result<(), ReadStatError>>,
690) -> Result<(), ReadStatError> {
691    match handle.join() {
692        Ok(res) => res,
693        Err(_) => Err(ReadStatError::Other("Reader thread panicked".to_string())),
694    }
695}
696
697/// Default write path: consume chunks in order, streaming each to the format
698/// writer, then finalize. Memory stays bounded because only the most recent
699/// chunk is retained — kept solely so `finish` can report the row total.
700fn write_sequential(ctx: WriteContext) -> Result<(), ReadStatError> {
701    let WriteContext {
702        rx,
703        reader,
704        wc,
705        input_path,
706        var_count,
707        vars,
708        schema,
709    } = ctx;
710
711    let mut wtr = ReadStatWriter::new();
712
713    // Each chunk replaces `last`, dropping the previous chunk's RecordBatch
714    // memory; `last` is kept so `finish` can report the row total after the
715    // channel drains.
716    let mut last: Option<(ReadStatData, WriteConfig)> = None;
717    for (d, chunk_wc, _pairs_cnt) in rx.iter() {
718        wtr.write(&d, &chunk_wc)?;
719        last = Some((d, chunk_wc));
720    }
721
722    // Check the reader result before finalizing the output file.
723    join_reader(reader)?;
724
725    match last {
726        Some((d, chunk_wc)) => {
727            let rows = wtr.finish(&d, &chunk_wc)?;
728            print_write_summary(rows, &input_path, chunk_wc.out_path());
729        }
730        None => {
731            // Zero rows: still produce a valid header-only/empty file.
732            write_empty_output(var_count, vars, schema, &wc, &input_path)?;
733        }
734    }
735
736    Ok(())
737}
738
739/// Parallel Parquet write path (only for `--parallel --parallel-write` with
740/// Parquet output): write each buffered batch group to a temp file
741/// concurrently, then merge the temp files into the final output.
742#[cfg(feature = "parquet")]
743fn write_parallel_parquet(ctx: WriteContext, buffer_size_bytes: usize) -> Result<(), ReadStatError> {
744    let WriteContext {
745        rx,
746        reader,
747        wc,
748        input_path,
749        var_count,
750        vars,
751        schema,
752    } = ctx;
753
754    let out_path = wc.out_path().map(std::path::Path::to_path_buf);
755    let compression = wc.compression();
756    let compression_level = wc.compression_level();
757
758    let temp_dir = if let Some(out_path) = &out_path {
759        match out_path.parent() {
760            Ok(parent) => parent.to_path_buf(),
761            Err(_) => std::env::current_dir()?,
762        }
763    } else {
764        return Err(ReadStatError::Other(
765            "No output path specified for parallel write".to_string(),
766        ));
767    };
768
769    // Stage temp files inside a uniquely-named RAII directory alongside the
770    // output. The random suffix prevents two concurrent runs in the same
771    // directory from clobbering each other's temp files, and `TempDir`'s Drop
772    // removes the directory (and any leftover temp files) even if we bail out
773    // early via `?` before the merge.
774    let staging = tempfile::Builder::new()
775        .prefix(".readstat-parquet-")
776        .tempdir_in(&temp_dir)?;
777
778    let mut all_temp_files: Vec<PathBuf> = Vec::new();
779    let mut merged_schema: Option<Arc<arrow_schema::Schema>> = None;
780    let mut batch_idx: usize = 0;
781
782    loop {
783        let mut batch_group: Vec<(ReadStatData, WriteConfig, usize)> =
784            Vec::with_capacity(CHANNEL_CAPACITY);
785        for item in &rx {
786            batch_group.push(item);
787            if batch_group.len() >= CHANNEL_CAPACITY {
788                break;
789            }
790        }
791
792        if batch_group.is_empty() {
793            break;
794        }
795
796        if merged_schema.is_none() {
797            merged_schema = Some(batch_group[0].0.schema.clone());
798        }
799        let schema_ref = merged_schema
800            .as_ref()
801            .expect("schema must be set after first batch group");
802
803        let temp_files: Vec<PathBuf> = batch_group
804            .par_iter()
805            .enumerate()
806            .map(|(i, (d, _wc, _))| -> Result<PathBuf, ReadStatError> {
807                let temp_file = staging
808                    .path()
809                    .join(format!("part_{}.parquet", batch_idx + i));
810
811                if let Some(batch) = &d.batch {
812                    ReadStatWriter::write_batch_to_parquet(
813                        batch,
814                        schema_ref,
815                        &temp_file,
816                        compression,
817                        compression_level,
818                        buffer_size_bytes,
819                    )?;
820                }
821
822                Ok(temp_file)
823            })
824            .collect::<Result<Vec<_>, _>>()?;
825
826        batch_idx += batch_group.len();
827        // batch_group is implicitly dropped here at the end of the loop body,
828        // freeing ReadStatData/RecordBatch memory before the next iteration
829        all_temp_files.extend(temp_files);
830    }
831
832    // Check the reader result before producing final output.
833    join_reader(reader)?;
834
835    // Merge all temp files into final output
836    if all_temp_files.is_empty() {
837        // Zero rows: still produce a valid (empty) Parquet file.
838        write_empty_output(var_count, vars, schema, &wc, &input_path)?;
839    } else if let Some(out_path) = &out_path {
840        ReadStatWriter::merge_parquet_files(
841            &all_temp_files,
842            out_path,
843            merged_schema
844                .as_ref()
845                .expect("schema must be set when temp files exist"),
846            compression,
847            compression_level,
848        )?;
849    }
850
851    Ok(())
852}
853
854/// SQL write path: collect every batch, run the query through DataFusion, then
855/// write the result set to the output file.
856#[cfg(feature = "sql")]
857fn write_with_sql(ctx: WriteContext, query: &str, table_name: &str) -> Result<(), ReadStatError> {
858    let WriteContext {
859        rx, reader, wc, schema, ..
860    } = ctx;
861
862    let mut all_batches = Vec::new();
863    for (d, _wc, _) in rx.iter() {
864        if let Some(batch) = d.batch {
865            all_batches.push(batch);
866        }
867    }
868
869    join_reader(reader)?;
870
871    let results = readstat::execute_sql(all_batches, schema, table_name, query)?;
872    if let Some(out_path) = wc.out_path() {
873        readstat::write_sql_results(
874            &results,
875            out_path,
876            wc.format(),
877            wc.compression(),
878            wc.compression_level(),
879        )?;
880    }
881
882    Ok(())
883}