-
Notifications
You must be signed in to change notification settings - Fork 186
Add zstd compression and decompression for pekko streams #2409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add zstd compression and decompression for pekko streams #2409
Conversation
|
|
||
| lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest) | ||
| lazy val stream = | ||
| l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make the zstd dependency optional or provided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making it optional/provided is not going to give as any benefits as we would need to force users to redundantly provide a zstd implementation (which is going to be "com.github.luben" % "zstd-jni" % "1.5.7-6" anyways).
To do this properly, we would need to make a zstd agnostic API and then provided concrete implementations in a different artifact and considering that there is only a single candidate for a proper implementation (as discussed at #2404 (comment)) we wouldn't gain any benefit out of it.
Hence zstd is being treated the same way as lz4Java is (for the same reasons), where we have a simple direct dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On another note there is a silver lining here, since zstd-jni uses JNI, the C shared objects which are platform dependent are contained within the jar artifact. The "com.github.luben" % "zstd-jni" % "1.5.7-6" artifact contains the C shared objects for all of the platforms (see https://github.com/luben/zstd-jni?tab=readme-ov-file#binary-releases for which ones are supported) but there are platform dependent jars if you only care about a single platform.
This does give some credence to using provided, as a user can provide their own zstd-jni artifact however there are still some things to note here. If we end up using provided then we force all pekko end users to add a zstd-jni artifact to their dependencies (with "com.github.luben" % "zstd-jni" % "1.5.7-6" being the documented default) as its not going to be including by in the dependency resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.
You could make the same argument as to why lz4Java isn't its own artifact, or any of the other compression algorithms. Also, due to the way the current API is designed, all of the compression algorithms are contained within the pekko-streams Compression object (which we could monkey patch to add more methods as a separate artifact, but thats starting to look ugly already). Ontop of that, when adding zstd compression to pekko http request/response headers (see apache/pekko-http#860), we would likewise have to create yet another artifact for pekko-http.
zio-streams-compress is also not the best to compare to, since if you look at https://zio.dev/zio-streams-compress/ you will see they also have separate artifacts for gzip and lz4 something is contained within pekko-streams core artifact for us. zio appears to have a different design ethos here, where they err on the side of hyper modularity and put everything in its own artifact where as pekko is on the other side.
I understand the sentiment, but honestly this is creating a huge amount of ceremony for no real benefit and the only reason we are having this discussion is because of how behind Java is in adding what are de facto standard compression algorithms to the stdlib (of which zstd has been for at least half a decade now). The only reason I can see to use separate artifacts would be to support Pekko 1.x series, but zstd-jni only supports JDK 11 or higher and this would then need to be advertised to end users.
zio-streams-compress is also not the best comparison, since if you look at https://zio.dev/zio-streams-compress/ you will see they also have separate artifacts for gzip and lz4, where as for us these compression algorithms are contained within the core pekko-streams artifact. zio appears to have a different design ethos here, where they err on the side of hyper modularity and put everything in its own artifact where as pekko is on the other side.
Put differently, the entire point of Compression object for scaladsl/javadsl is to contain the standard compression/decompression algorithms, and zstd is one of those. While having to use zstd-jni is not ideal, to the end user there is no difference as it already supports almost every architecture that would be used in the wild.
|
Since the core tests have passed (the github action says failed but thats due to a missing paradox documentation) I have added other reviewers |
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
Show resolved
Hide resolved
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
Show resolved
Hide resolved
|
@He-Pin This cannot be backported as the zstd-jni library is built against JDK 11 and Pekko 1.x series requires every dependency to be built against JDK 1.8 or lower |
|
@mdedetrich Not sure if it's better to be a dedicated module, I will review it this weekend. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds Zstandard (zstd) compression and decompression support to Apache Pekko streams by integrating the zstd-jni library. This enhancement provides an additional compression algorithm alongside existing gzip and deflate options.
- Implements
ZstdCompressorandZstdDecompressorclasses for zstd compression/decompression - Adds public API methods
zstd()andzstdDecompress()to both Scala and Java DSLs - Includes comprehensive test coverage with a new
ZstdSpectest class
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala | Adds zstd compression and decompression API methods with optional compression level and dictionary support |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala | Provides Java-compatible wrappers for zstd compression methods |
| stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala | Implements the Compressor interface using zstd-jni library |
| stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala | Implements zstd decompression stage using direct ByteBuffers |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala | Adds basic zstd decompression test |
| stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdSpec.scala | Provides comprehensive zstd codec testing |
| stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala | Updates test base class to support different exception types per codec |
| stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala | Updates to specify DataFormatException type parameter |
| stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala | Updates to specify DataFormatException type parameter |
| project/Dependencies.scala | Adds zstd-jni 1.5.7-6 dependency |
| .scala-steward.conf | Pins zstd-jni to version 1.x for backwards compatibility |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala
Show resolved
Hide resolved
| val result = ByteString.fromByteBuffer(targetBuffer) | ||
| targetBuffer.clear() | ||
| compressingStream.flush() |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush operation calls compressingStream.flush() after extracting the result from the buffer. The flush result is discarded. The correct order should be: flip buffer, flush stream (which may write more data to buffer), extract result, then clear buffer.
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.clear() | |
| compressingStream.flush() | |
| compressingStream.flush() | |
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.clear() |
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala
Show resolved
Hide resolved
| * @since 2.0.0 | ||
| */ | ||
| def zstd(level: Int, dictionary: Option[ZstdDictCompress] = None): Flow[ByteString, ByteString, NotUsed] = { | ||
| require(level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel()) |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The require statement lacks a descriptive error message. Add a message explaining the valid compression level range, e.g., require(level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel(), s\"Compression level must be between \${Zstd.minCompressionLevel()} and \${Zstd.maxCompressionLevel()}, but was $level\").
| require(level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel()) | |
| require( | |
| level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel(), | |
| s"Compression level must be between ${Zstd.minCompressionLevel()} and ${Zstd.maxCompressionLevel()}, but was $level" | |
| ) |
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
Show resolved
Hide resolved
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala
Outdated
Show resolved
Hide resolved
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala
Outdated
Show resolved
Hide resolved
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala
Show resolved
Hide resolved
423d50e to
085bb42
Compare
|
I have finished the docs for paradox (hence completing the PR) and also fixed some minor things, all resolved comments from the copilot review (including ones which were outright wrong). There is still an open question about whether we should make our own |
There is a discussion on this topic at #2409 (comment), I personally think making this an extra module adds a lot of ceremony (as we would also have to add it another module in pekko-http) for no real benefit to end user but if a lot of people feel strongly about it I can reconsider. |
| * @since 2.0.0 | ||
| */ | ||
| def zstdDecompress(maxBytesPerChunk: Int = MaxBytesPerChunkDefault): Flow[ByteString, ByteString, NotUsed] = | ||
| Flow[ByteString].via(new ZstdDecompressor(maxBytesPerChunk)).named("zstdDecompress") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this live inside ZstdCompression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, what if the user wants to use it in Android?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why, its sitting in the same spot that all other compression flows like deflate or gzip do? If we decide to put this in another artifact then sure, but that is another debate.
Either we treat zstd as a standard widely adopted industry compression algorithm (which it is) in which this PR has it in the appropriate place or we put it in another artifact and if so we should have the discussion then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One difference is that zlib is distributed differently and already part of the JDK while zstd-jni might bring extra weight that not everyone is interested in. The full jar including binaries for all platforms is >7MB. Whether that is significant or not is a different question but it goes somewhat against the usual idea to keep pekko-stream reasonably minimal and add stuff like this to the connectors (which probably does not work if we want to depend on it from http). So, I'd also slightly favor having an extra module at least in pekko core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One difference is that zlib is distributed differently and already part of the JDK while zstd-jni might bring extra weight that not everyone is interested in. The full jar including binaries for all platforms is >7MB. Whether that is significant or not is a different question but it goes somewhat against the usual idea to keep pekko-stream reasonably minimal and add stuff like this to the connectors (which probably does not work if we want to depend on it from http). So, I'd also slightly favor having an extra module at least in pekko core.
What would you recommend here, because there are many ways to go about this? For example, it would be possible to have the zstd/zstdDecompress method in the Compression object so that the API is clean but then the user would have to provide their own artifact (of which we can provide a default that uses zstd-jni).
Or should the artifact just contain everything related to zstd and should the artifact be a pekko-streams-compress-extra (which initially contains a zstd) or pekko-streams-zstd?
One thing to note is that when doing this, I also have apache/pekko-http#860 in mind, and I wanted to add zstd to the standard spot where all of our content encoder's are i.e. https://github.com/apache/pekko-http/blob/4833a8e42f946682a72a72a0f3bee4c4d662b9a6/http-core/src/main/scala/org/apache/pekko/http/scaladsl/model/headers/HttpEncoding.scala#L63-L76
| dictionary.foreach(compressingStream.setDict) | ||
|
|
||
| override def compress(input: ByteString): ByteString = { | ||
| val inputBB = ByteBuffer.allocateDirect(input.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a ByteBufferPool, can we use that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly? I just read the implementation now and it appears that the main use case of ByteBufferPool is if you need a pool of buffers where as in our case we only have a static set amount of direct byte buffers and they have different sizes.
However from reading the ByteBufferPool code I did found another improvement, we can use ByteBufferCleaner.clean(byteBuffer) to clean up the byte buffers in a timely manner (will implement this).
| import java.util.OptionalInt; | ||
|
|
||
| /** | ||
| * @since 2.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add java/scala doc
e26dc99 to
86c5088
Compare
|
I have updated the PR, I have made the various zstd constants (i.e. min level, max level, default block size etc etc) be part of the I have also rebased against main to use the latest configurable |
86c5088 to
a1d4caa
Compare
raboof
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not fully digested the PR but on first read it looks reasonable. Noted a couple of small things that are potential leftovers/nits.
| Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor | ||
| will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also means that if the way the incoming data is cut up into ByteStrings is not deterministic, the compressed stream will not be deterministic, right? I think that is acceptable, but would be worth calling out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually its the opposite, since a sync flush happens on every stream element it means that there is a direct one to one relationship of every element having the exact same data compressed (there is actually a test that validates this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that is not the case "if the way the incoming data is cut up into ByteStrings is not deterministic", right? E.g when the data is coming from the network?
(this is not a fault of the stream compression, but a truth that would be good to highlight)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that is not the case "if the way the incoming data is cut up into ByteStrings is not deterministic", right? E.g when the data is coming from the network?
Yes true, I believe that comment is strictly talking about the source happens right before going into compression and not the entire "stack". All the comment is trying to convey is that if autoFlush is true, any buffers will get processed and cleared for each incoming element, so if you have "abc" as an incoming element, you will always get the compressed "abc" as a result, and not lets say "a" and then "bc" as a further element or some variation.
The network can always split up data as it wants, but thats an orthogonal concern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. I wasn't trying to say the original comment is wrong (it isn't), I meant to say that it would be valuable to highlight that the way the incoming data is split up influences the result of the compression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, ill better clarify this!
| * <p>See <a href=https://bugs.openjdk.java.net/browse/JDK-4724038>JDK-4724038</a> | ||
| */ | ||
| final class ByteBufferCleaner { | ||
| @InternalStableApi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This annotation is for APIs that need to remain binary compatible because other, independently-released Pekko components depend on them.
As you're using this API within Pekko Core, and we expect all Pekko Core artifact versions to be consistent, the annotation is not needed for this use.
Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.
As you allude to, the reason why I added @InternalStableApi is that I can imagine cases where other Pekko libraries (i.e. pekko-http or even pekko-connectors) might want to use ByteBufferCleaner and obviously this would need to be available in the Pekko 2.0.0 release to fullfil this.
Also this API is quite stable, hence why I added @InternalStableApi, but yes in general it was a forward thinking rationality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'm not super fond of committing ourselves to @InternalStableApi "just in case" without a specific plan on where to use it. Also we should seriously consider using the implementation from commons-io - definitely in satellite projects, but also here. The use of sun.misc.Unsafe is icky as well.
I'm OK with this not blocking the merge though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- this code doesn't fail if Unsafe is not accessible - it accepts that GC will eventually clean but it is better for users if this Unsafe code cleans the instance earlier
- it's one of those areas where the Java team keep telling us to stop using Unsafe but in this case, they have not yet given an alternative
| inputBB.put(input.toArrayUnsafe()) | ||
| inputBB.flip() | ||
| compressingStream.compress(inputBB) | ||
| val result = ByteString.fromByteBuffer(targetBuffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you still planning on this?
|
|
||
| override def compressAndFlush(input: ByteString): ByteString = { | ||
| val inputBB = ByteBuffer.allocateDirect(input.size) | ||
| inputBB.put(input.toArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the recoding is now complete, right? This line still seems to be here, should it still be changed to Unsafe?
|
Needs a rebase for the merge conflict and the fix for the #2476 failure |
stream/src/main/scala/org/apache/pekko/stream/javadsl/compression/ZstdDictionary.java
Show resolved
Hide resolved
| private val targetBuffer = ByteBuffer.allocateDirect(65536) | ||
| private val compressingStream = new ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, compressionLevel) | ||
|
|
||
| dictionary.foreach { dict => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit: I find match almost always easier to read compared to foreach on Option)
So I still want to get this into 2.0.0-M1 but I haven't had time with the buffer implementation I was planning to do. For now I am thinking of doing the simple dumb solution so that we at least have a implementation with a stable API which unlocks zstd-compression for http responses in pekko-http and I can always do a better version for a future milestone release of 2.0.0. |
|
@mdedetrich I think you can mark it as experimental and then polish it in a later release. |
0989b9d to
46da6fb
Compare
|
@mdedetrich are you still planning further changes? I see you pushed some changes yesterday but AFAICT there's still some open review comments |
Yes, I have a couple of tests to fix to pass and then I will push changes |
46da6fb to
52cd78a
Compare
52cd78a to
955e9f7
Compare
|
@raboof So I tried to get my fancy buffering implementation done, but I am coming across a very weird issue/problem where even if I change the current implementation to something like this override def compressAndFlush(input: ByteString): ByteString =
ByteString.fromArrayUnsafe(Zstd.compress(input.toArrayUnsafe))The tests start failing because So I think we should opt for the simple option for now, which is going to be the current implementation but avoid creating a direct byte buffer on each incoming element (instead using a pool), doing the changes now. |
|
@raboof @pjfanning @He-Pin So here is a status update, I actually found a bug in the To solve this, I wanted to create an ultra simple implementation which would essentially just be Due to this I have to shuffle data from the input The advantage here is that this is the exact same logic that is needed to avoid the tl;dr Almost at the finish line! |
|
@mdedetrich There is a related pr in netty too. |
|
@raboof @pjfanning Apologies for missing this for the Pekko 2.0.0-M1 release train, I am still working on this but its taking longer as expected. I am actually currently implementing this work as a test in zstd-jni to make my life easier and also with the hope of getting feedback from the author that it is on the right track. Essentially the problem is that there isn't any real prior art for this, all of the current zstd streaming tests in zstd-jni either involve creating a direct I am slightly annoyed that I didn't make the 2.0.0-M1 release, but as long as we are happy with using a nightly in pekko-http to implement the http request/response zstd work in the end it shouldn't be that problematic. You could also argue its better this way, as pekko 2.0.0 milestone was primarily about updating all of the core libraries/jvm versions + removing deprecated code and bookkeeping and we can have 2.0.0-M2 be about major features. |
|
Relevant issue luben/zstd-jni#324 and made a comment there. |
|
So upstream has commented at luben/zstd-jni#324 (comment), it may make more sense to add this functionality (specifically |
Resolves: #2404
This PR adds a zstd compression/decompression stream/flow using zstd-jni, this project was chosen because its the only one that has high performance (it uses the reference zstd implementation via JNI) and also supports at least JDK 17 (its published with JDK 11).
The PR still needs to be completed (documentation needs to be added along with MiMa exclusions) but I am creating a PR now with the necessary barebones so that people can comment on whether the PR is on the right track, tests have been added (there is already a base testing framework for pekko-streams compression flows
CoderSpec).The implementation of
ZstdCompressor/ZstdDecompressorusesZstdDirectBufferCompressingStreamNoFinalizer/ZstdDirectBufferDecompressingStreamNoFinalizeras these are the abstractions provided by zstd-jni to do streaming compression, note that the versions withNoFinalizerjust mean that you need to explicitly shutdown the resource (which is what we want since Pekko Streams handles resource cleaning). These compression abstractions need to use a directByteBufferin order to handle the shuffling of data between the JNI boundary so that the C implementation can do its work directly in memory, the zstd-jni tests was the basis used to write the implementation.Some extra notes
CoderSpechad to be modified as the test which catches the exception to be thrown on corrupt input was hardcoded toDataFormatExceptionwhere as zstd throws its own bespoke exception on corrupt inputZstdCompressorimplements theCompressorabstraction which does a lot of heavy lifting (especially when it comes to tests) however theZstdDecompressorintentionally does not implementDeflateDecompressorBaseas the design is heavily tied to Java's deflate/compression API's, instead we useSimpleLinearGraphStage[ByteString]backed byZstdDirectBufferDecompressingStreamNoFinalizercom.github.luben.zstd.ZstdDictCompressdatastructure which is tied to the implementation of zstd-jni. There is an argument to create our own pekko equivalent ofZstdDictCompresswhich will internally map to acom.github.luben.zstd.ZstdDictCompress, doing so would allow us to swap to a different implementation of zstd in the future without breaking the API.com.github.luben.zstd.ZstdIOExceptionexception that is thrown on corrupt input) that is tied to the zstd-jni implementation