=str #18890 Polish Source.queue

This commit is contained in:
Alexander Golubev 2016-01-16 12:17:19 -05:00
parent 7e7a2808f1
commit d5f81e19d1
19 changed files with 565 additions and 332 deletions

View file

@ -6,7 +6,7 @@ package akka.http.impl.engine.client
import java.net.InetSocketAddress
import akka.stream.OverflowStrategy.Fail.BufferOverflowException
import akka.stream.BufferOverflowException
import scala.annotation.tailrec
import scala.concurrent.Promise

View file

@ -1,96 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber }
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent._
import akka.pattern.pipe
class AcknowledgeSourceSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
def assertSuccess(b: Boolean, fb: Future[Boolean]): Unit =
Await.result(fb, 1.second) should be(b)
"A AcknowledgeSource" must {
"emit received messages to the stream" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
sub.request(2)
assertSuccess(true, queue.offer(1))
s.expectNext(1)
assertSuccess(true, queue.offer(2))
s.expectNext(2)
assertSuccess(true, queue.offer(3))
sub.cancel()
}
"buffer when needed" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
for (n 1 to 20) assertSuccess(true, queue.offer(n))
sub.request(10)
for (n 1 to 10) assertSuccess(true, queue.offer(n))
sub.request(10)
for (n 11 to 20) assertSuccess(true, queue.offer(n))
for (n 200 to 399) assertSuccess(true, queue.offer(n))
sub.request(100)
for (n 300 to 399) assertSuccess(true, queue.offer(n))
sub.cancel()
}
"not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
sub.request(1)
assertSuccess(true, queue.offer(1))
s.expectNext(1)
sub.cancel()
}
"return false when can reject element to buffer" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
assertSuccess(true, queue.offer(1))
assertSuccess(false, queue.offer(2))
sub.request(1)
s.expectNext(1)
sub.cancel()
}
"wait when buffer is full and backpressure is on" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(2, OverflowStrategy.backpressure).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
assertSuccess(true, queue.offer(1))
val addedSecond = queue.offer(2)
addedSecond.pipeTo(testActor)
expectNoMsg(300.millis)
sub.request(1)
s.expectNext(1)
assertSuccess(true, addedSecond)
sub.request(1)
s.expectNext(2)
sub.cancel()
}
}
}

View file

@ -6,8 +6,7 @@ package akka.stream.scaladsl
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, OverflowStrategy }
import akka.stream.OverflowStrategy.Fail.BufferOverflowException
import akka.stream.{ BufferOverflowException, ActorMaterializer, ActorMaterializerSettings, OverflowStrategy }
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._

View file

@ -7,7 +7,7 @@ import akka.stream.Attributes._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
import akka.stream.{ DelayOverflowStrategy, ActorMaterializer }
import akka.stream.{ BufferOverflowException, DelayOverflowStrategy, ActorMaterializer }
import scala.concurrent.Await
import scala.concurrent.duration._
@ -103,7 +103,7 @@ class FlowDelaySpec extends AkkaSpec {
.withAttributes(inputBuffer(16, 16))
.runWith(TestSink.probe[Int])
.request(100)
.expectError(new DelayOverflowStrategy.Fail.BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!"))
.expectError(new BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!"))
}

View file

@ -5,7 +5,7 @@ package akka.stream.scaladsl
import akka.actor.Status
import akka.pattern.pipe
import akka.stream.ActorMaterializer
import akka.stream.{ OverflowStrategy, ActorMaterializer }
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, _ }
@ -97,5 +97,20 @@ class QueueSinkSpec extends AkkaSpec {
queue.pull()
}
"fail pull future when stream is completed" in assertAllStagesStopped {
val probe = TestPublisher.manualProbe[Int]()
val queue = Source.fromPublisher(probe).runWith(Sink.queue())
val sub = probe.expectSubscription()
queue.pull().pipeTo(testActor)
sub.sendNext(1)
expectMsg(Some(1))
sub.sendComplete()
Await.result(queue.pull(), noMsgTimeout) should be(None)
queue.pull().onFailure { case e e.isInstanceOf[IllegalStateException] should ===(true) }
}
}
}

View file

@ -0,0 +1,228 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.actor.{ NoSerializationVerificationNeeded, Status }
import akka.pattern.pipe
import akka.stream._
import akka.stream.impl.QueueSource
import akka.stream.stage.OutHandler
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber }
import akka.testkit.TestProbe
import scala.concurrent.duration._
import scala.concurrent.{ Future, _ }
class QueueSourceSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val pause = 300.millis
def assertSuccess(f: Future[QueueOfferResult]): Unit = {
f pipeTo testActor
expectMsg(QueueOfferResult.Enqueued)
}
object SourceTestMessages {
case object Pull extends NoSerializationVerificationNeeded
case object Finish extends NoSerializationVerificationNeeded
}
def testSource(maxBuffer: Int, overflowStrategy: OverflowStrategy, probe: TestProbe): Source[Int, SourceQueue[Int]] = {
class QueueSourceTestStage(maxBuffer: Int, overflowStrategy: OverflowStrategy)
extends QueueSource[Int](maxBuffer, overflowStrategy) {
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val (logic, inputStream) = super.createLogicAndMaterializedValue(inheritedAttributes)
val outHandler = logic.handlers(out.id).asInstanceOf[OutHandler]
logic.handlers(out.id) = new OutHandler {
override def onPull(): Unit = {
probe.ref ! SourceTestMessages.Pull
outHandler.onPull()
}
override def onDownstreamFinish(): Unit = {
probe.ref ! SourceTestMessages.Finish
outHandler.onDownstreamFinish()
}
}
(logic, inputStream)
}
}
Source.fromGraph(new QueueSourceTestStage(maxBuffer, overflowStrategy))
}
"A QueueSourceSpec" must {
"emit received messages to the stream" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
for (i 1 to 3) {
sub.request(1)
assertSuccess(queue.offer(i))
s.expectNext(i)
}
queue.watchCompletion().pipeTo(testActor)
expectNoMsg(pause)
sub.cancel()
expectMsg(())
}
"buffer when needed" in {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
for (n 1 to 20) assertSuccess(queue.offer(n))
sub.request(10)
for (n 1 to 10) assertSuccess(queue.offer(n))
sub.request(10)
for (n 11 to 20) assertSuccess(queue.offer(n))
for (n 200 to 399) assertSuccess(queue.offer(n))
sub.request(100)
for (n 300 to 399) assertSuccess(queue.offer(n))
sub.cancel()
}
"not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
sub.request(1)
assertSuccess(queue.offer(1))
sub.cancel()
}
"wait for demand when buffer is 0" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
queue.offer(1).pipeTo(testActor)
expectNoMsg(pause)
sub.request(1)
expectMsg(QueueOfferResult.Enqueued)
s.expectNext(1)
sub.cancel()
}
"finish offer and complete futures when stream completed" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
queue.watchCompletion.pipeTo(testActor)
queue.offer(1) pipeTo testActor
expectNoMsg(pause)
sub.cancel()
expectMsgAllOf(QueueOfferResult.QueueClosed, ())
}
"fail stream on buffer overflow in fail mode" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
s.expectSubscription
queue.offer(1)
queue.offer(2)
s.expectError()
}
"remember pull from downstream to send offered element immediately" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val probe = TestProbe()
val queue = testSource(1, OverflowStrategy.dropHead, probe).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
sub.request(1)
probe.expectMsg(SourceTestMessages.Pull)
assertSuccess(queue.offer(1))
s.expectNext(1)
sub.cancel()
}
"fail offer future if user does not wait in backpressure mode" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(5, OverflowStrategy.backpressure).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
for (i 1 to 5) assertSuccess(queue.offer(i))
queue.offer(6).pipeTo(testActor)
expectNoMsg(pause)
val future = queue.offer(7)
future.onFailure { case e e.isInstanceOf[IllegalStateException] should ===(true) }
future.onSuccess { case _ fail() }
Await.ready(future, pause)
sub.request(1)
s.expectNext(1)
expectMsg(QueueOfferResult.Enqueued)
sub.cancel()
}
"complete watching future with failure if stream failed" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
queue.watchCompletion().pipeTo(testActor)
queue.offer(1) //need to wait when first offer is done as initialization can be done in this moment
queue.offer(2)
expectMsgClass(classOf[Status.Failure])
}
"return false when elemen was not added to buffer" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
queue.offer(1)
queue.offer(2) pipeTo testActor
expectMsg(QueueOfferResult.Dropped)
sub.request(1)
s.expectNext(1)
sub.cancel()
}
"wait when buffer is full and backpressure is on" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.backpressure).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
assertSuccess(queue.offer(1))
queue.offer(2) pipeTo testActor
expectNoMsg(pause)
sub.request(1)
s.expectNext(1)
sub.request(1)
s.expectNext(2)
expectMsg(QueueOfferResult.Enqueued)
sub.cancel()
}
"fail offer future when stream is completed" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
queue.watchCompletion().pipeTo(testActor)
sub.cancel()
expectMsg(())
queue.offer(1).onFailure { case e e.isInstanceOf[IllegalStateException] should ===(true) }
}
}
}

View file

@ -3,48 +3,53 @@
*/
package akka.stream
/**
* Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element.
*/
sealed abstract class OverflowStrategy extends Serializable
sealed trait DelayOverflowStrategy extends Serializable
private[akka] trait BaseOverflowStrategy {
import OverflowStrategies._
/**
* INTERNAL API
* Represents a strategy that decides how to deal with a buffer of time based stage
* that is full but is about to receive a new element.
*/
private[akka] case object DropHead extends OverflowStrategy with DelayOverflowStrategy
sealed abstract class DelayOverflowStrategy extends Serializable
/**
* INTERNAL API
*/
private[akka] case object DropTail extends OverflowStrategy with DelayOverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object DropBuffer extends OverflowStrategy with DelayOverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object DropNew extends OverflowStrategy with DelayOverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object Backpressure extends OverflowStrategy with DelayOverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object Fail extends OverflowStrategy with DelayOverflowStrategy {
final case class BufferOverflowException(msg: String) extends RuntimeException(msg)
}
/**
* Represents a strategy that decides how to deal with a buffer that is full but is
* about to receive a new element.
*/
sealed abstract class OverflowStrategy extends DelayOverflowStrategy
private[akka] object OverflowStrategies {
/**
* INTERNAL API
*/
private[akka] case object DropHead extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object DropTail extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object DropBuffer extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object DropNew extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object Backpressure extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object Fail extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case object EmitEarly extends DelayOverflowStrategy
}
object OverflowStrategy extends BaseOverflowStrategy {
object OverflowStrategy {
/**
* If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
* the new element.
@ -79,12 +84,7 @@ object OverflowStrategy extends BaseOverflowStrategy {
def fail: OverflowStrategy = Fail
}
object DelayOverflowStrategy extends BaseOverflowStrategy {
/**
* INTERNAL API
*/
private[akka] case object EmitEarly extends DelayOverflowStrategy
object DelayOverflowStrategy {
/**
* If the buffer is full when a new element is available this strategy send next element downstream without waiting
*/

View file

@ -12,13 +12,21 @@ trait SourceQueue[T] {
/**
* Method offers next element to a stream and returns future that:
* - competes with true if element is consumed by a stream
* - competes with false when stream dropped offered element
* - fails if stream is completed or cancelled.
* - completes with `Enqueued` if element is consumed by a stream
* - completes with `Dropped` when stream dropped offered element
* - completes with `QueueClosed` when stream is completed during future is active
* - completes with `Failure(f)` when failure to enqueue element from upstream
* - fails when stream is completed or you cannot call offer in this moment because of implementation rules
* (like for backpressure mode and full buffer you need to wait for last offer call Future completion)
*
* @param elem element to send to a stream
*/
def offer(elem: T): Future[Boolean]
def offer(elem: T): Future[QueueOfferResult]
/**
* Method returns future that completes when stream is completed and fails when stream failed
*/
def watchCompletion(): Future[Unit]
}
/**
@ -35,3 +43,33 @@ trait SinkQueue[T] {
*/
def pull(): Future[Option[T]]
}
sealed abstract class QueueOfferResult
/**
* Contains types that is used as return types for async callbacks to streams
*/
object QueueOfferResult {
/**
* Type is used to indicate that stream is successfully enqueued an element
*/
final case object Enqueued extends QueueOfferResult
/**
* Type is used to indicate that stream is dropped an element
*/
final case object Dropped extends QueueOfferResult
/**
* Type is used to indicate that stream is failed before or during call to the stream
* @param cause - exception that stream failed with
*/
final case class Failure(cause: Throwable) extends QueueOfferResult
/**
* Type is used to indicate that stream is completed before call
*/
case object QueueClosed extends QueueOfferResult
}

View file

@ -1,92 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.{ ActorRef, Props }
import akka.stream.OverflowStrategy
import akka.stream.OverflowStrategy._
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.impl.AcknowledgePublisher.{ Rejected, Ok }
/**
* INTERNAL API
*/
private[akka] object AcknowledgePublisher {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy) =
Props(new AcknowledgePublisher(bufferSize, overflowStrategy))
case class Ok()
case class Rejected()
}
/**
* INTERNAL API
*/
private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: OverflowStrategy)
extends ActorRefSourceActor(bufferSize, overflowStrategy) {
var backpressedElem: Option[ActorRef] = None
override def requestElem: Receive = {
case _: Request
// totalDemand is tracked by super
if (bufferSize != 0)
while (totalDemand > 0L && !buffer.isEmpty) {
//if buffer is full - sent ack message to sender in case of Backpressure mode
if (buffer.isFull) backpressedElem match {
case Some(ref)
ref ! Ok(); backpressedElem = None
case None //do nothing
}
onNext(buffer.dequeue())
}
}
override def receiveElem: Receive = {
case elem if isActive
if (totalDemand > 0L) {
onNext(elem)
sendAck(true)
} else if (bufferSize == 0) {
log.debug("Dropping element because there is no downstream demand: [{}]", elem)
sendAck(false)
} else if (!buffer.isFull)
enqueueAndSendAck(elem)
else (overflowStrategy: @unchecked) match {
case DropHead
log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]")
buffer.dropHead()
enqueueAndSendAck(elem)
case DropTail
log.debug("Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]")
buffer.dropTail()
enqueueAndSendAck(elem)
case DropBuffer
log.debug("Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]")
buffer.clear()
enqueueAndSendAck(elem)
case DropNew
log.debug("Dropping the new element because buffer is full and overflowStrategy is: [DropNew]")
sendAck(false)
case Fail
log.error("Failing because buffer is full and overflowStrategy is: [Fail]")
onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
case Backpressure
log.debug("Backpressuring because buffer is full and overflowStrategy is: [Backpressure]")
sendAck(false) //does not allow to send more than buffer size
}
}
def enqueueAndSendAck(elem: Any): Unit = {
buffer.enqueue(elem)
if (buffer.isFull && overflowStrategy == Backpressure) backpressedElem = Some(sender)
else sendAck(true)
}
def sendAck(isOk: Boolean): Unit = {
val msg = if (isOk) Ok() else Rejected()
context.sender() ! msg
}
}

View file

@ -6,14 +6,15 @@ package akka.stream.impl
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.Status
import akka.stream.OverflowStrategy
import akka.stream.OverflowStrategies._
import akka.stream.{ BufferOverflowException, OverflowStrategy, OverflowStrategies }
/**
* INTERNAL API
*/
private[akka] object ActorRefSourceActor {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = {
require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported")
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
Props(new ActorRefSourceActor(bufferSize, overflowStrategy))
}
}
@ -58,7 +59,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
log.debug("Dropping element because there is no downstream demand: [{}]", elem)
else if (!buffer.isFull)
buffer.enqueue(elem)
else (overflowStrategy: @unchecked) match {
else overflowStrategy match {
case DropHead
log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]")
buffer.dropHead()
@ -76,7 +77,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
log.debug("Dropping the new element because buffer is full and overflowStrategy is: [DropNew]")
case Fail
log.error("Failing because buffer is full and overflowStrategy is: [Fail]")
onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
onErrorThenStop(new BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
case Backpressure
// there is a precondition check in Source.actorRefSource factory method
log.debug("Backpressuring because buffer is full and overflowStrategy is: [Backpressure]")

View file

@ -3,19 +3,14 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicInteger
import akka.NotUsed
import akka.actor._
import akka.stream._
import akka.stream.impl.AcknowledgePublisher.{ Ok, Rejected }
import akka.stream.impl.StreamLayout.Module
import akka.util.Timeout
import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.Promise
import scala.language.postfixOps
/**
@ -122,32 +117,3 @@ private[akka] final class ActorRefSource[Out](
override def withAttributes(attr: Attributes): Module =
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
private[akka] final class AcknowledgeSource[Out](bufferSize: Int, overflowStrategy: OverflowStrategy,
val attributes: Attributes, shape: SourceShape[Out],
timeout: FiniteDuration = 5 seconds)
extends SourceModule[Out, SourceQueue[Out]](shape) {
override def create(context: MaterializationContext) = {
import akka.pattern.ask
val ref = ActorMaterializer.downcast(context.materializer).actorOf(context,
AcknowledgePublisher.props(bufferSize, overflowStrategy))
implicit val t = Timeout(timeout)
(akka.stream.actor.ActorPublisher[Out](ref), new SourceQueue[Out] {
implicit val ctx = context.materializer.executionContext
override def offer(out: Out): Future[Boolean] = (ref ? out).map {
case Ok() true
case Rejected() false
}
})
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, SourceQueue[Out]] =
new AcknowledgeSource[Out](bufferSize, overflowStrategy, attributes, shape, timeout)
override def withAttributes(attr: Attributes): Module =
new AcknowledgeSource(bufferSize, overflowStrategy, attr, amendShape(attr), timeout)
}

View file

@ -10,7 +10,7 @@ import akka.actor.{ ActorRef, Props }
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.stage.{ AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler }
import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
@ -243,11 +243,7 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV
* INTERNAL API
*/
private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] {
trait RequestElementCallback[E] {
val requestElement = new AtomicReference[AnyRef](Nil)
}
type Requested[E] = Promise[Option[T]]
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
override val shape: SinkShape[T] = SinkShape.of(in)
@ -261,15 +257,17 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS
val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1)
var currentRequest: Option[Requested[T]] = None
val stageLogic = new GraphStageLogic(shape) with RequestElementCallback[Requested[T]] {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
override def preStart(): Unit = {
setKeepGoing(true)
val list = requestElement.getAndSet(callback.invoke _).asInstanceOf[List[Requested[T]]]
list.reverse.foreach(callback.invoke)
initCallback(callback.invoke)
pull(in)
}
override def postStop(): Unit = stopCallback(promise
promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached")))
private val callback: AsyncCallback[Requested[T]] =
getAsyncCallback(promise currentRequest match {
case Some(_)
@ -311,17 +309,8 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS
(stageLogic, new SinkQueue[T] {
override def pull(): Future[Option[T]] = {
val ref = stageLogic.requestElement
val p = Promise[Option[T]]
ref.get() match {
case l: List[_]
if (!ref.compareAndSet(l, p :: l))
ref.get() match {
case _: List[_] throw new IllegalStateException("Concurrent call of SinkQueue.pull() is detected")
case f: Function1[_, _] f.asInstanceOf[Requested[T] Unit](p)
}
case f: Function1[_, _] f.asInstanceOf[Requested[T] Unit](p)
}
stageLogic.invoke(p)
p.future
}
})

View file

@ -0,0 +1,127 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.OverflowStrategies._
import akka.stream._
import akka.stream.stage._
import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
*/
private[akka] class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueue[T]] {
type Offered = Promise[QueueOfferResult]
val out = Outlet[T]("queueSource.out")
override val shape: SourceShape[T] = SourceShape.of(out)
val completion = Promise[Unit]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[(T, Offered)] {
val buffer = if (maxBuffer == 0) null else FixedSizeBuffer[T](maxBuffer)
var pendingOffer: Option[(T, Offered)] = None
var pulled = false
override def preStart(): Unit = initCallback(callback.invoke)
override def postStop(): Unit = stopCallback {
case (elem, promise) promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached"))
}
private def enqueueAndSuccess(elem: T, promise: Offered): Unit = {
buffer.enqueue(elem)
promise.success(QueueOfferResult.Enqueued)
}
private def bufferElem(elem: T, promise: Offered): Unit = {
if (!buffer.isFull) {
enqueueAndSuccess(elem, promise)
} else overflowStrategy match {
case DropHead
buffer.dropHead()
enqueueAndSuccess(elem, promise)
case DropTail
buffer.dropTail()
enqueueAndSuccess(elem, promise)
case DropBuffer
buffer.clear()
enqueueAndSuccess(elem, promise)
case DropNew
promise.success(QueueOfferResult.Dropped)
case Fail
val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!")
promise.success(QueueOfferResult.Failure(bufferOverflowException))
completion.failure(bufferOverflowException)
failStage(bufferOverflowException)
case Backpressure
pendingOffer match {
case Some(_)
promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request"))
case None
pendingOffer = Some((elem, promise))
}
}
}
private val callback: AsyncCallback[(T, Offered)] = getAsyncCallback(tuple {
val (elem, promise) = tuple
if (maxBuffer != 0) {
bufferElem(elem, promise)
if (pulled) {
push(out, buffer.dequeue())
pulled = false
}
} else if (pulled) {
push(out, elem)
pulled = false
promise.success(QueueOfferResult.Enqueued)
} else pendingOffer = Some(tuple)
})
setHandler(out, new OutHandler {
override def onDownstreamFinish(): Unit = {
pendingOffer match {
case Some((elem, promise))
promise.success(QueueOfferResult.QueueClosed)
pendingOffer = None
case None // do nothing
}
completion.success(())
completeStage()
}
override def onPull(): Unit = {
if (maxBuffer == 0)
pendingOffer match {
case Some((elem, promise))
push(out, elem)
promise.success(QueueOfferResult.Enqueued)
pendingOffer = None
case None pulled = true
}
else if (!buffer.isEmpty) {
push(out, buffer.dequeue())
pendingOffer match {
case Some((elem, promise))
enqueueAndSuccess(elem, promise)
pendingOffer = None
case None //do nothing
}
} else pulled = true
}
})
}
(stageLogic, new SourceQueue[T] {
override def watchCompletion() = completion.future
override def offer(element: T): Future[QueueOfferResult] = {
val p = Promise[QueueOfferResult]()
stageLogic.invoke((element, p))
p.future
}
})
}
}

View file

@ -87,7 +87,7 @@ private[stream] object Stages {
val subscriberSource = name("subscriberSource")
val actorPublisherSource = name("actorPublisherSource")
val actorRefSource = name("actorRefSource")
val acknowledgeSource = name("acknowledgeSource")
val queueSource = name("queueSource")
val inputStreamSource = name("inputStreamSource") and IODispatcher
val outputStreamSource = name("outputStreamSource") and IODispatcher
val fileSource = name("fileSource") and IODispatcher

View file

@ -6,8 +6,8 @@ package akka.stream.impl.fusing
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.DelayOverflowStrategy.EmitEarly
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance }
import akka.stream.stage._
@ -372,8 +372,6 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta
*/
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
import OverflowStrategy._
private val buffer = FixedSizeBuffer[T](size)
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
@ -394,8 +392,8 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
if (buffer.isEmpty) ctx.finish()
else ctx.absorbTermination()
val enqueueAction: (DetachedContext[T], T) UpstreamDirective = {
(overflowStrategy: @unchecked) match {
val enqueueAction: (DetachedContext[T], T) UpstreamDirective =
overflowStrategy match {
case DropHead (ctx, elem)
if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem)
@ -416,13 +414,13 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
if (buffer.isFull) ctx.holdUpstream()
else ctx.pull()
case Fail (ctx, elem)
if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
if (buffer.isFull) ctx.fail(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
buffer.enqueue(elem)
ctx.pull()
}
}
}
}
/**
@ -961,7 +959,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
setHandler(in, handler = new InHandler {
//FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full
override def onPush(): Unit = {
if (buffer.isFull) (strategy: @unchecked) match {
if (buffer.isFull) strategy match {
case EmitEarly
if (!isTimerActive(timerName))
push(out, buffer.dequeue()._2)
@ -969,24 +967,24 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
cancelTimer(timerName)
onTimer(timerName)
}
case DelayOverflowStrategy.DropHead
case DropHead
buffer.dropHead()
grabAndPull(true)
case DelayOverflowStrategy.DropTail
case DropTail
buffer.dropTail()
grabAndPull(true)
case DelayOverflowStrategy.DropNew
case DropNew
grab(in)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
case DelayOverflowStrategy.DropBuffer
case DropBuffer
buffer.clear()
grabAndPull(true)
case DelayOverflowStrategy.Fail
failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
case DelayOverflowStrategy.Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
case Fail
failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
else {
grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1)
grabAndPull(strategy != Backpressure || buffer.size < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
}
}

View file

@ -272,28 +272,34 @@ object Source {
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true
* if element was added to buffer or sent downstream. It completes
* with false if element was dropped.
* [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full.
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
* You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed.
*
* @param bufferSize The size of the buffer in element count
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand.
* When `bufferSize` is 0 the `overflowStrategy` does not matter.
*
* SourceQueue that current source is materialized to is for single thread usage only.
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]``
*/
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration): Source[T, SourceQueue[T]] =
new Source(scaladsl.Source.queue(bufferSize, overflowStrategy, timeout))
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueue[T]] =
new Source(scaladsl.Source.queue(bufferSize, overflowStrategy))
}

View file

@ -361,7 +361,7 @@ object Source {
*/
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported")
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
}
@ -387,29 +387,33 @@ object Source {
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true
* if element was added to buffer or sent downstream. It completes
* with false if element was dropped.
* [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full.
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
* You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed.
*
* @param bufferSize The size of the buffer in element count
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand.
* When `bufferSize` is 0 the `overflowStrategy` does not matter.
*
* SourceQueue that current source is materialized to is for single thread usage only.
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]``
*/
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration = 5.seconds): Source[T, SourceQueue[T]] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource")))
}
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueue[T]] =
Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource))
}

View file

@ -3,23 +3,21 @@
*/
package akka.stream.stage
import java.util
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference }
import akka.NotUsed
import java.util.concurrent.locks.ReentrantLock
import akka.actor._
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch }
import akka.event.LoggingAdapter
import akka.japi.function.{ Effect, Procedure }
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule, GraphStageModule, SubSource, SubSink }
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSource, SubSink }
import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName }
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.ActorPublisherMessage
@ -1273,3 +1271,55 @@ abstract class AbstractOutHandler extends OutHandler
* (completing when upstream completes, failing when upstream fails, completing when downstream cancels).
*/
abstract class AbstractInOutHandler extends InHandler with OutHandler
/**
* INTERNAL API
* This trait wraps callback for `GraphStage` stage instances and handle gracefully cases when stage is
* not yet initialized or already finished.
*
* While `GraphStage` has not initialized it adds all requests to list.
* As soon as `GraphStage` is started it stops collecting requests (pointing to real callback
* function) and run all the callbacks from the list
*
* Supposed to be used by GraphStages that share call back to outer world
*/
private[akka] trait CallbackWrapper[T] extends AsyncCallback[T] {
private trait CallbackState
private case class NotInitialized(list: List[T]) extends CallbackState
private case class Initialized(f: T Unit) extends CallbackState
private case class Stopped(f: T Unit) extends CallbackState
/*
* To preserve message order when switching between not initialized / initialized states
* lock is used. Case is similar to RepointableActorRef
*/
private[this] final val lock = new ReentrantLock
private[this] val callbackState = new AtomicReference[CallbackState](NotInitialized(Nil))
def stopCallback(f: T Unit): Unit = locked {
callbackState.set(Stopped(f))
}
def initCallback(f: T Unit): Unit = locked {
val list = (callbackState.getAndSet(Initialized(f)): @unchecked) match {
case NotInitialized(l) l
}
list.reverse.foreach(f)
}
override def invoke(arg: T): Unit = locked {
callbackState.get() match {
case Initialized(cb) cb(arg)
case list @ NotInitialized(l) callbackState.compareAndSet(list, NotInitialized(arg :: l))
case Stopped(cb)
lock.unlock()
cb(arg)
}
}
private[this] def locked(body: Unit): Unit = {
lock.lock()
try body finally if (lock.isLocked) lock.unlock()
}
}