Integration with Pekko Streams

Parquet4s has an integration module that allows you to read and write Parquet files using Pekko Streams. Just import:

"com.github.mjakubowski84" %% "parquet4s-pekko" % "2.17.0"
"org.apache.hadoop" % "hadoop-client" % yourHadoopVersion

ParquetStreams has a single Source for reading a single file or a directory (can be partitioned), a Sinks for writing a single file and a sophisticated Flow for performing complex writes.

import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Source
import com.github.mjakubowski84.parquet4s.{ParquetReader, ParquetStreams, ParquetWriter, Path}
import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetWriter => HadoopParquetWriter}
import org.apache.hadoop.conf.Configuration

import scala.concurrent.duration._

case class User(userId: String, name: String, created: java.sql.Timestamp)

implicit val system: ActorSystem = ActorSystem()

val users: Source[User, NotUsed] = ???

val conf: Configuration = ??? // Set Hadoop configuration programmatically

// Please check all the available configuration options!
val writeOptions = ParquetWriter.Options(
  writeMode = Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY,
  hadoopConf = conf // optional hadoopConf
)

// Writes a single file.
users.runWith(
  ParquetStreams
    .toParquetSingleFile
    .of[User]
    .options(writeOptions)
    .write(Path("file:///data/users/user-303.parquet"))
)

// Tailored for writing indefinite streams.
// Writes file when chunk reaches size limit and when defined time period elapses.
// Can also partition files!
// Check all the parameters and example usage in project sources.
users.via(
  ParquetStreams
    .viaParquet
    .of[User]
    .maxCount(writeOptions.rowGroupSize)
    .maxDuration(30.seconds)
    .options(writeOptions)
    .write(Path("file:///data/users"))
).runForeach(user => println(s"Just wrote user ${user.userId}..."))
  
// Reads a file, files from the directory or a partitioned directory.
// Allows reading multiple files in parallel for speed.
// Please also have a look at the rest of parameters.
ParquetStreams
  .fromParquet
  .as[User]
  .options(ParquetReader.Options(hadoopConf = conf))
  .parallelism(n = 4)
  .read(Path("file:///data/users"))
  .runForeach(println)

// (Experimental API) Writes a single file using a custom ParquetWriter.
class UserParquetWriterBuilder(path: Path) extends HadoopParquetWriter.Builder[User, UserParquetWriterBuilder](path.toHadoop) {
  override def self() = this
  override def getWriteSupport(conf: Configuration) = ???
}
users.runWith(
  ParquetStreams
    .toParquetSingleFile
    .custom[User, UserParquetWriterBuilder](new UserParquetWriterBuilder(Path("file:///data/users/custom.parquet")))
    .options(writeOptions)
    .write
)

Please check examples to learn more.