From 7278bc779e368b9e041e87b8d73f77b954607881 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 8 Dec 2025 15:28:39 +0500 Subject: [PATCH 1/2] IGNITE-27270 Use MessageSerializer for DataStreamerResponse --- .../communication/GridIoMessageFactory.java | 3 +- .../datastreamer/DataStreamProcessor.java | 17 ++-- .../datastreamer/DataStreamerResponse.java | 95 +++---------------- 3 files changed, 23 insertions(+), 92 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 8a29b57768fcb..f9a320921cc6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; +import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer; import org.apache.ignite.internal.codegen.DeploymentModeMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -381,7 +382,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)59, GridCacheQueryResponse::new); factory.register((short)61, GridContinuousMessage::new); factory.register((short)62, DataStreamerRequest::new); - factory.register((short)63, DataStreamerResponse::new); + factory.register((short)63, DataStreamerResponse::new, new DataStreamerResponseSerializer()); factory.register((short)76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer()); factory.register((short)77, GridTaskResultResponse::new, new GridTaskResultResponseSerializer()); factory.register((short)78, MissingMappingRequestMessage::new, new MissingMappingRequestMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 65f225b22a923..df87046505ba9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -270,8 +270,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { topic, req.requestId(), new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + - ", req=" + req + ']'), - false); + ", req=" + req + ']')); return; } @@ -290,7 +289,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); - sendResponse(nodeId, topic, req.requestId(), e, false); + sendResponse(nodeId, topic, req.requestId(), e); return; } @@ -356,7 +355,7 @@ else if (!topFut.isDone()) } if (remapErr != null) { - sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), remapErr); return; } @@ -392,7 +391,7 @@ else if (topWaitFut == null) { try { job.call(); - sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), null); } finally { if (waitFut != null) @@ -400,7 +399,7 @@ else if (topWaitFut == null) { } } catch (Throwable e) { - sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), e); if (e instanceof Error) throw (Error)e; @@ -412,10 +411,8 @@ else if (topWaitFut == null) { * @param resTopic Response topic. * @param reqId Request ID. * @param err Error. - * @param forceLocDep Force local deployment. */ - private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, - boolean forceLocDep) { + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err) { byte[] errBytes; try { @@ -427,7 +424,7 @@ private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Th errBytes = marshErrBytes; } - DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); + DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes); try { ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java index 20ac080bca01b..e17d044a516b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java @@ -17,34 +17,29 @@ package org.apache.ignite.internal.processors.datastreamer; -import java.nio.ByteBuffer; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * */ public class DataStreamerResponse implements Message { /** */ + @Order(value = 0, method = "requestId") private long reqId; /** */ + @Order(value = 1, method = "errorBytes") private byte[] errBytes; - /** */ - private boolean forceLocDep; - /** * @param reqId Request ID. * @param errBytes Error bytes. - * @param forceLocDep Force local deployment. */ - public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + public DataStreamerResponse(long reqId, byte[] errBytes) { this.reqId = reqId; this.errBytes = errBytes; - this.forceLocDep = forceLocDep; } /** @@ -61,6 +56,13 @@ public long requestId() { return reqId; } + /** + * @param reqId Request ID. + */ + public void requestId(long reqId) { + this.reqId = reqId; + } + /** * @return Error bytes. */ @@ -69,10 +71,10 @@ public byte[] errorBytes() { } /** - * @return {@code True} to force local deployment. + * @param errBytes Error bytes. */ - public boolean forceLocalDeployment() { - return forceLocDep; + public void errorBytes(byte[] errBytes) { + this.errBytes = errBytes; } /** {@inheritDoc} */ @@ -80,75 +82,6 @@ public boolean forceLocalDeployment() { return S.toString(DataStreamerResponse.class, this); } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray(errBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeBoolean(forceLocDep)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong(reqId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - errBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - forceLocDep = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - reqId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public short directType() { return 63; From 5b245cd182f51712f814a26704d22692af34c01b Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Mon, 8 Dec 2025 16:31:06 +0500 Subject: [PATCH 2/2] IGNITE-27270 refactor --- .../datastreamer/DataStreamProcessor.java | 19 +----- .../datastreamer/DataStreamerImpl.java | 21 +++--- .../datastreamer/DataStreamerResponse.java | 66 +++++++++++++++++-- 3 files changed, 71 insertions(+), 35 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index df87046505ba9..0fbe81b19c358 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -72,9 +72,6 @@ public class DataStreamProcessor extends GridProcessorAdapter { /** Marshaller. */ private final Marshaller marsh; - /** */ - private byte[] marshErrBytes; - /** * @param ctx Kernal context. */ @@ -96,9 +93,6 @@ public DataStreamProcessor(GridKernalContext ctx) { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - marshErrBytes = U.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " + - "see node log for details.")); - flusher = U.newThread(new GridWorker(ctx.igniteInstanceName(), "grid-data-loader-flusher", log) { @Override protected void body() throws InterruptedException { while (!isCancelled()) { @@ -413,18 +407,9 @@ else if (topWaitFut == null) { * @param err Error. */ private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err) { - byte[] errBytes; - - try { - errBytes = err != null ? U.marshal(marsh, err) : null; - } - catch (Exception e) { - U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); - - errBytes = marshErrBytes; - } + DataStreamerResponse res = new DataStreamerResponse(reqId, err); - DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes); + res.marshalError(marsh, log); try { ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index f154866fc9805..02940fc5fb3f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -2093,17 +2093,14 @@ void onResponse(DataStreamerResponse res, UUID nodeId) { Throwable err = null; - byte[] errBytes = res.errorBytes(); + try { + GridPeerDeployAware jobPda0 = jobPda; - if (errBytes != null) { - try { - GridPeerDeployAware jobPda0 = jobPda; + res.unmarshalError(ctx, U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); - final Throwable cause = U.unmarshal( - ctx, - errBytes, - U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); + final Throwable cause = res.error(); + if (cause != null) { final String msg = "DataStreamer request failed [node=" + nodeId + "]"; if (cause instanceof ClusterTopologyCheckedException) @@ -2113,11 +2110,11 @@ else if (X.hasCause(cause, IgniteClusterReadOnlyException.class)) else err = new IgniteCheckedException(msg, cause); } - catch (IgniteCheckedException e) { - f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); + } + catch (IgniteCheckedException e) { + f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); - return; - } + return; } f.onDone(null, err); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java index e17d044a516b7..c5f4ac8b89996 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java @@ -17,9 +17,16 @@ package org.apache.ignite.internal.processors.datastreamer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; /** * @@ -29,17 +36,20 @@ public class DataStreamerResponse implements Message { @Order(value = 0, method = "requestId") private long reqId; + /** */ + private @Nullable Throwable err; + /** */ @Order(value = 1, method = "errorBytes") - private byte[] errBytes; + private @Nullable byte[] errBytes; /** * @param reqId Request ID. - * @param errBytes Error bytes. + * @param err Error. */ - public DataStreamerResponse(long reqId, byte[] errBytes) { + public DataStreamerResponse(long reqId, @Nullable Throwable err) { this.reqId = reqId; - this.errBytes = errBytes; + this.err = err; } /** @@ -66,17 +76,61 @@ public void requestId(long reqId) { /** * @return Error bytes. */ - public byte[] errorBytes() { + public @Nullable byte[] errorBytes() { return errBytes; } /** * @param errBytes Error bytes. */ - public void errorBytes(byte[] errBytes) { + public void errorBytes(@Nullable byte[] errBytes) { this.errBytes = errBytes; } + /** + * @return Error. + */ + public @Nullable Throwable error() { + return err; + } + + /** + * Marshal error to byte array. + * + * @param marsh Marshaller. + * @param log Ignite logger. + */ + public void marshalError(Marshaller marsh, IgniteLogger log) { + try { + errBytes = err != null ? U.marshal(marsh, err) : null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); + + try { + errBytes = U.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " + + "see node log for details.")); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } + } + } + + /** + * Unmarshal error from byte array. + * + * @param ctx Kernal context. + * @param ldr Class loader. + */ + public void unmarshalError(GridKernalContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (errBytes != null && err == null) { + err = U.unmarshal(ctx, errBytes, ldr); + + errBytes = null; + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStreamerResponse.class, this);