Quick start

SBT

libraryDependencies ++= Seq(
  "com.github.mjakubowski84" %% "parquet4s-core" % "2.21.0",
  "org.apache.hadoop" % "hadoop-client" % yourHadoopVersion
)

Mill

def ivyDeps = Agg(
  ivy"com.github.mjakubowski84::parquet4s-core:2.21.0",
  ivy"org.apache.hadoop:hadoop-client:$yourHadoopVersion"
)
import com.github.mjakubowski84.parquet4s.{ ParquetReader, ParquetWriter, Path }

case class User(userId: String, name: String, created: java.sql.Timestamp)

val users: Iterable[User] = Seq(
  User("1", "parquet", new java.sql.Timestamp(1L))
)
val path = Path("path/to/local/file.parquet")

// writing
ParquetWriter.of[User].writeAndClose(path, users)

// reading
val parquetIterable = ParquetReader.as[User].read(path)
try {
  parquetIterable.foreach(println)
} finally parquetIterable.close()

AWS S3

Parquet4s works with AWS S3 and many other distributed storage types.

In order to connect to AWS S3 you need to define one more dependency:

"org.apache.hadoop" % "hadoop-aws" % yourHadoopVersion

Next, the most common way is to define following environmental variables:

export AWS_ACCESS_KEY_ID=my.aws.key
export AWS_SECRET_ACCESS_KEY=my.secret.key

Please refer to documentation of Hadoop AWS for more information on how to authenticate with S3.

You may need to set some configuration properties to access your storage, e.g. fs.s3a.path.style.access. Please follow documentation of Hadoop AWS for more details and troubleshooting.

Moreover, you refer to Parquet4s’ integration test that proves that integration with S3 works.

Passing Hadoop Configs Programmatically

File system configs for S3, GCS, Hadoop, etc. can also be set programmatically to the ParquetReader and ParquetWriter by passing the Configuration to the ParqetReader.Options and ParquetWriter.Options case classes.

import com.github.mjakubowski84.parquet4s.{ ParquetReader, ParquetWriter, Path }
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.hadoop.conf.Configuration

case class User(userId: String, name: String, created: java.sql.Timestamp)

val users: Iterable[User] = Seq(
  User("1", "parquet", new java.sql.Timestamp(1L))
)

val writerOptions = ParquetWriter.Options(
  compressionCodecName = CompressionCodecName.SNAPPY,
  hadoopConf = new Configuration()
)

ParquetWriter
  .of[User]
  .options(writerOptions)
  .writeAndClose(Path("path/to/local/file.parquet"), users)