Skip to content

Add Pulsar reconsumeLater API #695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ trait CatsAsyncHandlerLowPriority {

def abortTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.abort()).liftF.map(_ => ())

override def reconsumeLaterAsync[T](consumer: JConsumer[T], message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): F[Unit] =
Async[F].delay {
consumer.reconsumeLater(ConsumerMessage.toJava(message, schema), delayTime, unit)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.sksamuel.pulsar4s

import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.TypedMessageBuilder
import org.apache.pulsar.client.api.{Schema, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction

import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

Expand Down Expand Up @@ -47,6 +48,7 @@ trait AsyncHandler[F[_]] {
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit]
def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit]
def reconsumeLaterAsync[T](consumer: api.Consumer[T], message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)(implicit schema: Schema[T]): F[Unit]

def withTransaction[E, A](
builder: api.transaction.TransactionBuilder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sksamuel.pulsar4s

import com.sksamuel.exts.Logging
import org.apache.pulsar.client.api.ConsumerStats
import org.apache.pulsar.client.api.{ConsumerStats, Schema}
import org.apache.pulsar.client.api.transaction.Transaction

import java.io.Closeable
Expand Down Expand Up @@ -94,6 +94,12 @@ trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] {

def negativeAcknowledgeAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]

def reconsumeLater(message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): Unit

def reconsumeLaterAsync[F[_] : AsyncHandler](message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): F[Unit]

def unsubscribe(): Unit
def unsubscribeAsync[F[_] : AsyncHandler]: F[Unit]

Expand Down Expand Up @@ -136,6 +142,14 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin
override def negativeAcknowledgeAsync[F[_]: AsyncHandler](messageId: MessageId): F[Unit] =
implicitly[AsyncHandler[F]].negativeAcknowledgeAsync(consumer, messageId)

override def reconsumeLater(message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): Unit =
consumer.reconsumeLater(ConsumerMessage.toJava(message, schema), delayTime, unit)

override def reconsumeLaterAsync[F[_] : AsyncHandler](message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): F[Unit] =
implicitly[AsyncHandler[F]].reconsumeLaterAsync(consumer, message, delayTime, unit)

override def stats: ConsumerStats = consumer.getStats
override def subscription: Subscription = Subscription(consumer.getSubscription)
override def topic: Topic = Topic(consumer.getTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ case class ConsumerConfig(subscriptionName: Subscription,
additionalProperties: Map[String, AnyRef] = Map.empty,
deadLetterPolicy: Option[DeadLetterPolicy] = None,
replicateSubscriptionState: Boolean = false,
batchReceivePolicy: Option[BatchReceivePolicy] = None)
batchReceivePolicy: Option[BatchReceivePolicy] = None,
enableRetry: Option[Boolean] = None)

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s.conversions.collections._
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.transaction.Transaction
import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.{ConsumerBuilder, ReaderBuilder, Schema, TypedMessageBuilder}

import java.util.concurrent.CompletableFuture
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.compat.java8.FutureConverters
import scala.compat.java8.FutureConverters.CompletionStageOps
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -85,6 +85,11 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
Future.successful(consumer.negativeAcknowledge(messageId))

override def reconsumeLaterAsync[T](consumer: api.Consumer[T], message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): Future[Unit] =
Future.successful(consumer.reconsumeLater(ConsumerMessage.toJava(message, schema), delayTime, unit))


override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala

override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class DefaultPulsarClient(client: org.apache.pulsar.client.api.PulsarClient) ext
config.deadLetterPolicy.foreach(builder.deadLetterPolicy)
config.batchReceivePolicy.foreach(builder.batchReceivePolicy)
config.acknowledgmentGroupTime.foreach { gt => builder.acknowledgmentGroupTime(gt._1, gt._2) }
config.enableRetry.foreach(builder.enableRetry)
if (config.topics.nonEmpty)
builder.topics(config.topics.map(_.name).asJava)
builder.subscriptionName(config.subscriptionName.name)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.sksamuel.pulsar4s.monixs

import java.util.concurrent.CompletableFuture

import java.util.concurrent.{CompletableFuture, TimeUnit}
import com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s.conversions.collections._
import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer, TransactionContext}
import monix.eval.Task
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, Schema, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction

import scala.compat.java8.FutureConverters
Expand Down Expand Up @@ -133,6 +132,12 @@ class MonixAsyncHandler extends AsyncHandler[Task] {

override def abortTransaction(txn: Transaction): Task[Unit] =
Task.deferFuture(txn.abort()).map(_ => ())

override def reconsumeLaterAsync[T](consumer: Consumer[T], message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): Task[Unit] =
Task {
consumer.reconsumeLater(ConsumerMessage.toJava(message, schema), delayTime, unit)
}
}

object MonixAsyncHandler {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.sksamuel.pulsar4s.scalaz

import java.util.concurrent.CompletableFuture

import java.util.concurrent.{CompletableFuture, TimeUnit}
import com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s.conversions.collections._
import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer, TransactionContext}
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, Schema, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction
import scalaz.concurrent.Task

Expand Down Expand Up @@ -128,6 +127,13 @@ class ScalazAsyncHandler extends AsyncHandler[Task] {
override def commitTransaction(txn: Transaction): Task[Unit] = txn.commit()

override def abortTransaction(txn: Transaction): Task[Unit] = txn.abort()

override def reconsumeLaterAsync[T](consumer: api.Consumer[T], message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): Task[Unit] =
Task {
consumer.reconsumeLater(ConsumerMessage.toJava(message, schema), delayTime, unit)
}

}

object ScalazAsyncHandler {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.sksamuel.pulsar4s.zio

import java.util.concurrent.CompletionStage

import java.util.concurrent.{CompletionStage, TimeUnit}
import com.sksamuel.pulsar4s
import com.sksamuel.pulsar4s.conversions.collections._
import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerMessage, DefaultConsumer, DefaultProducer, DefaultReader, MessageId, Producer, TransactionContext}
import org.apache.pulsar.client.api
import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, TypedMessageBuilder}
import org.apache.pulsar.client.api.{Consumer, ConsumerBuilder, ProducerBuilder, PulsarClient, Reader, ReaderBuilder, Schema, TypedMessageBuilder}
import org.apache.pulsar.client.api.transaction.Transaction
import zio.{Exit, Task, ZIO}

Expand Down Expand Up @@ -115,6 +114,10 @@ class ZioAsyncHandler extends AsyncHandler[Task] {
override def commitTransaction(txn: Transaction): Task[Unit] = fromFuture(ZIO.attempt(txn.commit())).unit

override def abortTransaction(txn: Transaction): Task[Unit] = fromFuture(ZIO.attempt(txn.abort())).unit

override def reconsumeLaterAsync[T](consumer: Consumer[T], message: ConsumerMessage[T], delayTime: Long, unit: TimeUnit)
(implicit schema: Schema[T]): Task[Unit] =
ZIO.attempt(consumer.reconsumeLater(ConsumerMessage.toJava(message, schema), delayTime, unit))
}

object ZioAsyncHandler {
Expand Down