Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/main/resources/db_scripts/db_script_latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ create table "job_instance" (
"id" BIGSERIAL NOT NULL PRIMARY KEY,
"application_id" VARCHAR,
"step_id" VARCHAR,
"job_parameters" JSONB NOT NULL DEFAULT '{}'
"job_parameters" JSONB NOT NULL DEFAULT '{}',
"diagnostics" VARCHAR
);

create table "job_definition" (
Expand Down Expand Up @@ -148,7 +149,8 @@ create table archive_job_instance
references archive_dag_instance,
id bigint primary key,
application_id varchar,
step_id varchar
step_id varchar,
diagnostics varchar
);

create table archive_event
Expand Down Expand Up @@ -317,8 +319,8 @@ BEGIN
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id;

INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id)
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id
INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics)
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics
FROM job_instance ji
JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id
ON CONFLICT (id) DO NOTHING;
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/db_scripts/liquibase/db.changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ databaseChangeLog:
- include:
relativeToChangelogFile: true
file: v0.5.14.remove-deprecated-columns.yml
- include:
relativeToChangelogFile: true
file: v0.5.20.add-diagnostics-field.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE "job_instance"
ADD COLUMN "diagnostics" VARCHAR;
ALTER TABLE "archive_job_instance"
ADD COLUMN "diagnostics" VARCHAR;



CREATE OR REPLACE PROCEDURE archive_dag_instances_chunk(
IN i_min_id BIGINT,
IN i_max_id BIGINT
)
AS $$
-------------------------------------------------------------------------------
--
-- Procedure: archive_dag_instances_chunk(2)
-- Copies dag_instances with a final status from i_min_id to i_max_id to the
-- archive_dag_instance table.
-- Along with dag_instance, referenced job_instances and events are
-- archived to the archive_job_instance and archive_event tables, respectively.
-- This method should not be called directly. Instead, use archive_dag_instances
--
-- Parameters:
-- i_min_id - Minimum dag instance id to archive
-- i_max_id - Maximum dag instance id to archive
--
-------------------------------------------------------------------------------
DECLARE
_cnt INT;
BEGIN
RAISE NOTICE '=============';
RAISE NOTICE ' START BATCH';
RAISE NOTICE '=============';

CREATE TEMPORARY TABLE dag_instance_ids_to_archive AS
SELECT di.id
FROM dag_instance di
WHERE di.status NOT IN ('Running', 'InQueue')
AND di.id >= i_min_id
AND di.id <= i_max_id;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Going to archive % dag instances from % to %', _cnt, i_min_id, i_max_id;

INSERT INTO archive_dag_instance (status, workflow_id, id, started, finished, triggered_by)
SELECT di.status, di.workflow_id, di.id, di.started, di.finished, di.triggered_by
FROM dag_instance di
JOIN dag_instance_ids_to_archive diita ON di.id = diita.id
ON CONFLICT (id) DO NOTHING;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Archived % dag instances from % to %', _cnt, i_min_id, i_max_id;

INSERT INTO archive_job_instance (job_name, job_status, executor_job_id, created, updated, "order", dag_instance_id, id, application_id, step_id, diagnostics)
SELECT ji.job_name, ji.job_status, ji.executor_job_id, ji.created, ji.updated, ji."order", ji.dag_instance_id, ji.id, ji.application_id, ji.step_id, ji.diagnostics
FROM job_instance ji
JOIN dag_instance_ids_to_archive diita ON ji.dag_instance_id = diita.id
ON CONFLICT (id) DO NOTHING;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Archived % job instances', _cnt;

INSERT INTO archive_event (sensor_event_id, sensor_id, dag_instance_id, id, payload)
SELECT e.sensor_event_id, e.sensor_id, e.dag_instance_id, e.id, e.payload
FROM "event" e
JOIN dag_instance_ids_to_archive diita ON e.dag_instance_id = diita.id
ON CONFLICT (id) DO NOTHING;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Archived % events', _cnt;

RAISE NOTICE 'Going to delete dag instances';

DELETE FROM job_instance ji
USING dag_instance_ids_to_archive diita
WHERE ji.dag_instance_id = diita.id;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Deleted % job instances', _cnt;

DELETE FROM "event" e
USING dag_instance_ids_to_archive diita
WHERE e.dag_instance_id = diita.id;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Deleted % events', _cnt;

DELETE FROM dag_instance di
USING dag_instance_ids_to_archive diita
WHERE di.id = diita.id;
GET DIAGNOSTICS _cnt = ROW_COUNT;
RAISE NOTICE 'Deleted % dag instances', _cnt;

DROP TABLE dag_instance_ids_to_archive;

RAISE NOTICE '=============';
RAISE NOTICE ' END BATCH';
RAISE NOTICE '=============';
END;
$$ LANGUAGE plpgsql;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright 2018 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

databaseChangeLog:
- changeSet:
id: v0.5.20.add-diagnostics-field
logicalFilePath: v0.5.20.add-diagnostics-field
author: HyperdriveDevTeam@absa.africa
context: default
changes:
- sqlFile:
relativeToChangelogFile: true
path: v0.5.20.add-diagnostics-field.sql
splitStatements: false
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ case class JobInstance(
jobName: String,
jobParameters: JobInstanceParameters,
jobStatus: JobStatus,
diagnostics: Option[String] = None,
executorJobId: Option[String],
applicationId: Option[String],
stepId: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ trait JobInstanceTable {
def jobName: Rep[String] = column[String]("job_name")
def jobParameters: Rep[JobInstanceParameters] = column[JobInstanceParameters]("job_parameters", O.SqlType("JSONB"))
def jobStatus: Rep[JobStatus] = column[JobStatus]("job_status")
def diagnostics: Rep[Option[String]] = column[Option[String]]("diagnostics")
def executorJobId: Rep[Option[String]] = column[Option[String]]("executor_job_id")
def applicationId: Rep[Option[String]] = column[Option[String]]("application_id")
def stepId: Rep[Option[String]] = column[Option[String]]("step_id")
Expand All @@ -46,6 +47,7 @@ trait JobInstanceTable {
jobName,
jobParameters,
jobStatus,
diagnostics,
executorJobId,
applicationId,
stepId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark

import play.api.libs.json.{Json, OFormat}

case class App(id: String, name: String, state: String, finalStatus: String)
case class App(id: String, name: String, state: String, finalStatus: String, diagnostics: String)

case class Apps(app: Seq[App])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object SparkExecutor {
case None => Seq.empty
}) match {
case Seq(first) =>
updateJob(jobInstance.copy(applicationId = Some(first.id), jobStatus = getStatus(first.finalStatus)))
updateJob(getUpdatedJobInstance(jobInstance, first))
case _
// It relies on the same value set for sparkYarnSink.submitTimeout in multi instance deployment
if jobInstance.jobStatus == JobStatuses.Submitting && jobInstance.updated
Expand All @@ -69,6 +69,23 @@ object SparkExecutor {
private def getStatusUrl(executorJobId: String)(implicit sparkConfig: SparkConfig): String =
s"${sparkConfig.hadoopResourceManagerUrlBase}/ws/v1/cluster/apps?applicationTags=$executorJobId"

private def getUpdatedJobInstance(
jobInstance: JobInstance,
app: App
): JobInstance = {
val diagnostics = app.diagnostics match {
case "" => None
case _ => Some(app.diagnostics)
}

jobInstance.copy(
jobStatus = getStatus(app.finalStatus),
applicationId = Some(app.id),
updated = Option(LocalDateTime.now()),
diagnostics = diagnostics
)
}

private def getStatus(finalStatus: String): JobStatus =
finalStatus match {
case fs if fs == YarnFinalStatuses.Undefined.name => Running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class NotificationSenderImpl(
private val yarnBaseUrl = sparkConfig.hadoopResourceManagerUrlBase
private val dateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
private val messageQueue = new ConcurrentLinkedQueue[Message]
private val causedByPattern = """Caused by: (.*)\n""".r

def createNotifications(dagInstance: DagInstance, jobInstances: Seq[JobInstance])(
implicit ec: ExecutionContext
Expand Down Expand Up @@ -114,15 +115,40 @@ class NotificationSenderImpl(
"Finished" -> dagInstance.finished.map(_.format(dateTimeFormatter)).getOrElse("Couldn't get finish time"),
"Status" -> dagInstance.status.name
)
jobInstances
val failedJob = jobInstances
.sortBy(_.order)(Ordering.Int.reverse)
.find(_.jobStatus.isFailed)
.map(_.applicationId.map { appId =>
val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId"
messageMap += ("Failed application" -> applicationUrl)
})
failedJob.map(_.applicationId.map { appId =>
val applicationUrl = s"${yarnBaseUrl.stripSuffix("/")}/cluster/app/$appId"
messageMap += ("Failed application" -> applicationUrl)
})

messageMap += ("Notification rule ID" -> notificationRule.id.toString)
val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) + "\n\n" + footer

val diagnosticsOpt = failedJob.flatMap(_.diagnostics)
val causes = diagnosticsOpt
.map { diagnostics =>
causedByPattern
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one is overkill. I would just add whole text within diagnosticsOpt.

.findAllMatchIn(diagnostics)
.map(_.group(1))
.toSeq
.map("- " + _)
.reduce(_ + "\n" + _)
}
.map("Causes:\n" + _ + "\n\n")
.getOrElse("")

val stackTrace = diagnosticsOpt
.map { diagnostics =>
s"Stack trace:\n$diagnostics\n\n"
}
.getOrElse("")

val message = messageMap.map { case (key, value) => s"$key: $value" }.reduce(_ + "\n" + _) +
"\n\n" +
causes +
stackTrace +
footer
Message(notificationRule.recipients, subject, message, 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,75 @@ class NotificationSenderTest extends FlatSpec with MockitoSugar with Matchers wi
messagesCaptor.getValue should include(s"Failed application: $clusterBaseUrl/cluster/app/application_9876_4567")
}

it should "print the error message if diagnostics are available" in {
// given
val di = createDagInstance().copy(
status = DagInstanceStatuses.Failed,
started = LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(12, 30)),
finished = Some(LocalDateTime.of(LocalDate.of(2020, 3, 2), LocalTime.of(14, 30)))
)

val diagnostics =
"""User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION
|Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
|=== Streaming Query ===
|Caused by: org.apache.spark.SparkException: Job aborted.
|at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
|Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39
|at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
|""".stripMargin
val ji =
createJobInstance().copy(
jobStatus = JobStatuses.Failed,
applicationId = Some("application_1234_4567"),
order = 1,
diagnostics = Some(diagnostics)
)

val nr1 = createNotificationRule().copy(id = 1, recipients = Seq("abc@def.com", "xyz@def.com"))
val w = createWorkflow()

when(notificationRuleService.getMatchingNotificationRules(eqTo(di.workflowId), eqTo(di.status))(any()))
.thenReturn(Future(Some(Seq(nr1), w)))

// when
await(underTest.createNotifications(di, Seq(ji)))
underTest.sendNotifications()

// then
val expectedMessage =
"""Environment: TEST
|Project: project
|Workflow Name: workflow
|Started: 2020-03-02T12:30:00
|Finished: 2020-03-02T14:30:00
|Status: Failed
|Failed application: http://localhost:8088/cluster/app/application_1234_4567
|Notification rule ID: 1
|
|Causes:
|- org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
|- org.apache.spark.SparkException: Job aborted.
|- java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39
|
|Stack trace:
|User class threw exception: za.co.absa.hyperdrive.shared.exceptions.IngestionException: PROBABLY FAILED INGESTION
|Caused by: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted.
|=== Streaming Query ===
|Caused by: org.apache.spark.SparkException: Job aborted.
|at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
|Caused by: java.lang.IllegalStateException: 29.compact doesn't exist when compacting batch 39
|at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
|
|
|This message has been generated automatically. Please don't reply to it.
|
|HyperdriveDevTeam""".stripMargin
val messagesCaptor: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String])
verify(emailService).sendMessageToBccRecipients(any(), any(), any(), messagesCaptor.capture())
messagesCaptor.getValue shouldBe expectedMessage
}

it should "retry sending the message at most maxRetries times" in {
// given
val maxRetries = 5
Expand Down