diff --git a/akka-amqp/src/main/scala/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/akka/amqp/AMQP.scala index dc0ed8eb80..ec029bc1cd 100644 --- a/akka-amqp/src/main/scala/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/akka/amqp/AMQP.scala @@ -60,7 +60,8 @@ object AMQP { */ case class ChannelParameters( shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) { + channelCallback: Option[ActorRef] = None, + prefetchSize: Int = 0) { // Needed for Java API usage def this() = this (None, None) diff --git a/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala b/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala index 02390d3619..b339cf4727 100644 --- a/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/akka/amqp/ConsumerActor.scala @@ -30,6 +30,8 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) protected def setupChannel(ch: Channel) = { + channelParameters.foreach(params => ch.basicQos(params.prefetchSize)) + val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName)) val consumingQueue = exchangeName match { case Some(exchange) =>