Migration from 1.x
Records
In 1.x ParquetRecord
and its implementations were mutable Iterable
s.
In 2.x those classes are immutable. That is, any modification on a record returns a new instance.
In 1.x when reading using generic records RowParquetRecord
had no entries for missing optional fields.
In 2.x each field is represented in RowParquerRecord
, the order is kept, and NullValue
represents missing data.
Type classes
SkippingParquetSchemaResolver
and SkippingParquetRecordEncoder
are removed in 2.x and their logic is merged into regular ParquetSchemaResolver
and ParquetRecordEncoder
.
PartitionLens
is removed in 2.x. Its functionality is now available in the API of RowParquetRecord
.
ValueCodec
now composes ValueDecoder
and ValueEncoder
which allows writing custom encoders or decoders without implementing both.
Core API changes
Reading
Was:
ParquetReader
.read[Data](
path = "path/to/file",
options = ParquetReader.Options(),
filter = Col("id") > 100
)
Is:
ParquetReader
.as[Data]
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.read(Path("path/to/file"))
Reading with projection
Was:
ParquetReader
.withProjection[Data]
.read(
path = "path/to/file",
options = ParquetReader.Options(),
filter = Col("id") > 100
)
Is:
ParquetReader
.projectedAs[Data]
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.read(Path("path/to/file"))
Reading generic records
Was:
ParquetReader
.read[RowParquetRecord](
path = "path/to/file",
options = ParquetReader.Options(),
filter = Col("id") > 100
)
Is:
ParquetReader
.generic
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.read(Path("path/to/file"))
Or:
ParquetReader
.as[RowParquetRecord]
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.read(Path("path/to/file"))
Writing
Was:
ParquetWriter.writeAndClose(
path = "path/to/file",
data = data,
options = ParquetWriter.Options()
)
Is:
ParquetWriter
.of[Data]
.options(ParquetWriter.Options())
.writeAndClose(Path("path/to/file"), data)
Writing generic records
Was:
implicit val schema: MessageType = ???
ParquetWriter.writeAndClose(
path = "path/to/file",
data = records,
options = ParquetWriter.Options()
)
Is:
ParquetWriter
.generic(schema)
.options(ParquetWriter.Options())
.writeAndClose(Path("path/to/file"), records)
Or:
implicit val schema: MessageType = ???
ParquetWriter
.of[RowParquetRecord]
.options(ParquetWriter.Options())
.writeAndClose(Path("path/to/file"), records)
Stats
Was:
Stats(
path = "path/to/file",
options = ParquetReader.Options(),
filter = Col("id") > 100
)
Is:
Stats
.builder
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.stats(Path("path/to/file"))
Akka API changes
Changes related to generic records are the same as in the core library.
Deprecated API of fromParquet
, toParquetSequentialWithFileSplit
, toParquetParallelUnordered
and toParquetIndefinite
is removed in 2.x.
Reading
Was:
ParquetStreams
.fromParquet[Data]
.withOptions(ParquetReader.Options())
.withFilter(Col("id") > 100)
.withProjection
.read("path/to/file")
Is:
ParquetStreams
.fromParquet
.projectedAs[Data]
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.read(Path("path/to/file"))
Note that type class SkippingParquetSchemaResolver
that was required for projection is now replaced by regular ParquetSchemaResolver
.
Writing single file
Was:
ParquetStreams
.toParquetSingleFile(
path = "path/to/file",
options = ParquetWriter.Options()
)
Is:
ParquetStreams
.toParquetSingleFile
.of[Data]
.options(ParquetWriter.Options())
.write(Path("path/to/file"))
Advanced writing
Was:
ParquetStreams
.viaParquet[User]("path/to/directory")
.withMaxCount(1024 * 1024)
.withMaxDuration(30.seconds)
.withWriteOptions(ParquetWriter.Options())
.withPartitionBy("col1", "col2")
.build()
Is:
ParquetStreams
.viaParquet
.of[Data]
.maxCount(1024 * 1024)
.maxDuration(30.seconds)
.options(ParquetWriter.Options())
.partitionBy(Col("col1"), Col("col2"))
.write(Path("path/to/directory"))
In 1.x rotation was executed when maxCount
was reached and when maxDuration
expired.
In 2.x rotation is executed when maxCount
is reached or when maxDuration
expires. The counter and timer are reset after each rotation.
In 1.x all files (all partitions) were rotated at once.
In 2.x each file (each partition) is rotated individually.
In 1.x preWriteTransformation
could produce only a single record.
In 2.x preWriteTransformation
can produce a collection of records.
In 1.x postWriteHandler
allowed the implementation of custom rotation of all files.
In 2.x postWriteHandler
allows the implementation of custom rotation of individual files.
Please note the dependency to type class PartitionLens
is removed and SkippingParquetSchemaResolver
and SkippingParquetRecordEncoder
are replaced by regular ParquetSchemaResolver
and ParquetRecordEncoder
.
FS2 API changes
Changes related to generic records are the same as in the core library.
In 2.x FS2 and Cats Effect are upgraded to version 3.x.
Deprecated API of read
is removed in 2.x
Reading
Was:
parquet
.fromParquet[IO, Data]
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.projection
.read(blocker, "path/to/file")
Is:
parquet
.fromParquet[IO]
.projectedAs[Data]
.options(ParquetReader.Options())
.filter(Col("id") > 100)
.read(Path("path/to/file"))
Note that type class SkippingParquetSchemaResolver
that was required for projection is now replaced by regular ParquetSchemaResolver
.
Writing single file
Was:
parquet
.writeSingleFile[IO, Data](
blocker = blocker,
path = "path/to/file",
options = ParquetWriter.Options()
)
Is:
parquet
.writeSingleFile[IO]
.of[Data]
.options(ParquetWriter.Options())
.write(Path("path/to/file"))
Advanced writing
Was:
parquet
.viaParquet[IO, User]
.maxCount(1024 * 1024)
.maxDuration(30.seconds)
.options(ParquetWriter.Options())
.partitionBy("col1", "col2")
.write(blocker, "path/to/directory")
Is:
parquet
.viaParquet[IO]
.of[Data]
.maxCount(1024 * 1024)
.maxDuration(30.seconds)
.options(ParquetWriter.Options())
.partitionBy(Col("col1"), Col("col2"))
.write(Path("path/to/directory"))
In 1.x rotation was executed when maxCount
was reached and when maxDuration
expired.
In 2.x rotation is executed when maxCount
is reached or when maxDuration
expires. The counter and timer are reset after each rotation.
In 1.x all files (all partitions) were rotated at once.
In 2.x each file (each partition) is rotated individually.
In 1.x postWriteHandler
allowed the implementation of custom rotation of all files.
In 2.x postWriteHandler
allows the implementation of custom rotation of individual files.
Please note the dependency to type class SkippingParquetSchemaResolver
is replaced by regular ParquetSchemaResolver
.