Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to make both this class and specific methods public because currently its package private to org.apache.pekko.io and the pekko stream implementations are not in that package.

I marked the class to be InternalStableApi, let me know if should be something else.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import org.apache.pekko.annotation.InternalStableApi;

/**
* Cleans a direct {@link ByteBuffer}. Without manual intervention, direct ByteBuffers will be
Expand All @@ -36,7 +37,8 @@
*
* <p>See <a href=https://bugs.openjdk.java.net/browse/JDK-4724038>JDK-4724038</a>
*/
final class ByteBufferCleaner {
@InternalStableApi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This annotation is for APIs that need to remain binary compatible because other, independently-released Pekko components depend on them.

As you're using this API within Pekko Core, and we expect all Pekko Core artifact versions to be consistent, the annotation is not needed for this use.

Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to use this API in other Pekko modules? If not then we can remove this annotation here. If so then we should add a note to the scaladoc pointing to where it's (going to be) used.

As you allude to, the reason why I added @InternalStableApi is that I can imagine cases where other Pekko libraries (i.e. pekko-http or even pekko-connectors) might want to use ByteBufferCleaner and obviously this would need to be available in the Pekko 2.0.0 release to fullfil this.

Also this API is quite stable, hence why I added @InternalStableApi, but yes in general it was a forward thinking rationality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'm not super fond of committing ourselves to @InternalStableApi "just in case" without a specific plan on where to use it. Also we should seriously consider using the implementation from commons-io - definitely in satellite projects, but also here. The use of sun.misc.Unsafe is icky as well.

I'm OK with this not blocking the merge though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • this code doesn't fail if Unsafe is not accessible - it accepts that GC will eventually clean but it is better for users if this Unsafe code cleans the instance earlier
  • it's one of those areas where the Java team keep telling us to stop using Unsafe but in this case, they have not yet given an alternative

public final class ByteBufferCleaner {

// adapted from
// https://github.com/apache/commons-io/blob/441115a4b5cd63ae808dd4c40fc238cb52c8048f/src/main/java/org/apache/commons/io/input/ByteBufferCleaner.java
Expand Down Expand Up @@ -75,7 +77,7 @@ public void clean(final ByteBuffer buffer) throws Throwable {
* @param buffer to release.
* @throws IllegalStateException on internal failure.
*/
static void clean(final ByteBuffer buffer) {
public static void clean(final ByteBuffer buffer) {
try {
INSTANCE.clean(buffer);
} catch (final Throwable t) {
Expand Down Expand Up @@ -116,7 +118,7 @@ private static Cleaner getCleaner() {
*
* @return {@code true} if cleaning is supported, {@code false} otherwise.
*/
static boolean isSupported() {
public static boolean isSupported() {
return INSTANCE != null;
}
}
30 changes: 30 additions & 0 deletions docs/src/main/paradox/stream/operators/Compression/zstd.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Compression.zstd

Creates a flow that zstd-compresses a stream of ByteStrings.

@ref[Compression operators](../index.md#compression-operators)

## Signature

@apidoc[Compression.zstd](stream.*.Compression$) { scala="#zstd:org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstd()" }

## Description

Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor
will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString]
Comment on lines +13 to +14
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also means that if the way the incoming data is cut up into ByteStrings is not deterministic, the compressed stream will not be deterministic, right? I think that is acceptable, but would be worth calling out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually its the opposite, since a sync flush happens on every stream element it means that there is a direct one to one relationship of every element having the exact same data compressed (there is actually a test that validates this)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is not the case "if the way the incoming data is cut up into ByteStrings is not deterministic", right? E.g when the data is coming from the network?

(this is not a fault of the stream compression, but a truth that would be good to highlight)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is not the case "if the way the incoming data is cut up into ByteStrings is not deterministic", right? E.g when the data is coming from the network?

Yes true, I believe that comment is strictly talking about the source happens right before going into compression and not the entire "stack". All the comment is trying to convey is that if autoFlush is true, any buffers will get processed and cleared for each incoming element, so if you have "abc" as an incoming element, you will always get the compressed "abc" as a result, and not lets say "a" and then "bc" as a further element or some variation.

The network can always split up data as it wants, but thats an orthogonal concern

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. I wasn't trying to say the original comment is wrong (it isn't), I meant to say that it would be valuable to highlight that the way the incoming data is split up influences the result of the compression.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, ill better clarify this!

coming out of the flow can be fully decompressed without waiting for additional data. This may
come at a compression performance cost for very small chunks.

Use the overload method to control the compression level.

## Reactive Streams semantics

@@@div { .callout }

**emits** when the compression algorithm produces output for the received `ByteString`

**backpressures** when downstream backpressures

**completes** when upstream completes

@@@
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Compression.zstdDecompress

Creates a flow that zstd-decompresses a stream of ByteStrings.

@ref[Compression operators](../index.md#compression-operators)

## Signature

@apidoc[Compression.zstdDecompress](stream.*.Compression$) { scala="#zstdDecompress(maxBytesPerChunk:Int):org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstdDecompress(int)" }

## Description

Creates a flow that zstd-decompresses a stream of ByteStrings. If the input is truncated, uses invalid
compression method or is invalid (failed CRC checks) this operator fails with a `com.github.luben.zstd.ZstdIOException`.

## Reactive Streams semantics

@@@div { .callout }

**emits** when the decompression algorithm produces output for the received `ByteString` (the emitted `ByteString` is of `maxBytesPerChunk` maximum length)

**backpressures** when downstream backpressures

**completes** when upstream completes

@@@
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ object Dependencies {

// pekko stream

lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
lazy val stream =
l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make the zstd dependency optional or provided?

Copy link
Contributor Author

@mdedetrich mdedetrich Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it optional/provided is not going to give as any benefits as we would need to force users to redundantly provide a zstd implementation (which is going to be "com.github.luben" % "zstd-jni" % "1.5.7-6" anyways).

To do this properly, we would need to make a zstd agnostic API and then provided concrete implementations in a different artifact and considering that there is only a single candidate for a proper implementation (as discussed at #2404 (comment)) we wouldn't gain any benefit out of it.

Hence zstd is being treated the same way as lz4Java is (for the same reasons), where we have a simple direct dependency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On another note there is a silver lining here, since zstd-jni uses JNI, the C shared objects which are platform dependent are contained within the jar artifact. The "com.github.luben" % "zstd-jni" % "1.5.7-6" artifact contains the C shared objects for all of the platforms (see https://github.com/luben/zstd-jni?tab=readme-ov-file#binary-releases for which ones are supported) but there are platform dependent jars if you only care about a single platform.

This does give some credence to using provided, as a user can provide their own zstd-jni artifact however there are still some things to note here. If we end up using provided then we force all pekko end users to add a zstd-jni artifact to their dependencies (with "com.github.luben" % "zstd-jni" % "1.5.7-6" being the documented default) as its not going to be including by in the dependency resolution.

Copy link
Contributor Author

@mdedetrich mdedetrich Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having a separate pekko-streams-ztsd or pekko-streams-compression so that pekko-streams core lib doesn't need this dependency? ZIO have done something similar where a small number of compression operators are in zio-streams but others have their own libs, eg zio-streams-compress-zstd.

You could make the same argument as to why lz4Java isn't its own artifact, or any of the other compression algorithms. Also, due to the way the current API is designed, all of the compression algorithms are contained within the pekko-streams Compression object (which we could monkey patch to add more methods as a separate artifact, but thats starting to look ugly already). Ontop of that, when adding zstd compression to pekko http request/response headers (see apache/pekko-http#860), we would likewise have to create yet another artifact for pekko-http.

zio-streams-compress is also not the best to compare to, since if you look at https://zio.dev/zio-streams-compress/ you will see they also have separate artifacts for gzip and lz4 something is contained within pekko-streams core artifact for us. zio appears to have a different design ethos here, where they err on the side of hyper modularity and put everything in its own artifact where as pekko is on the other side.

I understand the sentiment, but honestly this is creating a huge amount of ceremony for no real benefit and the only reason we are having this discussion is because of how behind Java is in adding what are de facto standard compression algorithms to the stdlib (of which zstd has been for at least half a decade now). The only reason I can see to use separate artifacts would be to support Pekko 1.x series, but zstd-jni only supports JDK 11 or higher and this would then need to be advertised to end users.

zio-streams-compress is also not the best comparison, since if you look at https://zio.dev/zio-streams-compress/ you will see they also have separate artifacts for gzip and lz4, where as for us these compression algorithms are contained within the core pekko-streams artifact. zio appears to have a different design ethos here, where they err on the side of hyper modularity and put everything in its own artifact where as pekko is on the other side.

Put differently, the entire point of Compression object for scaladsl/javadsl is to contain the standard compression/decompression algorithms, and zstd is one of those. While having to use zstd-jni is not ideal, to the end user there is no difference as it already supports almost every architecture that would be used in the wild.


lazy val streamTestkit = l ++= Seq(
TestDependencies.scalatest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ package org.apache.pekko.stream.io.compression

import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
import java.util.concurrent.ThreadLocalRandom
import java.util.zip.DataFormatException

import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.control.NoStackTrace

import org.apache.pekko
Expand All @@ -31,7 +31,8 @@ import pekko.util.ByteString
import org.scalatest.Inspectors
import org.scalatest.wordspec.AnyWordSpec

abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSupport with Inspectors {
abstract class CoderSpec[CorruptInputException: ClassTag](codecName: String) extends AnyWordSpec with CodecSpecSupport
with Inspectors {
import CompressionTestingTools._

protected def newCompressor(): Compressor
Expand Down Expand Up @@ -85,7 +86,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
"throw an error on corrupt input" in {
(the[RuntimeException] thrownBy {
ourDecode(corruptContent)
}).ultimateCause should be(a[DataFormatException])
}).ultimateCause should be(a[CorruptInputException])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import pekko.stream.impl.io.compression.{ Compressor, DeflateCompressor }
import pekko.stream.scaladsl.{ Compression, Flow }
import pekko.util.ByteString

class DeflateSpec extends CoderSpec("deflate") {
class DeflateSpec extends CoderSpec[DataFormatException]("deflate") {
import CompressionTestingTools._

protected def newCompressor(): Compressor = new DeflateCompressor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ package org.apache.pekko.stream.io.compression

import java.io.{ InputStream, OutputStream }
import java.nio.charset.StandardCharsets
import java.util.zip.{ GZIPInputStream, GZIPOutputStream, ZipException }
import java.util.zip.{ DataFormatException, GZIPInputStream, GZIPOutputStream, ZipException }

import org.apache.pekko
import pekko.stream.impl.io.compression.{ Compressor, GzipCompressor }
import pekko.stream.scaladsl.{ Compression, Flow }
import pekko.util.ByteString

class GzipSpec extends CoderSpec("gzip") {
class GzipSpec extends CoderSpec[DataFormatException]("gzip") {
import CompressionTestingTools._

protected def newCompressor(): Compressor = new GzipCompressor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.io.compression

import org.apache.pekko
import pekko.stream.scaladsl.{ Compression, Flow }
import pekko.util.ByteString

class ZstdAutoFlushSpec extends ZstdSpec {
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
Compression.zstd(Compression.ZstdDefaultCompressionLevel, dictionary = None, autoFlush = false)

override protected val autoFlush: Boolean = false

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.io.compression

import com.github.luben.zstd.{ ZstdIOException, ZstdInputStream, ZstdOutputStream }

import org.apache.pekko
import pekko.stream.impl.io.compression.{ Compressor, ZstdCompressor }
import pekko.stream.scaladsl.{ Compression, Flow }
import pekko.util.ByteString

import java.io.{ InputStream, OutputStream }

class ZstdSpec extends CoderSpec[ZstdIOException]("zstd") {
import CompressionTestingTools._

override protected def newCompressor(): Compressor = new ZstdCompressor

override protected def encoderFlow: Flow[ByteString, ByteString, Any] = Compression.zstd

override protected def decoderFlow(maxBytesPerChunk: Int): Flow[ByteString, ByteString, Any] =
Compression.zstdDecompress(maxBytesPerChunk)

override protected def newDecodedInputStream(underlying: InputStream): InputStream =
new ZstdInputStream(underlying)

override protected def newEncodedOutputStream(underlying: OutputStream): OutputStream =
new ZstdOutputStream(underlying)

override def extraTests(): Unit = {
"decode concatenated compressions" in {
ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package org.apache.pekko.stream.scaladsl
import java.nio.charset.StandardCharsets

import org.apache.pekko
import pekko.stream.impl.io.compression.DeflateCompressor
import pekko.stream.impl.io.compression.GzipCompressor
import pekko.stream.impl.io.compression.{ DeflateCompressor, GzipCompressor, ZstdCompressor }
import pekko.stream.testkit.StreamSpec
import pekko.util.ByteString

Expand All @@ -27,6 +26,8 @@ class CompressionSpec extends StreamSpec {

def deflate(s: String): ByteString = new DeflateCompressor().compressAndFinish(ByteString(s))

def zstd(s: String): ByteString = new ZstdCompressor().compressAndFinish(ByteString(s))

val data = "hello world"

"Gzip decompression" must {
Expand All @@ -47,4 +48,14 @@ class CompressionSpec extends StreamSpec {
res.futureValue should ===(data)
}
}

"Zstd decompression" must {
"be able to decompress a zstd stream" in {
val source =
Source.single(zstd(data)).via(Compression.zstdDecompress()).map(_.decodeString(StandardCharsets.UTF_8))

val res = source.runFold("")(_ + _)
res.futureValue should ===(data)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.impl.io.compression

import java.nio.ByteBuffer
import com.github.luben.zstd.{ ZstdDictCompress, ZstdDirectBufferCompressingStreamNoFinalizer }
import org.apache.pekko
import org.apache.pekko.stream.scaladsl.Compression
import pekko.annotation.InternalApi
import pekko.util.ByteString

/** INTERNAL API */
@InternalApi private[pekko] class ZstdCompressor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark it as not threadsafe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean with a comment or is there some annotation I am not aware of?

compressionLevel: Int =
Compression.ZstdDefaultCompressionLevel, dictionary: Option[ZstdDictionaryImpl] = None) extends Compressor {

private val targetBuffer = ByteBuffer.allocateDirect(65536)
private val compressingStream = new ZstdDirectBufferCompressingStreamNoFinalizer(targetBuffer, compressionLevel)

dictionary.foreach { dict =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit: I find match almost always easier to read compared to foreach on Option)

(dict.level, dict.length, dict.offset) match {
case (None, None, None) =>
compressingStream.setDict(dict.dictionary)
case (Some(dictLevel), None, None) =>
compressingStream.setDict(new ZstdDictCompress(dict.dictionary, dictLevel))
case (Some(dictLevel), Some(dictLength), Some(dictOffset)) =>
compressingStream.setDict(new ZstdDictCompress(dict.dictionary, dictLevel, dictLength, dictOffset))
case _ =>
throw new IllegalArgumentException("Invalid combination of ZstdDictionary parameters")
}
}

override def compress(input: ByteString): ByteString = {
val inputBB = ByteBuffer.allocateDirect(input.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a ByteBufferPool, can we use that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly? I just read the implementation now and it appears that the main use case of ByteBufferPool is if you need a pool of buffers where as in our case we only have a static set amount of direct byte buffers and they have different sizes.

However from reading the ByteBufferPool code I did found another improvement, we can use ByteBufferCleaner.clean(byteBuffer) to clean up the byte buffers in a timely manner (will implement this).

inputBB.put(input.toArrayUnsafe())
inputBB.flip()
compressingStream.compress(inputBB)
val result = ByteString.fromByteBuffer(targetBuffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed Netty's bytebuf here, when we release the bytebuf, it will returns to the pool, which avloid the fragment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rewrite the implementation to use the pekko DirectByteBufferPool

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you still planning on this?

targetBuffer.flip()
Comment on lines +53 to +54
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of operations is incorrect. ByteString.fromByteBuffer reads from the current position to the limit, but targetBuffer hasn't been flipped yet. The flip should occur before calling fromByteBuffer. This should be: targetBuffer.flip() followed by val result = ByteString.fromByteBuffer(targetBuffer).

Suggested change
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.flip()
targetBuffer.flip()
val result = ByteString.fromByteBuffer(targetBuffer)

Copilot uses AI. Check for mistakes.
result
}

override def flush(): ByteString = {
targetBuffer.flip()
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()
compressingStream.flush()
Comment on lines +60 to +62
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush operation calls compressingStream.flush() after extracting the result from the buffer. The flush result is discarded. The correct order should be: flip buffer, flush stream (which may write more data to buffer), extract result, then clear buffer.

Suggested change
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()
compressingStream.flush()
compressingStream.flush()
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()

Copilot uses AI. Check for mistakes.
Comment on lines +59 to +62
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compressingStream.flush() is called after extracting the result and clearing the buffer. This means any data written by flush() to targetBuffer will be lost. The flush should be called before flipping the buffer, and the buffer should be cleared after creating the result ByteString, similar to the pattern in compressAndFlush.

Suggested change
targetBuffer.flip()
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()
compressingStream.flush()
compressingStream.flush()
targetBuffer.flip()
val result = ByteString.fromByteBuffer(targetBuffer)
targetBuffer.clear()

Copilot uses AI. Check for mistakes.
result
}

override def finish(): ByteString = {
compressingStream.close()
targetBuffer.flip()
val arr = Array.ofDim[Byte](targetBuffer.limit())
targetBuffer.get(arr)
val result = ByteString.fromArrayUnsafe(arr)
result
}

override def compressAndFlush(input: ByteString): ByteString = {
val inputBB = ByteBuffer.allocateDirect(input.size)
inputBB.put(input.toArrayUnsafe())
inputBB.flip()
compressingStream.compress(inputBB)
compressingStream.flush()
targetBuffer.flip()

val arr = new Array[Byte](targetBuffer.limit())
targetBuffer.get(arr)
targetBuffer.clear()
ByteString.fromArrayUnsafe(arr)
}

override def compressAndFinish(input: ByteString): ByteString = {
val inputBB = ByteBuffer.allocateDirect(input.size)
inputBB.put(input.toArrayUnsafe())
inputBB.flip()
compressingStream.compress(inputBB)
compressingStream.close()
targetBuffer.flip()

val arr = new Array[Byte](targetBuffer.limit())
targetBuffer.get(arr)
ByteString.fromArrayUnsafe(arr)
}

override def close(): Unit = compressingStream.close()
}
Loading