+str #17693 add Source.queue and Sink.queue

This commit is contained in:
Alexander Golubev 2015-08-19 23:04:20 -04:00
parent 8ea47c3d37
commit 38fe35d668
13 changed files with 607 additions and 36 deletions

View file

@ -0,0 +1,117 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.actor.Status
import akka.pattern.{ AskTimeoutException, pipe }
import akka.stream.ActorMaterializer
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, _ }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import scala.util.control.NoStackTrace
class AcknowledgeSinkSpec extends AkkaSpec {
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
val ex = new RuntimeException("ex") with NoStackTrace
val noMsgTimeout = 300.millis
def assertSuccess(value: Any, fb: Future[Option[Any]]): Unit =
Await.result(fb, 1.second) should be(Some(value))
"An AcknowledgeSink" must {
"send the elements as result of future" in assertAllStagesStopped {
val queue = Source(List(1, 2, 3)).runWith(Sink.queue(3))
assertSuccess(1, queue.pull())
assertSuccess(2, queue.pull())
assertSuccess(3, queue.pull())
queue.pull().pipeTo(testActor)
expectMsg(None)
}
"allow to have only one future waiting for result in each point of time" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source(probe).runWith(Sink.queue(3))
val sub = probe.expectSubscription()
val future = queue.pull()
val future2 = queue.pull()
an[IllegalStateException] shouldBe thrownBy { Await.result(future2, 300.millis) }
sub.sendNext(1)
future.pipeTo(testActor)
expectMsg(Some(1))
sub.sendComplete()
}
"wait for next element from upstream" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source(probe).runWith(Sink.queue(3))
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMsg(noMsgTimeout)
sub.sendNext(1)
expectMsg(Some(1))
sub.sendComplete()
}
"fail future on stream failure" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source(probe).runWith(Sink.queue(3))
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMsg(noMsgTimeout)
sub.sendError(ex)
expectMsg(Status.Failure(ex))
}
"fail future when stream failed" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source(probe).runWith(Sink.queue(3, 100.milli))
val sub = probe.expectSubscription()
sub.sendError(ex) // potential race condition
val future = queue.pull()
future.onFailure { case e e.getClass() should be(classOf[AskTimeoutException]); Unit }
future.onSuccess { case _ fail() }
Await.ready(future, 1.second)
}
"timeout future when stream cannot provide data" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source(probe).runWith(Sink.queue(3))
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
expectNoMsg(noMsgTimeout)
sub.sendNext(1)
expectMsg(Some(1))
sub.sendComplete()
}
"work when buffer is 0" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source(probe).runWith(Sink.queue(0))
val sub = probe.expectSubscription()
sub.sendNext(1)
queue.pull().pipeTo(testActor)
sub.sendNext(2)
expectMsg(Some(2))
sub.sendComplete()
}
}
}

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber }
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent._
import akka.pattern.pipe
class AcknowledgeSourceSpec extends AkkaSpec {
implicit val mat = ActorMaterializer()
implicit val ec = system.dispatcher
def assertSuccess(b: Boolean, fb: Future[Boolean]): Unit =
Await.result(fb, 1.second) should be(b)
"A AcknowledgeSource" must {
"emit received messages to the stream" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(10, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
sub.request(2)
assertSuccess(true, queue.offer(1))
s.expectNext(1)
assertSuccess(true, queue.offer(2))
s.expectNext(2)
assertSuccess(true, queue.offer(3))
sub.cancel()
}
"buffer when needed" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink(s)).run()
val sub = s.expectSubscription
for (n 1 to 20) assertSuccess(true, queue.offer(n))
sub.request(10)
for (n 1 to 10) assertSuccess(true, queue.offer(n))
sub.request(10)
for (n 11 to 20) assertSuccess(true, queue.offer(n))
for (n 200 to 399) assertSuccess(true, queue.offer(n))
sub.request(100)
for (n 300 to 399) assertSuccess(true, queue.offer(n))
sub.cancel()
}
"not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink(s)).run()
val sub = s.expectSubscription
sub.request(1)
assertSuccess(true, queue.offer(1))
s.expectNext(1)
sub.cancel()
}
"return false when can reject element to buffer" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink(s)).run()
val sub = s.expectSubscription
assertSuccess(true, queue.offer(1))
assertSuccess(false, queue.offer(2))
sub.request(1)
s.expectNext(1)
sub.cancel()
}
"wait when buffer is full and backpressure is on" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(2, OverflowStrategy.backpressure).to(Sink(s)).run()
val sub = s.expectSubscription
assertSuccess(true, queue.offer(1))
val addedSecond = queue.offer(2)
addedSecond.pipeTo(testActor)
expectNoMsg(300.millis)
sub.request(1)
s.expectNext(1)
assertSuccess(true, addedSecond)
sub.request(1)
s.expectNext(2)
sub.cancel()
}
}
}

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.Future
/**
* This trait allows to have the queue as a data source for some stream.
*/
trait SourceQueue[T] {
/**
* Method offers next element to a stream and returns future that:
* - competes with true if element is consumed by a stream
* - competes with false when stream dropped offered element
* - fails if stream is completed or cancelled.
*
* @param elem element to send to a stream
*/
def offer(elem: T): Future[Boolean]
}
/**
* Trait allows to have the queue as a sink for some stream.
* "SinkQueue" pulls data from stream with backpressure mechanism.
*/
trait SinkQueue[T] {
/**
* Method pulls elements from stream and returns future that:
* - fails if stream is finished
* - completes with None in case if stream is completed after we got future
* - completes with `Some(element)` in case next element is available from stream.
*/
def pull(): Future[Option[T]]
}

View file

@ -0,0 +1,86 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.{ ActorRef, Props }
import akka.stream.OverflowStrategy
import akka.stream.OverflowStrategy._
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.impl.AcknowledgePublisher.{ Rejected, Ok }
/**
* INTERNAL API
*/
private[akka] object AcknowledgePublisher {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy) =
Props(new AcknowledgePublisher(bufferSize, overflowStrategy))
case class Ok()
case class Rejected()
}
/**
* INTERNAL API
*/
private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: OverflowStrategy)
extends ActorRefSourceActor(bufferSize, overflowStrategy) {
var backpressedElem: Option[ActorRef] = None
override def requestElem: Receive = {
case _: Request
// totalDemand is tracked by super
if (bufferSize != 0)
while (totalDemand > 0L && !buffer.isEmpty) {
//if buffer is full - sent ack message to sender in case of Backpressure mode
if (buffer.isFull) backpressedElem match {
case Some(ref)
ref ! Ok(); backpressedElem = None
case None //do nothing
}
onNext(buffer.dequeue())
}
}
override def receiveElem: Receive = {
case elem if isActive
if (totalDemand > 0L) {
onNext(elem)
sendAck(true)
} else if (bufferSize == 0) {
log.debug("Dropping element because there is no downstream demand: [{}]", elem)
sendAck(false)
} else if (!buffer.isFull)
enqueueAndSendAck(elem)
else overflowStrategy match {
case DropHead
buffer.dropHead()
enqueueAndSendAck(elem)
case DropTail
buffer.dropTail()
enqueueAndSendAck(elem)
case DropBuffer
buffer.clear()
enqueueAndSendAck(elem)
case DropNew
sendAck(false)
case Fail
onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
case Backpressure
sendAck(false) //does not allow to send more than buffer size
}
}
def enqueueAndSendAck(elem: Any): Unit = {
buffer.enqueue(elem)
if (buffer.isFull && overflowStrategy == Backpressure) backpressedElem = Some(sender)
else sendAck(true)
}
def sendAck(isOk: Boolean): Unit = {
val msg = if (isOk) Ok() else Rejected()
context.sender() ! msg
}
}

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.{ ActorLogging, ActorRef, Props, Status }
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage, RequestStrategy }
import scala.util.{ Try, Failure, Success }
private[akka] object AcknowledgeSubscriber {
def props(highWatermark: Int) =
Props(new AcknowledgeSubscriber(highWatermark))
}
/**
* INTERNAL API
*/
private[akka] class AcknowledgeSubscriber(maxBuffer: Int) extends ActorSubscriber with ActorLogging {
import ActorSubscriberMessage._
var buffer: Vector[Any] = Vector.empty
override val requestStrategy = new RequestStrategy {
def requestDemand(remainingRequested: Int): Int = {
maxBuffer - buffer.size - remainingRequested
}
}
var requester: Option[ActorRef] = None
def receive = {
case Request(_)
if (requester.isEmpty) {
requester = Some(sender)
trySendElementDownstream()
} else
sender ! Status.Failure(
new IllegalStateException("You have to wait for first future to be resolved to send another request"))
case OnNext(elem)
if (maxBuffer != 0) {
buffer :+= elem
trySendElementDownstream()
} else requester match {
case Some(ref)
requester = None
ref ! Some(elem)
case None log.debug("Dropping element because there is no downstream demand: [{}]", elem)
}
case OnError(cause)
trySendDownstream(Status.Failure(cause))
context.stop(self)
case OnComplete
if (buffer.isEmpty) {
trySendDownstream(Status.Success(None))
context.stop(self)
}
}
def trySendElementDownstream(): Unit = {
requester match {
case Some(ref)
if (buffer.size > 0) {
ref ! Some(buffer.head)
requester = None
buffer = buffer.tail
} else if (canceled) {
ref ! None
context.stop(self)
}
case None //do nothing
}
}
def trySendDownstream(e: Any): Unit = {
requester match {
case Some(ref)
ref ! e
case None //do nothing
}
}
}

View file

@ -27,15 +27,9 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
import akka.stream.OverflowStrategy._
// when bufferSize is 0 there the buffer is not used
private val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize)
def receive = {
case _: Request
// totalDemand is tracked by super
if (bufferSize != 0)
while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue())
protected val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize)
def receive = ({
case Cancel
context.stop(self)
@ -46,6 +40,17 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
case Status.Failure(cause) if isActive
onErrorThenStop(cause)
}: Receive).orElse(requestElem).orElse(receiveElem)
def requestElem: Receive = {
case _: Request
// totalDemand is tracked by super
if (bufferSize != 0)
while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue())
}
def receiveElem: Receive = {
case elem if isActive
if (totalDemand > 0L)
onNext(elem)

View file

@ -4,13 +4,18 @@
package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props }
import akka.stream.impl.StreamLayout.Module
import akka.actor._
import akka.stream._
import akka.stream.impl.AcknowledgePublisher.{ Ok, Rejected }
import akka.stream.impl.StreamLayout.Module
import akka.util.Timeout
import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Promise
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ Future, Promise }
import scala.language.postfixOps
import scala.util.{ Failure, Success }
/**
@ -163,3 +168,32 @@ private[akka] final class ActorRefSource[Out](
override def withAttributes(attr: Attributes): Module =
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
private[akka] final class AcknowledgeSource[Out](bufferSize: Int, overflowStrategy: OverflowStrategy,
val attributes: Attributes, shape: SourceShape[Out],
timeout: FiniteDuration = 5 seconds)
extends SourceModule[Out, SourceQueue[Out]](shape) {
override def create(context: MaterializationContext) = {
import akka.pattern.ask
val ref = ActorMaterializer.downcast(context.materializer).actorOf(context,
AcknowledgePublisher.props(bufferSize, overflowStrategy))
implicit val t = Timeout(timeout)
(akka.stream.actor.ActorPublisher[Out](ref), new SourceQueue[Out] {
implicit val ctx = context.materializer.executionContext
override def offer(out: Out): Future[Boolean] = (ref ? out).map {
case Ok() true
case Rejected() false
}
})
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, SourceQueue[Out]] =
new AcknowledgeSource[Out](bufferSize, overflowStrategy, attributes, shape, timeout)
override def withAttributes(attr: Attributes): Module =
new AcknowledgeSource(bufferSize, overflowStrategy, attr, amendShape(attr), timeout)
}

View file

@ -4,11 +4,16 @@
package akka.stream.impl
import akka.actor.{ Deploy, ActorRef, Props }
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.impl.StreamLayout.Module
import akka.stream.{ Attributes, Inlet, Shape, SinkShape, MaterializationContext, ActorMaterializer }
import akka.stream._
import akka.util.Timeout
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ Future, Promise }
import scala.language.postfixOps
import scala.util.Try
/**
* INTERNAL API
@ -217,3 +222,28 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
override def toString: String = "ActorRefSink"
}
/**
* INTERNAL API
*/
private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: Attributes,
shape: SinkShape[In], timeout: FiniteDuration) extends SinkModule[In, SinkQueue[In]](shape) {
override def create(context: MaterializationContext) = {
import akka.pattern.ask
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
implicit val t = Timeout(timeout)
val subscriberRef = actorMaterializer.actorOf(context,
AcknowledgeSubscriber.props(bufferSize))
(akka.stream.actor.ActorSubscriber[In](subscriberRef),
new SinkQueue[In] {
override def pull(): Future[Option[In]] = (subscriberRef ? Request(1)).mapTo[Option[In]]
})
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, SinkQueue[In]] =
new AcknowledgeSink[In](bufferSize, attributes, shape, timeout)
override def withAttributes(attr: Attributes): Module =
new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout)
override def toString: String = "AcknowledgeSink"
}

View file

@ -75,6 +75,7 @@ private[stream] object Stages {
val actorRefSource = name("actorRefSource")
val synchronousFileSource = name("synchronousFileSource")
val inputStreamSource = name("inputStreamSource")
val acknowledgeSource = name("acknowledgeSource")
val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink")
@ -86,6 +87,7 @@ private[stream] object Stages {
val actorSubscriberSink = name("actorSubscriberSink")
val synchronousFileSink = name("synchronousFileSink")
val outputStreamSink = name("outputStreamSink")
val acknowledgeSink = name("acknowledgeSink")
}
import DefaultAttributes._

View file

@ -9,6 +9,7 @@ import akka.stream.impl.StreamLayout
import akka.stream.{ javadsl, scaladsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
@ -147,6 +148,20 @@ object Sink {
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num strategy.apply(num)))
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]].
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* `Future` completes when element is available.
*
* `Sink` will request at most `bufferSize` number of elements from
* upstream and then stop back pressure.
*
* @param bufferSize The size of the buffer in element count
* @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]``
*/
def queue[T](bufferSize: Int, timeout: FiniteDuration): Sink[T, SinkQueue[T]] =
new Sink(scaladsl.Sink.queue(bufferSize, timeout))
}
/**

View file

@ -3,27 +3,20 @@
*/
package akka.stream.javadsl
import java.io.File
import akka.japi.function
import scala.collection.immutable
import java.util.concurrent.Callable
import akka.actor.{ Cancellable, ActorRef, Props }
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter
import akka.japi.Util
import akka.stream.Attributes._
import akka.japi.{ Util, function }
import akka.stream._
import akka.stream.impl.{ ActorPublisherSource, StreamLayout }
import akka.util.ByteString
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import akka.stream.impl.StreamLayout
import akka.stream.stage.Stage
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
import scala.concurrent.{ Promise, Future }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import scala.language.implicitConversions
import akka.stream.stage.Stage
import akka.stream.impl.StreamLayout
import scala.annotation.varargs
import scala.concurrent.{ Future, Promise }
import scala.language.{ higherKinds, implicitConversions }
/** Java API */
object Source {
@ -235,6 +228,32 @@ object Source {
val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq()
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num strategy.apply(num)))
}
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true
* if element was added to buffer or sent downstream. It completes
* with false if element was dropped.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]``
*/
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration): Source[T, SourceQueue[T]] =
new Source(scaladsl.Source.queue(bufferSize, overflowStrategy, timeout))
}
/**

View file

@ -3,18 +3,17 @@
*/
package akka.stream.scaladsl
import akka.stream.javadsl
import akka.actor.{ ActorRef, Props }
import akka.stream._
import akka.stream.impl.Stages.{ MapAsyncUnordered, DefaultAttributes }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.Attributes._
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage, SyncDirective }
import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective }
import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
@ -205,4 +204,20 @@ object Sink extends SinkApply {
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
/**
* Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]].
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* `Future` completes when element is available.
*
* `Sink` will request at most `bufferSize` number of elements from
* upstream and then stop back pressure.
*
* @param bufferSize The size of the buffer in element count
* @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]``
*/
def queue[T](bufferSize: Int, timeout: FiniteDuration = 5.seconds): Sink[T, SinkQueue[T]] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout))
}
}

View file

@ -12,7 +12,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ Future, Promise }
import scala.language.higherKinds
@ -401,4 +401,32 @@ object Source extends SourceApply {
combineRest(2, rest.iterator)
})
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true
* if element was added to buffer or sent downstream. It completes
* with false if element was dropped.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
*
* @param bufferSize The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]``
*/
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration = 5.seconds): Source[T, SourceQueue[T]] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource")))
}
}