1#[cfg(feature = "parquet")]
9use arrow_array::RecordBatch;
10#[cfg(feature = "csv")]
11use arrow_csv::WriterBuilder as CsvWriterBuilder;
12#[cfg(feature = "feather")]
13use arrow_ipc::writer::FileWriter as IpcFileWriter;
14#[cfg(feature = "ndjson")]
15use arrow_json::LineDelimitedWriter as JsonLineDelimitedWriter;
16#[cfg(feature = "parquet")]
17use arrow_schema::Schema;
18#[cfg(feature = "parquet")]
19use parquet::{
20 arrow::ArrowWriter as ParquetArrowWriter, basic::Compression as ParquetCompressionCodec,
21 file::properties::WriterProperties,
22};
23#[cfg(feature = "parquet")]
24use std::fs;
25#[cfg(any(
26 feature = "csv",
27 feature = "feather",
28 feature = "ndjson",
29 feature = "parquet"
30))]
31use std::fs::{File, OpenOptions};
32#[cfg(any(
33 feature = "csv",
34 feature = "feather",
35 feature = "ndjson",
36 feature = "parquet"
37))]
38use std::io::BufWriter;
39#[cfg(feature = "csv")]
40use std::io::stdout;
41#[cfg(feature = "parquet")]
42use std::io::{Seek, SeekFrom};
43#[cfg(any(
44 feature = "csv",
45 feature = "feather",
46 feature = "ndjson",
47 feature = "parquet"
48))]
49use std::path::{Path, PathBuf};
50#[cfg(feature = "parquet")]
51use std::sync::Arc;
52#[cfg(feature = "parquet")]
53use tempfile::SpooledTempFile;
54
55use crate::err::ReadStatError;
56use crate::rs_data::ReadStatData;
57use crate::rs_metadata::ReadStatMetadata;
58use crate::rs_path::ReadStatPath;
59#[cfg(any(
60 feature = "csv",
61 feature = "feather",
62 feature = "ndjson",
63 feature = "parquet"
64))]
65use crate::rs_write_config::OutFormat;
66#[cfg(feature = "parquet")]
67use crate::rs_write_config::ParquetCompression;
68use crate::rs_write_config::WriteConfig;
69
70#[cfg(feature = "parquet")]
72pub(crate) struct ReadStatParquetWriter {
73 wtr: Option<ParquetArrowWriter<BufWriter<std::fs::File>>>,
74}
75
76#[cfg(feature = "parquet")]
77impl ReadStatParquetWriter {
78 fn new(wtr: ParquetArrowWriter<BufWriter<std::fs::File>>) -> Self {
79 Self { wtr: Some(wtr) }
80 }
81}
82
83pub(crate) enum ReadStatWriterFormat {
85 #[cfg(feature = "csv")]
87 Csv(BufWriter<std::fs::File>),
88 #[cfg(feature = "csv")]
90 CsvStdout(std::io::Stdout),
91 #[cfg(feature = "feather")]
93 Feather(IpcFileWriter<BufWriter<std::fs::File>>),
94 #[cfg(feature = "ndjson")]
96 Ndjson(BufWriter<std::fs::File>),
97 #[cfg(feature = "parquet")]
99 Parquet(ReadStatParquetWriter),
100}
101
102#[derive(Default)]
108pub struct ReadStatWriter {
109 pub(crate) wtr: Option<ReadStatWriterFormat>,
111 pub(crate) wrote_header: bool,
113 pub(crate) wrote_start: bool,
115}
116
117impl ReadStatWriter {
118 #[must_use]
120 pub const fn new() -> Self {
121 Self {
122 wtr: None,
123 wrote_header: false,
124 wrote_start: false,
125 }
126 }
127
128 #[cfg(any(
130 feature = "csv",
131 feature = "feather",
132 feature = "ndjson",
133 feature = "parquet"
134 ))]
135 fn open_output(&self, path: &Path) -> Result<File, ReadStatError> {
136 let f = if self.wrote_start {
137 OpenOptions::new().create(true).append(true).open(path)?
138 } else {
139 OpenOptions::new()
140 .write(true)
141 .create(true)
142 .truncate(true)
143 .open(path)?
144 };
145 Ok(f)
146 }
147
148 #[cfg(feature = "parquet")]
156 pub fn write_batch_to_parquet(
157 batch: &RecordBatch,
158 schema: &Schema,
159 output_path: &Path,
160 compression: Option<ParquetCompression>,
161 compression_level: Option<u32>,
162 buffer_size_bytes: usize,
163 ) -> Result<(), ReadStatError> {
164 let mut spooled_file = SpooledTempFile::new(buffer_size_bytes);
166
167 let compression_codec = Self::resolve_compression(compression, compression_level)?;
168
169 let props = WriterProperties::builder()
170 .set_compression(compression_codec)
171 .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
172 .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0)
173 .build();
174
175 let mut wtr =
177 ParquetArrowWriter::try_new(&mut spooled_file, Arc::new(schema.clone()), Some(props))?;
178
179 wtr.write(batch)?;
180 wtr.close()?;
181
182 spooled_file.seek(SeekFrom::Start(0))?;
184 let mut output_file = OpenOptions::new()
185 .write(true)
186 .create(true)
187 .truncate(true)
188 .open(output_path)?;
189 std::io::copy(&mut spooled_file, &mut output_file)?;
190
191 Ok(())
192 }
193
194 #[cfg(feature = "parquet")]
201 pub fn merge_parquet_files(
202 temp_files: &[PathBuf],
203 output_path: &Path,
204 schema: &Schema,
205 compression: Option<ParquetCompression>,
206 compression_level: Option<u32>,
207 ) -> Result<(), ReadStatError> {
208 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
209
210 let f = OpenOptions::new()
211 .write(true)
212 .create(true)
213 .truncate(true)
214 .open(output_path)?;
215
216 let compression_codec = Self::resolve_compression(compression, compression_level)?;
217
218 let props = WriterProperties::builder()
219 .set_compression(compression_codec)
220 .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
221 .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0)
222 .build();
223
224 let mut writer =
225 ParquetArrowWriter::try_new(BufWriter::new(f), Arc::new(schema.clone()), Some(props))?;
226
227 for temp_file in temp_files {
229 let file = File::open(temp_file)?;
230 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
231 let reader = builder.build()?;
232
233 for batch in reader {
234 writer.write(&batch?)?;
235 }
236
237 fs::remove_file(temp_file)?;
239 }
240
241 writer.close()?;
242 Ok(())
243 }
244
245 #[cfg(feature = "parquet")]
246 fn resolve_compression(
247 compression: Option<ParquetCompression>,
248 compression_level: Option<u32>,
249 ) -> Result<ParquetCompressionCodec, ReadStatError> {
250 crate::rs_write_config::resolve_parquet_compression(compression, compression_level)
251 }
252
253 #[allow(unused_variables)]
262 pub fn finish(
263 &mut self,
264 d: &ReadStatData,
265 wc: &WriteConfig,
266 in_path: &std::path::Path,
267 ) -> Result<(), ReadStatError> {
268 match wc.format {
269 #[cfg(feature = "csv")]
270 OutFormat::Csv => {
271 self.print_finish_message(d, wc, in_path);
272 Ok(())
273 }
274 #[cfg(feature = "feather")]
275 OutFormat::Feather => {
276 self.finish_feather()?;
277 self.print_finish_message(d, wc, in_path);
278 Ok(())
279 }
280 #[cfg(feature = "ndjson")]
281 OutFormat::Ndjson => {
282 self.print_finish_message(d, wc, in_path);
283 Ok(())
284 }
285 #[cfg(feature = "parquet")]
286 OutFormat::Parquet => {
287 self.finish_parquet()?;
288 self.print_finish_message(d, wc, in_path);
289 Ok(())
290 }
291 #[allow(unreachable_patterns)]
292 _ => Err(ReadStatError::Other(format!(
293 "Output format {:?} is not enabled. Enable the corresponding feature flag.",
294 wc.format
295 ))),
296 }
297 }
298
299 #[cfg(any(
300 feature = "csv",
301 feature = "feather",
302 feature = "ndjson",
303 feature = "parquet"
304 ))]
305 #[allow(clippy::unused_self)]
306 fn print_finish_message(&self, d: &ReadStatData, wc: &WriteConfig, in_path: &std::path::Path) {
307 let rows = d
308 .total_rows_processed
309 .as_ref()
310 .map_or(0, |trp| trp.load(std::sync::atomic::Ordering::SeqCst));
311
312 let in_f = in_path
313 .file_name()
314 .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
315
316 let out_f = wc
317 .out_path
318 .as_ref()
319 .and_then(|p| p.file_name())
320 .map_or_else(|| "___".to_string(), |f| f.to_string_lossy().to_string());
321
322 let rows_formatted = format_with_commas(rows);
323 println!("In total, wrote {rows_formatted} rows from file {in_f} into {out_f}");
324 }
325
326 #[cfg(feature = "feather")]
327 fn finish_feather(&mut self) -> Result<(), ReadStatError> {
328 if let Some(ReadStatWriterFormat::Feather(wtr)) = &mut self.wtr {
329 wtr.finish()?;
330 Ok(())
331 } else {
332 Err(ReadStatError::Other(
333 "Error writing feather as associated writer is not for the feather format"
334 .to_string(),
335 ))
336 }
337 }
338
339 #[cfg(feature = "parquet")]
340 fn finish_parquet(&mut self) -> Result<(), ReadStatError> {
341 if let Some(ReadStatWriterFormat::Parquet(pwtr)) = &mut self.wtr {
342 if let Some(wtr) = pwtr.wtr.take() {
343 wtr.close()?;
344 }
345 Ok(())
346 } else {
347 Err(ReadStatError::Other(
348 "Error writing parquet as associated writer is not for the parquet format"
349 .to_string(),
350 ))
351 }
352 }
353
354 #[allow(unused_variables)]
363 pub fn write(&mut self, d: &ReadStatData, wc: &WriteConfig) -> Result<(), ReadStatError> {
364 match wc.format {
365 #[cfg(feature = "csv")]
366 OutFormat::Csv => {
367 if wc.out_path.is_none() {
368 if self.wrote_header {
369 self.write_data_to_stdout(d)
370 } else {
371 self.write_header_to_stdout(d)?;
372 self.write_data_to_stdout(d)
373 }
374 } else {
375 self.write_data_to_csv(d, wc)
376 }
377 }
378 #[cfg(feature = "feather")]
379 OutFormat::Feather => self.write_data_to_feather(d, wc),
380 #[cfg(feature = "ndjson")]
381 OutFormat::Ndjson => self.write_data_to_ndjson(d, wc),
382 #[cfg(feature = "parquet")]
383 OutFormat::Parquet => self.write_data_to_parquet(d, wc),
384 #[allow(unreachable_patterns)]
385 _ => Err(ReadStatError::Other(format!(
386 "Output format {:?} is not enabled. Enable the corresponding feature flag.",
387 wc.format
388 ))),
389 }
390 }
391
392 #[cfg(feature = "csv")]
393 fn write_data_to_csv(
394 &mut self,
395 d: &ReadStatData,
396 wc: &WriteConfig,
397 ) -> Result<(), ReadStatError> {
398 if let Some(p) = &wc.out_path {
399 let f = self.open_output(p)?;
400
401 if !self.wrote_start {
403 self.wtr = Some(ReadStatWriterFormat::Csv(BufWriter::new(f)));
404 }
405
406 if let Some(ReadStatWriterFormat::Csv(f)) = &mut self.wtr {
408 if let Some(batch) = &d.batch {
409 let include_header = !self.wrote_header;
410 let mut writer = CsvWriterBuilder::new().with_header(include_header).build(f);
411 writer.write(batch)?;
412 self.wrote_header = true;
413 }
414
415 self.wrote_start = true;
416 Ok(())
417 } else {
418 Err(ReadStatError::Other(
419 "Error writing csv as associated writer is not for the csv format".to_string(),
420 ))
421 }
422 } else {
423 Err(ReadStatError::Other(
424 "Error writing csv as output path is set to None".to_string(),
425 ))
426 }
427 }
428
429 #[cfg(feature = "feather")]
430 fn write_data_to_feather(
431 &mut self,
432 d: &ReadStatData,
433 wc: &WriteConfig,
434 ) -> Result<(), ReadStatError> {
435 if let Some(p) = &wc.out_path {
436 let f = self.open_output(p)?;
437
438 if !self.wrote_start {
440 let wtr = IpcFileWriter::try_new(BufWriter::new(f), &d.schema)?;
441 self.wtr = Some(ReadStatWriterFormat::Feather(wtr));
442 }
443
444 if let Some(ReadStatWriterFormat::Feather(wtr)) = &mut self.wtr {
446 if let Some(batch) = &d.batch {
447 wtr.write(batch)?;
448 }
449
450 self.wrote_start = true;
451
452 Ok(())
453 } else {
454 Err(ReadStatError::Other(
455 "Error writing feather as associated writer is not for the feather format"
456 .to_string(),
457 ))
458 }
459 } else {
460 Err(ReadStatError::Other(
461 "Error writing feather file as output path is set to None".to_string(),
462 ))
463 }
464 }
465
466 #[cfg(feature = "ndjson")]
467 fn write_data_to_ndjson(
468 &mut self,
469 d: &ReadStatData,
470 wc: &WriteConfig,
471 ) -> Result<(), ReadStatError> {
472 if let Some(p) = &wc.out_path {
473 let f = self.open_output(p)?;
474
475 if !self.wrote_start {
477 self.wtr = Some(ReadStatWriterFormat::Ndjson(BufWriter::new(f)));
478 }
479
480 if let Some(ReadStatWriterFormat::Ndjson(f)) = &mut self.wtr {
482 if let Some(batch) = &d.batch {
483 let mut writer = JsonLineDelimitedWriter::new(f);
484 writer.write(batch)?;
485 writer.finish()?;
486 }
487
488 self.wrote_start = true;
489
490 Ok(())
491 } else {
492 Err(ReadStatError::Other(
493 "Error writing ndjson as associated writer is not for the ndjson format"
494 .to_string(),
495 ))
496 }
497 } else {
498 Err(ReadStatError::Other(
499 "Error writing ndjson file as output path is set to None".to_string(),
500 ))
501 }
502 }
503
504 #[cfg(feature = "parquet")]
505 fn write_data_to_parquet(
506 &mut self,
507 d: &ReadStatData,
508 wc: &WriteConfig,
509 ) -> Result<(), ReadStatError> {
510 if let Some(p) = &wc.out_path {
511 let f = self.open_output(p)?;
512
513 if !self.wrote_start {
515 let compression_codec =
516 Self::resolve_compression(wc.compression, wc.compression_level)?;
517
518 let props = WriterProperties::builder()
519 .set_compression(compression_codec)
520 .set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
521 .set_writer_version(parquet::file::properties::WriterVersion::PARQUET_2_0)
522 .build();
523
524 let wtr =
525 ParquetArrowWriter::try_new(BufWriter::new(f), d.schema.clone(), Some(props))?;
526
527 self.wtr = Some(ReadStatWriterFormat::Parquet(ReadStatParquetWriter::new(
528 wtr,
529 )));
530 }
531
532 if let Some(ReadStatWriterFormat::Parquet(pwtr)) = &mut self.wtr {
534 if let Some(batch) = &d.batch
535 && let Some(ref mut wtr) = pwtr.wtr
536 {
537 wtr.write(batch)?;
538 }
539
540 self.wrote_start = true;
541
542 Ok(())
543 } else {
544 Err(ReadStatError::Other(
545 "Error writing parquet as associated writer is not for the parquet format"
546 .to_string(),
547 ))
548 }
549 } else {
550 Err(ReadStatError::Other(
551 "Error writing parquet file as output path is set to None".to_string(),
552 ))
553 }
554 }
555
556 #[cfg(feature = "csv")]
557 fn write_data_to_stdout(&mut self, d: &ReadStatData) -> Result<(), ReadStatError> {
558 if !self.wrote_start {
560 self.wtr = Some(ReadStatWriterFormat::CsvStdout(stdout()));
561 }
562
563 if let Some(ReadStatWriterFormat::CsvStdout(f)) = &mut self.wtr {
565 if let Some(batch) = &d.batch {
566 let mut writer = CsvWriterBuilder::new().with_header(false).build(f);
567 writer.write(batch)?;
568 }
569
570 self.wrote_start = true;
571
572 Ok(())
573 } else {
574 Err(ReadStatError::Other(
575 "Error writing to csv as associated writer is not for the csv format".to_string(),
576 ))
577 }
578 }
579
580 #[cfg(feature = "csv")]
581 #[allow(clippy::unnecessary_wraps)]
582 fn write_header_to_stdout(&mut self, d: &ReadStatData) -> Result<(), ReadStatError> {
583 let vars: Vec<String> = d.vars.values().map(|m| m.var_name.clone()).collect();
584
585 println!("{}", vars.join(","));
586
587 self.wrote_header = true;
588
589 Ok(())
590 }
591
592 pub fn write_metadata(
598 &self,
599 md: &ReadStatMetadata,
600 rsp: &ReadStatPath,
601 as_json: bool,
602 ) -> Result<(), ReadStatError> {
603 if as_json {
604 self.write_metadata_to_json(md)
605 } else {
606 self.write_metadata_to_stdout(md, rsp)
607 }
608 }
609
610 pub fn write_metadata_to_json(&self, md: &ReadStatMetadata) -> Result<(), ReadStatError> {
616 let s = serde_json::to_string_pretty(md)?;
617 println!("{s}");
618 Ok(())
619 }
620
621 #[allow(clippy::cast_sign_loss, clippy::unnecessary_wraps)]
628 pub fn write_metadata_to_stdout(
629 &self,
630 md: &ReadStatMetadata,
631 rsp: &ReadStatPath,
632 ) -> Result<(), ReadStatError> {
633 use crate::rs_var::ReadStatVarFormatClass;
634
635 println!("Metadata for the file {}\n", rsp.path.to_string_lossy());
636 println!("Row count: {}", md.row_count);
637 println!("Variable count: {}", md.var_count);
638 println!("Table name: {}", md.table_name);
639 println!("Table label: {}", md.file_label);
640 println!("File encoding: {}", md.file_encoding);
641 println!("Format version: {}", md.version);
642 println!(
643 "Bitness: {}",
644 if md.is64bit == 0 { "32-bit" } else { "64-bit" }
645 );
646 println!("Creation time: {}", md.creation_time);
647 println!("Modified time: {}", md.modified_time);
648 println!("Compression: {:#?}", md.compression);
649 println!("Byte order: {:#?}", md.endianness);
650 println!("Variable names:");
651 for (k, v) in &md.vars {
652 let format_class = v.var_format_class.as_ref().map_or("", |f| match f {
653 ReadStatVarFormatClass::Date => "Date",
654 ReadStatVarFormatClass::DateTime
655 | ReadStatVarFormatClass::DateTimeWithMilliseconds
656 | ReadStatVarFormatClass::DateTimeWithMicroseconds
657 | ReadStatVarFormatClass::DateTimeWithNanoseconds => "DateTime",
658 ReadStatVarFormatClass::Time | ReadStatVarFormatClass::TimeWithMicroseconds => {
659 "Time"
660 }
661 });
662 let data_type = md.schema.fields[*k as usize].data_type();
663 println!(
664 "{k}: {} {{ type class: {:#?}, type: {:#?}, label: {}, format class: {format_class}, format: {}, arrow logical data type: {data_type:#?}, arrow physical data type: {data_type:#?} }}",
665 v.var_name, v.var_type_class, v.var_type, v.var_label, v.var_format,
666 );
667 }
668
669 Ok(())
670 }
671}
672
673#[cfg(feature = "csv")]
679pub fn write_batch_to_csv_bytes(
680 batch: &arrow_array::RecordBatch,
681) -> Result<Vec<u8>, ReadStatError> {
682 let mut buf = Vec::new();
683 let mut writer = CsvWriterBuilder::new().with_header(true).build(&mut buf);
684 writer.write(batch)?;
685 drop(writer);
686 Ok(buf)
687}
688
689#[cfg(feature = "ndjson")]
695pub fn write_batch_to_ndjson_bytes(
696 batch: &arrow_array::RecordBatch,
697) -> Result<Vec<u8>, ReadStatError> {
698 let mut buf = Vec::new();
699 let mut writer = JsonLineDelimitedWriter::new(&mut buf);
700 writer.write(batch)?;
701 writer.finish()?;
702 Ok(buf)
703}
704
705#[cfg(feature = "parquet")]
711pub fn write_batch_to_parquet_bytes(batch: &RecordBatch) -> Result<Vec<u8>, ReadStatError> {
712 let mut buf = Vec::new();
713 let props = WriterProperties::builder()
714 .set_compression(ParquetCompressionCodec::SNAPPY)
715 .build();
716 let mut writer = ParquetArrowWriter::try_new(&mut buf, batch.schema(), Some(props))?;
717 writer.write(batch)?;
718 writer.close()?;
719 Ok(buf)
720}
721
722#[cfg(feature = "feather")]
728pub fn write_batch_to_feather_bytes(
729 batch: &arrow_array::RecordBatch,
730) -> Result<Vec<u8>, ReadStatError> {
731 let mut buf = Vec::new();
732 let mut writer = IpcFileWriter::try_new(&mut buf, &batch.schema())?;
733 writer.write(batch)?;
734 writer.finish()?;
735 Ok(buf)
736}
737
738#[cfg(any(
740 feature = "csv",
741 feature = "feather",
742 feature = "ndjson",
743 feature = "parquet"
744))]
745fn format_with_commas(n: usize) -> String {
746 let s = n.to_string();
747 let bytes = s.as_bytes();
748 let len = bytes.len();
749 if len <= 3 {
750 return s;
751 }
752 let mut result = String::with_capacity(len + len / 3);
753 for (i, &b) in bytes.iter().enumerate() {
754 if i > 0 && (len - i) % 3 == 0 {
755 result.push(',');
756 }
757 result.push(b as char);
758 }
759 result
760}
761
762#[cfg(test)]
763mod tests {
764 use super::*;
765
766 #[test]
769 fn resolve_compression_none_defaults_to_snappy() {
770 let codec = ReadStatWriter::resolve_compression(None, None).unwrap();
771 assert!(matches!(codec, ParquetCompressionCodec::SNAPPY));
772 }
773
774 #[test]
775 fn resolve_compression_uncompressed() {
776 let codec =
777 ReadStatWriter::resolve_compression(Some(ParquetCompression::Uncompressed), None)
778 .unwrap();
779 assert!(matches!(codec, ParquetCompressionCodec::UNCOMPRESSED));
780 }
781
782 #[test]
783 fn resolve_compression_snappy() {
784 let codec =
785 ReadStatWriter::resolve_compression(Some(ParquetCompression::Snappy), None).unwrap();
786 assert!(matches!(codec, ParquetCompressionCodec::SNAPPY));
787 }
788
789 #[test]
790 fn resolve_compression_lz4raw() {
791 let codec =
792 ReadStatWriter::resolve_compression(Some(ParquetCompression::Lz4Raw), None).unwrap();
793 assert!(matches!(codec, ParquetCompressionCodec::LZ4_RAW));
794 }
795
796 #[test]
797 fn resolve_compression_gzip_default() {
798 let codec =
799 ReadStatWriter::resolve_compression(Some(ParquetCompression::Gzip), None).unwrap();
800 assert!(matches!(codec, ParquetCompressionCodec::GZIP(_)));
801 }
802
803 #[test]
804 fn resolve_compression_gzip_with_level() {
805 let codec =
806 ReadStatWriter::resolve_compression(Some(ParquetCompression::Gzip), Some(5)).unwrap();
807 assert!(matches!(codec, ParquetCompressionCodec::GZIP(_)));
808 }
809
810 #[test]
811 fn resolve_compression_brotli_default() {
812 let codec =
813 ReadStatWriter::resolve_compression(Some(ParquetCompression::Brotli), None).unwrap();
814 assert!(matches!(codec, ParquetCompressionCodec::BROTLI(_)));
815 }
816
817 #[test]
818 fn resolve_compression_brotli_with_level() {
819 let codec =
820 ReadStatWriter::resolve_compression(Some(ParquetCompression::Brotli), Some(8)).unwrap();
821 assert!(matches!(codec, ParquetCompressionCodec::BROTLI(_)));
822 }
823
824 #[test]
825 fn resolve_compression_zstd_default() {
826 let codec =
827 ReadStatWriter::resolve_compression(Some(ParquetCompression::Zstd), None).unwrap();
828 assert!(matches!(codec, ParquetCompressionCodec::ZSTD(_)));
829 }
830
831 #[test]
832 fn resolve_compression_zstd_with_level() {
833 let codec =
834 ReadStatWriter::resolve_compression(Some(ParquetCompression::Zstd), Some(15)).unwrap();
835 assert!(matches!(codec, ParquetCompressionCodec::ZSTD(_)));
836 }
837
838 #[test]
841 fn new_writer_defaults() {
842 let wtr = ReadStatWriter::new();
843 assert!(wtr.wtr.is_none());
844 assert!(!wtr.wrote_header);
845 assert!(!wtr.wrote_start);
846 }
847}