-
Notifications
You must be signed in to change notification settings - Fork 5
Description
Describe the bug
This is not a proven bug, but as you can see, the inner state of running dags of JobSchedule is using
hyperdrive-trigger/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala
Line 60 in 3a141a0
| private val runningDags = mutable.Map.empty[RunningDagsKey, Future[Unit]] |
mutable.Map which doesn't provide a guarantee, that is correctly synchronized between multiple threads. The current implementation behaves synchronously but is still shared between numerous Futures. One example is hyperdrive-trigger/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala
Lines 120 to 125 in 3a141a0
| .map { | |
| _.foreach { dag => | |
| logger.debug(s"Deploying dag = ${dag.id}") | |
| runningDags.put(RunningDagsKey(dag.id, dag.workflowId), executors.executeDag(dag)) | |
| } | |
| } |
This change also enables continuous execution of workflows and is not dependent on the heartbeat cycle, which might improve throughput and delays in the future.
Expected behavior
We have guarantees that the internal state of JobScheduler is consistent across threads.
Additional context
We should merge PR for this issue after merging additional trace or debug messages mentioned in the issue #802