From e684ddcce4c23b8378a162aa08b6b011888c4690 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 10 Jan 2024 12:30:45 +0100 Subject: [PATCH 1/8] Add error_message field to DB --- src/main/resources/db_scripts/db_script_latest.sql | 10 ++++++---- .../resources/db_scripts/liquibase/db.changelog.yml | 3 +++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/resources/db_scripts/db_script_latest.sql b/src/main/resources/db_scripts/db_script_latest.sql index d8c18caf1..2bc65201d 100644 --- a/src/main/resources/db_scripts/db_script_latest.sql +++ b/src/main/resources/db_scripts/db_script_latest.sql @@ -34,7 +34,8 @@ create table "job_instance" ( "id" BIGSERIAL NOT NULL PRIMARY KEY, "application_id" VARCHAR, "step_id" VARCHAR, - "job_parameters" JSONB NOT NULL DEFAULT '{}' + "job_parameters" JSONB NOT NULL DEFAULT '{}', + "error_message" VARCHAR ); create table "job_definition" ( @@ -148,7 +149,8 @@ create table archive_job_instance references archive_dag_instance, id bigint primary key, application_id varchar, - step_id varchar + step_id varchar, + error_message varchar, ); create table archive_event @@ -317,8 +319,8 @@ BEGIN GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; - INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id) - SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, error_message) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.error_message FROM job_instance ji JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; diff --git a/src/main/resources/db_scripts/liquibase/db.changelog.yml b/src/main/resources/db_scripts/liquibase/db.changelog.yml index 17119d955..022a3667b 100644 --- a/src/main/resources/db_scripts/liquibase/db.changelog.yml +++ b/src/main/resources/db_scripts/liquibase/db.changelog.yml @@ -95,3 +95,6 @@ databaseChangeLog: - include: relativeToChangelogFile: true file: v0.5.14.remove-deprecated-columns.yml + - include: + relativeToChangelogFile: true + file: v0.5.20.add-error-message-field.yml From f4161efe623bf9ea68924e44271d0d7e9ae0c6c2 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 10 Jan 2024 13:22:51 +0100 Subject: [PATCH 2/8] Store diagnostics in job instance table --- .../trigger/models/JobInstance.scala | 1 + .../models/tables/JobInstanceTable.scala | 2 ++ .../executors/spark/AppsResponse.scala | 2 +- .../executors/spark/SparkExecutor.scala | 19 ++++++++++++++++++- 4 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala index a0ea1f652..dbd3b50bc 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala @@ -23,6 +23,7 @@ case class JobInstance( jobName: String, jobParameters: JobInstanceParameters, jobStatus: JobStatus, + errorMessage: Option[String], executorJobId: Option[String], applicationId: Option[String], stepId: Option[String], diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala index 40b428eaf..d5ef11841 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala @@ -30,6 +30,7 @@ trait JobInstanceTable { def jobName: Rep[String] = column[String]("job_name") def jobParameters: Rep[JobInstanceParameters] = column[JobInstanceParameters]("job_parameters", O.SqlType("JSONB")) def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status") + def errorMessage: Rep[Option[String]] = column[Option[String]]("error_message") def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id") def applicationId: Rep[Option[String]] = column[Option[String]]("application_id") def stepId: Rep[Option[String]] = column[Option[String]]("step_id") @@ -46,6 +47,7 @@ trait JobInstanceTable { jobName, jobParameters, jobStatus, + errorMessage, executorJobId, applicationId, stepId, diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala index 849f38b0d..f85d7d864 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala @@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark import play.api.libs.json.{Json, OFormat} -case class App(id: String, name: String, state: String, finalStatus: String) +case class App(id: String, name: String, state: String, finalStatus: String, diagnostics: String) case class Apps(app: Seq[App]) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala index 49840dfa0..851a1e61e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala @@ -54,7 +54,7 @@ object SparkExecutor { case None => Seq.empty }) match { case Seq(first) => - updateJob(jobInstance.copy(applicationId = Some(first.id), jobStatus = getStatus(first.finalStatus))) + updateJob(getUpdatedJobInstance(jobInstance, first)) case _ // It relies on the same value set for sparkYarnSink.submitTimeout in multi instance deployment if jobInstance.jobStatus == JobStatuses.Submitting && jobInstance.updated @@ -69,6 +69,23 @@ object SparkExecutor { private def getStatusUrl(executorJobId: String)(implicit sparkConfig: SparkConfig): String = s"${sparkConfig.hadoopResourceManagerUrlBase}/ws/v1/cluster/apps?applicationTags=$executorJobId" + private def getUpdatedJobInstance( + jobInstance: JobInstance, + app: App + ): JobInstance = { + val errorMessage = app.diagnostics match { + case "" => None + case _ => Some(app.diagnostics) + } + + jobInstance.copy( + jobStatus = getStatus(app.finalStatus), + applicationId = Some(app.id), + updated = Option(LocalDateTime.now()), + errorMessage = errorMessage + ) + } + private def getStatus(finalStatus: String): JobStatus = finalStatus match { case fs if fs == YarnFinalStatuses.Undefined.name => Running From f295c9bf96f59837fbcfe1296652fe84f12522ec Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 10 Jan 2024 13:27:35 +0100 Subject: [PATCH 3/8] Rename error_message -> diagnostics --- .../resources/db_scripts/db_script_latest.sql | 8 +- .../db_scripts/liquibase/db.changelog.yml | 2 +- .../v0.5.20.add-diagnostics-field.sql | 108 ++++++++++++++++++ .../v0.5.20.add-diagnostics-field.yml | 25 ++++ .../trigger/models/JobInstance.scala | 2 +- .../models/tables/JobInstanceTable.scala | 4 +- .../executors/spark/SparkExecutor.scala | 4 +- 7 files changed, 143 insertions(+), 10 deletions(-) create mode 100644 src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql create mode 100644 src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml diff --git a/src/main/resources/db_scripts/db_script_latest.sql b/src/main/resources/db_scripts/db_script_latest.sql index 2bc65201d..991702173 100644 --- a/src/main/resources/db_scripts/db_script_latest.sql +++ b/src/main/resources/db_scripts/db_script_latest.sql @@ -35,7 +35,7 @@ create table "job_instance" ( "application_id" VARCHAR, "step_id" VARCHAR, "job_parameters" JSONB NOT NULL DEFAULT '{}', - "error_message" VARCHAR + "diagnostics" VARCHAR ); create table "job_definition" ( @@ -150,7 +150,7 @@ create table archive_job_instance id bigint primary key, application_id varchar, step_id varchar, - error_message varchar, + diagnostics varchar, ); create table archive_event @@ -319,8 +319,8 @@ BEGIN GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; - INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, error_message) - SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.error_message + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics FROM job_instance ji JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; diff --git a/src/main/resources/db_scripts/liquibase/db.changelog.yml b/src/main/resources/db_scripts/liquibase/db.changelog.yml index 022a3667b..764544244 100644 --- a/src/main/resources/db_scripts/liquibase/db.changelog.yml +++ b/src/main/resources/db_scripts/liquibase/db.changelog.yml @@ -97,4 +97,4 @@ databaseChangeLog: file: v0.5.14.remove-deprecated-columns.yml - include: relativeToChangelogFile: true - file: v0.5.20.add-error-message-field.yml + file: v0.5.20.add-diagnostics-field.yml diff --git a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql new file mode 100644 index 000000000..0cb30bfcf --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql @@ -0,0 +1,108 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE "job_instance" + ADD COLUMN "diagnostics" VARCHAR; +ALTER TABLE "archive_job_instance" + ADD COLUMN "diagnostics" VARCHAR; + + + +CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk( + IN i_min_id BIGINT, + IN i_max_id BIGINT +) +AS $$ + ------------------------------------------------------------------------------- +-- +-- Procedure: archive_dag_instances_chunk(2) +-- Copies dag_instances with a final status from i_min_id to i_max_id to the +-- archive_dag_instance table. +-- Along with dag_instance, referenced job_instances and events are +-- archived to the archive_job_instance and archive_event tables, respectively. +-- This method should not be called directly. Instead, use archive_dag_instances +-- +-- Parameters: +-- i_min_id - Minimum dag instance id to archive +-- i_max_id - Maximum dag instance id to archive +-- +------------------------------------------------------------------------------- +DECLARE + _cnt INT; +BEGIN + RAISE NOTICE '============='; + RAISE NOTICE ' START BATCH'; + RAISE NOTICE '============='; + + CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS + SELECT di.id + FROM dag_instance di + WHERE di.status NOT IN ('Running', 'InQueue') + AND di.id >= i_min_id + AND di.id <= i_max_id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) + SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by + FROM dag_instance di + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics + FROM job_instance ji + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % job instances', _cnt; + + INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) + SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload + FROM "event" e + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % events', _cnt; + + RAISE NOTICE 'Going to delete dag instances'; + + DELETE FROM job_instance ji + USING dag_instance_ids_to_archive diita + WHERE ji.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % job instances', _cnt; + + DELETE FROM "event" e + USING dag_instance_ids_to_archive diita + WHERE e.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % events', _cnt; + + DELETE FROM dag_instance di + USING dag_instance_ids_to_archive diita + WHERE di.id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % dag instances', _cnt; + + DROP TABLE dag_instance_ids_to_archive; + + RAISE NOTICE '============='; + RAISE NOTICE ' END BATCH'; + RAISE NOTICE '============='; +END; +$$ LANGUAGE plpgsql; diff --git a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml new file mode 100644 index 000000000..dcffde3ac --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml @@ -0,0 +1,25 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +databaseChangeLog: + - changeSet: + id: v0.5.20.add-diagnostics-field + logicalFilePath: v0.5.20.add-diagnostics-field + author: HyperdriveDevTeam@absa.africa + context: default + changes: + - sqlFile: + relativeToChangelogFile: true + path: v0.5.20.add-diagnostics-field.sql diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala index dbd3b50bc..c8862b37b 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala @@ -23,7 +23,7 @@ case class JobInstance( jobName: String, jobParameters: JobInstanceParameters, jobStatus: JobStatus, - errorMessage: Option[String], + diagnostics: Option[String], executorJobId: Option[String], applicationId: Option[String], stepId: Option[String], diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala index d5ef11841..4d621270e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala @@ -30,7 +30,7 @@ trait JobInstanceTable { def jobName: Rep[String] = column[String]("job_name") def jobParameters: Rep[JobInstanceParameters] = column[JobInstanceParameters]("job_parameters", O.SqlType("JSONB")) def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status") - def errorMessage: Rep[Option[String]] = column[Option[String]]("error_message") + def diagnostics: Rep[Option[String]] = column[Option[String]]("diagnostics") def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id") def applicationId: Rep[Option[String]] = column[Option[String]]("application_id") def stepId: Rep[Option[String]] = column[Option[String]]("step_id") @@ -47,7 +47,7 @@ trait JobInstanceTable { jobName, jobParameters, jobStatus, - errorMessage, + diagnostics, executorJobId, applicationId, stepId, diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala index 851a1e61e..9aebc60a6 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala @@ -73,7 +73,7 @@ object SparkExecutor { jobInstance: JobInstance, app: App ): JobInstance = { - val errorMessage = app.diagnostics match { + val diagnostics = app.diagnostics match { case "" => None case _ => Some(app.diagnostics) } @@ -82,7 +82,7 @@ object SparkExecutor { jobStatus = getStatus(app.finalStatus), applicationId = Some(app.id), updated = Option(LocalDateTime.now()), - errorMessage = errorMessage + diagnostics = diagnostics ) } From 9cdf3628ca5566ef0fc435fb5fd41e8631ad70a9 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 10 Jan 2024 14:34:59 +0100 Subject: [PATCH 4/8] Add diagnostics to notification --- .../trigger/models/JobInstance.scala | 2 +- .../notifications/NotificationSender.scala | 23 +++++++- .../NotificationSenderTest.scala | 57 +++++++++++++++++++ 3 files changed, 78 insertions(+), 4 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala index c8862b37b..93383d236 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala @@ -23,7 +23,7 @@ case class JobInstance( jobName: String, jobParameters: JobInstanceParameters, jobStatus: JobStatus, - diagnostics: Option[String], + diagnostics: Option[String] = None, executorJobId: Option[String], applicationId: Option[String], stepId: Option[String], diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala index 1ba561490..3a4dd832c 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala @@ -52,6 +52,7 @@ class NotificationSenderImpl( private val yarnBaseUrl = sparkConfig.hadoopResourceManagerUrlBase private val dateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME private val messageQueue = new ConcurrentLinkedQueue[Message] + private val causedByPattern = """Caused by: (.*)\n""".r def createNotifications(dagInstance: DagInstance, jobInstances: Seq[JobInstance])( implicit ec: ExecutionContext @@ -114,15 +115,31 @@ class NotificationSenderImpl( "Finished" -> dagInstance.finished.map(_.format(dateTimeFormatter)).getOrElse("Couldn't get finish time"), "Status" -> dagInstance.status.name ) - jobInstances + val failedJob = jobInstances .sortBy(_.order)(Ordering.Int.reverse) .find(_.jobStatus.isFailed) - .map(_.applicationId.map { appId => + failedJob.map(_.applicationId.map { appId => val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId" messageMap += ("Failed application" -> applicationUrl) }) + val diagnosticsOpt = failedJob.flatMap(_.diagnostics) + diagnosticsOpt.map { diagnostics => + causedByPattern.findFirstMatchIn(diagnostics) match { + case Some(m) => messageMap += ("Caused by" -> m.group(1)) + case None => // do nothing + } + } messageMap += ("Notification rule ID" -> notificationRule.id.toString) - val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + "\n\n" + footer + + val diagnosticsDetail = diagnosticsOpt.map { diagnostics => + s"Job diagnostics:\n$diagnostics\n\n" + }.getOrElse("") + + val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + + "\n\n" + + diagnosticsDetail + + "\n\n" + + footer Message(notificationRule.recipients, subject, message, 1) } } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala index 331ca75ea..419e12eb3 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala @@ -131,6 +131,63 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi messagesCaptor.getValue should include(s"Failed application: $clusterBaseUrl/cluster/app/application_9876_4567") } + it should "print the error message if diagnostics are available" in { + // given + val di = createDagInstance().copy( + status = DagInstanceStatuses.Failed, + started = LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(12, 30)), + finished = Some(LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(14, 30))) + ) + + val diagnostics = + """User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: batch 20256 doesn't exist + |=== Streaming Query ===""".stripMargin + val ji = + createJobInstance().copy( + jobStatus = JobStatuses.Failed, + applicationId = Some("application_1234_4567"), + order = 1, + diagnostics = Some(diagnostics) + ) + + val nr1 = createNotificationRule().copy(id = 1, recipients = Seq("abc@def.com", "xyz@def.com")) + val w = createWorkflow() + + when(notificationRuleService.getMatchingNotificationRules(eqTo(di.workflowId), eqTo(di.status))(any())) + .thenReturn(Future(Some(Seq(nr1), w))) + + // when + await(underTest.createNotifications(di, Seq(ji))) + underTest.sendNotifications() + + // then + val expectedMessage = + """Environment: TEST + |Project: project + |Workflow Name: workflow + |Started: 2020-03-02T12:30:00 + |Finished: 2020-03-02T14:30:00 + |Status: Failed + |Failed application: http://localhost:8088/cluster/app/application_1234_4567 + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: batch 20256 doesn't exist + |Notification rule ID: 1 + | + |Job diagnostics: + |User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: batch 20256 doesn't exist + |=== Streaming Query === + | + | + | + |This message has been generated automatically. Please don't reply to it. + | + |HyperdriveDevTeam""".stripMargin + val messagesCaptor: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) + verify(emailService).sendMessageToBccRecipients(any(), any(), any(), messagesCaptor.capture()) + messagesCaptor.getValue shouldBe expectedMessage + } + it should "retry sending the message at most maxRetries times" in { // given val maxRetries = 5 From b1e73aa6588bd122a2f4facee7db2ff34e4de213 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 10 Jan 2024 16:39:33 +0100 Subject: [PATCH 5/8] List all causes --- .../v0.5.20.add-diagnostics-field.yml | 1 + .../notifications/NotificationSender.scala | 21 ++++++++-------- .../NotificationSenderTest.scala | 24 ++++++++++++++----- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml index dcffde3ac..25c367f95 100644 --- a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml +++ b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml @@ -23,3 +23,4 @@ databaseChangeLog: - sqlFile: relativeToChangelogFile: true path: v0.5.20.add-diagnostics-field.sql + splitStatements: false diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala index 3a4dd832c..0740d4362 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala @@ -122,23 +122,24 @@ class NotificationSenderImpl( val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId" messageMap += ("Failed application" -> applicationUrl) }) + + messageMap += ("Notification rule ID" -> notificationRule.id.toString) + val diagnosticsOpt = failedJob.flatMap(_.diagnostics) - diagnosticsOpt.map { diagnostics => - causedByPattern.findFirstMatchIn(diagnostics) match { - case Some(m) => messageMap += ("Caused by" -> m.group(1)) - case None => // do nothing - } + val causes = diagnosticsOpt.map { diagnostics => + causedByPattern.findAllMatchIn(diagnostics).map(_.group(1)).toSeq.reduce(_ + "\n" + _) } - messageMap += ("Notification rule ID" -> notificationRule.id.toString) + .map("Causes:\n" + _ + "\n\n") + .getOrElse("") - val diagnosticsDetail = diagnosticsOpt.map { diagnostics => - s"Job diagnostics:\n$diagnostics\n\n" + val stackTrace = diagnosticsOpt.map { diagnostics => + s"Stack trace:\n$diagnostics\n\n" }.getOrElse("") val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + "\n\n" + - diagnosticsDetail + - "\n\n" + + causes + + stackTrace + footer Message(notificationRule.recipients, subject, message, 1) } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala index 419e12eb3..c5c8c7032 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala @@ -141,8 +141,13 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi val diagnostics = """User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION - |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: batch 20256 doesn't exist - |=== Streaming Query ===""".stripMargin + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. + |=== Streaming Query === + |Caused by: org.apache.spark.SparkException: Job aborted. + |at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) + |Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + |at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) + |""".stripMargin val ji = createJobInstance().copy( jobStatus = JobStatuses.Failed, @@ -170,14 +175,21 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi |Finished: 2020-03-02T14:30:00 |Status: Failed |Failed application: http://localhost:8088/cluster/app/application_1234_4567 - |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: batch 20256 doesn't exist |Notification rule ID: 1 | - |Job diagnostics: + |Causes: + |org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. + |org.apache.spark.SparkException: Job aborted. + |java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + | + |Stack trace: |User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION - |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: batch 20256 doesn't exist + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. |=== Streaming Query === - | + |Caused by: org.apache.spark.SparkException: Job aborted. + |at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) + |Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + |at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) | | |This message has been generated automatically. Please don't reply to it. From cdaaefb35275427e9fa20a723dd89a29ef72b722 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 10 Jan 2024 20:12:15 +0100 Subject: [PATCH 6/8] Add bulletpoints --- .../scheduler/notifications/NotificationSender.scala | 5 ++++- .../scheduler/notifications/NotificationSenderTest.scala | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala index 0740d4362..f35ee44fa 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala @@ -127,7 +127,10 @@ class NotificationSenderImpl( val diagnosticsOpt = failedJob.flatMap(_.diagnostics) val causes = diagnosticsOpt.map { diagnostics => - causedByPattern.findAllMatchIn(diagnostics).map(_.group(1)).toSeq.reduce(_ + "\n" + _) + causedByPattern.findAllMatchIn(diagnostics).map(_.group(1)) + .toSeq + .map("- " + _) + .reduce(_ + "\n" + _) } .map("Causes:\n" + _ + "\n\n") .getOrElse("") diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala index c5c8c7032..33ce1d745 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala @@ -178,9 +178,9 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi |Notification rule ID: 1 | |Causes: - |org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. - |org.apache.spark.SparkException: Job aborted. - |java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + |- org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. + |- org.apache.spark.SparkException: Job aborted. + |- java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 | |Stack trace: |User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION From 8198457f3d17e986eeb7807f223e7d14a4c9b969 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 11 Jan 2024 11:16:45 +0100 Subject: [PATCH 7/8] Fixes --- src/main/resources/db_scripts/db_script_latest.sql | 2 +- .../liquibase/v0.5.20.add-diagnostics-field.sql | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/resources/db_scripts/db_script_latest.sql b/src/main/resources/db_scripts/db_script_latest.sql index 991702173..d3cd532f5 100644 --- a/src/main/resources/db_scripts/db_script_latest.sql +++ b/src/main/resources/db_scripts/db_script_latest.sql @@ -150,7 +150,7 @@ create table archive_job_instance id bigint primary key, application_id varchar, step_id varchar, - diagnostics varchar, + diagnostics varchar ); create table archive_event diff --git a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql index 0cb30bfcf..89402ddc9 100644 --- a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql +++ b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql @@ -25,7 +25,7 @@ CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk( IN i_max_id BIGINT ) AS $$ - ------------------------------------------------------------------------------- +------------------------------------------------------------------------------- -- -- Procedure: archive_dag_instances_chunk(2) -- Copies dag_instances with a final status from i_min_id to i_max_id to the @@ -58,7 +58,7 @@ BEGIN INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by FROM dag_instance di - JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; @@ -66,7 +66,7 @@ BEGIN INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics) SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics FROM job_instance ji - JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % job instances', _cnt; @@ -74,7 +74,7 @@ BEGIN INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload FROM "event" e - JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % events', _cnt; @@ -82,19 +82,19 @@ BEGIN RAISE NOTICE 'Going to delete dag instances'; DELETE FROM job_instance ji - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE ji.dag_instance_id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % job instances', _cnt; DELETE FROM "event" e - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE e.dag_instance_id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % events', _cnt; DELETE FROM dag_instance di - USING dag_instance_ids_to_archive diita + USING dag_instance_ids_to_archive diita WHERE di.id = diita.id; GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Deleted % dag instances', _cnt; From e9c1eebc335240500b00cee40053c6978d30d2c2 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 11 Jan 2024 11:28:29 +0100 Subject: [PATCH 8/8] Fix format --- .../notifications/NotificationSender.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala index f35ee44fa..d873cca5b 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala @@ -119,25 +119,30 @@ class NotificationSenderImpl( .sortBy(_.order)(Ordering.Int.reverse) .find(_.jobStatus.isFailed) failedJob.map(_.applicationId.map { appId => - val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId" - messageMap += ("Failed application" -> applicationUrl) - }) + val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId" + messageMap += ("Failed application" -> applicationUrl) + }) messageMap += ("Notification rule ID" -> notificationRule.id.toString) val diagnosticsOpt = failedJob.flatMap(_.diagnostics) - val causes = diagnosticsOpt.map { diagnostics => - causedByPattern.findAllMatchIn(diagnostics).map(_.group(1)) - .toSeq - .map("- " + _) - .reduce(_ + "\n" + _) - } + val causes = diagnosticsOpt + .map { diagnostics => + causedByPattern + .findAllMatchIn(diagnostics) + .map(_.group(1)) + .toSeq + .map("- " + _) + .reduce(_ + "\n" + _) + } .map("Causes:\n" + _ + "\n\n") .getOrElse("") - val stackTrace = diagnosticsOpt.map { diagnostics => - s"Stack trace:\n$diagnostics\n\n" - }.getOrElse("") + val stackTrace = diagnosticsOpt + .map { diagnostics => + s"Stack trace:\n$diagnostics\n\n" + } + .getOrElse("") val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + "\n\n" +