Feature/789 show messages to ingest on hyperdrive jobs#791
Feature/789 show messages to ingest on hyperdrive jobs#791jozefbakus merged 33 commits intodevelopfrom
Conversation
src/main/scala/za/co/absa/hyperdrive/trigger/models/IngestionStatus.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala
Outdated
Show resolved
Hide resolved
src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveServiceTest.scala
Outdated
Show resolved
Hide resolved
ui/src/app/components/workflows/workflows-home/workflows-home.component.html
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/HyperdriveController.scala
Show resolved
Hide resolved
filiphornak
left a comment
There was a problem hiding this comment.
The code is well-written, at least the scala part. However, there is one inconsistency when reading the last commit file, which may throw an exception even when the method returns the Try monad.
Also, reading offsets from kafka and the latest checkpoint file are not parallelized due to the current implementation of the kafka cache. However, this would require a proper consumer pool implementation because calls on ordinary kafka consumer are not thread-safe or finding alternative how to request offsets from kafka without a consumer.
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala
Outdated
Show resolved
Hide resolved
| def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets = { | ||
| BeginningEndOffsets( | ||
| topic, | ||
| getOffsets(topic, consumerProperties, BeginningOffsets), |
There was a problem hiding this comment.
This is a suggestion for future projects because it would require reworking or unification of the entire architecture. I noticed that some services use Futures as output, and some don't even when they are blocking.
case class Offset(beginning: Long, end: Long)
def getBeginningEndOffset(topic: String, consumerProperties: Properties): Future[Map[Long, Offset]] =
for {
kafkaConsumer <- consumerPool.getConsumer(consumerProperties)
parts <- listTopicPartitions(kafkaConsumer, topic)
futureBeginning = getBeginningOffsets(kafka, parts)
futureEnd = getEndOffsets(kafka, parts)
begin <- futureBegin
end <- futureEnd
} yield begin.map { case (part, bOff) => part -> Offset(bOff, end(part)) } // This might be done in safer mannerThis might improve performance.
There was a problem hiding this comment.
I agree with you. Please create an issue for this one. With this PR I just follow style that was already introduced
src/main/scala/za/co/absa/hyperdrive/trigger/models/BeginningEndOffsets.scala
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala
Outdated
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala
Outdated
Show resolved
Hide resolved
src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala
Show resolved
Hide resolved
...test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetServiceTest.scala
Show resolved
Hide resolved
src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala
Show resolved
Hide resolved
|
Kudos, SonarCloud Quality Gate passed!
|
filiphornak
left a comment
There was a problem hiding this comment.
I'm pleased with this explanation and your changes, so I approve this PR. Also, I will create issues for the things we discussed earlier.
src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala
Show resolved
Hide resolved
...test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetServiceTest.scala
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/models/BeginningEndOffsets.scala
Show resolved
Hide resolved
src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala
Outdated
Show resolved
Hide resolved
| def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets = { | ||
| BeginningEndOffsets( | ||
| topic, | ||
| getOffsets(topic, consumerProperties, BeginningOffsets), |








Implements: #789