diff --git a/pom.xml b/pom.xml index 883ffe032..f5567274b 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 2.3.2 2.17.1 1.7.26 + 3.9.5 5.6.1 1.0.1.RELEASE 3.0.5 @@ -153,6 +154,29 @@ log4j-to-slf4j ${log4j.version} + + com.typesafe.scala-logging + scala-logging_${scala.compat.version} + ${scala-logging.version} + + + org.scalatestplus + mockito-3-4_2.12 + + + org.scalatest + scalatest_2.12 + + + org.scala-lang + scala-library + + + org.scala-lang + scala-reflect + + + com.fasterxml.jackson.module jackson-module-scala_${scala.compat.version} diff --git a/src/main/scala/org/apache/spark/launcher/NoBackendConnectionInProcessLauncher.scala b/src/main/scala/org/apache/spark/launcher/NoBackendConnectionInProcessLauncher.scala index fce63f3dc..825a4f827 100644 --- a/src/main/scala/org/apache/spark/launcher/NoBackendConnectionInProcessLauncher.scala +++ b/src/main/scala/org/apache/spark/launcher/NoBackendConnectionInProcessLauncher.scala @@ -15,11 +15,10 @@ package org.apache.spark.launcher -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging -class NoBackendConnectionInProcessLauncher extends InProcessLauncher { +class NoBackendConnectionInProcessLauncher extends InProcessLauncher with LazyLogging { - private val logger = LoggerFactory.getLogger(this.getClass) override def startApplication(listeners: SparkAppHandle.Listener*): SparkAppHandle = { import scala.collection.JavaConverters._ if (builder.isClientMode(Map[String, String]().asJava)) { diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/WebSecurityConfig.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/WebSecurityConfig.scala index 0dcea55f0..27829bab1 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/WebSecurityConfig.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/WebSecurityConfig.scala @@ -15,9 +15,7 @@ package za.co.absa.hyperdrive.trigger.api.rest -import javax.inject.Inject -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.beans.factory.BeanFactory import org.springframework.context.annotation.{Bean, Configuration} import org.springframework.http.HttpStatus @@ -49,10 +47,12 @@ import za.co.absa.hyperdrive.trigger.api.rest.auth.{ } import za.co.absa.hyperdrive.trigger.configuration.application.AuthConfig +import javax.inject.Inject +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + @EnableWebSecurity @EnableGlobalMethodSecurity(prePostEnabled = true) -class WebSecurityConfig @Inject() (val beanFactory: BeanFactory, authConfig: AuthConfig) { - private val logger = LoggerFactory.getLogger(this.getClass) +class WebSecurityConfig @Inject() (val beanFactory: BeanFactory, authConfig: AuthConfig) extends LazyLogging { val authMechanism: String = authConfig.mechanism diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/AuthClient.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/AuthClient.scala index c9b5d366e..ea0912c91 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/AuthClient.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/AuthClient.scala @@ -15,9 +15,8 @@ package za.co.absa.hyperdrive.trigger.api.rest.client -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.springframework.http.{HttpEntity, HttpHeaders, HttpMethod, HttpStatus, ResponseEntity} +import com.typesafe.scalalogging.{AnyLogging, LazyLogging} +import org.springframework.http._ import org.springframework.security.kerberos.client.KerberosRestTemplate import org.springframework.util.LinkedMultiValueMap import org.springframework.web.client.RestTemplate @@ -71,8 +70,7 @@ sealed abstract class AuthClient( restTemplate: RestTemplate, apiCaller: ApiCaller, url: String => String -) { - protected val logger: Logger = LoggerFactory.getLogger(this.getClass) +) extends AnyLogging { @throws[UnauthorizedException] def authenticate(): HttpHeaders = @@ -118,7 +116,8 @@ class SpnegoAuthClient( restTemplate: RestTemplate, apiCaller: ApiCaller, path: String -) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") { +) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") + with LazyLogging { override protected def requestAuthentication(url: String): ResponseEntity[String] = { logger.info( s"Authenticating via SPNEGO ($url): user `${credentials.username}`, with keytab `${credentials.keytabLocation}`" @@ -132,7 +131,8 @@ class StandardAuthClient( restTemplate: RestTemplate, apiCaller: ApiCaller, path: String -) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") { +) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") + with LazyLogging { override protected def requestAuthentication(url: String): ResponseEntity[String] = { val requestParts = new LinkedMultiValueMap[String, String] requestParts.add("username", credentials.username) @@ -148,7 +148,8 @@ class StandardBase64AuthClient( restTemplate: RestTemplate, apiCaller: ApiCaller, path: String -) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") { +) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") + with LazyLogging { override protected def requestAuthentication(url: String): ResponseEntity[String] = { val headers = new HttpHeaders() headers.add("Authorization", "Basic " + credentials.base64Credentials) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/CrossHostApiCaller.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/CrossHostApiCaller.scala index 6b703ead7..f805ec44c 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/CrossHostApiCaller.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/CrossHostApiCaller.scala @@ -15,19 +15,17 @@ package za.co.absa.hyperdrive.trigger.api.rest.client +import com.typesafe.scalalogging.Logger import org.apache.commons.lang3.exception.ExceptionUtils -import org.slf4j.LoggerFactory import org.springframework.web.client import org.springframework.web.client.ResourceAccessException -import za.co.absa.hyperdrive.trigger.api.rest.client.CrossHostApiCaller.logger import scala.annotation.tailrec -import scala.util.Failure -import scala.util.Random -import scala.util.Try +import scala.util.{Failure, Random, Try} class CrossHostApiCaller private (apiBaseUrls: Vector[String], maxTryCount: Int, private var currentHostIndex: Int) extends ApiCaller { + import CrossHostApiCaller._ def baseUrlsCount: Int = apiBaseUrls.size def currentBaseUrl: String = apiBaseUrls(currentHostIndex) @@ -69,7 +67,7 @@ class CrossHostApiCaller private (apiBaseUrls: Vector[String], maxTryCount: Int, } object CrossHostApiCaller { - private val logger = LoggerFactory.getLogger(classOf[CrossHostApiCaller]) + private val logger = Logger[CrossHostApiCaller] final val DefaultUrlsRetryCount: Int = 0 diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/RestClient.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/RestClient.scala index a6d3b874f..43c339f2c 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/RestClient.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/client/RestClient.scala @@ -15,18 +15,14 @@ package za.co.absa.hyperdrive.trigger.api.rest.client -import org.slf4j.LoggerFactory -import org.springframework.http.HttpEntity -import org.springframework.http.HttpHeaders -import org.springframework.http.HttpMethod -import org.springframework.http.HttpStatus +import com.typesafe.scalalogging.LazyLogging +import org.springframework.http.{HttpEntity, HttpHeaders, HttpMethod, HttpStatus} import org.springframework.web.client.RestTemplate import scala.annotation.tailrec import scala.reflect.ClassTag -class RestClient(authClient: AuthClient, restTemplate: RestTemplate) { - private val logger = LoggerFactory.getLogger(this.getClass) +class RestClient(authClient: AuthClient, restTemplate: RestTemplate) extends LazyLogging { private var authHeaders = new HttpHeaders() diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/RestErrorHandler.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/RestErrorHandler.scala index 277bc0ed8..f85da2df0 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/RestErrorHandler.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/RestErrorHandler.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.api.rest.controllers -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.http.converter.HttpMessageNotReadableException import org.springframework.http.{HttpStatus, ResponseEntity} import org.springframework.security.access.AccessDeniedException @@ -23,8 +23,7 @@ import org.springframework.web.bind.annotation.{ExceptionHandler, RestController import org.springframework.web.context.request.WebRequest import za.co.absa.hyperdrive.trigger.models.errors.ApiException @RestControllerAdvice -class RestErrorHandler { - private val logger = LoggerFactory.getLogger(this.getClass) +class RestErrorHandler extends LazyLogging { @ExceptionHandler(Array(classOf[ApiException])) def handleApiException(ex: ApiException, request: WebRequest): ResponseEntity[Object] = { diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/WorkflowController.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/WorkflowController.scala index e38b38f52..de9ddb1f8 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/WorkflowController.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/WorkflowController.scala @@ -15,11 +15,7 @@ package za.co.absa.hyperdrive.trigger.api.rest.controllers -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import java.util.concurrent.CompletableFuture -import java.util.zip.{ZipEntry, ZipInputStream, ZipOutputStream} -import javax.inject.Inject -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.core.io.ByteArrayResource import org.springframework.http.{HttpHeaders, MediaType, ResponseEntity} import org.springframework.web.bind.annotation._ @@ -31,6 +27,10 @@ import za.co.absa.hyperdrive.trigger.models._ import za.co.absa.hyperdrive.trigger.models.errors.{ApiException, BulkOperationError, GenericError} import za.co.absa.hyperdrive.trigger.models.search.{TableSearchRequest, TableSearchResponse} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util.concurrent.CompletableFuture +import java.util.zip.{ZipEntry, ZipInputStream, ZipOutputStream} +import javax.inject.Inject import scala.collection.mutable.ArrayBuffer import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext @@ -38,8 +38,8 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.Try @RestController -class WorkflowController @Inject() (workflowService: WorkflowService, generalConfig: GeneralConfig) { - private val logger = LoggerFactory.getLogger(this.getClass) +class WorkflowController @Inject() (workflowService: WorkflowService, generalConfig: GeneralConfig) + extends LazyLogging { val environment: String = generalConfig.environment diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/health/DatabaseConnectionHealthIndicator.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/health/DatabaseConnectionHealthIndicator.scala index 632c9fbe0..9f154d1a2 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/health/DatabaseConnectionHealthIndicator.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/health/DatabaseConnectionHealthIndicator.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.api.rest.health -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.Logger import org.springframework.boot.actuate.health.{Health, HealthIndicator} import org.springframework.stereotype.Component import za.co.absa.hyperdrive.trigger.configuration.application.HealthConfig @@ -32,7 +32,7 @@ class DatabaseConnectionHealthIndicator @Inject() (val dbProvider: DatabaseProvi extends HealthIndicator with Repository { import api._ - private val log = LoggerFactory.getLogger(this.getClass) + private val log = Logger(this.getClass) val dbConnection: Duration = Duration(healthConfig.databaseConnectionTimeoutMillis, TimeUnit.MILLISECONDS) override protected def health(): Health = Try { diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala index 1baabdb4d..cebfaa5b5 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala @@ -15,11 +15,11 @@ package za.co.absa.hyperdrive.trigger.api.rest.services +import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.security.UserGroupInformation import org.json4s.jackson.Serialization import org.json4s.{Formats, NoTypeHints} -import org.slf4j.LoggerFactory import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap @@ -48,8 +48,7 @@ class HdfsParameters( @Lazy @Service -class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends CheckpointService { - private val logger = LoggerFactory.getLogger(this.getClass) +class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends CheckpointService with LazyLogging { private val offsetsDirName = "offsets" private val commitsDirName = "commits" private implicit val formats: Formats = Serialization.formats(NoTypeHints) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala index 0a7cdbd63..24ee80344 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala @@ -15,11 +15,11 @@ package za.co.absa.hyperdrive.trigger.api.rest.services +import com.typesafe.scalalogging.LazyLogging import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.deploy.SparkHadoopUtil -import org.slf4j.LoggerFactory import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Service @@ -39,8 +39,7 @@ trait HdfsService { @Lazy @Service -class HdfsServiceImpl extends HdfsService { - private val logger = LoggerFactory.getLogger(this.getClass) +class HdfsServiceImpl extends HdfsService with LazyLogging { private lazy val conf = SparkHadoopUtil.get.conf override def exists(path: Path)(implicit ugi: UserGroupInformation): Try[Boolean] = { Try { diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala index b8f1ee9d6..83473b101 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala @@ -15,13 +15,13 @@ package za.co.absa.hyperdrive.trigger.api.rest.services +import com.typesafe.scalalogging.LazyLogging import org.apache.commons.configuration2.builder.BasicConfigurationBuilder import org.apache.commons.configuration2.builder.fluent.Parameters import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler import org.apache.commons.configuration2.{BaseConfiguration, Configuration} import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.consumer.ConsumerConfig -import org.slf4j.LoggerFactory import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig @@ -47,8 +47,8 @@ class HyperdriveOffsetServiceImpl @Inject() (sparkConfig: SparkConfig, @Lazy checkpointService: CheckpointService, @Lazy userGroupInformationService: UserGroupInformationService, kafkaService: KafkaService -) extends HyperdriveOffsetService { - private val logger = LoggerFactory.getLogger(this.getClass) +) extends HyperdriveOffsetService + with LazyLogging { private val HyperdriveCheckpointKey = "writer.common.checkpoint.location" private val HyperdriveKafkaTopicKey = "reader.kafka.topic" private val HyperdriveKafkaBrokersKey = "reader.kafka.brokers" diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala index bbb617ba1..51a427691 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala @@ -15,10 +15,10 @@ package za.co.absa.hyperdrive.trigger.api.rest.services -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.stereotype.Service -import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus} import za.co.absa.hyperdrive.trigger.models.enums.JobTypes +import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus} import za.co.absa.hyperdrive.trigger.persistance.WorkflowRepository import scala.concurrent.{ExecutionContext, Future} @@ -37,8 +37,8 @@ class HyperdriveServiceImpl( override protected val workflowRepository: WorkflowRepository, override protected val jobTemplateService: JobTemplateService, override protected val hyperdriveOffsetService: HyperdriveOffsetService -) extends HyperdriveService { - private val logger = LoggerFactory.getLogger(this.getClass) +) extends HyperdriveService + with LazyLogging { override def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]] = { workflowRepository.getWorkflow(id).flatMap { workflow => diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala index 18428a872..5872c7ae4 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala @@ -15,9 +15,9 @@ package za.co.absa.hyperdrive.trigger.api.rest.services +import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.TopicPartition -import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import org.springframework.util.ConcurrentLruCache import za.co.absa.hyperdrive.trigger.api.rest.services.KafkaServiceImpl.{BeginningOffsets, EndOffsets, OffsetFunction} @@ -36,8 +36,7 @@ trait KafkaService { } @Service -class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService { - private val logger = LoggerFactory.getLogger(this.getClass) +class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService with LazyLogging { private val consumerUuid = randomUUID().toString private val kafkaConsumersCache = new ConcurrentLruCache[(Properties, Long), KafkaConsumer[String, String]]( generalConfig.kafkaConsumersCacheSize, diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/WorkflowService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/WorkflowService.scala index 235615ece..7748ed51b 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/WorkflowService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/WorkflowService.scala @@ -15,20 +15,13 @@ package za.co.absa.hyperdrive.trigger.api.rest.services +import com.typesafe.scalalogging.Logger import org.springframework.stereotype.Service -import za.co.absa.hyperdrive.trigger.models.{ - JobDefinition, - Project, - ProjectInfo, - Workflow, - WorkflowImportExportWrapper, - WorkflowJoined -} -import za.co.absa.hyperdrive.trigger.models.errors.{ApiException, BulkOperationError, GenericError} -import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, WorkflowRepository} -import org.slf4j.LoggerFactory import za.co.absa.hyperdrive.trigger.configuration.application.GeneralConfig +import za.co.absa.hyperdrive.trigger.models._ +import za.co.absa.hyperdrive.trigger.models.errors.{ApiException, BulkOperationError, GenericError} import za.co.absa.hyperdrive.trigger.models.search.{TableSearchRequest, TableSearchResponse} +import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, WorkflowRepository} import scala.concurrent.{ExecutionContext, Future} @@ -91,7 +84,7 @@ class WorkflowServiceImpl( ) extends WorkflowService with UserDetailsService { - private val serviceLogger = LoggerFactory.getLogger(this.getClass) + private val serviceLogger = Logger(this.getClass) def createWorkflow(workflow: WorkflowJoined)(implicit ec: ExecutionContext): Future[WorkflowJoined] = { val userName = getUserName.apply() diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/liquibase/LiquibaseConfiguration.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/liquibase/LiquibaseConfiguration.scala index 13db81b17..2a2183804 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/liquibase/LiquibaseConfiguration.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/liquibase/LiquibaseConfiguration.scala @@ -15,10 +15,9 @@ package za.co.absa.hyperdrive.trigger.configuration.liquibase +import com.typesafe.scalalogging.Logger import liquibase.integration.spring.SpringLiquibase import liquibase.{Contexts, LabelExpression, Liquibase} -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.liquibase.LiquibaseProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.{Bean, Configuration} @@ -33,7 +32,7 @@ class LiquibaseConfiguration( val dbProvider: DatabaseProvider ) extends SpringLiquibase with Repository { - private val configLogger = LoggerFactory.getLogger(this.getClass) + private val configLogger = Logger(this.getClass) private val skipLiquibase: Boolean = dbConfig.skipLiquibase @Bean diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/DBErrorHandling.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/DBErrorHandling.scala index 60b3703ee..137fc24c7 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/DBErrorHandling.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/DBErrorHandling.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.persistance -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.Logger import slick.dbio.{DBIOAction, Effect, NoStream} import za.co.absa.hyperdrive.trigger.models.errors.{ApiException, GenericDatabaseError} import za.co.absa.hyperdrive.trigger.models.tables.Profile @@ -26,7 +26,7 @@ import scala.util.{Failure, Success} private[persistance] trait DBErrorHandling { this: Profile => - private val repositoryLogger = LoggerFactory.getLogger(this.getClass) + private val repositoryLogger = Logger(this.getClass) protected implicit class DBIOActionOps[T](val action: api.DBIO[T]) { def withErrorHandling(errorMessage: String)(implicit ec: ExecutionContext): DBIOAction[T, NoStream, Effect.All] = diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/WorkflowRepository.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/WorkflowRepository.scala index 27c1a0cba..fdd1b176f 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/WorkflowRepository.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/persistance/WorkflowRepository.scala @@ -15,17 +15,17 @@ package za.co.absa.hyperdrive.trigger.persistance -import java.time.LocalDateTime -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.Logger import org.springframework.stereotype +import za.co.absa.hyperdrive.trigger.models._ import za.co.absa.hyperdrive.trigger.models.enums.SchedulerInstanceStatuses import za.co.absa.hyperdrive.trigger.models.enums.SchedulerInstanceStatuses.SchedulerInstanceStatus import za.co.absa.hyperdrive.trigger.models.errors.ApiErrorTypes.OptimisticLockingErrorType import za.co.absa.hyperdrive.trigger.models.errors.{ApiException, DatabaseError, GenericDatabaseError, ValidationError} import za.co.absa.hyperdrive.trigger.models.search.{TableSearchRequest, TableSearchResponse} import za.co.absa.hyperdrive.trigger.models.tables.tableExtensions.optimisticLocking.OptimisticLockingException -import za.co.absa.hyperdrive.trigger.models.{ProjectInfo, _} +import java.time.LocalDateTime import javax.inject.Inject import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -71,7 +71,7 @@ class WorkflowRepositoryImpl @Inject() ( import api._ - private val repositoryLogger = LoggerFactory.getLogger(this.getClass) + private val repositoryLogger = Logger(this.getClass) override def insertWorkflow(workflow: WorkflowJoined, user: String)(implicit ec: ExecutionContext): Future[Long] = db.run( diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala index f288e45b3..be39eced4 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/JobScheduler.scala @@ -15,10 +15,11 @@ package za.co.absa.hyperdrive.trigger.scheduler +import com.typesafe.scalalogging.LazyLogging + import java.util.concurrent import java.util.concurrent.atomic.AtomicBoolean import javax.inject.Inject -import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import za.co.absa.hyperdrive.trigger.configuration.application.SchedulerConfig import za.co.absa.hyperdrive.trigger.persistance._ @@ -39,12 +40,10 @@ class JobScheduler @Inject() ( workflowBalancer: WorkflowBalancer, notificationSender: NotificationSender, schedulerConfig: SchedulerConfig -) { +) extends LazyLogging { case class RunningDagsKey(dagId: Long, workflowId: Long) - private val logger = LoggerFactory.getLogger(this.getClass) - private val HEART_BEAT: Int = schedulerConfig.heartBeat val NUM_OF_PAR_TASKS: Int = schedulerConfig.maxParallelJobs @@ -119,17 +118,22 @@ class JobScheduler @Inject() ( .getDagsToRun(runningDags.keys.map(_.workflowId).toSeq.distinct, emptySlotsSize, assignedWorkflowIds) .map { _.foreach { dag => - logger.debug(s"Deploying dag = ${dag.id}") + logger.debug("Deploying dag (DagId={})", dag.id) runningDags.put(RunningDagsKey(dag.id, dag.workflowId), executors.executeDag(dag)) } } private def removeFinishedDags(): Unit = if (runningEnqueue.isCompleted) { - runningDags.foreach { - case (id, fut) if fut.isCompleted => runningDags.remove(id) - case _ => () - } + val finishedDags = runningDags.flatMap { + case (key, dagCompletion) if dagCompletion.isCompleted => Some(key) + case _ => None + }.toSeq + logger.debug( + "Removing finished DAGs for workflows {}", + finishedDags.map(k => s"(DagId=${k.dagId}, WorkflowId=${k.workflowId})") + ) + runningDags --= finishedDags } private def processEvents(assignedWorkflowIds: Seq[Long]): Unit = diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/SchedulerInstanceService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/SchedulerInstanceService.scala index 74c3db21f..65cef3b64 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/SchedulerInstanceService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/SchedulerInstanceService.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.cluster -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import java.time.Duration import javax.inject.Inject @@ -36,8 +36,8 @@ trait SchedulerInstanceService { @Service class SchedulerInstanceServiceImpl @Inject() (schedulerInstanceRepository: SchedulerInstanceRepository) - extends SchedulerInstanceService { - private val logger = LoggerFactory.getLogger(this.getClass) + extends SchedulerInstanceService + with LazyLogging { override def registerNewInstance()(implicit ec: ExecutionContext): Future[Long] = schedulerInstanceRepository.insertInstance() @@ -60,7 +60,12 @@ class SchedulerInstanceServiceImpl @Inject() (schedulerInstanceRepository: Sched lagThreshold ) _ = if (deactivatedCount != 0) - logger.info(s"Deactivated $deactivatedCount instances at current heartbeat $currentHeartbeat") + logger.info( + "Deactivated {} instances at current heartbeat {} by (SchedulerId={})", + deactivatedCount, + currentHeartbeat, + instanceId + ) allInstances <- schedulerInstanceRepository.getAllInstances() } yield allInstances } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancer.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancer.scala index dfdc5db97..f8d84779c 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancer.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancer.scala @@ -15,14 +15,13 @@ package za.co.absa.hyperdrive.trigger.scheduler.cluster -import java.time.Duration +import com.typesafe.scalalogging.LazyLogging + import javax.inject.Inject -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import za.co.absa.hyperdrive.trigger.configuration.application.SchedulerConfig import za.co.absa.hyperdrive.trigger.models.enums.SchedulerInstanceStatuses.SchedulerInstanceStatus -import za.co.absa.hyperdrive.trigger.models.{SchedulerInstance, Workflow} +import za.co.absa.hyperdrive.trigger.models.Workflow import scala.concurrent.{ExecutionContext, Future} @@ -31,9 +30,8 @@ class WorkflowBalancer @Inject() ( schedulerInstanceService: SchedulerInstanceService, workflowBalancingService: WorkflowBalancingService, schedulerConfig: SchedulerConfig -) { +) extends LazyLogging { case class SchedulerIdStatus(id: Long, status: SchedulerInstanceStatus) - private val logger = LoggerFactory.getLogger(this.getClass) private var schedulerInstanceId: Option[Long] = None private var previousInstancesIdStatus: Set[SchedulerIdStatus] = Set() @@ -65,8 +63,10 @@ class WorkflowBalancer @Inject() ( workflows } - def resetSchedulerInstanceId(): Unit = + def resetSchedulerInstanceId(): Unit = { + logger.trace("Resetting scheduler instance id (SchedulerId={}) -> None", schedulerInstanceId) schedulerInstanceId = None + } private def getOrCreateInstance()(implicit ec: ExecutionContext) = schedulerInstanceId match { @@ -76,7 +76,7 @@ class WorkflowBalancer @Inject() ( .registerNewInstance() .map { id => schedulerInstanceId = Some(id) - logger.info(s"Registered new scheduler instance with id = $id") + logger.info(s"Registered new scheduler instance (SchedulerId=$id)") id } } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancingService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancingService.scala index c35f5836d..a1c33226e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancingService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/cluster/WorkflowBalancingService.scala @@ -15,8 +15,9 @@ package za.co.absa.hyperdrive.trigger.scheduler.cluster +import com.typesafe.scalalogging.LazyLogging + import javax.inject.Inject -import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.models.enums.SchedulerInstanceStatuses import za.co.absa.hyperdrive.trigger.models.{SchedulerInstance, Workflow} @@ -34,8 +35,9 @@ trait WorkflowBalancingService { } @Service -class WorkflowBalancingServiceImpl @Inject() (workflowRepository: WorkflowRepository) extends WorkflowBalancingService { - private val logger = LoggerFactory.getLogger(this.getClass) +class WorkflowBalancingServiceImpl @Inject() (workflowRepository: WorkflowRepository) + extends WorkflowBalancingService + with LazyLogging { override def getWorkflowsAssignment( runningWorkflowIds: Iterable[Long], @@ -45,16 +47,22 @@ class WorkflowBalancingServiceImpl @Inject() (workflowRepository: WorkflowReposi val activeInstances = instances.filter(_.status == SchedulerInstanceStatuses.Active) val myRank = getRank(activeInstances, myInstanceId) logger.info( - s"Rebalancing workflows on scheduler instance id = $myInstanceId, rank = $myRank," + - s" active instance ids = ${activeInstances.map(_.id).sorted}, retaining workflow ids = $runningWorkflowIds" + "Rebalancing workflows on scheduler instance (SchedulerId={}), rank = {}," + + " active instances {}, retaining workflows {}", + myInstanceId, + myRank, + activeInstances.map(_.id).sorted.map(id => s"InstanceId=$id"), + runningWorkflowIds.map(id => s"WorkflowId=$id") ) for { (releasedWorkflowsCount, instancesDeletedCount) <- workflowRepository .releaseWorkflowAssignmentsOfDeactivatedInstances() _ = if (releasedWorkflowsCount > 0) { logger.info( - s"Scheduler instance id = $myInstanceId released $releasedWorkflowsCount workflows of " + - s"$instancesDeletedCount deactivated instances" + "Scheduler instance (SchedulerId={}) released {} workflows of {} deactivated instances", + myInstanceId, + releasedWorkflowsCount, + instancesDeletedCount ) } allWorkflows <- workflowRepository.getWorkflows() @@ -72,8 +80,10 @@ class WorkflowBalancingServiceImpl @Inject() (workflowRepository: WorkflowReposi val acquiredWorkflowIds = acquiredWorkflows.map(_.id) val targetWorkflowAssignmentReached = acquiredWorkflowIds.toSet == targetWorkflowIds.toSet logger.debug( - s"Scheduler instance id = $myInstanceId acquired workflow ids ${acquiredWorkflowIds.sorted}" + - s" with missing target workflow ids = ${targetWorkflowIds.diff(acquiredWorkflowIds).sorted}" + "Scheduler instance (SchedulerId={}) acquired workflows {} with missing target workflows {}", + myInstanceId, + acquiredWorkflowIds.sorted.map(id => s"WorkflowId=$id"), + targetWorkflowIds.diff(acquiredWorkflowIds).sorted.map(id => s"WorkflowId=$id") ) (acquiredWorkflows, targetWorkflowAssignmentReached) } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/eventProcessor/EventProcessor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/eventProcessor/EventProcessor.scala index 599267bd0..cb9a1819a 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/eventProcessor/EventProcessor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/eventProcessor/EventProcessor.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.eventProcessor -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.stereotype.Component import za.co.absa.hyperdrive.trigger.api.rest.services.DagInstanceService import za.co.absa.hyperdrive.trigger.models.Event @@ -29,8 +29,7 @@ class EventProcessor( dagDefinitionRepository: DagDefinitionRepository, dagInstanceRepository: DagInstanceRepository, dagInstanceService: DagInstanceService -) { - private val logger = LoggerFactory.getLogger(this.getClass) +) extends LazyLogging { def eventProcessor( triggeredBy: String @@ -42,27 +41,55 @@ class EventProcessor( private def processEvents(events: Seq[Event], sensorId: Long, triggeredBy: String)( implicit ec: ExecutionContext - ): Future[Boolean] = + ): Future[Boolean] = { + logger.trace( + "Processing events {} called on event processor for sensor (SensorId={}), triggered by: {}", + events.map(e => s"EventId=${e.id}"), + sensorId, + triggeredBy + ) eventRepository.getExistEvents(events.map(_.sensorEventId)).flatMap { eventsIdsInDB => val newEvents = events.filter(e => !eventsIdsInDB.contains(e.sensorEventId)) + logger.trace(s"Unprocessed events ${newEvents.map(e => s"EventId=${e.id}")}") if (newEvents.nonEmpty) { dagDefinitionRepository.getJoinedDagDefinition(sensorId).flatMap { case Some(joinedDagDefinition) => for { - hasInQueueDagInstance <- dagInstanceRepository.hasInQueueDagInstance(joinedDagDefinition.workflowId) + hasInQueueDagInstance <- dagInstanceRepository + .hasInQueueDagInstance(joinedDagDefinition.workflowId) + _ = logger.trace( + "DAG instance for (WorkflowId={}) produced by (SensorId={}) already queued: [{}]", + joinedDagDefinition.workflowId, + sensorId, + hasInQueueDagInstance + ) dagInstanceJoined <- dagInstanceService .createDagInstance(joinedDagDefinition, triggeredBy, hasInQueueDagInstance) + _ = logger.trace( + "Created Joined DAG instance {} by (SensorId={}) for (WorkflowId={})", + dagInstanceJoined, + sensorId, + joinedDagDefinition.workflowId + ) dagInstanceJoinedEvents = newEvents.map(event => (dagInstanceJoined, event)) _ <- dagInstanceRepository.insertJoinedDagInstancesWithEvents(dagInstanceJoinedEvents) } yield { + logger.info( + "Persisted newly paired DAG instances with Events into DB by (SensorId={}) for (WorkflowId={})", + sensorId, + joinedDagDefinition.workflowId + ) true } case None => + logger.info("No Joined DAG definition found for (SensorId={})", sensorId) Future.successful(true) } } else { + logger.info("EventProcessor for (SensorId={}) doesn't have any new events", sensorId) Future.successful(true) } } + } } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 346f15779..97bc233fc 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -15,6 +15,8 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors +import com.typesafe.scalalogging.LazyLogging + import java.time.LocalDateTime import java.util.concurrent import javax.inject.Inject @@ -29,7 +31,6 @@ import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{ SparkExecutor, SparkYarnClusterServiceImpl } -import org.slf4j.LoggerFactory import org.springframework.beans.factory.BeanFactory import org.springframework.context.annotation.Lazy import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor @@ -50,8 +51,7 @@ class Executors @Inject() ( implicit val sparkConfig: SparkConfig, schedulerConfig: SchedulerConfig, @Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetService -) { - private val logger = LoggerFactory.getLogger(this.getClass) +) extends LazyLogging { private implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(concurrent.Executors.newFixedThreadPool(schedulerConfig.executors.threadPoolSize)) private val sparkClusterService: SparkClusterService = { @@ -79,8 +79,9 @@ class Executors @Inject() ( } yield {} fut.onComplete { case Failure(exception) => - logger.error(s"Updating status failed for failed run. Dag instance id = ${dagInstance.id}", exception) + logger.error(s"Updating status failed for failed run. (DagId=${dagInstance.id})", exception) case _ => + logger.info("Updating status succeeded for failed run. (DagId={})", dagInstance.id) } fut case jobInstances @@ -92,8 +93,9 @@ class Executors @Inject() ( } yield {} fut.onComplete { case Failure(exception) => - logger.error(s"Updating status failed for skipped run. Dag instance id = ${dagInstance.id}", exception) + logger.error(s"Updating status failed for skipped run. (DagId=${dagInstance.id})", exception) case _ => + logger.info("Updating status succeeded for skipped run. (DagId={})", dagInstance.id) } fut case jobInstances if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && !ji.jobStatus.isFailed) => @@ -105,8 +107,9 @@ class Executors @Inject() ( } yield {} fut.onComplete { case Failure(exception) => - logger.error(s"Updating status failed for successful run. Dag instance id = ${dagInstance.id}", exception) + logger.error(s"Updating status failed for successful run. (DagId=${dagInstance.id})", exception) case _ => + logger.info("Updating status succeeded for successful run. (DagId={})", dagInstance.id) } fut case jobInstances => @@ -127,16 +130,19 @@ class Executors @Inject() ( } } fut.onComplete { - case Success(_) => logger.debug(s"Executing job. Job instance = $jobInstance") + case Success(_) => logger.debug(s"Executing job. (JobInstance={})", jobInstance) case Failure(exception) => - logger.error(s"Executing job failed. Job instance id = $jobInstance.", exception) + logger.error(s"Executing job failed. (JobInstance=$jobInstance).", exception) } fut } private def updateJob(jobInstance: JobInstance): Future[Unit] = { logger.info( - s"Job updated. ID = ${jobInstance.id} STATUS = ${jobInstance.jobStatus} EXECUTOR_ID = ${jobInstance.executorJobId}" + "Job updated. (JobInstance={}) STATUS = {} EXECUTOR_ID = {}", + jobInstance.id, + jobInstance.jobStatus, + jobInstance.executorJobId ) jobInstanceRepository.updateJob(jobInstance) } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/shell/ShellExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/shell/ShellExecutor.scala index 6eca40f8a..f6023720d 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/shell/ShellExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/shell/ShellExecutor.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.shell -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import za.co.absa.hyperdrive.trigger.models.{JobInstance, ShellInstanceParameters} import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses._ @@ -23,8 +23,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.sys.process._ import scala.util.Try -object ShellExecutor { - private val logger = LoggerFactory.getLogger(this.getClass) +object ShellExecutor extends LazyLogging { def execute(jobInstance: JobInstance, jobParameters: ShellInstanceParameters, updateJob: JobInstance => Future[Unit])( implicit executionContext: ExecutionContext diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index e25c458fd..2bcbf2133 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses @@ -23,8 +23,7 @@ import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameter import scala.concurrent.{ExecutionContext, Future} -object HyperdriveExecutor { - private val logger = LoggerFactory.getLogger(this.getClass) +object HyperdriveExecutor extends LazyLogging { def execute( jobInstance: JobInstance, diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkEmrClusterServiceImpl.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkEmrClusterServiceImpl.scala index 30c3447d7..2b0da81db 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkEmrClusterServiceImpl.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkEmrClusterServiceImpl.scala @@ -25,8 +25,8 @@ import com.amazonaws.services.elasticmapreduce.model.{ StepState, StepSummary } +import com.typesafe.scalalogging.LazyLogging import org.apache.commons.lang3.StringUtils -import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses @@ -46,9 +46,9 @@ class SparkEmrClusterServiceImpl @Inject() ( sparkConfig: SparkConfig, emrClusterProvider: EmrClusterProviderService, executionContextProvider: SparkClusterServiceExecutionContextProvider -) extends SparkClusterService { +) extends SparkClusterService + with LazyLogging { private implicit val executionContext: ExecutionContext = executionContextProvider.get() - private val logger = LoggerFactory.getLogger(this.getClass) private val commandRunnerJar = "command-runner.jar" private lazy val emr = emrClusterProvider.get() diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala index ce53d7f00..1ba561490 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/notifications/NotificationSender.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.notifications -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.stereotype.Component import za.co.absa.hyperdrive.trigger.api.rest.services.NotificationRuleService import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, NotificationConfig, SparkConfig} @@ -42,10 +42,10 @@ class NotificationSenderImpl( sparkConfig: SparkConfig, notificationConfig: NotificationConfig, generalConfig: GeneralConfig -) extends NotificationSender { +) extends NotificationSender + with LazyLogging { private case class Message(recipients: Seq[String], subject: String, text: String, attempts: Int) - private val logger = LoggerFactory.getLogger(this.getClass) private val sender = notificationConfig.senderAddress private val notificationEnabled = notificationConfig.enabled private val environment = generalConfig.environment diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensor.scala index 5df518fa4..446a53a4f 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensor.scala @@ -15,19 +15,24 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.AnyLogging import za.co.absa.hyperdrive.trigger.models.{Event, SensorProperties} import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal -trait Sensor[T <: SensorProperties] { - private val logger = LoggerFactory.getLogger(this.getClass) +trait Sensor[T <: SensorProperties] extends AnyLogging { + val eventsProcessor: (Seq[Event], Long) => Future[Boolean] val sensorDefinition: za.co.absa.hyperdrive.trigger.models.Sensor[T] implicit val executionContext: ExecutionContext def close(): Unit = try { + logger.trace( + "Closing sensor (SensorId={}) for workflow (WorkflowId={})", + sensorDefinition.id, + sensorDefinition.workflowId + ) closeInternal() } catch { case NonFatal(e) => logger.warn(s"Couldn't close sensor ${sensorDefinition.id} - $sensorDefinition", e) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensors.scala index c66ef5f56..10b796753 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/Sensors.scala @@ -15,7 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors -import org.slf4j.LoggerFactory +import com.typesafe.scalalogging.LazyLogging import org.springframework.stereotype.Component import za.co.absa.hyperdrive.trigger.configuration.application.{ GeneralConfig, @@ -28,14 +28,14 @@ import za.co.absa.hyperdrive.trigger.models.{ KafkaSensorProperties, RecurringSensorProperties, SensorProperties, - TimeSensorProperties + TimeSensorProperties, + Sensor => SensorDefition } import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, SensorRepository} import za.co.absa.hyperdrive.trigger.scheduler.eventProcessor.EventProcessor import za.co.absa.hyperdrive.trigger.scheduler.sensors.kafka.{AbsaKafkaSensor, KafkaSensor} import za.co.absa.hyperdrive.trigger.scheduler.sensors.recurring.RecurringSensor import za.co.absa.hyperdrive.trigger.scheduler.sensors.time.{TimeSensor, TimeSensorQuartzSchedulerManager} -import za.co.absa.hyperdrive.trigger.models.{Sensor => SensorDefition} import java.util.concurrent.Executors import javax.inject.Inject @@ -52,8 +52,7 @@ class Sensors @Inject() ( implicit val generalConfig: GeneralConfig, schedulerConfig: SchedulerConfig, implicit val recurringSensorConfig: RecurringSensorConfig -) { - private val logger = LoggerFactory.getLogger(this.getClass) +) extends LazyLogging { private implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(schedulerConfig.sensors.threadPoolSize)) @@ -62,7 +61,7 @@ class Sensors @Inject() ( mutable.Map.empty[Long, Sensor[_ <: SensorProperties]] def processEvents(assignedWorkflowIds: Seq[Long]): Future[Unit] = { - logger.info(s"Processing events. Sensors: ${sensors.keys}") + logger.info(s"Processing events sensed by ${sensors.keys.map(id => s"SensorId=$id")}") removeReleasedSensors(assignedWorkflowIds) val fut = for { _ <- removeInactiveSensors() @@ -75,7 +74,7 @@ class Sensors @Inject() ( fut.onComplete { case Success(_) => logger.info("Processing events successful") - case Failure(exception) => logger.debug("Processing events failed.", exception) + case Failure(exception) => logger.warn("Processing events failed.", exception) } fut @@ -100,6 +99,7 @@ class Sensors @Inject() ( sensors.values.map(sensor => (sensor.sensorDefinition.id, sensor.sensorDefinition.properties)).toSeq ) .map(_.foreach { sensor => + logger.trace("Restarting updated sensor (SensorId={}) for (WorkflowId={})", sensor.id, sensor.workflowId) stopSensor(sensor.id) startSensor(sensor) }) @@ -107,6 +107,11 @@ class Sensors @Inject() ( private def removeReleasedSensors(assignedWorkflowIds: Seq[Long]): Unit = { val releasedWorkflowIds = sensors.values.map(_.sensorDefinition.workflowId).toSeq.diff(assignedWorkflowIds) + logger.trace( + "Removing released sensors {} when assigned {}", + releasedWorkflowIds.map(id => s"WorkflowId=$id"), + assignedWorkflowIds.map(id => s"WorkflowId=$id") + ) sensors .filter { case (_, value) => releasedWorkflowIds.contains(value.sensorDefinition.workflowId) } .foreach { case (sensorId, _) => stopSensor(sensorId) } @@ -114,10 +119,17 @@ class Sensors @Inject() ( private def removeInactiveSensors(): Future[Unit] = { val activeSensors = sensors.keys.toSeq - sensorRepository.getInactiveSensors(activeSensors).map(_.foreach(id => stopSensor(id))) + logger.trace(s"Removing inactive sensors called with active sensors: ${activeSensors.map(id => s"SensorId=$id")}") + sensorRepository + .getInactiveSensors(activeSensors) + .map { inactive => + logger.info("Removing inactive sensors {}", inactive.map(id => s"SensorId=$id")) + inactive.foreach(id => stopSensor(id)) + } } private def stopSensor(id: Long) = { + logger.trace("Stopping sensor (SensorId={})", id) sensors.get(id).foreach(_.close()) sensors.remove(id) } @@ -129,7 +141,13 @@ class Sensors @Inject() ( } } - private def startSensor(sensor: SensorDefition[_ <: SensorProperties]) = + private def startSensor(sensor: SensorDefition[_ <: SensorProperties]) = { + logger.debug( + "Starting sensor (SensorId={}) for (WorkflowId={}) with (SensorType={})", + sensor.id, + sensor.workflowId, + sensor.properties.sensorType.name + ) sensor.properties match { case kafkaSensorProperties: KafkaSensorProperties => Try( @@ -138,8 +156,10 @@ class Sensors @Inject() ( sensor.copy(properties = kafkaSensorProperties) ) ) match { - case Success(s) => sensors.put(sensor.id, s) - case Failure(f) => logger.error(s"Could not create Kafka sensor for sensor (#${sensor.id}).", f) + case Success(s) => + logger.info("Kafka sensor (SensorId={}) started for workflow (WorkflowId={})", sensor.id, sensor.workflowId) + sensors.put(sensor.id, s) + case Failure(f) => logger.error(s"Could not create Kafka sensor for sensor (SensorId=${sensor.id}).", f) } case absaKafkaSensorProperties: AbsaKafkaSensorProperties => Try( @@ -148,8 +168,14 @@ class Sensors @Inject() ( sensor.copy(properties = absaKafkaSensorProperties) ) ) match { - case Success(s) => sensors.put(sensor.id, s) - case Failure(f) => logger.error(s"Could not create Absa Kafka sensor for sensor (#${sensor.id}).", f) + case Success(s) => + logger.info( + "Absa Kafka sensor (SensorId={}) started for workflow (WorkflowId={})", + sensor.id, + sensor.workflowId + ) + sensors.put(sensor.id, s) + case Failure(f) => logger.error(s"Could not create Absa Kafka sensor for sensor (SensorId=${sensor.id}).", f) } case timeSensorProperties: TimeSensorProperties => Try( @@ -158,8 +184,10 @@ class Sensors @Inject() ( sensor.copy(properties = timeSensorProperties) ) ) match { - case Success(s) => sensors.put(sensor.id, s) - case Failure(f) => logger.error(s"Could not create Time sensor for sensor (#${sensor.id}).", f) + case Success(s) => + logger.info("Time sensor (SensorId={}) started for workflow (WorkflowId={})", sensor.id, sensor.workflowId) + sensors.put(sensor.id, s) + case Failure(f) => logger.error(s"Could not create Time sensor for sensor (SensorId=${sensor.id}).", f) } case recurringSensorProperties: RecurringSensorProperties => Try( @@ -169,17 +197,30 @@ class Sensors @Inject() ( dagInstanceRepository ) ) match { - case Success(s) => sensors.put(sensor.id, s) - case Failure(f) => logger.error(s"Could not create Recurring sensor for sensor (#${sensor.id}).", f) + case Success(s) => + logger.info( + "Recurring sensor (SensorId={}) started for workflow (WorkflowId={})", + sensor.id, + sensor.workflowId + ) + sensors.put(sensor.id, s) + case Failure(f) => logger.error(s"Could not create Recurring sensor for sensor (SensorId=${sensor.id}).", f) } case _ => - logger.error(s"Could not find sensor implementation (#${sensor.id}).", sensor.properties.sensorType) + logger.error( + "Could not find sensor implementation (SensorId={}) unknown (SensorType={})", + sensor.id, + sensor.properties.sensorType + ) } + } - private def pollEvents(): Future[Seq[Unit]] = + private def pollEvents(): Future[Seq[Unit]] = { + logger.trace("Polling events events called") Future.sequence(sensors.flatMap { case (_, sensor: PollSensor[_]) => Option(sensor.poll()) case _ => None }.toSeq) + } } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/AbsaKafkaSensor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/AbsaKafkaSensor.scala index 0aaf7ea80..aed884c84 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/AbsaKafkaSensor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/AbsaKafkaSensor.scala @@ -15,6 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.kafka +import com.typesafe.scalalogging.LazyLogging import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, KafkaConfig} import za.co.absa.hyperdrive.trigger.models.{AbsaKafkaSensorProperties, Event} import za.co.absa.hyperdrive.trigger.scheduler.sensors.PollSensor @@ -26,7 +27,8 @@ class AbsaKafkaSensor( eventsProcessor: (Seq[Event], Long) => Future[Boolean], sensorDefinition: SensorDefition[AbsaKafkaSensorProperties] )(implicit kafkaConfig: KafkaConfig, generalConfig: GeneralConfig, executionContext: ExecutionContext) - extends PollSensor[AbsaKafkaSensorProperties](eventsProcessor, sensorDefinition, executionContext) { + extends PollSensor[AbsaKafkaSensorProperties](eventsProcessor, sensorDefinition, executionContext) + with LazyLogging { val kafkaSensor = new KafkaSensor( eventsProcessor, diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/KafkaSensor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/KafkaSensor.scala index c8cda386f..c06f0d855 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/KafkaSensor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/kafka/KafkaSensor.scala @@ -15,9 +15,9 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.kafka +import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.TopicPartition -import org.slf4j.LoggerFactory import play.api.libs.json.{JsError, JsSuccess, Json} import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, KafkaConfig} import za.co.absa.hyperdrive.trigger.models.{Event, KafkaSensorProperties} @@ -34,10 +34,8 @@ class KafkaSensor( eventsProcessor: (Seq[Event], Long) => Future[Boolean], sensorDefinition: SensorDefition[KafkaSensorProperties] )(implicit kafkaConfig: KafkaConfig, generalConfig: GeneralConfig, executionContext: ExecutionContext) - extends PollSensor[KafkaSensorProperties](eventsProcessor, sensorDefinition, executionContext) { - - private val logger = LoggerFactory.getLogger(this.getClass) - private val logMsgPrefix = s"Sensor id = ${sensorDefinition.id}." + extends PollSensor[KafkaSensorProperties](eventsProcessor, sensorDefinition, executionContext) + with LazyLogging { private val consumer = { val consumerProperties = kafkaConfig.properties @@ -64,27 +62,27 @@ class KafkaSensor( } ) } catch { - case e: Exception => logger.error(s"$logMsgPrefix. Exception during subscribe.", e) + case e: Exception => logger.error(s"(SensorId=${sensorDefinition.id}). Exception during subscribe.", e) } override def poll(): Future[Unit] = { import scala.collection.JavaConverters._ - logger.debug(s"$logMsgPrefix. Polling new events.") + logger.debug("(SensorId={}). Polling new events.", sensorDefinition.id) val fut = Future { consumer.poll(Duration.ofMillis(kafkaConfig.pollDuration)).asScala } flatMap processRecords map (_ => consumer.commitSync()) fut.onComplete { - case Success(_) => logger.debug(s"$logMsgPrefix. Polling successful") + case Success(_) => logger.debug("(SensorId={}). Polling successful", sensorDefinition.id) case Failure(exception) => - logger.debug(s"$logMsgPrefix. Polling failed.", exception) + logger.warn(s"(SensorId=${sensorDefinition.id}). Polling failed.", exception) } fut } private def processRecords[A](records: Iterable[ConsumerRecord[A, String]]): Future[Unit] = { - logger.debug(s"$logMsgPrefix. Messages received = ${records.map(_.value())}") + logger.debug(s"(SensorId=${sensorDefinition.id}). Messages received = ${records.map(_.value())}") if (records.nonEmpty) { val events = records.map(recordToEvent).toSeq val matchedEvents = events.filter { event => @@ -106,7 +104,7 @@ class KafkaSensor( private def recordToEvent[A](record: ConsumerRecord[A, String]): Event = { val sourceEventId = sensorDefinition.id + "kafka" + record.topic() + record.partition() + record.offset() val payload = Try(Json.parse(record.value())).getOrElse { - logger.error(s"$logMsgPrefix. Invalid message.") + logger.error(s"(SensorId={}). Invalid message.", sensorDefinition.id) Json.parse(s"""{"errorMessage": "${record.value()}"}""") } Event(sourceEventId, sensorDefinition.id, payload) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/recurring/RecurringSensor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/recurring/RecurringSensor.scala index 4ea21f026..f0cfe212d 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/recurring/RecurringSensor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/recurring/RecurringSensor.scala @@ -15,9 +15,10 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.recurring +import com.typesafe.scalalogging.LazyLogging + import java.time.format.DateTimeFormatter import java.time.{Instant, LocalDateTime} -import org.slf4j.LoggerFactory import play.api.libs.json.JsObject import za.co.absa.hyperdrive.trigger.configuration.application.RecurringSensorConfig import za.co.absa.hyperdrive.trigger.models.{Event, RecurringSensorProperties} @@ -33,18 +34,17 @@ class RecurringSensor( sensorDefinition: SensorDefition[RecurringSensorProperties], dagInstanceRepository: DagInstanceRepository )(implicit recurringSensorConfig: RecurringSensorConfig, executionContext: ExecutionContext) - extends PollSensor[RecurringSensorProperties](eventsProcessor, sensorDefinition, executionContext) { + extends PollSensor[RecurringSensorProperties](eventsProcessor, sensorDefinition, executionContext) + with LazyLogging { private val eventDateFormatter: DateTimeFormatter = DateTimeFormatter.ISO_INSTANT - private val logger = LoggerFactory.getLogger(this.getClass) - private val logMsgPrefix = s"Sensor id = ${sensorDefinition.id}." override def poll(): Future[Unit] = { - logger.debug(s"$logMsgPrefix. Polling new events.") + logger.debug("(SensorId={}). Polling new events.", sensorDefinition.id) val fut = dagInstanceRepository.hasRunningDagInstance(sensorDefinition.workflowId).flatMap { hasRunningDagInstance => if (hasRunningDagInstance) { - logger.debug(s"$logMsgPrefix. Workflow is running.") + logger.debug("(SensorId={}). Workflow is running.", sensorDefinition.id) Future.successful((): Unit) } else { val cutOffTime = LocalDateTime.now().minus(recurringSensorConfig.duration) @@ -53,13 +53,23 @@ class RecurringSensor( .flatMap { count => if (count >= recurringSensorConfig.maxJobsPerDuration) { logger.warn( - s"Skipping dag instance creation, because $count dag instances have been created since" + - s" $cutOffTime, but the allowed maximum is ${recurringSensorConfig.maxJobsPerDuration} " + "(SensorId={}). Skipping dag instance creation," + + " because {} dag instances have been created since {}," + + " but the allowed maximum is {}", + sensorDefinition.id, + count, + cutOffTime, + recurringSensorConfig.maxJobsPerDuration ) Future.successful((): Unit) } else { val sourceEventId = s"sid=${sensorDefinition.id};t=${eventDateFormatter.format(Instant.now())}" val event = Event(sourceEventId, sensorDefinition.id, JsObject.empty) + logger.trace( + "(SensorId={}). Polling source event (SourceEventId={}).", + sensorDefinition.id, + sourceEventId + ) eventsProcessor.apply(Seq(event), sensorDefinition.id).map(_ => (): Unit) } } @@ -67,8 +77,8 @@ class RecurringSensor( } fut.onComplete { - case Success(_) => logger.debug(s"$logMsgPrefix. Polling successful") - case Failure(exception) => logger.debug(s"$logMsgPrefix. Polling failed.", exception) + case Success(_) => logger.debug("(SensorId={}). Polling successful.", sensorDefinition.id) + case Failure(exception) => logger.warn(s"(SensorId=${sensorDefinition.id}). Polling failed.", exception) } fut } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensor.scala index d4f124488..ecfca056f 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensor.scala @@ -15,6 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.time +import com.typesafe.scalalogging.{LazyLogging, Logger} import org.quartz.CronScheduleBuilder.cronSchedule import org.quartz._ import za.co.absa.hyperdrive.trigger.models.{Event, TimeSensorProperties} @@ -28,7 +29,8 @@ class TimeSensor( sensorDefinition: SensorDefition[TimeSensorProperties], scheduler: Scheduler )(implicit executionContext: ExecutionContext) - extends PushSensor[TimeSensorProperties](eventsProcessor, sensorDefinition, executionContext) { + extends PushSensor[TimeSensorProperties](eventsProcessor, sensorDefinition, executionContext) + with LazyLogging { val jobKey: JobKey = new JobKey(sensorDefinition.id.toString, TimeSensor.JOB_GROUP_NAME) val jobTriggerKey: TriggerKey = new TriggerKey(jobKey.getName, TimeSensor.JOB_TRIGGER_GROUP_NAME) @@ -41,7 +43,16 @@ class TimeSensor( def launchQuartzJob(cronExpression: CronExpression, sensorId: Long): Unit = { val jobDetail = buildJobDetail(sensorId) val trigger = buildJobTrigger(jobDetail, cronExpression) - scheduler.scheduleJob(jobDetail, trigger) + val firstFireTime = scheduler.scheduleJob(jobDetail, trigger) + logger.trace( + "First execution of (SensorId={}) for (WorkflowId={})" + + " with (JobDetail={}) and (JobTrigger={}) is scheduled to: {}", + sensorId: Long, + sensorDefinition.workflowId: Long, + jobDetail: JobDetail, + trigger: Trigger, + firstFireTime: java.util.Date + ) } private def buildJobDetail(sensorId: Long): JobDetail = { @@ -75,6 +86,8 @@ object TimeSensor { val JOB_GROUP_NAME: String = "time-sensor-job-group" val JOB_TRIGGER_GROUP_NAME: String = "time-sensor-job-trigger-group" + private val logger = Logger[TimeSensor] + def apply( eventsProcessor: (Seq[Event], Long) => Future[Boolean], sensorDefinition: SensorDefition[TimeSensorProperties] @@ -85,6 +98,12 @@ object TimeSensor { validateJobKeys(sensor.jobKey, sensor.jobTriggerKey, quartzScheduler, sensorDefinition.id) validateCronExpression(sensorDefinition.properties.cronExpression, sensorDefinition.id) + logger.debug( + "Launching QuartzJob (CronExpression={}) within TimeSensor (SensorId={}) for workflow (WorkflowId={})", + sensorDefinition.properties.cronExpression: String, + sensorDefinition.id: Long, + sensorDefinition.workflowId: Long + ) sensor.launchQuartzJob(new CronExpression(sensorDefinition.properties.cronExpression), sensorDefinition.id) sensor } @@ -92,18 +111,18 @@ object TimeSensor { private def validateJobKeys(jobKey: JobKey, triggerKey: TriggerKey, scheduler: Scheduler, sensorId: Long): Unit = { if (scheduler.checkExists(jobKey)) { throw new IllegalArgumentException( - s"A Quartz Job with key ($jobKey) already exists. Cannot create job for sensor (#$sensorId)" + s"A Quartz Job with key ($jobKey) already exists. Cannot create job for sensor (SensorId=$sensorId)" ) } if (scheduler.checkExists(triggerKey)) { throw new IllegalArgumentException( - s"A Quartz Job-Trigger with key ($triggerKey) already exists. Cannot create job for sensor (#$sensorId)" + s"A Quartz Job-Trigger with key ($triggerKey) already exists. Cannot create job for sensor (SensorId=$sensorId)" ) } } private def validateCronExpression(cronExpression: String, sensorId: Long): Unit = if (!CronExpression.isValidExpression(cronExpression)) { - throw new IllegalArgumentException(s"Invalid cron expression $cronExpression for sensor (#$sensorId)") + throw new IllegalArgumentException(s"Invalid cron expression $cronExpression for sensor (SensorId=$sensorId)") } } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzJob.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzJob.scala index d9a10f93a..b83e995c8 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzJob.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzJob.scala @@ -15,17 +15,17 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.time +import com.typesafe.scalalogging.LazyLogging + import java.time.Instant import java.time.format.DateTimeFormatter import org.quartz.{Job, JobExecutionContext} -import org.slf4j.LoggerFactory import play.api.libs.json.JsObject import za.co.absa.hyperdrive.trigger.models.Event import scala.concurrent.Future -class TimeSensorQuartzJob extends Job { - private val logger = LoggerFactory.getLogger(this.getClass) +class TimeSensorQuartzJob extends Job with LazyLogging { private val eventDateFormatter: DateTimeFormatter = DateTimeFormatter.ISO_INSTANT override def execute(jobExecutionContext: JobExecutionContext): Unit = { @@ -34,6 +34,7 @@ class TimeSensorQuartzJob extends Job { val push = jobDataMap.get(TimeSensor.PUSH_FUNCTION_JOB_DATA_MAP_KEY).asInstanceOf[Seq[Event] => Future[Unit]] val sensorId = jobDataMap.get(TimeSensor.SENSOR_ID_JOB_DATA_MAP_KEY).asInstanceOf[Long] val sourceEventId = s"sid=$sensorId;t=${eventDateFormatter.format(Instant.now())}" + logger.debug("(SensorId={}). Pushing event (SourceEventId={})", sensorId, sourceEventId) val event = Event(sourceEventId, sensorId, JsObject.empty) push(Seq(event)) } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerManager.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerManager.scala index cda3d5014..576f27386 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerManager.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerManager.scala @@ -15,30 +15,29 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.time -import java.util.Properties +import com.typesafe.scalalogging.LazyLogging +import java.util.Properties import org.quartz.impl.StdSchedulerFactory import org.quartz.{Scheduler, SchedulerException, SchedulerFactory} -import org.slf4j.LoggerFactory import org.springframework.scheduling.SchedulingException /** * Quartz Scheduler for Time Sensors. */ -object TimeSensorQuartzSchedulerManager { - private val logger = LoggerFactory.getLogger(this.getClass) +object TimeSensorQuartzSchedulerManager extends LazyLogging { private val scheduler: Scheduler = initialize() def start(): Unit = { - logger.info(s"Starting Quartz Scheduler ${scheduler.getSchedulerName} now") + logger.info("Starting Quartz Scheduler {} now", scheduler.getSchedulerName) scheduler.start() } def stop(): Unit = { - logger.info(s"Stopping Quartz Scheduler ${scheduler.getSchedulerName} now") + logger.info(s"Stopping Quartz Scheduler {} now", scheduler.getSchedulerName) try { scheduler.standby() - logger.info(s"Stopped Quartz Scheduler ${scheduler.getSchedulerName}") + logger.info(s"Stopped Quartz Scheduler {}", scheduler.getSchedulerName) } catch { case ex: SchedulerException => throw new SchedulingException("Could not stop Quartz Scheduler", ex) } @@ -47,6 +46,7 @@ object TimeSensorQuartzSchedulerManager { def getScheduler: Scheduler = scheduler private def initialize(): Scheduler = { + logger.info("Initializing scheduler") val schedulerFactory = initSchedulerFactory() createScheduler(schedulerFactory) } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerThreadPool.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerThreadPool.scala index 810083ef9..218836760 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerThreadPool.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/sensors/time/TimeSensorQuartzSchedulerThreadPool.scala @@ -15,18 +15,17 @@ package za.co.absa.hyperdrive.trigger.scheduler.sensors.time +import com.typesafe.scalalogging.LazyLogging + import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{Executors, RejectedExecutionException, ThreadFactory, ThreadPoolExecutor} - import org.quartz.spi.ThreadPool -import org.slf4j.LoggerFactory /** * Thread Pool for Quartz Scheduler based on org.springframework.scheduling.quartz.LocalTaskExecutorThreadPool */ -class TimeSensorQuartzSchedulerThreadPool extends ThreadPool { +class TimeSensorQuartzSchedulerThreadPool extends ThreadPool with LazyLogging { - private val logger = LoggerFactory.getLogger(this.getClass) private val fixedPoolSize = 10 private val taskExecutor = createTaskExecutor @@ -39,6 +38,7 @@ class TimeSensorQuartzSchedulerThreadPool extends ThreadPool { override def runInThread(runnable: Runnable): Boolean = { Predef.assert(this.taskExecutor != null, "No TaskExecutor available") try { + logger.debug("Trying to spawn new thread for runnable") this.taskExecutor.execute(runnable) true } catch { @@ -66,24 +66,39 @@ class TimeSensorQuartzSchedulerThreadPool extends ThreadPool { * prefix for threads of this thread group */ class CustomThreadGroupThreadFactory(val threadGroupName: String, val threadNamePrefix: String) - extends ThreadFactory { + extends ThreadFactory + with LazyLogging { Predef.assert(threadGroupName != null) Predef.assert(threadNamePrefix != null) private val threadNumber = new AtomicInteger(1) val s: SecurityManager = System.getSecurityManager private val group = { - if (threadGroupName.nonEmpty) new ThreadGroup(threadGroupName) - else if (s != null) s.getThreadGroup - else Thread.currentThread.getThreadGroup + if (threadGroupName.nonEmpty) { + logger.trace("Getting thread group for threadGroupName: group={}", threadGroupName) + new ThreadGroup(threadGroupName) + } else if (s != null) { + logger.trace("Getting thread group from security manager: group={}", s.getThreadGroup.getName) + s.getThreadGroup + } else { + logger.trace("Getting thread group of current thread: group={}", Thread.currentThread().getThreadGroup.getName) + Thread.currentThread.getThreadGroup + } } private val namePrefix = s"$threadNamePrefix-thread-" override def newThread(r: Runnable): Thread = { + logger.debug("Creating new thread for runnable") val t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement, 0) - if (t.isDaemon) t.setDaemon(false) - if (t.getPriority != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY) + if (t.isDaemon) { + logger.trace("Spawning normal thread from daemon for runnable") + t.setDaemon(false) + } + if (t.getPriority != Thread.NORM_PRIORITY) { + logger.trace("Setting priority to Thread.NORM_PRIORITY") + t.setPriority(Thread.NORM_PRIORITY) + } t } }