-
Notifications
You must be signed in to change notification settings - Fork 186
Add zstd compression and decompression for pekko streams #2409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import java.lang.invoke.MethodHandles; | ||
| import java.lang.reflect.Field; | ||
| import java.nio.ByteBuffer; | ||
| import org.apache.pekko.annotation.InternalStableApi; | ||
|
|
||
| /** | ||
| * Cleans a direct {@link ByteBuffer}. Without manual intervention, direct ByteBuffers will be | ||
|
|
@@ -36,7 +37,8 @@ | |
| * | ||
| * <p>See <a href=https://bugs.openjdk.java.net/browse/JDK-4724038>JDK-4724038</a> | ||
| */ | ||
| final class ByteBufferCleaner { | ||
| @InternalStableApi | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This annotation is for APIs that need to remain binary compatible because other, independently-released Pekko components depend on them. As you're using this API within Pekko Core, and we expect all Pekko Core artifact versions to be consistent, the annotation is not needed for this use. Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
As you allude to, the reason why I added Also this API is quite stable, hence why I added
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I'm not super fond of committing ourselves to I'm OK with this not blocking the merge though.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| public final class ByteBufferCleaner { | ||
|
|
||
| // adapted from | ||
| // https://github.com/apache/commons-io/blob/441115a4b5cd63ae808dd4c40fc238cb52c8048f/src/main/java/org/apache/commons/io/input/ByteBufferCleaner.java | ||
|
|
@@ -75,7 +77,7 @@ public void clean(final ByteBuffer buffer) throws Throwable { | |
| * @param buffer to release. | ||
| * @throws IllegalStateException on internal failure. | ||
| */ | ||
| static void clean(final ByteBuffer buffer) { | ||
| public static void clean(final ByteBuffer buffer) { | ||
| try { | ||
| INSTANCE.clean(buffer); | ||
| } catch (final Throwable t) { | ||
|
|
@@ -116,7 +118,7 @@ private static Cleaner getCleaner() { | |
| * | ||
| * @return {@code true} if cleaning is supported, {@code false} otherwise. | ||
| */ | ||
| static boolean isSupported() { | ||
| public static boolean isSupported() { | ||
| return INSTANCE != null; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| # Compression.zstd | ||
|
|
||
| Creates a flow that zstd-compresses a stream of ByteStrings. | ||
|
|
||
| @ref[Compression operators](../index.md#compression-operators) | ||
|
|
||
| ## Signature | ||
|
|
||
| @apidoc[Compression.zstd](stream.*.Compression$) { scala="#zstd:org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstd()" } | ||
|
|
||
| ## Description | ||
|
|
||
| Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor | ||
| will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString] | ||
|
Comment on lines
+13
to
+14
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also means that if the way the incoming data is cut up into
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually its the opposite, since a sync flush happens on every stream element it means that there is a direct one to one relationship of every element having the exact same data compressed (there is actually a test that validates this)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But that is not the case "if the way the incoming data is cut up into (this is not a fault of the stream compression, but a truth that would be good to highlight)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes true, I believe that comment is strictly talking about the source happens right before going into compression and not the entire "stack". All the comment is trying to convey is that if The network can always split up data as it wants, but thats an orthogonal concern
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct. I wasn't trying to say the original comment is wrong (it isn't), I meant to say that it would be valuable to highlight that the way the incoming data is split up influences the result of the compression.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, ill better clarify this! |
||
| coming out of the flow can be fully decompressed without waiting for additional data. This may | ||
| come at a compression performance cost for very small chunks. | ||
|
|
||
| Use the overload method to control the compression level. | ||
|
|
||
| ## Reactive Streams semantics | ||
|
|
||
| @@@div { .callout } | ||
|
|
||
| **emits** when the compression algorithm produces output for the received `ByteString` | ||
|
|
||
| **backpressures** when downstream backpressures | ||
|
|
||
| **completes** when upstream completes | ||
|
|
||
| @@@ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| # Compression.zstdDecompress | ||
|
|
||
| Creates a flow that zstd-decompresses a stream of ByteStrings. | ||
|
|
||
| @ref[Compression operators](../index.md#compression-operators) | ||
|
|
||
| ## Signature | ||
|
|
||
| @apidoc[Compression.zstdDecompress](stream.*.Compression$) { scala="#zstdDecompress(maxBytesPerChunk:Int):org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstdDecompress(int)" } | ||
|
|
||
| ## Description | ||
|
|
||
| Creates a flow that zstd-decompresses a stream of ByteStrings. If the input is truncated, uses invalid | ||
| compression method or is invalid (failed CRC checks) this operator fails with a `com.github.luben.zstd.ZstdIOException`. | ||
|
|
||
| ## Reactive Streams semantics | ||
|
|
||
| @@@div { .callout } | ||
|
|
||
| **emits** when the decompression algorithm produces output for the received `ByteString` (the emitted `ByteString` is of `maxBytesPerChunk` maximum length) | ||
|
|
||
| **backpressures** when downstream backpressures | ||
|
|
||
| **completes** when upstream completes | ||
|
|
||
| @@@ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -352,7 +352,8 @@ object Dependencies { | |
|
|
||
| // pekko stream | ||
|
|
||
| lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest) | ||
| lazy val stream = | ||
| l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we make the zstd dependency optional or provided?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making it optional/provided is not going to give as any benefits as we would need to force users to redundantly provide a zstd implementation (which is going to be To do this properly, we would need to make a zstd agnostic API and then provided concrete implementations in a different artifact and considering that there is only a single candidate for a proper implementation (as discussed at #2404 (comment)) we wouldn't gain any benefit out of it. Hence zstd is being treated the same way as lz4Java is (for the same reasons), where we have a simple direct dependency
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On another note there is a silver lining here, since zstd-jni uses JNI, the C shared objects which are platform dependent are contained within the jar artifact. The This does give some credence to using provided, as a user can provide their own zstd-jni artifact however there are still some things to note here. If we end up using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You could make the same argument as to why lz4Java isn't its own artifact, or any of the other compression algorithms. Also, due to the way the current API is designed, all of the compression algorithms are contained within the pekko-streams
I understand the sentiment, but honestly this is creating a huge amount of ceremony for no real benefit and the only reason we are having this discussion is because of how behind Java is in adding what are de facto standard compression algorithms to the stdlib (of which zstd has been for at least half a decade now). The only reason I can see to use separate artifacts would be to support Pekko 1.x series, but zstd-jni only supports JDK 11 or higher and this would then need to be advertised to end users.
Put differently, the entire point of |
||
|
|
||
| lazy val streamTestkit = l ++= Seq( | ||
| TestDependencies.scalatest, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.pekko.stream.io.compression | ||
|
|
||
| import org.apache.pekko | ||
| import pekko.stream.scaladsl.{ Compression, Flow } | ||
| import pekko.util.ByteString | ||
|
|
||
| class ZstdAutoFlushSpec extends ZstdSpec { | ||
| override protected val encoderFlow: Flow[ByteString, ByteString, Any] = | ||
| Compression.zstd(Compression.ZstdDefaultCompressionLevel, dictionary = None, autoFlush = false) | ||
|
|
||
| override protected val autoFlush: Boolean = false | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.pekko.stream.io.compression | ||
|
|
||
| import com.github.luben.zstd.{ ZstdIOException, ZstdInputStream, ZstdOutputStream } | ||
|
|
||
| import org.apache.pekko | ||
| import pekko.stream.impl.io.compression.{ Compressor, ZstdCompressor } | ||
| import pekko.stream.scaladsl.{ Compression, Flow } | ||
| import pekko.util.ByteString | ||
|
|
||
| import java.io.{ InputStream, OutputStream } | ||
|
|
||
| class ZstdSpec extends CoderSpec[ZstdIOException]("zstd") { | ||
| import CompressionTestingTools._ | ||
|
|
||
| override protected def newCompressor(): Compressor = new ZstdCompressor | ||
|
|
||
| override protected def encoderFlow: Flow[ByteString, ByteString, Any] = Compression.zstd | ||
|
|
||
| override protected def decoderFlow(maxBytesPerChunk: Int): Flow[ByteString, ByteString, Any] = | ||
| Compression.zstdDecompress(maxBytesPerChunk) | ||
|
|
||
| override protected def newDecodedInputStream(underlying: InputStream): InputStream = | ||
| new ZstdInputStream(underlying) | ||
|
|
||
| override protected def newEncodedOutputStream(underlying: OutputStream): OutputStream = | ||
| new ZstdOutputStream(underlying) | ||
|
|
||
| override def extraTests(): Unit = { | ||
| "decode concatenated compressions" in { | ||
| ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!") | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,103 @@ | ||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||||||||||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| package org.apache.pekko.stream.impl.io.compression | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import java.nio.ByteBuffer | ||||||||||||||||||||||||||||||
| import com.github.luben.zstd.{ ZstdDictCompress, ZstdDirectBufferCompressingStreamNoFinalizer } | ||||||||||||||||||||||||||||||
| import org.apache.pekko | ||||||||||||||||||||||||||||||
| import org.apache.pekko.stream.scaladsl.Compression | ||||||||||||||||||||||||||||||
| import pekko.annotation.InternalApi | ||||||||||||||||||||||||||||||
| import pekko.util.ByteString | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /** INTERNAL API */ | ||||||||||||||||||||||||||||||
| @InternalApi private[pekko] class ZstdCompressor( | ||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mark it as not threadsafe
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean with a comment or is there some annotation I am not aware of? |
||||||||||||||||||||||||||||||
| compressionLevel: Int = | ||||||||||||||||||||||||||||||
| Compression.ZstdDefaultCompressionLevel, dictionary: Option[ZstdDictionaryImpl] = None) extends Compressor { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private val targetBuffer = ByteBuffer.allocateDirect(65536) | ||||||||||||||||||||||||||||||
| private val compressingStream = new ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, compressionLevel) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| dictionary.foreach { dict => | ||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (nit: I find |
||||||||||||||||||||||||||||||
| (dict.level, dict.length, dict.offset) match { | ||||||||||||||||||||||||||||||
| case (None, None, None) => | ||||||||||||||||||||||||||||||
| compressingStream.setDict(dict.dictionary) | ||||||||||||||||||||||||||||||
| case (Some(dictLevel), None, None) => | ||||||||||||||||||||||||||||||
| compressingStream.setDict(new ZstdDictCompress(dict.dictionary, dictLevel)) | ||||||||||||||||||||||||||||||
| case (Some(dictLevel), Some(dictLength), Some(dictOffset)) => | ||||||||||||||||||||||||||||||
| compressingStream.setDict(new ZstdDictCompress(dict.dictionary, dictLevel, dictLength, dictOffset)) | ||||||||||||||||||||||||||||||
| case _ => | ||||||||||||||||||||||||||||||
| throw new IllegalArgumentException("Invalid combination of ZstdDictionary parameters") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| override def compress(input: ByteString): ByteString = { | ||||||||||||||||||||||||||||||
| val inputBB = ByteBuffer.allocateDirect(input.size) | ||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a ByteBufferPool, can we use that here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possibly? I just read the implementation now and it appears that the main use case of However from reading the |
||||||||||||||||||||||||||||||
| inputBB.put(input.toArrayUnsafe()) | ||||||||||||||||||||||||||||||
| inputBB.flip() | ||||||||||||||||||||||||||||||
mdedetrich marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||
| compressingStream.compress(inputBB) | ||||||||||||||||||||||||||||||
| val result = ByteString.fromByteBuffer(targetBuffer) | ||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed Netty's bytebuf here, when we release the bytebuf, it will returns to the pool, which avloid the fragment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will rewrite the implementation to use the pekko
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you still planning on this? |
||||||||||||||||||||||||||||||
| targetBuffer.flip() | ||||||||||||||||||||||||||||||
mdedetrich marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+53
to
+54
|
||||||||||||||||||||||||||||||
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.flip() | |
| targetBuffer.flip() | |
| val result = ByteString.fromByteBuffer(targetBuffer) |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush operation calls compressingStream.flush() after extracting the result from the buffer. The flush result is discarded. The correct order should be: flip buffer, flush stream (which may write more data to buffer), extract result, then clear buffer.
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.clear() | |
| compressingStream.flush() | |
| compressingStream.flush() | |
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.clear() |
Copilot
AI
Oct 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compressingStream.flush() is called after extracting the result and clearing the buffer. This means any data written by flush() to targetBuffer will be lost. The flush should be called before flipping the buffer, and the buffer should be cleared after creating the result ByteString, similar to the pattern in compressAndFlush.
| targetBuffer.flip() | |
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.clear() | |
| compressingStream.flush() | |
| compressingStream.flush() | |
| targetBuffer.flip() | |
| val result = ByteString.fromByteBuffer(targetBuffer) | |
| targetBuffer.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to make both this class and specific methods public because currently its package private to
org.apache.pekko.ioand the pekko stream implementations are not in that package.I marked the class to be
InternalStableApi, let me know if should be something else.