diff --git a/src/main/resources/db_scripts/db_script_latest.sql b/src/main/resources/db_scripts/db_script_latest.sql index d8c18caf1..d3cd532f5 100644 --- a/src/main/resources/db_scripts/db_script_latest.sql +++ b/src/main/resources/db_scripts/db_script_latest.sql @@ -34,7 +34,8 @@ create table "job_instance" ( "id" BIGSERIAL NOT NULL PRIMARY KEY, "application_id" VARCHAR, "step_id" VARCHAR, - "job_parameters" JSONB NOT NULL DEFAULT '{}' + "job_parameters" JSONB NOT NULL DEFAULT '{}', + "diagnostics" VARCHAR ); create table "job_definition" ( @@ -148,7 +149,8 @@ create table archive_job_instance references archive_dag_instance, id bigint primary key, application_id varchar, - step_id varchar + step_id varchar, + diagnostics varchar ); create table archive_event @@ -317,8 +319,8 @@ BEGIN GET DIAGNOSTICS _cnt = ROW_COUNT; RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; - INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id) - SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics FROM job_instance ji JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id ON CONFLICT (id) DO NOTHING; diff --git a/src/main/resources/db_scripts/liquibase/db.changelog.yml b/src/main/resources/db_scripts/liquibase/db.changelog.yml index 17119d955..764544244 100644 --- a/src/main/resources/db_scripts/liquibase/db.changelog.yml +++ b/src/main/resources/db_scripts/liquibase/db.changelog.yml @@ -95,3 +95,6 @@ databaseChangeLog: - include: relativeToChangelogFile: true file: v0.5.14.remove-deprecated-columns.yml + - include: + relativeToChangelogFile: true + file: v0.5.20.add-diagnostics-field.yml diff --git a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql new file mode 100644 index 000000000..89402ddc9 --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.sql @@ -0,0 +1,108 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE "job_instance" + ADD COLUMN "diagnostics" VARCHAR; +ALTER TABLE "archive_job_instance" + ADD COLUMN "diagnostics" VARCHAR; + + + +CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk( + IN i_min_id BIGINT, + IN i_max_id BIGINT +) +AS $$ +------------------------------------------------------------------------------- +-- +-- Procedure: archive_dag_instances_chunk(2) +-- Copies dag_instances with a final status from i_min_id to i_max_id to the +-- archive_dag_instance table. +-- Along with dag_instance, referenced job_instances and events are +-- archived to the archive_job_instance and archive_event tables, respectively. +-- This method should not be called directly. Instead, use archive_dag_instances +-- +-- Parameters: +-- i_min_id - Minimum dag instance id to archive +-- i_max_id - Maximum dag instance id to archive +-- +------------------------------------------------------------------------------- +DECLARE + _cnt INT; +BEGIN + RAISE NOTICE '============='; + RAISE NOTICE ' START BATCH'; + RAISE NOTICE '============='; + + CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS + SELECT di.id + FROM dag_instance di + WHERE di.status NOT IN ('Running', 'InQueue') + AND di.id >= i_min_id + AND di.id <= i_max_id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by) + SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by + FROM dag_instance di + JOIN dag_instance_ids_to_archive diita ON di.id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id; + + INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics) + SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics + FROM job_instance ji + JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % job instances', _cnt; + + INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload) + SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload + FROM "event" e + JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id + ON CONFLICT (id) DO NOTHING; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Archived % events', _cnt; + + RAISE NOTICE 'Going to delete dag instances'; + + DELETE FROM job_instance ji + USING dag_instance_ids_to_archive diita + WHERE ji.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % job instances', _cnt; + + DELETE FROM "event" e + USING dag_instance_ids_to_archive diita + WHERE e.dag_instance_id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % events', _cnt; + + DELETE FROM dag_instance di + USING dag_instance_ids_to_archive diita + WHERE di.id = diita.id; + GET DIAGNOSTICS _cnt = ROW_COUNT; + RAISE NOTICE 'Deleted % dag instances', _cnt; + + DROP TABLE dag_instance_ids_to_archive; + + RAISE NOTICE '============='; + RAISE NOTICE ' END BATCH'; + RAISE NOTICE '============='; +END; +$$ LANGUAGE plpgsql; diff --git a/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml new file mode 100644 index 000000000..25c367f95 --- /dev/null +++ b/src/main/resources/db_scripts/liquibase/v0.5.20.add-diagnostics-field.yml @@ -0,0 +1,26 @@ +# +# Copyright 2018 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +databaseChangeLog: + - changeSet: + id: v0.5.20.add-diagnostics-field + logicalFilePath: v0.5.20.add-diagnostics-field + author: HyperdriveDevTeam@absa.africa + context: default + changes: + - sqlFile: + relativeToChangelogFile: true + path: v0.5.20.add-diagnostics-field.sql + splitStatements: false diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala index a0ea1f652..93383d236 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/JobInstance.scala @@ -23,6 +23,7 @@ case class JobInstance( jobName: String, jobParameters: JobInstanceParameters, jobStatus: JobStatus, + diagnostics: Option[String] = None, executorJobId: Option[String], applicationId: Option[String], stepId: Option[String], diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala index 40b428eaf..4d621270e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JobInstanceTable.scala @@ -30,6 +30,7 @@ trait JobInstanceTable { def jobName: Rep[String] = column[String]("job_name") def jobParameters: Rep[JobInstanceParameters] = column[JobInstanceParameters]("job_parameters", O.SqlType("JSONB")) def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status") + def diagnostics: Rep[Option[String]] = column[Option[String]]("diagnostics") def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id") def applicationId: Rep[Option[String]] = column[Option[String]]("application_id") def stepId: Rep[Option[String]] = column[Option[String]]("step_id") @@ -46,6 +47,7 @@ trait JobInstanceTable { jobName, jobParameters, jobStatus, + diagnostics, executorJobId, applicationId, stepId, diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala index 849f38b0d..f85d7d864 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/AppsResponse.scala @@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark import play.api.libs.json.{Json, OFormat} -case class App(id: String, name: String, state: String, finalStatus: String) +case class App(id: String, name: String, state: String, finalStatus: String, diagnostics: String) case class Apps(app: Seq[App]) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala index 49840dfa0..9aebc60a6 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala @@ -54,7 +54,7 @@ object SparkExecutor { case None => Seq.empty }) match { case Seq(first) => - updateJob(jobInstance.copy(applicationId = Some(first.id), jobStatus = getStatus(first.finalStatus))) + updateJob(getUpdatedJobInstance(jobInstance, first)) case _ // It relies on the same value set for sparkYarnSink.submitTimeout in multi instance deployment if jobInstance.jobStatus == JobStatuses.Submitting && jobInstance.updated @@ -69,6 +69,23 @@ object SparkExecutor { private def getStatusUrl(executorJobId: String)(implicit sparkConfig: SparkConfig): String = s"${sparkConfig.hadoopResourceManagerUrlBase}/ws/v1/cluster/apps?applicationTags=$executorJobId" + private def getUpdatedJobInstance( + jobInstance: JobInstance, + app: App + ): JobInstance = { + val diagnostics = app.diagnostics match { + case "" => None + case _ => Some(app.diagnostics) + } + + jobInstance.copy( + jobStatus = getStatus(app.finalStatus), + applicationId = Some(app.id), + updated = Option(LocalDateTime.now()), + diagnostics = diagnostics + ) + } + private def getStatus(finalStatus: String): JobStatus = finalStatus match { case fs if fs == YarnFinalStatuses.Undefined.name => Running diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala index 1ba561490..d873cca5b 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala @@ -52,6 +52,7 @@ class NotificationSenderImpl( private val yarnBaseUrl = sparkConfig.hadoopResourceManagerUrlBase private val dateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME private val messageQueue = new ConcurrentLinkedQueue[Message] + private val causedByPattern = """Caused by: (.*)\n""".r def createNotifications(dagInstance: DagInstance, jobInstances: Seq[JobInstance])( implicit ec: ExecutionContext @@ -114,15 +115,40 @@ class NotificationSenderImpl( "Finished" -> dagInstance.finished.map(_.format(dateTimeFormatter)).getOrElse("Couldn't get finish time"), "Status" -> dagInstance.status.name ) - jobInstances + val failedJob = jobInstances .sortBy(_.order)(Ordering.Int.reverse) .find(_.jobStatus.isFailed) - .map(_.applicationId.map { appId => - val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId" - messageMap += ("Failed application" -> applicationUrl) - }) + failedJob.map(_.applicationId.map { appId => + val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId" + messageMap += ("Failed application" -> applicationUrl) + }) + messageMap += ("Notification rule ID" -> notificationRule.id.toString) - val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + "\n\n" + footer + + val diagnosticsOpt = failedJob.flatMap(_.diagnostics) + val causes = diagnosticsOpt + .map { diagnostics => + causedByPattern + .findAllMatchIn(diagnostics) + .map(_.group(1)) + .toSeq + .map("- " + _) + .reduce(_ + "\n" + _) + } + .map("Causes:\n" + _ + "\n\n") + .getOrElse("") + + val stackTrace = diagnosticsOpt + .map { diagnostics => + s"Stack trace:\n$diagnostics\n\n" + } + .getOrElse("") + + val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + + "\n\n" + + causes + + stackTrace + + footer Message(notificationRule.recipients, subject, message, 1) } } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala index 331ca75ea..33ce1d745 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSenderTest.scala @@ -131,6 +131,75 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi messagesCaptor.getValue should include(s"Failed application: $clusterBaseUrl/cluster/app/application_9876_4567") } + it should "print the error message if diagnostics are available" in { + // given + val di = createDagInstance().copy( + status = DagInstanceStatuses.Failed, + started = LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(12, 30)), + finished = Some(LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(14, 30))) + ) + + val diagnostics = + """User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. + |=== Streaming Query === + |Caused by: org.apache.spark.SparkException: Job aborted. + |at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) + |Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + |at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) + |""".stripMargin + val ji = + createJobInstance().copy( + jobStatus = JobStatuses.Failed, + applicationId = Some("application_1234_4567"), + order = 1, + diagnostics = Some(diagnostics) + ) + + val nr1 = createNotificationRule().copy(id = 1, recipients = Seq("abc@def.com", "xyz@def.com")) + val w = createWorkflow() + + when(notificationRuleService.getMatchingNotificationRules(eqTo(di.workflowId), eqTo(di.status))(any())) + .thenReturn(Future(Some(Seq(nr1), w))) + + // when + await(underTest.createNotifications(di, Seq(ji))) + underTest.sendNotifications() + + // then + val expectedMessage = + """Environment: TEST + |Project: project + |Workflow Name: workflow + |Started: 2020-03-02T12:30:00 + |Finished: 2020-03-02T14:30:00 + |Status: Failed + |Failed application: http://localhost:8088/cluster/app/application_1234_4567 + |Notification rule ID: 1 + | + |Causes: + |- org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. + |- org.apache.spark.SparkException: Job aborted. + |- java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + | + |Stack trace: + |User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION + |Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. + |=== Streaming Query === + |Caused by: org.apache.spark.SparkException: Job aborted. + |at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) + |Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39 + |at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174) + | + | + |This message has been generated automatically. Please don't reply to it. + | + |HyperdriveDevTeam""".stripMargin + val messagesCaptor: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) + verify(emailService).sendMessageToBccRecipients(any(), any(), any(), messagesCaptor.capture()) + messagesCaptor.getValue shouldBe expectedMessage + } + it should "retry sending the message at most maxRetries times" in { // given val maxRetries = 5