readstat/
rs_write.rs

1//! Output writers for converting Arrow [`RecordBatch`](arrow_array::RecordBatch) data
2//! to CSV, Feather (Arrow IPC), NDJSON, or Parquet format.
3//!
4//! [`ReadStatWriter`] manages the lifecycle of format-specific writers, handling
5//! streaming writes across multiple batches. It also supports metadata output
6//! (pretty-printed or JSON) and parallel Parquet writes via temporary files.
7
8#[cfg(feature = "parquet")]
9use arrow_array::RecordBatch;
10#[cfg(feature = "csv")]
11use arrow_csv::WriterBuilder as CsvWriterBuilder;
12#[cfg(feature = "feather")]
13use arrow_ipc::writer::FileWriter as IpcFileWriter;
14#[cfg(feature = "ndjson")]
15use arrow_json::LineDelimitedWriter as JsonLineDelimitedWriter;
16#[cfg(feature = "parquet")]
17use arrow_schema::Schema;
18#[cfg(feature = "parquet")]
19use parquet::{
20    arrow::ArrowWriter as ParquetArrowWriter, basic::Compression as ParquetCompressionCodec,
21    file::properties::WriterProperties,
22};
23#[cfg(feature = "parquet")]
24use std::fs;
25#[cfg(any(
26    feature = "csv",
27    feature = "feather",
28    feature = "ndjson",
29    feature = "parquet"
30))]
31use std::fs::{File, OpenOptions};
32#[cfg(any(
33    feature = "csv",
34    feature = "feather",
35    feature = "ndjson",
36    feature = "parquet"
37))]
38use std::io::BufWriter;
39#[cfg(feature = "csv")]
40use std::io::stdout;
41#[cfg(feature = "parquet")]
42use std::io::{Seek, SeekFrom};
43#[cfg(any(
44    feature = "csv",
45    feature = "feather",
46    feature = "ndjson",
47    feature = "parquet"
48))]
49use std::path::{Path, PathBuf};
50#[cfg(feature = "parquet")]
51use std::sync::Arc;
52#[cfg(feature = "parquet")]
53use tempfile::SpooledTempFile;
54
55use crate::err::ReadStatError;
56use crate::rs_data::ReadStatData;
57use crate::rs_metadata::ReadStatMetadata;
58use crate::rs_path::ReadStatPath;
59#[cfg(any(
60    feature = "csv",
61    feature = "feather",
62    feature = "ndjson",
63    feature = "parquet"
64))]
65use crate::rs_write_config::OutFormat;
66#[cfg(feature = "parquet")]
67use crate::rs_write_config::ParquetCompression;
68use crate::rs_write_config::WriteConfig;
69
70/// Internal wrapper around the Parquet Arrow writer, allowing ownership transfer on close.
71#[cfg(feature = "parquet")]
72pub(crate) struct ReadStatParquetWriter {
73    wtr: Option<ParquetArrowWriter<BufWriter<std::fs::File>>>,
74}
75
76#[cfg(feature = "parquet")]
77impl ReadStatParquetWriter {
78    fn new(wtr: ParquetArrowWriter<BufWriter<std::fs::File>>) -> Self {
79        Self { wtr: Some(wtr) }
80    }
81}
82
83/// Format-specific writer variant, created lazily on first write.
84pub(crate) enum ReadStatWriterFormat {
85    /// CSV writer to a file.
86    #[cfg(feature = "csv")]
87    Csv(BufWriter<std::fs::File>),
88    /// CSV writer to stdout (used for preview mode without an output file).
89    #[cfg(feature = "csv")]
90    CsvStdout(std::io::Stdout),
91    /// Feather (Arrow IPC) writer.
92    #[cfg(feature = "feather")]
93    Feather(IpcFileWriter<BufWriter<std::fs::File>>),
94    /// Newline-delimited JSON writer.
95    #[cfg(feature = "ndjson")]
96    Ndjson(BufWriter<std::fs::File>),
97    /// Parquet writer.
98    #[cfg(feature = "parquet")]
99    Parquet(ReadStatParquetWriter),
100}
101
102/// Manages writing Arrow [`RecordBatch`] data to the configured output format.
103///
104/// Supports streaming writes across multiple batches. The writer is created lazily
105/// on the first call to [`write`](ReadStatWriter::write) and finalized via
106/// [`finish`](ReadStatWriter::finish).
107#[derive(Default)]
108pub struct ReadStatWriter {
109    /// The format-specific writer, created on first write.
110    pub(crate) wtr: Option<ReadStatWriterFormat>,
111    /// Whether the CSV header row has been written.
112    pub(crate) wrote_header: bool,
113    /// Whether any data has been written (controls file creation vs. append).
114    pub(crate) wrote_start: bool,
115}
116
117impl ReadStatWriter {
118    /// Creates a new `ReadStatWriter` with no active writer.
119    #[must_use]
120    pub const fn new() -> Self {
121        Self {
122            wtr: None,
123            wrote_header: false,
124            wrote_start: false,
125        }
126    }
127
128    /// Opens an output file: creates or truncates on first write, appends on subsequent writes.
129    #[cfg(any(
130        feature = "csv",
131        feature = "feather",
132        feature = "ndjson",
133        feature = "parquet"
134    ))]
135    fn open_output(&self, path: &Path) -> Result<File, ReadStatError> {
136        let f = if self.wrote_start {
137            OpenOptions::new().create(true).append(true).open(path)?
138        } else {
139            OpenOptions::new()
140                .write(true)
141                .create(true)
142                .truncate(true)
143                .open(path)?
144        };
145        Ok(f)
146    }
147
148    /// Write a single batch to a Parquet file (for parallel writes).
149    /// Uses `SpooledTempFile` to keep data in memory until `buffer_size_bytes` threshold.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if compression configuration is invalid, writing fails,
154    /// or the output file cannot be created.
155    #[cfg(feature = "parquet")]
156    pub fn write_batch_to_parquet(
157        batch: &RecordBatch,
158        schema: &Schema,
159        output_path: &Path,
160        compression: Option<ParquetCompression>,
161        compression_level: Option<u32>,
162        buffer_size_bytes: usize,
163    ) -> Result<(), ReadStatError> {
164        // Create a SpooledTempFile that keeps data in memory until buffer_size_bytes
165        let mut spooled_file = SpooledTempFile::new(buffer_size_bytes);
166
167        let compression_codec = Self::resolve_compression(compression, compression_level)?;
168
169        let props = WriterProperties::builder()
170            .set_compression(compression_codec)
171            .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
172            .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0)
173            .build();
174
175        // Write to SpooledTempFile (in memory until threshold, then spills to temp disk file)
176        let mut wtr =
177            ParquetArrowWriter::try_new(&mut spooled_file, Arc::new(schema.clone()), Some(props))?;
178
179        wtr.write(batch)?;
180        wtr.close()?;
181
182        // Now copy from SpooledTempFile to the actual output file
183        spooled_file.seek(SeekFrom::Start(0))?;
184        let mut output_file = OpenOptions::new()
185            .write(true)
186            .create(true)
187            .truncate(true)
188            .open(output_path)?;
189        std::io::copy(&mut spooled_file, &mut output_file)?;
190
191        Ok(())
192    }
193
194    /// Merge multiple Parquet files into one by reading and rewriting all batches.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if any temp file cannot be read, the output file cannot
199    /// be created, or writing fails.
200    #[cfg(feature = "parquet")]
201    pub fn merge_parquet_files(
202        temp_files: &[PathBuf],
203        output_path: &Path,
204        schema: &Schema,
205        compression: Option<ParquetCompression>,
206        compression_level: Option<u32>,
207    ) -> Result<(), ReadStatError> {
208        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
209
210        let f = OpenOptions::new()
211            .write(true)
212            .create(true)
213            .truncate(true)
214            .open(output_path)?;
215
216        let compression_codec = Self::resolve_compression(compression, compression_level)?;
217
218        let props = WriterProperties::builder()
219            .set_compression(compression_codec)
220            .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
221            .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0)
222            .build();
223
224        let mut writer =
225            ParquetArrowWriter::try_new(BufWriter::new(f), Arc::new(schema.clone()), Some(props))?;
226
227        // Read each temp file and write its batches to the final file
228        for temp_file in temp_files {
229            let file = File::open(temp_file)?;
230            let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
231            let reader = builder.build()?;
232
233            for batch in reader {
234                writer.write(&batch?)?;
235            }
236
237            // Clean up temp file
238            fs::remove_file(temp_file)?;
239        }
240
241        writer.close()?;
242        Ok(())
243    }
244
245    #[cfg(feature = "parquet")]
246    fn resolve_compression(
247        compression: Option<ParquetCompression>,
248        compression_level: Option<u32>,
249    ) -> Result<ParquetCompressionCodec, ReadStatError> {
250        crate::rs_write_config::resolve_parquet_compression(compression, compression_level)
251    }
252
253    /// Finalizes the writer, flushing any remaining data and printing a summary.
254    ///
255    /// `in_path` is used for display messages showing the source file name.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if the underlying writer fails to flush or close,
260    /// or if the output format is not enabled.
261    #[allow(unused_variables)]
262    pub fn finish(
263        &mut self,
264        d: &ReadStatData,
265        wc: &WriteConfig,
266        in_path: &std::path::Path,
267    ) -> Result<(), ReadStatError> {
268        match wc.format {
269            #[cfg(feature = "csv")]
270            OutFormat::Csv => {
271                self.print_finish_message(d, wc, in_path);
272                Ok(())
273            }
274            #[cfg(feature = "feather")]
275            OutFormat::Feather => {
276                self.finish_feather()?;
277                self.print_finish_message(d, wc, in_path);
278                Ok(())
279            }
280            #[cfg(feature = "ndjson")]
281            OutFormat::Ndjson => {
282                self.print_finish_message(d, wc, in_path);
283                Ok(())
284            }
285            #[cfg(feature = "parquet")]
286            OutFormat::Parquet => {
287                self.finish_parquet()?;
288                self.print_finish_message(d, wc, in_path);
289                Ok(())
290            }
291            #[allow(unreachable_patterns)]
292            _ => Err(ReadStatError::Other(format!(
293                "Output format {:?} is not enabled. Enable the corresponding feature flag.",
294                wc.format
295            ))),
296        }
297    }
298
299    #[cfg(any(
300        feature = "csv",
301        feature = "feather",
302        feature = "ndjson",
303        feature = "parquet"
304    ))]
305    #[allow(clippy::unused_self)]
306    fn print_finish_message(&self, d: &ReadStatData, wc: &WriteConfig, in_path: &std::path::Path) {
307        let rows = d
308            .total_rows_processed
309            .as_ref()
310            .map_or(0, |trp| trp.load(std::sync::atomic::Ordering::SeqCst));
311
312        let in_f = in_path
313            .file_name()
314            .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
315
316        let out_f = wc
317            .out_path
318            .as_ref()
319            .and_then(|p| p.file_name())
320            .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
321
322        let rows_formatted = format_with_commas(rows);
323        println!("In total, wrote {rows_formatted} rows from file {in_f} into {out_f}");
324    }
325
326    #[cfg(feature = "feather")]
327    fn finish_feather(&mut self) -> Result<(), ReadStatError> {
328        if let Some(ReadStatWriterFormat::Feather(wtr)) = &mut self.wtr {
329            wtr.finish()?;
330            Ok(())
331        } else {
332            Err(ReadStatError::Other(
333                "Error writing feather as associated writer is not for the feather format"
334                    .to_string(),
335            ))
336        }
337    }
338
339    #[cfg(feature = "parquet")]
340    fn finish_parquet(&mut self) -> Result<(), ReadStatError> {
341        if let Some(ReadStatWriterFormat::Parquet(pwtr)) = &mut self.wtr {
342            if let Some(wtr) = pwtr.wtr.take() {
343                wtr.close()?;
344            }
345            Ok(())
346        } else {
347            Err(ReadStatError::Other(
348                "Error writing parquet as associated writer is not for the parquet format"
349                    .to_string(),
350            ))
351        }
352    }
353
354    /// Writes a single batch of data in the format determined by `wc`.
355    ///
356    /// Handles writer initialization on first call and CSV header writing.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the output file cannot be opened, writing fails,
361    /// or the output format is not enabled.
362    #[allow(unused_variables)]
363    pub fn write(&mut self, d: &ReadStatData, wc: &WriteConfig) -> Result<(), ReadStatError> {
364        match wc.format {
365            #[cfg(feature = "csv")]
366            OutFormat::Csv => {
367                if wc.out_path.is_none() {
368                    if self.wrote_header {
369                        self.write_data_to_stdout(d)
370                    } else {
371                        self.write_header_to_stdout(d)?;
372                        self.write_data_to_stdout(d)
373                    }
374                } else {
375                    self.write_data_to_csv(d, wc)
376                }
377            }
378            #[cfg(feature = "feather")]
379            OutFormat::Feather => self.write_data_to_feather(d, wc),
380            #[cfg(feature = "ndjson")]
381            OutFormat::Ndjson => self.write_data_to_ndjson(d, wc),
382            #[cfg(feature = "parquet")]
383            OutFormat::Parquet => self.write_data_to_parquet(d, wc),
384            #[allow(unreachable_patterns)]
385            _ => Err(ReadStatError::Other(format!(
386                "Output format {:?} is not enabled. Enable the corresponding feature flag.",
387                wc.format
388            ))),
389        }
390    }
391
392    #[cfg(feature = "csv")]
393    fn write_data_to_csv(
394        &mut self,
395        d: &ReadStatData,
396        wc: &WriteConfig,
397    ) -> Result<(), ReadStatError> {
398        if let Some(p) = &wc.out_path {
399            let f = self.open_output(p)?;
400
401            // setup writer with BufWriter for better performance
402            if !self.wrote_start {
403                self.wtr = Some(ReadStatWriterFormat::Csv(BufWriter::new(f)));
404            }
405
406            // write
407            if let Some(ReadStatWriterFormat::Csv(f)) = &mut self.wtr {
408                if let Some(batch) = &d.batch {
409                    let include_header = !self.wrote_header;
410                    let mut writer = CsvWriterBuilder::new().with_header(include_header).build(f);
411                    writer.write(batch)?;
412                    self.wrote_header = true;
413                }
414
415                self.wrote_start = true;
416                Ok(())
417            } else {
418                Err(ReadStatError::Other(
419                    "Error writing csv as associated writer is not for the csv format".to_string(),
420                ))
421            }
422        } else {
423            Err(ReadStatError::Other(
424                "Error writing csv as output path is set to None".to_string(),
425            ))
426        }
427    }
428
429    #[cfg(feature = "feather")]
430    fn write_data_to_feather(
431        &mut self,
432        d: &ReadStatData,
433        wc: &WriteConfig,
434    ) -> Result<(), ReadStatError> {
435        if let Some(p) = &wc.out_path {
436            let f = self.open_output(p)?;
437
438            // setup writer with BufWriter for better performance
439            if !self.wrote_start {
440                let wtr = IpcFileWriter::try_new(BufWriter::new(f), &d.schema)?;
441                self.wtr = Some(ReadStatWriterFormat::Feather(wtr));
442            }
443
444            // write
445            if let Some(ReadStatWriterFormat::Feather(wtr)) = &mut self.wtr {
446                if let Some(batch) = &d.batch {
447                    wtr.write(batch)?;
448                }
449
450                self.wrote_start = true;
451
452                Ok(())
453            } else {
454                Err(ReadStatError::Other(
455                    "Error writing feather as associated writer is not for the feather format"
456                        .to_string(),
457                ))
458            }
459        } else {
460            Err(ReadStatError::Other(
461                "Error writing feather file as output path is set to None".to_string(),
462            ))
463        }
464    }
465
466    #[cfg(feature = "ndjson")]
467    fn write_data_to_ndjson(
468        &mut self,
469        d: &ReadStatData,
470        wc: &WriteConfig,
471    ) -> Result<(), ReadStatError> {
472        if let Some(p) = &wc.out_path {
473            let f = self.open_output(p)?;
474
475            // setup writer with BufWriter for better performance
476            if !self.wrote_start {
477                self.wtr = Some(ReadStatWriterFormat::Ndjson(BufWriter::new(f)));
478            }
479
480            // write
481            if let Some(ReadStatWriterFormat::Ndjson(f)) = &mut self.wtr {
482                if let Some(batch) = &d.batch {
483                    let mut writer = JsonLineDelimitedWriter::new(f);
484                    writer.write(batch)?;
485                    writer.finish()?;
486                }
487
488                self.wrote_start = true;
489
490                Ok(())
491            } else {
492                Err(ReadStatError::Other(
493                    "Error writing ndjson as associated writer is not for the ndjson format"
494                        .to_string(),
495                ))
496            }
497        } else {
498            Err(ReadStatError::Other(
499                "Error writing ndjson file as output path is set to None".to_string(),
500            ))
501        }
502    }
503
504    #[cfg(feature = "parquet")]
505    fn write_data_to_parquet(
506        &mut self,
507        d: &ReadStatData,
508        wc: &WriteConfig,
509    ) -> Result<(), ReadStatError> {
510        if let Some(p) = &wc.out_path {
511            let f = self.open_output(p)?;
512
513            // setup writer
514            if !self.wrote_start {
515                let compression_codec =
516                    Self::resolve_compression(wc.compression, wc.compression_level)?;
517
518                let props = WriterProperties::builder()
519                    .set_compression(compression_codec)
520                    .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
521                    .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0)
522                    .build();
523
524                let wtr =
525                    ParquetArrowWriter::try_new(BufWriter::new(f), d.schema.clone(), Some(props))?;
526
527                self.wtr = Some(ReadStatWriterFormat::Parquet(ReadStatParquetWriter::new(
528                    wtr,
529                )));
530            }
531
532            // write
533            if let Some(ReadStatWriterFormat::Parquet(pwtr)) = &mut self.wtr {
534                if let Some(batch) = &d.batch
535                    && let Some(ref mut wtr) = pwtr.wtr
536                {
537                    wtr.write(batch)?;
538                }
539
540                self.wrote_start = true;
541
542                Ok(())
543            } else {
544                Err(ReadStatError::Other(
545                    "Error writing parquet as associated writer is not for the parquet format"
546                        .to_string(),
547                ))
548            }
549        } else {
550            Err(ReadStatError::Other(
551                "Error writing parquet file as output path is set to None".to_string(),
552            ))
553        }
554    }
555
556    #[cfg(feature = "csv")]
557    fn write_data_to_stdout(&mut self, d: &ReadStatData) -> Result<(), ReadStatError> {
558        // writer setup
559        if !self.wrote_start {
560            self.wtr = Some(ReadStatWriterFormat::CsvStdout(stdout()));
561        }
562
563        // write
564        if let Some(ReadStatWriterFormat::CsvStdout(f)) = &mut self.wtr {
565            if let Some(batch) = &d.batch {
566                let mut writer = CsvWriterBuilder::new().with_header(false).build(f);
567                writer.write(batch)?;
568            }
569
570            self.wrote_start = true;
571
572            Ok(())
573        } else {
574            Err(ReadStatError::Other(
575                "Error writing to csv as associated writer is not for the csv format".to_string(),
576            ))
577        }
578    }
579
580    #[cfg(feature = "csv")]
581    #[allow(clippy::unnecessary_wraps)]
582    fn write_header_to_stdout(&mut self, d: &ReadStatData) -> Result<(), ReadStatError> {
583        let vars: Vec<String> = d.vars.values().map(|m| m.var_name.clone()).collect();
584
585        println!("{}", vars.join(","));
586
587        self.wrote_header = true;
588
589        Ok(())
590    }
591
592    /// Writes metadata to stdout (pretty-printed) or as JSON.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if JSON serialization fails.
597    pub fn write_metadata(
598        &self,
599        md: &ReadStatMetadata,
600        rsp: &ReadStatPath,
601        as_json: bool,
602    ) -> Result<(), ReadStatError> {
603        if as_json {
604            self.write_metadata_to_json(md)
605        } else {
606            self.write_metadata_to_stdout(md, rsp)
607        }
608    }
609
610    /// Serializes metadata as pretty-printed JSON and writes to stdout.
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if JSON serialization fails.
615    pub fn write_metadata_to_json(&self, md: &ReadStatMetadata) -> Result<(), ReadStatError> {
616        let s = serde_json::to_string_pretty(md)?;
617        println!("{s}");
618        Ok(())
619    }
620
621    /// Writes metadata to stdout in a human-readable format.
622    ///
623    /// # Errors
624    ///
625    /// This method currently always succeeds but returns `Result` for
626    /// consistency with other writer methods.
627    #[allow(clippy::cast_sign_loss, clippy::unnecessary_wraps)]
628    pub fn write_metadata_to_stdout(
629        &self,
630        md: &ReadStatMetadata,
631        rsp: &ReadStatPath,
632    ) -> Result<(), ReadStatError> {
633        use crate::rs_var::ReadStatVarFormatClass;
634
635        println!("Metadata for the file {}\n", rsp.path.to_string_lossy());
636        println!("Row count: {}", md.row_count);
637        println!("Variable count: {}", md.var_count);
638        println!("Table name: {}", md.table_name);
639        println!("Table label: {}", md.file_label);
640        println!("File encoding: {}", md.file_encoding);
641        println!("Format version: {}", md.version);
642        println!(
643            "Bitness: {}",
644            if md.is64bit == 0 { "32-bit" } else { "64-bit" }
645        );
646        println!("Creation time: {}", md.creation_time);
647        println!("Modified time: {}", md.modified_time);
648        println!("Compression: {:#?}", md.compression);
649        println!("Byte order: {:#?}", md.endianness);
650        println!("Variable names:");
651        for (k, v) in &md.vars {
652            let format_class = v.var_format_class.as_ref().map_or("", |f| match f {
653                ReadStatVarFormatClass::Date => "Date",
654                ReadStatVarFormatClass::DateTime
655                | ReadStatVarFormatClass::DateTimeWithMilliseconds
656                | ReadStatVarFormatClass::DateTimeWithMicroseconds
657                | ReadStatVarFormatClass::DateTimeWithNanoseconds => "DateTime",
658                ReadStatVarFormatClass::Time | ReadStatVarFormatClass::TimeWithMicroseconds => {
659                    "Time"
660                }
661            });
662            let data_type = md.schema.fields[*k as usize].data_type();
663            println!(
664                "{k}: {} {{ type class: {:#?}, type: {:#?}, label: {}, format class: {format_class}, format: {}, arrow logical data type: {data_type:#?}, arrow physical data type: {data_type:#?} }}",
665                v.var_name, v.var_type_class, v.var_type, v.var_label, v.var_format,
666            );
667        }
668
669        Ok(())
670    }
671}
672
673/// Serialize a [`RecordBatch`](arrow_array::RecordBatch) to CSV bytes (with header).
674///
675/// # Errors
676///
677/// Returns an error if CSV writing fails.
678#[cfg(feature = "csv")]
679pub fn write_batch_to_csv_bytes(
680    batch: &arrow_array::RecordBatch,
681) -> Result<Vec<u8>, ReadStatError> {
682    let mut buf = Vec::new();
683    let mut writer = CsvWriterBuilder::new().with_header(true).build(&mut buf);
684    writer.write(batch)?;
685    drop(writer);
686    Ok(buf)
687}
688
689/// Serialize a [`RecordBatch`](arrow_array::RecordBatch) to NDJSON bytes.
690///
691/// # Errors
692///
693/// Returns an error if JSON writing fails.
694#[cfg(feature = "ndjson")]
695pub fn write_batch_to_ndjson_bytes(
696    batch: &arrow_array::RecordBatch,
697) -> Result<Vec<u8>, ReadStatError> {
698    let mut buf = Vec::new();
699    let mut writer = JsonLineDelimitedWriter::new(&mut buf);
700    writer.write(batch)?;
701    writer.finish()?;
702    Ok(buf)
703}
704
705/// Serialize a [`RecordBatch`](arrow_array::RecordBatch) to Parquet bytes with Snappy compression.
706///
707/// # Errors
708///
709/// Returns an error if Parquet writing fails.
710#[cfg(feature = "parquet")]
711pub fn write_batch_to_parquet_bytes(batch: &RecordBatch) -> Result<Vec<u8>, ReadStatError> {
712    let mut buf = Vec::new();
713    let props = WriterProperties::builder()
714        .set_compression(ParquetCompressionCodec::SNAPPY)
715        .build();
716    let mut writer = ParquetArrowWriter::try_new(&mut buf, batch.schema(), Some(props))?;
717    writer.write(batch)?;
718    writer.close()?;
719    Ok(buf)
720}
721
722/// Serialize a [`RecordBatch`](arrow_array::RecordBatch) to Feather (Arrow IPC) bytes.
723///
724/// # Errors
725///
726/// Returns an error if Feather/IPC writing fails.
727#[cfg(feature = "feather")]
728pub fn write_batch_to_feather_bytes(
729    batch: &arrow_array::RecordBatch,
730) -> Result<Vec<u8>, ReadStatError> {
731    let mut buf = Vec::new();
732    let mut writer = IpcFileWriter::try_new(&mut buf, &batch.schema())?;
733    writer.write(batch)?;
734    writer.finish()?;
735    Ok(buf)
736}
737
738/// Formats a number with comma thousands separators (e.g. 1081 -> "1,081").
739#[cfg(any(
740    feature = "csv",
741    feature = "feather",
742    feature = "ndjson",
743    feature = "parquet"
744))]
745fn format_with_commas(n: usize) -> String {
746    let s = n.to_string();
747    let bytes = s.as_bytes();
748    let len = bytes.len();
749    if len <= 3 {
750        return s;
751    }
752    let mut result = String::with_capacity(len + len / 3);
753    for (i, &b) in bytes.iter().enumerate() {
754        if i > 0 && (len - i) % 3 == 0 {
755            result.push(',');
756        }
757        result.push(b as char);
758    }
759    result
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765
766    // --- resolve_compression ---
767
768    #[test]
769    fn resolve_compression_none_defaults_to_snappy() {
770        let codec = ReadStatWriter::resolve_compression(None, None).unwrap();
771        assert!(matches!(codec, ParquetCompressionCodec::SNAPPY));
772    }
773
774    #[test]
775    fn resolve_compression_uncompressed() {
776        let codec =
777            ReadStatWriter::resolve_compression(Some(ParquetCompression::Uncompressed), None)
778                .unwrap();
779        assert!(matches!(codec, ParquetCompressionCodec::UNCOMPRESSED));
780    }
781
782    #[test]
783    fn resolve_compression_snappy() {
784        let codec =
785            ReadStatWriter::resolve_compression(Some(ParquetCompression::Snappy), None).unwrap();
786        assert!(matches!(codec, ParquetCompressionCodec::SNAPPY));
787    }
788
789    #[test]
790    fn resolve_compression_lz4raw() {
791        let codec =
792            ReadStatWriter::resolve_compression(Some(ParquetCompression::Lz4Raw), None).unwrap();
793        assert!(matches!(codec, ParquetCompressionCodec::LZ4_RAW));
794    }
795
796    #[test]
797    fn resolve_compression_gzip_default() {
798        let codec =
799            ReadStatWriter::resolve_compression(Some(ParquetCompression::Gzip), None).unwrap();
800        assert!(matches!(codec, ParquetCompressionCodec::GZIP(_)));
801    }
802
803    #[test]
804    fn resolve_compression_gzip_with_level() {
805        let codec =
806            ReadStatWriter::resolve_compression(Some(ParquetCompression::Gzip), Some(5)).unwrap();
807        assert!(matches!(codec, ParquetCompressionCodec::GZIP(_)));
808    }
809
810    #[test]
811    fn resolve_compression_brotli_default() {
812        let codec =
813            ReadStatWriter::resolve_compression(Some(ParquetCompression::Brotli), None).unwrap();
814        assert!(matches!(codec, ParquetCompressionCodec::BROTLI(_)));
815    }
816
817    #[test]
818    fn resolve_compression_brotli_with_level() {
819        let codec =
820            ReadStatWriter::resolve_compression(Some(ParquetCompression::Brotli), Some(8)).unwrap();
821        assert!(matches!(codec, ParquetCompressionCodec::BROTLI(_)));
822    }
823
824    #[test]
825    fn resolve_compression_zstd_default() {
826        let codec =
827            ReadStatWriter::resolve_compression(Some(ParquetCompression::Zstd), None).unwrap();
828        assert!(matches!(codec, ParquetCompressionCodec::ZSTD(_)));
829    }
830
831    #[test]
832    fn resolve_compression_zstd_with_level() {
833        let codec =
834            ReadStatWriter::resolve_compression(Some(ParquetCompression::Zstd), Some(15)).unwrap();
835        assert!(matches!(codec, ParquetCompressionCodec::ZSTD(_)));
836    }
837
838    // --- ReadStatWriter::new ---
839
840    #[test]
841    fn new_writer_defaults() {
842        let wtr = ReadStatWriter::new();
843        assert!(wtr.wtr.is_none());
844        assert!(!wtr.wrote_header);
845        assert!(!wtr.wrote_start);
846    }
847}