Decouple "job runner" from BaseJob ORM model#30255
Conversation
|
cc: @vincbeck |
There was a problem hiding this comment.
This seems to be old and unused/unusable test class.
|
Hello Everyone, I've been working on it on-off for quite a while as I attempted to make things work for the AIP-44. I actually attempted to do in a few different ways and I did not like the previous ones. Ths one I think is at the sweet-spot of solving the (a little) intertwined BaseJob dependencies with the need of decoupling of the Task running from the ORM models.. A little context on that one.LocalTaskJob as we have it curently implemented inherits from BaseJob (same as other jobs). It is in fact a polimorphic dependency - all of the jobs are stored in the same 'BaseJob' table. This is (and always has been) a little problematic - because the ''Job" objects inherit from the ORM object and there is an assumption that they are DB-related, on the other had they also had the "running" logic implemented in I attempted to do it in various ways but I had the goals: a) the resulting architecture wil decouple from the ORM object from the logic (so that we could have serialized Pydantic objects introduced in #29776 used instead (so basically we should be able to pass and use BaseJob and BaseJobPydantic around) b) it shoud touch as little logic change as possible (basically shuffling around objects and calling different objects was most of the changes I wanted to do) - so that it will be easy to review and reason about. c) the resulting architecture will make sense ResultI think I finally achieved all three goals. Summarizing of what has been done here:
Current stateIt looks like a huge change but if you look closly most of the changes are changes in tests to adapt to the new object hierarchy. So I hope the review will not be that difficult. I still have a few (heartbeat) tests not passing and I am working on those (likely something missing in heartbeat processing) - but other than those, I think everything else is in place. FutureNow - this is not YET AIP-44-compatible change. This refactor is just a basic decoupling. We will need to implement several other follow up changes after this one is merged:
Follow upsI am not sure of that but those changes also make it possible to do something else. Namely they allow us to limit for how long connections are opened from running tasks. Previously we kept them open all the time when the task was running and that was kinda strange as in most circumstances we only needed it to do some initial setup, heartbeat and save job state when complete. I think this change will enable something else (but that's something to see when the other changes are completed - we could optimize that away and (mimicking what Internal API will be doing) we could only get the session/connections established for short times by the running task. I hope we can get there. Looking forward to comments and feedback. BTW. It's I think impossible to split this PR into smaller ones :( BTW2. DON'T be scared about the size of hte change. It's not as big as it seems - it just needed a lot of changes in tests. The "code" changes: The So it is not that big. |
vincbeck
left a comment
There was a problem hiding this comment.
Nice decoupling! I really like that. That would make things easier for AIP-44. I only have 2 nits, the decoupling + the code make sense to me.
PS: +1 on the "it is easier to review than it seems"
5ba89fb to
242c9ef
Compare
242c9ef to
2671674
Compare
1d54762 to
1c59f34
Compare
1c59f34 to
5935581
Compare
5935581 to
d7f8b62
Compare
|
All resolved, pushed a new version rebasing on top of the typing change #30503 |
Originally BaseJob ORM model was extended and Polymorphism has been used to tie different execution logic to different job types. This has proven to be difficult to handle during AIP-44 implementation (internal API) because LocalTaskJob, DagProcessorJob and TriggererJob are all going to not use the ORM BaseJob model, but they should use BaseJobPydantic instead. In order to make it possible, we introduce a new type of object BaseJobRunner and make BaseJob use the runners instead. This way, the BaseJobRunners are used for the logic of each of the job, where single, non-polimorphic BaseJob is used to keep the records in the database - as a follow up it will allow to completely decouple the job database operations and move it to internal_api component when db-lesss mode is enabled. Closes: apache#30294
|
I think it shoudl be ready for the next pass. |
|
Yep. Came out green. |
| # we cannot (for now) define job in _job_runner nicely due to circular references of | ||
| # job and job runner, so we have to use getattr, but we might address it in the future | ||
| # change when decoupling these two even more | ||
| if getattr(self._job_runner, "job", None) is not None: | ||
| perform_heartbeat(self._job_runner.job, only_if_necessary=False) |
There was a problem hiding this comment.
I have a refactoring idea but that can wait until after this is merged and doesn’t need to block your other PRs.
There was a problem hiding this comment.
I think we will not need it at the end -> when we complet the refactoring (final state is #30376), this line is gone, we have job defined individually in each *JobRunner and we always know which runner we access, so there is no need to add the typeguard I think
|
Wooohooooo ! now time for the next stages :) |
Originally BaseJob ORM model was extended and Polymorphism has been used to tie different execution logic to different job types. This has proven to be difficult to handle during AIP-44 implementation (internal API) because LocalTaskJob, DagProcessorJob and TriggererJob are all going to not use the ORM BaseJob model, but they should use BaseJobPydantic instead. In order to make it possible, we introduce a new type of object BaseJobRunner and make BaseJob use the runners instead.
This way, the BaseJobRunners are used for the logic of each of the job, where single, non-polimorphic BaseJob is used to keep the records in the database - as a follow up it will allow to completely decouple the job database operations and move it to internal_api component when db-lesss mode is enabled.
Closes: #30294
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.