[GOBBLIN-2245] Independent Dynamic Scaling for different Activities in Temporal WorkFlow#4159
Conversation
- unit tests
1196a45 to
82b4dc0
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #4159 +/- ##
============================================
+ Coverage 49.12% 51.71% +2.59%
+ Complexity 10253 7751 -2502
============================================
Files 1924 1411 -513
Lines 75350 53504 -21846
Branches 8361 5882 -2479
============================================
- Hits 37012 27670 -9342
+ Misses 35039 23492 -11547
+ Partials 3299 2342 -957 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces independent dynamic scaling for different workflow stages in Gobblin's Temporal integration by enabling separate memory configuration and dedicated task queues for work execution activities.
Key Changes:
- Created dedicated execution task queue and ExecutionWorker class to isolate work execution from discovery/commit phases
- Added WorkflowStage enum for stage-specific task queue routing with configurable memory allocation
- Modified ProcessWorkUnitsWorkflowImpl to route NestingExecWorkflow to execution queue when dynamic scaling is enabled
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
GobblinTemporalConfigurationKeys.java |
Added execution worker class constant, execution task queue configuration keys, and stage-specific memory configuration |
ExecutionWorker.java |
New specialized worker for work execution stage with dedicated task queue and concurrency settings |
WorkflowStage.java |
New enum defining workflow stages (WORK_DISCOVERY, WORK_EXECUTION, COMMIT) with stage-specific task queue routing |
AbstractRecommendScalingForWorkUnitsImpl.java |
Enhanced to create profile overlays with ExecutionWorker class and optional memory configuration for scaled containers |
ProcessWorkUnitsWorkflowImpl.java |
Modified to route child workflows to appropriate task queues based on dynamic scaling configuration |
GobblinTemporalTaskRunner.java |
Added initialization logic to start ExecutionWorker in initial container when dynamic scaling is enabled |
AbstractTemporalWorker.java |
Refactored to make config field protected and extracted getTaskQueue() method for subclass customization |
ActivityType.java |
Added overloaded buildActivityOptions() method accepting taskQueue parameter |
| Test files | Comprehensive unit tests for workflow routing, worker configuration, and scaling behavior |
application.conf |
Minor whitespace cleanup |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS | ||
| )); | ||
|
|
||
| return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new ProfileOverlay.Adding(overlayPairs); |
There was a problem hiding this comment.
The check for overlayPairs.isEmpty() on line 100 will always be false because the ExecutionWorker class is unconditionally added to overlayPairs on line 95. This means ProfileOverlay.unchanged() will never be returned. Consider removing this check or restructuring the logic since overlayPairs will always contain at least one element.
| return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new ProfileOverlay.Adding(overlayPairs); | |
| return new ProfileOverlay.Adding(overlayPairs); |
|
|
||
| // Verify | ||
| Assert.assertEquals(taskQueue, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE, | ||
| "WORK_COMMIT should use default task queue"); |
There was a problem hiding this comment.
The comment mentions "WORK_COMMIT" but the actual enum value being tested is "COMMIT". Update the comment to match the enum value for consistency.
| public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout, String taskQueue) { | ||
| ActivityOptions.Builder builder = ActivityOptions.newBuilder() | ||
| .setStartToCloseTimeout(getStartToCloseTimeout(props)) | ||
| .setRetryOptions(buildRetryOptions(props)) | ||
| .setTaskQueue(taskQueue); | ||
|
|
||
| if (setHeartbeatTimeout) { | ||
| builder.setHeartbeatTimeout(getHeartbeatTimeout(props)); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
There was a problem hiding this comment.
The newly added buildActivityOptions method with taskQueue parameter is not used anywhere in the codebase. If this method is intended for future use, consider documenting this in a code comment. Otherwise, consider removing it to avoid maintaining unused code. Alternatively, if this should be used for routing activities to the execution queue when dynamic scaling is enabled, the implementation may be incomplete.
| public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout, String taskQueue) { | |
| ActivityOptions.Builder builder = ActivityOptions.newBuilder() | |
| .setStartToCloseTimeout(getStartToCloseTimeout(props)) | |
| .setRetryOptions(buildRetryOptions(props)) | |
| .setTaskQueue(taskQueue); | |
| if (setHeartbeatTimeout) { | |
| builder.setHeartbeatTimeout(getHeartbeatTimeout(props)); | |
| } | |
| return builder.build(); | |
| } |
| */ | ||
| public class ExecutionWorker extends AbstractTemporalWorker { | ||
| public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; | ||
| public int maxExecutionConcurrency; |
There was a problem hiding this comment.
The field maxExecutionConcurrency should be declared as private or have documentation explaining why it needs package-private visibility. Consider making it private if external access is not required, or document the reason for package-private visibility if it's intentional for testing purposes.
| public int maxExecutionConcurrency; | |
| private int maxExecutionConcurrency; |
| @@ -134,4 +141,5 @@ public interface GobblinTemporalConfigurationKeys { | |||
| String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts"; | |||
| int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4; | |||
|
|
|||
There was a problem hiding this comment.
Add a documentation comment for the WORK_EXECUTION_MEMORY_MB configuration key to explain its purpose and usage, similar to other configuration keys in this interface. The comment should clarify that this is the memory allocation in megabytes for execution worker containers when dynamic scaling is enabled.
| /** | |
| * Memory allocation (in megabytes) for execution worker containers when dynamic scaling is enabled. | |
| * This value determines the amount of memory assigned to each worker container during execution. | |
| */ |
conf/yarn/application.conf
Outdated
| # job history store ( WARN [GobblinYarnAppLauncher] NOT starting the admin UI because the job execution info server is NOT enabled ) | ||
| job.execinfo.server.enabled=false | ||
| job.history.store.enabled=false | ||
| job.history.store.enabled=false No newline at end of file |
There was a problem hiding this comment.
nit: remove the whitespace change
|
|
||
| public ExecutionWorker(Config config, WorkflowClient workflowClient) { | ||
| super(config, workflowClient); | ||
| this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, |
There was a problem hiding this comment.
Can this config be process workunit specific instead of using a generic config?
There was a problem hiding this comment.
yeah probably we can in another PR
There was a problem hiding this comment.
We should add this in same PR, its just adding one more config in GobblinTemporalConfigurationKeys, default value can be same
String TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER = PREFIX + "num.threads.per.execution.worker";
| executionWorker.start(); | ||
| workers.add(executionWorker); | ||
| logger.info("Worker started for class: {}", GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS); | ||
| } |
There was a problem hiding this comment.
Have we tested the scenario with 1 container?
| .setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, config)) | ||
| .build(); | ||
|
|
||
| // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow parent |
There was a problem hiding this comment.
Where are we setting default queue for CommitActivity? We don't want commit activity in the process queue right?
There was a problem hiding this comment.
Are we saying that we have only set the task queu for the child worfklow/activities of ProcessWorkUnitsWorkflow parent. so the CommitStep would default to the default Task queue?
| protected WorkerOptions createWorkerOptions() { | ||
| return WorkerOptions.newBuilder() | ||
| .setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS)) | ||
| .setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency) |
There was a problem hiding this comment.
Do you think it makes sense to have different configs for this? Currently this is being driven by TEMPORAL_NUM_THREADS_PER_WORKER, but suppose If I want to reduce the number of threads in the worker but don't want to reduce the execution local activity concurrenncy.
There was a problem hiding this comment.
yeah good point - I am not sure about this, maybe this can be discussed and handled in another PR.
This PR only handles independent memory configuration and other configs remain same.
There was a problem hiding this comment.
Lets add this in same PR too, so that we have configs to play around and tune if needed
|
|
||
| public ExecutionWorker(Config config, WorkflowClient workflowClient) { | ||
| super(config, workflowClient); | ||
| this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, |
There was a problem hiding this comment.
We should add this in same PR, its just adding one more config in GobblinTemporalConfigurationKeys, default value can be same
String TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER = PREFIX + "num.threads.per.execution.worker";
| protected WorkerOptions createWorkerOptions() { | ||
| return WorkerOptions.newBuilder() | ||
| .setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS)) | ||
| .setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency) |
There was a problem hiding this comment.
Lets add this in same PR too, so that we have configs to play around and tune if needed
| /** | ||
| * Package-private for testing purposes. | ||
| */ | ||
| int getMaxExecutionConcurrency() { | ||
| return maxExecutionConcurrency; | ||
| } |
There was a problem hiding this comment.
You can use lombok @Getter(AccessLevel.PROTECTED) for this
| */ | ||
| private ExecutionWorker createMockWorker(Config config) throws Exception { | ||
| ExecutionWorker worker = Mockito.mock(ExecutionWorker.class, Mockito.CALLS_REAL_METHODS); | ||
|
|
There was a problem hiding this comment.
If we are writing unit test for ExecutionWorker, we shouldn't be mocking that class itself, you can mock Config and WorkflowClient passed into constructor
|
|
||
| /** | ||
| * Helper to invoke the protected getWorkflowImplClasses method using reflection. | ||
| */ |
There was a problem hiding this comment.
Protected members should be directly accessible if package for Main Class and Test Class are same, can you check this again as I don't think use of reflection is needed
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
This PR introduces a separate memory configuration and dedicated task queue for the work execution phase in Temporal workflows to support better scalability and resource utilization.
Key Changes:
EXECUTION_TASK_QUEUE) to isolate work execution activities from work discovery and commit phasesWorkflowStageenum for stage-specific task queue routingNestingExecWorkflow(work execution) routes to the execution queue while other stages use the default queueConfiguration:
GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE- Execution task queue name (default: "GobblinTemporalExecutionTaskQueue")GobblinTemporalConfigurationKeys.EXECUTION_WORKER_MEMORY_MBS- Memory allocation for execution workersBenefits:
Tests
All tests use reflection and mocking to test actual method behavior without requiring full Temporal infrastructure.
Commits