diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 8a29b57768fcb..f9a320921cc6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.codegen.CachePartitionsToReloadMapSerializer; import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer; import org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer; +import org.apache.ignite.internal.codegen.DataStreamerResponseSerializer; import org.apache.ignite.internal.codegen.DeploymentModeMessageSerializer; import org.apache.ignite.internal.codegen.ErrorMessageSerializer; import org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer; @@ -381,7 +382,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)59, GridCacheQueryResponse::new); factory.register((short)61, GridContinuousMessage::new); factory.register((short)62, DataStreamerRequest::new); - factory.register((short)63, DataStreamerResponse::new); + factory.register((short)63, DataStreamerResponse::new, new DataStreamerResponseSerializer()); factory.register((short)76, GridTaskResultRequest::new, new GridTaskResultRequestSerializer()); factory.register((short)77, GridTaskResultResponse::new, new GridTaskResultResponseSerializer()); factory.register((short)78, MissingMappingRequestMessage::new, new MissingMappingRequestMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 65f225b22a923..0fbe81b19c358 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -72,9 +72,6 @@ public class DataStreamProcessor extends GridProcessorAdapter { /** Marshaller. */ private final Marshaller marsh; - /** */ - private byte[] marshErrBytes; - /** * @param ctx Kernal context. */ @@ -96,9 +93,6 @@ public DataStreamProcessor(GridKernalContext ctx) { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { - marshErrBytes = U.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " + - "see node log for details.")); - flusher = U.newThread(new GridWorker(ctx.igniteInstanceName(), "grid-data-loader-flusher", log) { @Override protected void body() throws InterruptedException { while (!isCancelled()) { @@ -270,8 +264,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { topic, req.requestId(), new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + - ", req=" + req + ']'), - false); + ", req=" + req + ']')); return; } @@ -290,7 +283,7 @@ private void processRequest(final UUID nodeId, final DataStreamerRequest req) { catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); - sendResponse(nodeId, topic, req.requestId(), e, false); + sendResponse(nodeId, topic, req.requestId(), e); return; } @@ -356,7 +349,7 @@ else if (!topFut.isDone()) } if (remapErr != null) { - sendResponse(nodeId, topic, req.requestId(), remapErr, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), remapErr); return; } @@ -392,7 +385,7 @@ else if (topWaitFut == null) { try { job.call(); - sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), null); } finally { if (waitFut != null) @@ -400,7 +393,7 @@ else if (topWaitFut == null) { } } catch (Throwable e) { - sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment()); + sendResponse(nodeId, topic, req.requestId(), e); if (e instanceof Error) throw (Error)e; @@ -412,22 +405,11 @@ else if (topWaitFut == null) { * @param resTopic Response topic. * @param reqId Request ID. * @param err Error. - * @param forceLocDep Force local deployment. */ - private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, - boolean forceLocDep) { - byte[] errBytes; - - try { - errBytes = err != null ? U.marshal(marsh, err) : null; - } - catch (Exception e) { - U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); - - errBytes = marshErrBytes; - } + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err) { + DataStreamerResponse res = new DataStreamerResponse(reqId, err); - DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep); + res.marshalError(marsh, log); try { ctx.io().sendToCustomTopic(nodeId, resTopic, res, threadIoPolicy()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index f154866fc9805..02940fc5fb3f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -2093,17 +2093,14 @@ void onResponse(DataStreamerResponse res, UUID nodeId) { Throwable err = null; - byte[] errBytes = res.errorBytes(); + try { + GridPeerDeployAware jobPda0 = jobPda; - if (errBytes != null) { - try { - GridPeerDeployAware jobPda0 = jobPda; + res.unmarshalError(ctx, U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); - final Throwable cause = U.unmarshal( - ctx, - errBytes, - U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())); + final Throwable cause = res.error(); + if (cause != null) { final String msg = "DataStreamer request failed [node=" + nodeId + "]"; if (cause instanceof ClusterTopologyCheckedException) @@ -2113,11 +2110,11 @@ else if (X.hasCause(cause, IgniteClusterReadOnlyException.class)) else err = new IgniteCheckedException(msg, cause); } - catch (IgniteCheckedException e) { - f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); + } + catch (IgniteCheckedException e) { + f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); - return; - } + return; } f.onDone(null, err); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java index 20ac080bca01b..c5f4ac8b89996 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java @@ -17,34 +17,39 @@ package org.apache.ignite.internal.processors.datastreamer; -import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; /** * */ public class DataStreamerResponse implements Message { /** */ + @Order(value = 0, method = "requestId") private long reqId; /** */ - private byte[] errBytes; + private @Nullable Throwable err; /** */ - private boolean forceLocDep; + @Order(value = 1, method = "errorBytes") + private @Nullable byte[] errBytes; /** * @param reqId Request ID. - * @param errBytes Error bytes. - * @param forceLocDep Force local deployment. + * @param err Error. */ - public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) { + public DataStreamerResponse(long reqId, @Nullable Throwable err) { this.reqId = reqId; - this.errBytes = errBytes; - this.forceLocDep = forceLocDep; + this.err = err; } /** @@ -61,92 +66,74 @@ public long requestId() { return reqId; } + /** + * @param reqId Request ID. + */ + public void requestId(long reqId) { + this.reqId = reqId; + } + /** * @return Error bytes. */ - public byte[] errorBytes() { + public @Nullable byte[] errorBytes() { return errBytes; } /** - * @return {@code True} to force local deployment. + * @param errBytes Error bytes. */ - public boolean forceLocalDeployment() { - return forceLocDep; + public void errorBytes(@Nullable byte[] errBytes) { + this.errBytes = errBytes; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DataStreamerResponse.class, this); + /** + * @return Error. + */ + public @Nullable Throwable error() { + return err; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); + /** + * Marshal error to byte array. + * + * @param marsh Marshaller. + * @param log Ignite logger. + */ + public void marshalError(Marshaller marsh, IgniteLogger log) { + try { + errBytes = err != null ? U.marshal(marsh, err) : null; } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray(errBytes)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeBoolean(forceLocDep)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeLong(reqId)) - return false; - - writer.incrementState(); - + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e); + + try { + errBytes = U.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " + + "see node log for details.")); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex); + } } - - return true; } - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - switch (reader.state()) { - case 0: - errBytes = reader.readByteArray(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - forceLocDep = reader.readBoolean(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - reqId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); + /** + * Unmarshal error from byte array. + * + * @param ctx Kernal context. + * @param ldr Class loader. + */ + public void unmarshalError(GridKernalContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (errBytes != null && err == null) { + err = U.unmarshal(ctx, errBytes, ldr); + errBytes = null; } + } - return true; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStreamerResponse.class, this); } /** {@inheritDoc} */