Skip to content

Conversation

@mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Oct 28, 2025

Resolves: #2404

This PR adds a zstd compression/decompression stream/flow using zstd-jni, this project was chosen because its the only one that has high performance (it uses the reference zstd implementation via JNI) and also supports at least JDK 17 (its published with JDK 11).

The PR still needs to be completed (documentation needs to be added along with MiMa exclusions) but I am creating a PR now with the necessary barebones so that people can comment on whether the PR is on the right track, tests have been added (there is already a base testing framework for pekko-streams compression flows CoderSpec).

The implementation of ZstdCompressor/ZstdDecompressor uses ZstdDirectBufferCompressingStreamNoFinalizer/ZstdDirectBufferDecompressingStreamNoFinalizer as these are the abstractions provided by zstd-jni to do streaming compression, note that the versions with NoFinalizer just mean that you need to explicitly shutdown the resource (which is what we want since Pekko Streams handles resource cleaning). These compression abstractions need to use a direct ByteBuffer in order to handle the shuffling of data between the JNI boundary so that the C implementation can do its work directly in memory, the zstd-jni tests was the basis used to write the implementation.

Some extra notes

  • CoderSpec had to be modified as the test which catches the exception to be thrown on corrupt input was hardcoded to DataFormatException where as zstd throws its own bespoke exception on corrupt input
  • The ZstdCompressor implements the Compressor abstraction which does a lot of heavy lifting (especially when it comes to tests) however the ZstdDecompressor intentionally does not implement DeflateDecompressorBase as the design is heavily tied to Java's deflate/compression API's, instead we use SimpleLinearGraphStage[ByteString] backed by ZstdDirectBufferDecompressingStreamNoFinalizer
  • The current API also allows you to specify a dictionary when doing compression. Note that to do this, you need to pass a com.github.luben.zstd.ZstdDictCompress datastructure which is tied to the implementation of zstd-jni. There is an argument to create our own pekko equivalent of ZstdDictCompress which will internally map to a com.github.luben.zstd.ZstdDictCompress, doing so would allow us to swap to a different implementation of zstd in the future without breaking the API.
    • This is the only part of the API (aside from the com.github.luben.zstd.ZstdIOException exception that is thrown on corrupt input) that is tied to the zstd-jni implementation

@mdedetrich mdedetrich marked this pull request as draft October 28, 2025 12:50

lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
lazy val stream =
l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make the zstd dependency optional or provided?

Copy link
Contributor Author

@mdedetrich mdedetrich Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it optional/provided is not going to give as any benefits as we would need to force users to redundantly provide a zstd implementation (which is going to be "com.github.luben" % "zstd-jni" % "1.5.7-6" anyways).

To do this properly, we would need to make a zstd agnostic API and then provided concrete implementations in a different artifact and considering that there is only a single candidate for a proper implementation (as discussed at #2404 (comment)) we wouldn't gain any benefit out of it.

Hence zstd is being treated the same way as lz4Java is (for the same reasons), where we have a simple direct dependency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On another note there is a silver lining here, since zstd-jni uses JNI, the C shared objects which are platform dependent are contained within the jar artifact. The "com.github.luben" % "zstd-jni" % "1.5.7-6" artifact contains the C shared objects for all of the platforms (see https://github.com/luben/zstd-jni?tab=readme-ov-file#binary-releases for which ones are supported) but there are platform dependent jars if you only care about a single platform.

This does give some credence to using provided, as a user can provide their own zstd-jni artifact however there are still some things to note here. If we end up using provided then we force all pekko end users to add a zstd-jni artifact to their dependencies (with "com.github.luben" % "zstd-jni" % "1.5.7-6" being the documented default) as its not going to be including by in the dependency resolution.

Copy link
Contributor Author

@mdedetrich mdedetrich Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.

You could make the same argument as to why lz4Java isn't its own artifact, or any of the other compression algorithms. Also, due to the way the current API is designed, all of the compression algorithms are contained within the pekko-streams Compression object (which we could monkey patch to add more methods as a separate artifact, but thats starting to look ugly already). Ontop of that, when adding zstd compression to pekko http request/response headers (see apache/pekko-http#860), we would likewise have to create yet another artifact for pekko-http.

zio-streams-compress is also not the best to compare to, since if you look at https://zio.dev/zio-streams-compress/ you will see they also have separate artifacts for gzip and lz4 something is contained within pekko-streams core artifact for us. zio appears to have a different design ethos here, where they err on the side of hyper modularity and put everything in its own artifact where as pekko is on the other side.

I understand the sentiment, but honestly this is creating a huge amount of ceremony for no real benefit and the only reason we are having this discussion is because of how behind Java is in adding what are de facto standard compression algorithms to the stdlib (of which zstd has been for at least half a decade now). The only reason I can see to use separate artifacts would be to support Pekko 1.x series, but zstd-jni only supports JDK 11 or higher and this would then need to be advertised to end users.

zio-streams-compress is also not the best comparison, since if you look at https://zio.dev/zio-streams-compress/ you will see they also have separate artifacts for gzip and lz4, where as for us these compression algorithms are contained within the core pekko-streams artifact. zio appears to have a different design ethos here, where they err on the side of hyper modularity and put everything in its own artifact where as pekko is on the other side.

Put differently, the entire point of Compression object for scaladsl/javadsl is to contain the standard compression/decompression algorithms, and zstd is one of those. While having to use zstd-jni is not ideal, to the end user there is no difference as it already supports almost every architecture that would be used in the wild.

@mdedetrich
Copy link
Contributor Author

Since the core tests have passed (the github action says failed but thats due to a missing paradox documentation) I have added other reviewers

@mdedetrich mdedetrich added this to the 2.0.0-M1 milestone Oct 28, 2025
@mdedetrich
Copy link
Contributor Author

@He-Pin This cannot be backported as the zstd-jni library is built against JDK 11 and Pekko 1.x series requires every dependency to be built against JDK 1.8 or lower

@He-Pin
Copy link
Member

He-Pin commented Oct 29, 2025

@mdedetrich Not sure if it's better to be a dedicated module, I will review it this weekend.

@He-Pin He-Pin requested a review from Copilot October 29, 2025 08:18
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds Zstandard (zstd) compression and decompression support to Apache Pekko streams by integrating the zstd-jni library. This enhancement provides an additional compression algorithm alongside existing gzip and deflate options.

  • Implements ZstdCompressor and ZstdDecompressor classes for zstd compression/decompression
  • Adds public API methods zstd() and zstdDecompress() to both Scala and Java DSLs
  • Includes comprehensive test coverage with a new ZstdSpec test class

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala Adds zstd compression and decompression API methods with optional compression level and dictionary support
stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala Provides Java-compatible wrappers for zstd compression methods
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdCompressor.scala Implements the Compressor interface using zstd-jni library
stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/ZstdDecompressor.scala Implements zstd decompression stage using direct ByteBuffers
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/CompressionSpec.scala Adds basic zstd decompression test
stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/ZstdSpec.scala Provides comprehensive zstd codec testing
stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala Updates test base class to support different exception types per codec
stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala Updates to specify DataFormatException type parameter
stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala Updates to specify DataFormatException type parameter
project/Dependencies.scala Adds zstd-jni 1.5.7-6 dependency
.scala-steward.conf Pins zstd-jni to version 1.x for backwards compatibility

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +50 to +62
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()
compressingStream.flush()
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush operation calls compressingStream.flush() after extracting the result from the buffer. The flush result is discarded. The correct order should be: flip buffer, flush stream (which may write more data to buffer), extract result, then clear buffer.

Suggested change
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()
compressingStream.flush()
compressingStream.flush()
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()

Copilot uses AI. Check for mistakes.
* @since 2.0.0
*/
def zstd(level: Int, dictionary: Option[ZstdDictCompress] = None): Flow[ByteString, ByteString, NotUsed] = {
require(level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel())
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The require statement lacks a descriptive error message. Add a message explaining the valid compression level range, e.g., require(level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel(), s\"Compression level must be between \${Zstd.minCompressionLevel()} and \${Zstd.maxCompressionLevel()}, but was $level\").

Suggested change
require(level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel())
require(
level <= Zstd.maxCompressionLevel() && level >= Zstd.minCompressionLevel(),
s"Compression level must be between ${Zstd.minCompressionLevel()} and ${Zstd.maxCompressionLevel()}, but was $level"
)

Copilot uses AI. Check for mistakes.
@mdedetrich mdedetrich force-pushed the add-zstd-compression-decompression branch 3 times, most recently from 423d50e to 085bb42 Compare October 29, 2025 08:28
@mdedetrich mdedetrich marked this pull request as ready for review October 29, 2025 08:30
@mdedetrich
Copy link
Contributor Author

mdedetrich commented Oct 29, 2025

I have finished the docs for paradox (hence completing the PR) and also fixed some minor things, all resolved comments from the copilot review (including ones which were outright wrong).

There is still an open question about whether we should make our own com.github.luben.zstd.ZstdDictCompress datastructure so that the API is abstracted away from the implementation.

@mdedetrich
Copy link
Contributor Author

@mdedetrich Not sure if it's better to be a dedicated module, I will review it this weekend.

There is a discussion on this topic at #2409 (comment), I personally think making this an extra module adds a lot of ceremony (as we would also have to add it another module in pekko-http) for no real benefit to end user but if a lot of people feel strongly about it I can reconsider.

* @since 2.0.0
*/
def zstdDecompress(maxBytesPerChunk: Int = MaxBytesPerChunkDefault): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new ZstdDecompressor(maxBytesPerChunk)).named("zstdDecompress")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this live inside ZstdCompression?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, what if the user wants to use it in Android?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why, its sitting in the same spot that all other compression flows like deflate or gzip do? If we decide to put this in another artifact then sure, but that is another debate.

Either we treat zstd as a standard widely adopted industry compression algorithm (which it is) in which this PR has it in the appropriate place or we put it in another artifact and if so we should have the discussion then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference is that zlib is distributed differently and already part of the JDK while zstd-jni might bring extra weight that not everyone is interested in. The full jar including binaries for all platforms is >7MB. Whether that is significant or not is a different question but it goes somewhat against the usual idea to keep pekko-stream reasonably minimal and add stuff like this to the connectors (which probably does not work if we want to depend on it from http). So, I'd also slightly favor having an extra module at least in pekko core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference is that zlib is distributed differently and already part of the JDK while zstd-jni might bring extra weight that not everyone is interested in. The full jar including binaries for all platforms is >7MB. Whether that is significant or not is a different question but it goes somewhat against the usual idea to keep pekko-stream reasonably minimal and add stuff like this to the connectors (which probably does not work if we want to depend on it from http). So, I'd also slightly favor having an extra module at least in pekko core.

What would you recommend here, because there are many ways to go about this? For example, it would be possible to have the zstd/zstdDecompress method in the Compression object so that the API is clean but then the user would have to provide their own artifact (of which we can provide a default that uses zstd-jni).

Or should the artifact just contain everything related to zstd and should the artifact be a pekko-streams-compress-extra (which initially contains a zstd) or pekko-streams-zstd?

One thing to note is that when doing this, I also have apache/pekko-http#860 in mind, and I wanted to add zstd to the standard spot where all of our content encoder's are i.e. https://github.com/apache/pekko-http/blob/4833a8e42f946682a72a72a0f3bee4c4d662b9a6/http-core/src/main/scala/org/apache/pekko/http/scaladsl/model/headers/HttpEncoding.scala#L63-L76

dictionary.foreach(compressingStream.setDict)

override def compress(input: ByteString): ByteString = {
val inputBB = ByteBuffer.allocateDirect(input.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a ByteBufferPool, can we use that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly? I just read the implementation now and it appears that the main use case of ByteBufferPool is if you need a pool of buffers where as in our case we only have a static set amount of direct byte buffers and they have different sizes.

However from reading the ByteBufferPool code I did found another improvement, we can use ByteBufferCleaner.clean(byteBuffer) to clean up the byte buffers in a timely manner (will implement this).

import java.util.OptionalInt;

/**
* @since 2.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add java/scala doc

@mdedetrich mdedetrich force-pushed the add-zstd-compression-decompression branch 4 times, most recently from e26dc99 to 86c5088 Compare November 11, 2025 14:57
@mdedetrich
Copy link
Contributor Author

I have updated the PR, I have made the various zstd constants (i.e. min level, max level, default block size etc etc) be part of the Compression object which makes the Pekko API fully separate from the underlying zstd-jni implementation.

I have also rebased against main to use the latest configurable autoFlush changes add as part of #2467

@mdedetrich mdedetrich force-pushed the add-zstd-compression-decompression branch from 86c5088 to a1d4caa Compare November 11, 2025 15:03
Copy link
Member

@raboof raboof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not fully digested the PR but on first read it looks reasonable. Noted a couple of small things that are potential leftovers/nits.

Comment on lines +13 to +14
Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor
will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also means that if the way the incoming data is cut up into ByteStrings is not deterministic, the compressed stream will not be deterministic, right? I think that is acceptable, but would be worth calling out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually its the opposite, since a sync flush happens on every stream element it means that there is a direct one to one relationship of every element having the exact same data compressed (there is actually a test that validates this)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is not the case "if the way the incoming data is cut up into ByteStrings is not deterministic", right? E.g when the data is coming from the network?

(this is not a fault of the stream compression, but a truth that would be good to highlight)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is not the case "if the way the incoming data is cut up into ByteStrings is not deterministic", right? E.g when the data is coming from the network?

Yes true, I believe that comment is strictly talking about the source happens right before going into compression and not the entire "stack". All the comment is trying to convey is that if autoFlush is true, any buffers will get processed and cleared for each incoming element, so if you have "abc" as an incoming element, you will always get the compressed "abc" as a result, and not lets say "a" and then "bc" as a further element or some variation.

The network can always split up data as it wants, but thats an orthogonal concern

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. I wasn't trying to say the original comment is wrong (it isn't), I meant to say that it would be valuable to highlight that the way the incoming data is split up influences the result of the compression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, ill better clarify this!

* <p>See <a href=https://bugs.openjdk.java.net/browse/JDK-4724038>JDK-4724038</a>
*/
final class ByteBufferCleaner {
@InternalStableApi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This annotation is for APIs that need to remain binary compatible because other, independently-released Pekko components depend on them.

As you're using this API within Pekko Core, and we expect all Pekko Core artifact versions to be consistent, the annotation is not needed for this use.

Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.

As you allude to, the reason why I added @InternalStableApi is that I can imagine cases where other Pekko libraries (i.e. pekko-http or even pekko-connectors) might want to use ByteBufferCleaner and obviously this would need to be available in the Pekko 2.0.0 release to fullfil this.

Also this API is quite stable, hence why I added @InternalStableApi, but yes in general it was a forward thinking rationality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'm not super fond of committing ourselves to @InternalStableApi "just in case" without a specific plan on where to use it. Also we should seriously consider using the implementation from commons-io - definitely in satellite projects, but also here. The use of sun.misc.Unsafe is icky as well.

I'm OK with this not blocking the merge though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this code doesn't fail if Unsafe is not accessible - it accepts that GC will eventually clean but it is better for users if this Unsafe code cleans the instance earlier
  • it's one of those areas where the Java team keep telling us to stop using Unsafe but in this case, they have not yet given an alternative

inputBB.put(input.toArrayUnsafe())
inputBB.flip()
compressingStream.compress(inputBB)
val result = ByteString.fromByteBuffer(targetBuffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you still planning on this?


override def compressAndFlush(input: ByteString): ByteString = {
val inputBB = ByteBuffer.allocateDirect(input.size)
inputBB.put(input.toArray)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the recoding is now complete, right? This line still seems to be here, should it still be changed to Unsafe?

@raboof
Copy link
Member

raboof commented Nov 24, 2025

Needs a rebase for the merge conflict and the fix for the #2476 failure

private val targetBuffer = ByteBuffer.allocateDirect(65536)
private val compressingStream = new ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, compressionLevel)

dictionary.foreach { dict =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit: I find match almost always easier to read compared to foreach on Option)

@mdedetrich
Copy link
Contributor Author

I have not fully digested the PR but on first read it looks reasonable. Noted a couple of small things that are potential leftovers/nits.

So I still want to get this into 2.0.0-M1 but I haven't had time with the buffer implementation I was planning to do. For now I am thinking of doing the simple dumb solution so that we at least have a implementation with a stable API which unlocks zstd-compression for http responses in pekko-http and I can always do a better version for a future milestone release of 2.0.0.

@He-Pin
Copy link
Member

He-Pin commented Nov 26, 2025

@mdedetrich I think you can mark it as experimental and then polish it in a later release.

@mdedetrich mdedetrich force-pushed the add-zstd-compression-decompression branch 3 times, most recently from 0989b9d to 46da6fb Compare November 30, 2025 09:00
@raboof
Copy link
Member

raboof commented Dec 1, 2025

@mdedetrich are you still planning further changes? I see you pushed some changes yesterday but AFAICT there's still some open review comments

@mdedetrich
Copy link
Contributor Author

@mdedetrich are you still planning further changes? I see you pushed some changes yesterday but AFAICT there's still some open review comments

Yes, I have a couple of tests to fix to pass and then I will push changes

@mdedetrich mdedetrich force-pushed the add-zstd-compression-decompression branch from 46da6fb to 52cd78a Compare December 1, 2025 13:03
@mdedetrich mdedetrich force-pushed the add-zstd-compression-decompression branch from 52cd78a to 955e9f7 Compare December 1, 2025 13:03
@mdedetrich
Copy link
Contributor Author

@raboof So I tried to get my fancy buffering implementation done, but I am coming across a very weird issue/problem where even if I change the current implementation to something like this

override def compressAndFlush(input: ByteString): ByteString =
  ByteString.fromArrayUnsafe(Zstd.compress(input.toArrayUnsafe))

The tests start failing because onUpstreamFinish never ends up being executed.

So I think we should opt for the simple option for now, which is going to be the current implementation but avoid creating a direct byte buffer on each incoming element (instead using a pool), doing the changes now.

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Dec 1, 2025

@raboof @pjfanning @He-Pin So here is a status update, I actually found a bug in the ZstdDecompressor which I am now trying to fix. The bug is quite simple, it happens when here when the source buffer is larger than the bytebuffer causing a basic overflow. This bug didn't show in tests because individual ByteString elements in the tests happen always be smaller than the maxBytesPerChunk, but if you set maxBytesPerChunk to small value such as 8 then it immediately shows.

To solve this, I wanted to create an ultra simple implementation which would essentially just be ByteString.fromArrayUnsafe(Zstd.decompress(input.toArrayUnsafe)) but this is not possible because Zstd.decompress actually needs to know the original size of the decompressed element (and we have no context of this in a stream).

Due to this I have to shuffle data from the input ByteString to that direct bytebuffer while taking into account the fact that the input ByteString can be of any size. There is a naive implementation that transfers one byte at a time (which is really slow), so instead the implementation copies as much as it can depending on the free amount of space in the outputBuffer but understandably this is a more complicated solution. Note this solution is similar to how zstd-jni test also does.

The advantage here is that this is the exact same logic that is needed to avoid the ByteBuffer.allocateDirect(input.size) in ZstdCompressor in the current PR implementation which we need to remove anyways because of how slow ByteBuffer.allocateDirect(input.size) is (which is the only change that is needed on the compressor side).

tl;dr Almost at the finish line!

@He-Pin
Copy link
Member

He-Pin commented Dec 2, 2025

netty/netty#15861

@mdedetrich There is a related pr in netty too.

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Dec 3, 2025

@raboof @pjfanning Apologies for missing this for the Pekko 2.0.0-M1 release train, I am still working on this but its taking longer as expected. I am actually currently implementing this work as a test in zstd-jni to make my life easier and also with the hope of getting feedback from the author that it is on the right track.

Essentially the problem is that there isn't any real prior art for this, all of the current zstd streaming tests in zstd-jni either involve creating a direct ByteBuffer with the same size of the input data (which we obviously don't want to do in pekko-streams as these allocations are extremely expensive), the input source is already a direct ByteBuffer (i.e. java nio) or in the case of decompression they use methods which require the size of the original uncompressed input (which we cannot do in pekko-streams) i.e. the tests are still streaming decompression a file but since they are dealing with a file they easily have access to the original file size.

I am slightly annoyed that I didn't make the 2.0.0-M1 release, but as long as we are happy with using a nightly in pekko-http to implement the http request/response zstd work in the end it shouldn't be that problematic. You could also argue its better this way, as pekko 2.0.0 milestone was primarily about updating all of the core libraries/jvm versions + removing deprecated code and bookkeeping and we can have 2.0.0-M2 be about major features.

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Dec 3, 2025

Relevant issue luben/zstd-jni#324 and made a comment there.

@mdedetrich
Copy link
Contributor Author

mdedetrich commented Dec 4, 2025

So upstream has commented at luben/zstd-jni#324 (comment), it may make more sense to add this functionality (specifically ZstdCompressCtx.compressByteBufferStream() and ZstdDecompressCtx.decompressByteBufferStream()) into zstd-jni proper as it would help others and in turn make the pekko-streams implementation far more trivial.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add zstd compression/decompression IO Stream

5 participants