-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Executor Synchronous callback workload #61153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Executor Synchronous callback workload #61153
Conversation
…eryExecutor Add support for the Callback workload to be run in the executors. Other executors will need to be updated before the can support the workload, but I tried to make it as non-invasive as I could.
| remaining_slots = open_slots - len(workloads_to_schedule) | ||
| if remaining_slots and self.queued_tasks: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remaining_slots is only used once. You could just put something like open_slots > len(workloads_to_schedule) in the if expression
|
|
||
| remaining_slots = open_slots - len(workloads_to_schedule) | ||
| if remaining_slots and self.queued_tasks: | ||
| sorted_tasks = sorted( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't you use the existing order_queued_tasks_by_priority() method?
| try: | ||
| self._process_workloads(workload_list) | ||
| except AttributeError as e: | ||
| if any(isinstance(workload, workloads.ExecuteCallback) for workload in workload_list): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we know exactly how to check for the unsupported use case, why don't we just check before trying to call __process_workloads()? Also, we can check much earlier in the queueing of workloads because we can check the supports_callback attr?
| TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, Exception | None] | ||
|
|
||
|
|
||
| def _get_executor_process_title_prefix(team_name: str | None) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These multi team related changes probably shouldn't be showing up in this diff right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I might have botched the rebase before I published this. I'll try to extricate those changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_execute_callback() is the third time duplicating that exact pattern, so I moved it to a helper.
| key = workload.callback.id | ||
| try: | ||
| _execute_callback(log, workload, team_conf) | ||
| output.put((key, TaskInstanceState.SUCCESS, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just curious: We still use TaskInstanceState here even those these are callbacks?
| # Find the appropriate executor | ||
| executor = None | ||
| if executor_name: | ||
| # Find executor by name - try multiple matching strategies | ||
| for exec in self.job.executors: | ||
| # Match by class name (e.g., "CeleryExecutor") | ||
| if exec.__class__.__name__ == executor_name: | ||
| executor = exec | ||
| break | ||
| # Match by executor name attribute if available | ||
| if hasattr(exec, "name") and exec.name and str(exec.name) == executor_name: | ||
| executor = exec | ||
| break | ||
| # Match by executor name attribute if available | ||
| if hasattr(exec, "executor_name") and exec.executor_name == executor_name: | ||
| executor = exec | ||
| break | ||
|
|
||
| # Default to first executor if no specific executor found | ||
| if executor is None: | ||
| executor = self.job.executors[0] if self.job.executors else None | ||
|
|
||
| if executor is None: | ||
| self.log.warning("No executor available for callback %s", callback.id) | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also missing multi-team logic which we need to stay up to date with at this point. It also is duplicating a lot of the work in _try_to_load_executor which is made to do exactly this kind of lookup. I think it's going to save you a bunch of effort and future maintenance to update _try_to_load_executor to support workloads generally instead of just ti (basically exactly the type of coding you did in the base executor and local executor changes).
| self.log.warning("No executor available for callback %s", callback.id) | ||
| continue | ||
|
|
||
| executor_to_callbacks[executor].append(callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above, there is already a _executor_to_tis which is doing exactly this but for tis, could be generalized.
| .where(ExecutorCallback.type == CallbackType.EXECUTOR) | ||
| .where(ExecutorCallback.state == CallbackState.QUEUED) | ||
| .order_by(ExecutorCallback.priority_weight.desc()) | ||
| .limit(conf.getint("scheduler", "max_callback_workloads_per_loop", fallback=100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and down below in the final loop over executors/workloads we're just queueing a static amount each time. But it is the schedulers responsibility now (in the world of multiple executors and now multi-team) to ensure we don't ever schedule more tasks (now, workloads) than we have executor slots for. You can see how we do this math for tasks currently here:
airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py
Lines 854 to 871 in 056e24e
| # The user can either request a certain number of tis to schedule per main scheduler loop (default | |
| # is non-zero). If that value has been set to zero, that means use the value of core.parallelism (or | |
| # however many free slots are left). core.parallelism represents the max number of running TIs per | |
| # scheduler. Historically this value was stored in the executor, who's job it was to control/enforce | |
| # it. However, with multiple executors, any of which can run up to core.parallelism TIs individually, | |
| # we need to make sure in the scheduler now that we don't schedule more than core.parallelism totally | |
| # across all executors. | |
| num_occupied_slots = sum([executor.slots_occupied for executor in self.job.executors]) | |
| parallelism = conf.getint("core", "parallelism") | |
| if self.job.max_tis_per_query == 0: | |
| max_tis = parallelism - num_occupied_slots | |
| else: | |
| max_tis = min(self.job.max_tis_per_query, parallelism - num_occupied_slots) | |
| if max_tis <= 0: | |
| self.log.debug("max_tis query size is less than or equal to zero. No query will be performed!") | |
| return 0 | |
| queued_tis = self._executable_task_instances_to_queued(max_tis, session=session) |
We need to ensure that math now includes callbacks because they also take up worker slots.
I think this will work for now, as long as this method is always called before the critical section. Since callbacks will increase occupied slots in the executors which should be taken into account in the critical section. BUT this code here needs to ensure it doesn't over subscribe the executors. So some similar logic to the critical section needs to be done here. E.g. we're taking a flat 100 here (by default anyway) but there may only be 20 free executor slots.
| if callback: | ||
| # Note: We receive TaskInstanceState from executor (SUCCESS/FAILED) but convert to CallbackState here. | ||
| # This is intentional - executor layer uses generic completion states, scheduler converts to proper types. | ||
| if state == TaskInstanceState.SUCCESS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now, but would be cool if Callbacks were fully first class citizens in executors. Including executors reporting the right state back.
Add support for the Callback workload to be run in the executors. Other executors will need to be updated before they can support the workload, but I tried to make it as non-invasive as I could.
This is the bulk of the work required to allow synchronous callbacks to be used in DeadlineAlerts. For example, this now works in LocalExecutor:
Co-author: Builds on work handed off by @seanghaeli and research from @ramitkataria; if I did this right then they should be getting co-author credits, I think?
Was generative AI tooling used to co-author this PR?
Cline (Claude Sonnet 4.5) was used for debugging and suggesting some unit test edge cases.
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.