Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.evolutiongaming.kafka.flow

import cats.MonadThrow
import cats.data.NonEmptySet
import cats.effect.Resource
import cats.effect.{Resource, Temporal}
import cats.syntax.all.*
import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.flow.kafka.Consumer
Expand All @@ -11,6 +11,8 @@ import com.evolutiongaming.skafka.consumer.ConsumerRecords
import com.evolutiongaming.sstream.Stream
import scodec.bits.ByteVector

import scala.concurrent.duration.DurationInt

/** Represents everything stateful happening on one `Consumer` */
trait ConsumerFlow[F[_]] {

Expand Down Expand Up @@ -68,7 +70,7 @@ object ConsumerFlow {
* Note, that topic specified by an appropriate parameter should contain a journal in the format of `Kafka Journal`
* library.
*/
def apply[F[_]: MonadThrow: LogOf](
def apply[F[_]: Temporal: LogOf](
consumer: Consumer[F],
flows: Map[Topic, TopicFlow[F]],
config: ConsumerFlowConfig
Expand All @@ -83,7 +85,7 @@ object ConsumerFlow {
def poll(logger: Log[F]) = {
val flowList = flows.toList // optimization, execute toList once instead of on each `consumer.poll`
for {
consumerRecords <- consumer.poll(config.pollTimeout)
consumerRecords <- consumer.poll(0.seconds)
_ <- flowList.traverse {
case (topic, flow) =>
val topicRecords = consumerRecords.values filter {
Expand All @@ -92,6 +94,7 @@ object ConsumerFlow {
}
flow(ConsumerRecords(topicRecords))
}
_ <- Temporal[F].sleep(config.pollTimeout).whenA(consumerRecords.values.isEmpty)
_ <- logger.debug("poll completed")
} yield consumerRecords
}
Expand Down
Loading