From 228f9d131bd7c36c55bea185dbbba360c5454a89 Mon Sep 17 00:00:00 2001 From: joecqupt Date: Fri, 17 Jan 2025 23:00:55 +0800 Subject: [PATCH 1/3] Fix ConsumeDriver running status --- .../commons/datacarrier/consumer/ConsumeDriver.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java index a6d808fead..dcf1dee807 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java @@ -27,7 +27,7 @@ * Pool of consumers

Created by wusheng on 2016/10/25. */ public class ConsumeDriver implements IDriver { - private boolean running; + private volatile boolean running; private ConsumerThread[] consumerThreads; private Channels channels; private ReentrantLock lock; @@ -88,6 +88,9 @@ public void begin(Channels channels) { } lock.lock(); try { + if (running){ + return; + } this.allocateBuffer2Thread(); for (ConsumerThread consumerThread : consumerThreads) { consumerThread.start(); @@ -124,8 +127,14 @@ private void allocateBuffer2Thread() { @Override public void close(Channels channels) { + if (!running) { + return; + } lock.lock(); try { + if (!running) { + return; + } this.running = false; for (ConsumerThread consumerThread : consumerThreads) { consumerThread.shutdown(); From f26df34f605c766f7940a3f78b781c8dac9af2e4 Mon Sep 17 00:00:00 2001 From: joecqupt Date: Sat, 18 Jan 2025 22:58:43 +0800 Subject: [PATCH 2/3] update changes.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 4fd1859d4c..2a797ed4cc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -28,6 +28,7 @@ Release Notes. * Add Caffeine plugin as optional. * Add Undertow 2.1.7.final+ worker thread pool metrics. * Support for tracking in spring gateway versions 4.1.2 and above. +* Fix `ConsumeDriver` running status concurrency issues. All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1) From 21dad8c3ba5d111c3e11480b297bca1940e504e5 Mon Sep 17 00:00:00 2001 From: joecqupt Date: Sun, 19 Jan 2025 12:59:08 +0800 Subject: [PATCH 3/3] fix format --- .../apm/commons/datacarrier/consumer/ConsumeDriver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java index dcf1dee807..853c7fc3bc 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java @@ -88,7 +88,7 @@ public void begin(Channels channels) { } lock.lock(); try { - if (running){ + if (running) { return; } this.allocateBuffer2Thread();