Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<quartz.version>2.3.2</quartz.version>
<log4j.version>2.17.1</log4j.version>
<slf4j.log4j2.version>1.7.26</slf4j.log4j2.version>
<scala-logging.version>3.9.5</scala-logging.version>
<spring.security.ldap.version>5.6.1</spring.security.ldap.version>
<spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
<scalatest.version>3.0.5</scalatest.version>
Expand Down Expand Up @@ -153,6 +154,29 @@
<artifactId>log4j-to-slf4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_${scala.compat.version}</artifactId>
<version>${scala-logging.version}</version>
<exclusions>
<exclusion>
<groupId>org.scalatestplus</groupId>
<artifactId>mockito-3-4_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.compat.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@

package org.apache.spark.launcher

import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.LazyLogging

class NoBackendConnectionInProcessLauncher extends InProcessLauncher {
class NoBackendConnectionInProcessLauncher extends InProcessLauncher with LazyLogging {

private val logger = LoggerFactory.getLogger(this.getClass)
override def startApplication(listeners: SparkAppHandle.Listener*): SparkAppHandle = {
import scala.collection.JavaConverters._
if (builder.isClientMode(Map[String, String]().asJava)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

package za.co.absa.hyperdrive.trigger.api.rest

import javax.inject.Inject
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.LazyLogging
import org.springframework.beans.factory.BeanFactory
import org.springframework.context.annotation.{Bean, Configuration}
import org.springframework.http.HttpStatus
Expand Down Expand Up @@ -49,10 +47,12 @@ import za.co.absa.hyperdrive.trigger.api.rest.auth.{
}
import za.co.absa.hyperdrive.trigger.configuration.application.AuthConfig

import javax.inject.Inject
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

@EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true)
class WebSecurityConfig @Inject() (val beanFactory: BeanFactory, authConfig: AuthConfig) {
private val logger = LoggerFactory.getLogger(this.getClass)
class WebSecurityConfig @Inject() (val beanFactory: BeanFactory, authConfig: AuthConfig) extends LazyLogging {

val authMechanism: String = authConfig.mechanism

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

package za.co.absa.hyperdrive.trigger.api.rest.client

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.{HttpEntity, HttpHeaders, HttpMethod, HttpStatus, ResponseEntity}
import com.typesafe.scalalogging.{AnyLogging, LazyLogging}
import org.springframework.http._
import org.springframework.security.kerberos.client.KerberosRestTemplate
import org.springframework.util.LinkedMultiValueMap
import org.springframework.web.client.RestTemplate
Expand Down Expand Up @@ -71,8 +70,7 @@ sealed abstract class AuthClient(
restTemplate: RestTemplate,
apiCaller: ApiCaller,
url: String => String
) {
protected val logger: Logger = LoggerFactory.getLogger(this.getClass)
) extends AnyLogging {

@throws[UnauthorizedException]
def authenticate(): HttpHeaders =
Expand Down Expand Up @@ -118,7 +116,8 @@ class SpnegoAuthClient(
restTemplate: RestTemplate,
apiCaller: ApiCaller,
path: String
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") {
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path")
with LazyLogging {
override protected def requestAuthentication(url: String): ResponseEntity[String] = {
logger.info(
s"Authenticating via SPNEGO ($url): user `${credentials.username}`, with keytab `${credentials.keytabLocation}`"
Expand All @@ -132,7 +131,8 @@ class StandardAuthClient(
restTemplate: RestTemplate,
apiCaller: ApiCaller,
path: String
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") {
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path")
with LazyLogging {
override protected def requestAuthentication(url: String): ResponseEntity[String] = {
val requestParts = new LinkedMultiValueMap[String, String]
requestParts.add("username", credentials.username)
Expand All @@ -148,7 +148,8 @@ class StandardBase64AuthClient(
restTemplate: RestTemplate,
apiCaller: ApiCaller,
path: String
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path") {
) extends AuthClient(credentials, restTemplate, apiCaller, baseUrl => s"$baseUrl$path")
with LazyLogging {
override protected def requestAuthentication(url: String): ResponseEntity[String] = {
val headers = new HttpHeaders()
headers.add("Authorization", "Basic " + credentials.base64Credentials)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@

package za.co.absa.hyperdrive.trigger.api.rest.client

import com.typesafe.scalalogging.Logger
import org.apache.commons.lang3.exception.ExceptionUtils
import org.slf4j.LoggerFactory
import org.springframework.web.client
import org.springframework.web.client.ResourceAccessException
import za.co.absa.hyperdrive.trigger.api.rest.client.CrossHostApiCaller.logger

import scala.annotation.tailrec
import scala.util.Failure
import scala.util.Random
import scala.util.Try
import scala.util.{Failure, Random, Try}

class CrossHostApiCaller private (apiBaseUrls: Vector[String], maxTryCount: Int, private var currentHostIndex: Int)
extends ApiCaller {
import CrossHostApiCaller._
def baseUrlsCount: Int = apiBaseUrls.size

def currentBaseUrl: String = apiBaseUrls(currentHostIndex)
Expand Down Expand Up @@ -69,7 +67,7 @@ class CrossHostApiCaller private (apiBaseUrls: Vector[String], maxTryCount: Int,
}

object CrossHostApiCaller {
private val logger = LoggerFactory.getLogger(classOf[CrossHostApiCaller])
private val logger = Logger[CrossHostApiCaller]

final val DefaultUrlsRetryCount: Int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@

package za.co.absa.hyperdrive.trigger.api.rest.client

import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import com.typesafe.scalalogging.LazyLogging
import org.springframework.http.{HttpEntity, HttpHeaders, HttpMethod, HttpStatus}
import org.springframework.web.client.RestTemplate

import scala.annotation.tailrec
import scala.reflect.ClassTag

class RestClient(authClient: AuthClient, restTemplate: RestTemplate) {
private val logger = LoggerFactory.getLogger(this.getClass)
class RestClient(authClient: AuthClient, restTemplate: RestTemplate) extends LazyLogging {

private var authHeaders = new HttpHeaders()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@

package za.co.absa.hyperdrive.trigger.api.rest.controllers

import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.LazyLogging
import org.springframework.http.converter.HttpMessageNotReadableException
import org.springframework.http.{HttpStatus, ResponseEntity}
import org.springframework.security.access.AccessDeniedException
import org.springframework.web.bind.annotation.{ExceptionHandler, RestControllerAdvice}
import org.springframework.web.context.request.WebRequest
import za.co.absa.hyperdrive.trigger.models.errors.ApiException
@RestControllerAdvice
class RestErrorHandler {
private val logger = LoggerFactory.getLogger(this.getClass)
class RestErrorHandler extends LazyLogging {

@ExceptionHandler(Array(classOf[ApiException]))
def handleApiException(ex: ApiException, request: WebRequest): ResponseEntity[Object] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@

package za.co.absa.hyperdrive.trigger.api.rest.controllers

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util.concurrent.CompletableFuture
import java.util.zip.{ZipEntry, ZipInputStream, ZipOutputStream}
import javax.inject.Inject
import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.LazyLogging
import org.springframework.core.io.ByteArrayResource
import org.springframework.http.{HttpHeaders, MediaType, ResponseEntity}
import org.springframework.web.bind.annotation._
Expand All @@ -31,15 +27,19 @@ import za.co.absa.hyperdrive.trigger.models._
import za.co.absa.hyperdrive.trigger.models.errors.{ApiException, BulkOperationError, GenericError}
import za.co.absa.hyperdrive.trigger.models.search.{TableSearchRequest, TableSearchResponse}

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util.concurrent.CompletableFuture
import java.util.zip.{ZipEntry, ZipInputStream, ZipOutputStream}
import javax.inject.Inject
import scala.collection.mutable.ArrayBuffer
import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Try

@RestController
class WorkflowController @Inject() (workflowService: WorkflowService, generalConfig: GeneralConfig) {
private val logger = LoggerFactory.getLogger(this.getClass)
class WorkflowController @Inject() (workflowService: WorkflowService, generalConfig: GeneralConfig)
extends LazyLogging {

val environment: String = generalConfig.environment

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package za.co.absa.hyperdrive.trigger.api.rest.health

import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.Logger
import org.springframework.boot.actuate.health.{Health, HealthIndicator}
import org.springframework.stereotype.Component
import za.co.absa.hyperdrive.trigger.configuration.application.HealthConfig
Expand All @@ -32,7 +32,7 @@ class DatabaseConnectionHealthIndicator @Inject() (val dbProvider: DatabaseProvi
extends HealthIndicator
with Repository {
import api._
private val log = LoggerFactory.getLogger(this.getClass)
private val log = Logger(this.getClass)
val dbConnection: Duration = Duration(healthConfig.databaseConnectionTimeoutMillis, TimeUnit.MILLISECONDS)
override protected def health(): Health =
Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

package za.co.absa.hyperdrive.trigger.api.rest.services

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.security.UserGroupInformation
import org.json4s.jackson.Serialization
import org.json4s.{Formats, NoTypeHints}
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Lazy
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap
Expand Down Expand Up @@ -48,8 +48,7 @@ class HdfsParameters(

@Lazy
@Service
class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends CheckpointService {
private val logger = LoggerFactory.getLogger(this.getClass)
class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends CheckpointService with LazyLogging {
private val offsetsDirName = "offsets"
private val commitsDirName = "commits"
private implicit val formats: Formats = Serialization.formats(NoTypeHints)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

package za.co.absa.hyperdrive.trigger.api.rest.services

import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Lazy
import org.springframework.stereotype.Service

Expand All @@ -39,8 +39,7 @@ trait HdfsService {

@Lazy
@Service
class HdfsServiceImpl extends HdfsService {
private val logger = LoggerFactory.getLogger(this.getClass)
class HdfsServiceImpl extends HdfsService with LazyLogging {
private lazy val conf = SparkHadoopUtil.get.conf
override def exists(path: Path)(implicit ugi: UserGroupInformation): Try[Boolean] = {
Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@

package za.co.absa.hyperdrive.trigger.api.rest.services

import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.configuration2.builder.BasicConfigurationBuilder
import org.apache.commons.configuration2.builder.fluent.Parameters
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.commons.configuration2.{BaseConfiguration, Configuration}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Lazy
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
Expand All @@ -47,8 +47,8 @@ class HyperdriveOffsetServiceImpl @Inject() (sparkConfig: SparkConfig,
@Lazy checkpointService: CheckpointService,
@Lazy userGroupInformationService: UserGroupInformationService,
kafkaService: KafkaService
) extends HyperdriveOffsetService {
private val logger = LoggerFactory.getLogger(this.getClass)
) extends HyperdriveOffsetService
with LazyLogging {
private val HyperdriveCheckpointKey = "writer.common.checkpoint.location"
private val HyperdriveKafkaTopicKey = "reader.kafka.topic"
private val HyperdriveKafkaBrokersKey = "reader.kafka.brokers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

package za.co.absa.hyperdrive.trigger.api.rest.services

import org.slf4j.LoggerFactory
import com.typesafe.scalalogging.LazyLogging
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus}
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus}
import za.co.absa.hyperdrive.trigger.persistance.WorkflowRepository

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -37,8 +37,8 @@ class HyperdriveServiceImpl(
override protected val workflowRepository: WorkflowRepository,
override protected val jobTemplateService: JobTemplateService,
override protected val hyperdriveOffsetService: HyperdriveOffsetService
) extends HyperdriveService {
private val logger = LoggerFactory.getLogger(this.getClass)
) extends HyperdriveService
with LazyLogging {

override def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]] = {
workflowRepository.getWorkflow(id).flatMap { workflow =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

package za.co.absa.hyperdrive.trigger.api.rest.services

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import org.springframework.util.ConcurrentLruCache
import za.co.absa.hyperdrive.trigger.api.rest.services.KafkaServiceImpl.{BeginningOffsets, EndOffsets, OffsetFunction}
Expand All @@ -36,8 +36,7 @@ trait KafkaService {
}

@Service
class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService {
private val logger = LoggerFactory.getLogger(this.getClass)
class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService with LazyLogging {
private val consumerUuid = randomUUID().toString
private val kafkaConsumersCache = new ConcurrentLruCache[(Properties, Long), KafkaConsumer[String, String]](
generalConfig.kafkaConsumersCacheSize,
Expand Down
Loading