Integration with FS2

FS2 integration allows you to read and write Parquet using functional streams. Functionality is exactly the same as in the case of Akka / Pekko module. In order to use it, please import the following:

"com.github.mjakubowski84" %% "parquet4s-fs2" % "2.20.0"
"org.apache.hadoop" % "hadoop-client" % yourHadoopVersion

parquet object has a single Stream for reading a single file or a directory (can be partitioned), a Pipe for writing a single file and a sophisticated Pipe for performing complex writes.

import cats.effect.{IO, IOApp}
import com.github.mjakubowski84.parquet4s.parquet.{fromParquet, writeSingleFile, viaParquet}
import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetWriter, Path}
import fs2.Stream
import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetWriter => HadoopParquetWriter}
import org.apache.hadoop.conf.Configuration

import scala.concurrent.duration._

object Example extends IOApp.Simple {

  case class User(userId: String, name: String, created: java.sql.Timestamp)

  val users: Stream[IO, User] = ???

  val conf: Configuration = ??? // Set Hadoop configuration programmatically

  // Please check all the available configuration options!
  val writeOptions = ParquetWriter.Options(
    writeMode = Mode.OVERWRITE,
    compressionCodecName = CompressionCodecName.SNAPPY,
    hadoopConf = conf // optional hadoopConf
  )

  // Writes a single file.
  val writeSingleFilePipe = 
    writeSingleFile[IO]
      .of[User]
      .options(writeOptions)
      .write(Path("file:///data/users/single.parquet"))

  // (Experimental API) Writes a single file using a custom ParquetWriter. 
  class UserParquetWriterBuilder(path: Path) extends HadoopParquetWriter.Builder[User, UserParquetWriterBuilder](path.toHadoop) {
    override def self() = this
    override def getWriteSupport(conf: Configuration) = ???
  }
  val writeSingleFileCustomPipe =
    writeSingleFile[IO]
      .custom[User, UserParquetWriterBuilder](new UserParquetWriterBuilder(Path("file:///data/users/custom.parquet")))
      .options(writeOptions)
      .write

  // Tailored for writing indefinite streams.
  // Writes file when chunk reaches size limit and when defined time period elapses.
  // Can also partition files!
  // Check all the parameters and example usage in project sources.
  val writeRotatedPipe =
    viaParquet[IO]
      .of[User]
      .maxCount(writeOptions.rowGroupSize)
      .maxDuration(30.seconds)
      .options(writeOptions)
      .write(Path("file:///data/users"))

  // Reads a file, files from the directory or a partitioned directory. 
  // Allows reading multiple files in parallel for speed.
  // Please also have a look at the rest of parameters.
  val readAllStream =
    fromParquet[IO]
      .as[User]
      .options(ParquetReader.Options(hadoopConf = conf))
      .parallelism(n = 4)
      .read(Path("file:///data/users"))
      .printlns

  def run: IO[Unit] =  
    users
      .through(writeRotatedPipe)
      .through(writeSingleFilePipe)
      .through(writeSingleFileCustomPipe)
      .append(readAllStream)
      .compile
      .drain
  
}

What differentiates FS2 from Akka / Pekko is that, for better performance, FS2 processes stream elements in chunks. Therefore, viaParquet and fromParquet have a chunkSize property that allows a custom definition of the chunk size. The default value is 16. Override the value to set up your own balance between memory consumption and performance.


Please check the examples to learn more.