From d5f81e19d1417fa00921a838968d76beb85334a3 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sat, 16 Jan 2016 12:17:19 -0500 Subject: [PATCH] =str #18890 Polish Source.queue --- .../engine/client/PoolInterfaceActor.scala | 2 +- .../scaladsl/AcknowledgeSourceSpec.scala | 96 -------- .../akka/stream/scaladsl/FlowBufferSpec.scala | 3 +- .../akka/stream/scaladsl/FlowDelaySpec.scala | 4 +- .../akka/stream/scaladsl/QueueSinkSpec.scala | 17 +- .../stream/scaladsl/QueueSourceSpec.scala | 228 ++++++++++++++++++ .../scala/akka/stream/OverflowStrategy.scala | 48 ++-- .../src/main/scala/akka/stream/Queue.scala | 46 +++- .../stream/impl/AcknowledgePublisher.scala | 92 ------- .../stream/impl/ActorRefSourceActor.scala | 9 +- .../main/scala/akka/stream/impl/Modules.scala | 36 +-- .../main/scala/akka/stream/impl/Sinks.scala | 27 +-- .../main/scala/akka/stream/impl/Sources.scala | 127 ++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 2 +- .../scala/akka/stream/impl/fusing/Ops.scala | 30 ++- .../scala/akka/stream/javadsl/Source.scala | 34 +-- .../scala/akka/stream/scaladsl/Flow.scala | 4 +- .../scala/akka/stream/scaladsl/Source.scala | 34 +-- .../scala/akka/stream/stage/GraphStage.scala | 58 ++++- 19 files changed, 565 insertions(+), 332 deletions(-) delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/Sources.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 51317681f8..a89c87ceef 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -6,7 +6,7 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress -import akka.stream.OverflowStrategy.Fail.BufferOverflowException +import akka.stream.BufferOverflowException import scala.annotation.tailrec import scala.concurrent.Promise diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala deleted file mode 100644 index 8c394c9b5c..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSourceSpec.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.scaladsl - -import akka.stream.testkit.Utils._ -import akka.stream.testkit.{ AkkaSpec, TestSubscriber } -import akka.stream.{ ActorMaterializer, OverflowStrategy } -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.concurrent._ -import akka.pattern.pipe - -class AcknowledgeSourceSpec extends AkkaSpec { - implicit val materializer = ActorMaterializer() - implicit val ec = system.dispatcher - - def assertSuccess(b: Boolean, fb: Future[Boolean]): Unit = - Await.result(fb, 1.second) should be(b) - - "A AcknowledgeSource" must { - - "emit received messages to the stream" in { - val s = TestSubscriber.manualProbe[Int]() - val queue = Source.queue(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - sub.request(2) - assertSuccess(true, queue.offer(1)) - s.expectNext(1) - assertSuccess(true, queue.offer(2)) - s.expectNext(2) - assertSuccess(true, queue.offer(3)) - sub.cancel() - } - - "buffer when needed" in { - val s = TestSubscriber.manualProbe[Int]() - val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - for (n ← 1 to 20) assertSuccess(true, queue.offer(n)) - sub.request(10) - for (n ← 1 to 10) assertSuccess(true, queue.offer(n)) - sub.request(10) - for (n ← 11 to 20) assertSuccess(true, queue.offer(n)) - - for (n ← 200 to 399) assertSuccess(true, queue.offer(n)) - sub.request(100) - for (n ← 300 to 399) assertSuccess(true, queue.offer(n)) - sub.cancel() - } - - "not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped { - val s = TestSubscriber.manualProbe[Int]() - val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - sub.request(1) - assertSuccess(true, queue.offer(1)) - s.expectNext(1) - sub.cancel() - } - - "return false when can reject element to buffer" in assertAllStagesStopped { - val s = TestSubscriber.manualProbe[Int]() - val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - assertSuccess(true, queue.offer(1)) - assertSuccess(false, queue.offer(2)) - sub.request(1) - s.expectNext(1) - sub.cancel() - } - - "wait when buffer is full and backpressure is on" in assertAllStagesStopped { - val s = TestSubscriber.manualProbe[Int]() - val queue = Source.queue(2, OverflowStrategy.backpressure).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - assertSuccess(true, queue.offer(1)) - - val addedSecond = queue.offer(2) - - addedSecond.pipeTo(testActor) - expectNoMsg(300.millis) - - sub.request(1) - s.expectNext(1) - assertSuccess(true, addedSecond) - - sub.request(1) - s.expectNext(2) - - sub.cancel() - } - - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 283a907360..a3222867c3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -6,8 +6,7 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, OverflowStrategy } -import akka.stream.OverflowStrategy.Fail.BufferOverflowException +import akka.stream.{ BufferOverflowException, ActorMaterializer, ActorMaterializerSettings, OverflowStrategy } import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 0cb262bbd9..8c0dc7b7b5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -7,7 +7,7 @@ import akka.stream.Attributes._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } -import akka.stream.{ DelayOverflowStrategy, ActorMaterializer } +import akka.stream.{ BufferOverflowException, DelayOverflowStrategy, ActorMaterializer } import scala.concurrent.Await import scala.concurrent.duration._ @@ -103,7 +103,7 @@ class FlowDelaySpec extends AkkaSpec { .withAttributes(inputBuffer(16, 16)) .runWith(TestSink.probe[Int]) .request(100) - .expectError(new DelayOverflowStrategy.Fail.BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!")) + .expectError(new BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!")) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 2249f82c13..2c6b927e11 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.actor.Status import akka.pattern.pipe -import akka.stream.ActorMaterializer +import akka.stream.{ OverflowStrategy, ActorMaterializer } import akka.stream.testkit.Utils._ import akka.stream.testkit.{ AkkaSpec, _ } @@ -97,5 +97,20 @@ class QueueSinkSpec extends AkkaSpec { queue.pull() } + "fail pull future when stream is completed" in assertAllStagesStopped { + val probe = TestPublisher.manualProbe[Int]() + val queue = Source.fromPublisher(probe).runWith(Sink.queue()) + val sub = probe.expectSubscription() + + queue.pull().pipeTo(testActor) + sub.sendNext(1) + expectMsg(Some(1)) + + sub.sendComplete() + Await.result(queue.pull(), noMsgTimeout) should be(None) + + queue.pull().onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala new file mode 100644 index 0000000000..be9605a319 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala @@ -0,0 +1,228 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.actor.{ NoSerializationVerificationNeeded, Status } +import akka.pattern.pipe +import akka.stream._ +import akka.stream.impl.QueueSource +import akka.stream.stage.OutHandler +import akka.stream.testkit.Utils._ +import akka.stream.testkit.{ AkkaSpec, TestSubscriber } +import akka.testkit.TestProbe + +import scala.concurrent.duration._ +import scala.concurrent.{ Future, _ } + +class QueueSourceSpec extends AkkaSpec { + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + val pause = 300.millis + + def assertSuccess(f: Future[QueueOfferResult]): Unit = { + f pipeTo testActor + expectMsg(QueueOfferResult.Enqueued) + } + + object SourceTestMessages { + case object Pull extends NoSerializationVerificationNeeded + case object Finish extends NoSerializationVerificationNeeded + } + + def testSource(maxBuffer: Int, overflowStrategy: OverflowStrategy, probe: TestProbe): Source[Int, SourceQueue[Int]] = { + class QueueSourceTestStage(maxBuffer: Int, overflowStrategy: OverflowStrategy) + extends QueueSource[Int](maxBuffer, overflowStrategy) { + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val (logic, inputStream) = super.createLogicAndMaterializedValue(inheritedAttributes) + val outHandler = logic.handlers(out.id).asInstanceOf[OutHandler] + logic.handlers(out.id) = new OutHandler { + override def onPull(): Unit = { + probe.ref ! SourceTestMessages.Pull + outHandler.onPull() + } + override def onDownstreamFinish(): Unit = { + probe.ref ! SourceTestMessages.Finish + outHandler.onDownstreamFinish() + } + + } + (logic, inputStream) + } + } + Source.fromGraph(new QueueSourceTestStage(maxBuffer, overflowStrategy)) + } + + "A QueueSourceSpec" must { + + "emit received messages to the stream" in { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + for (i ← 1 to 3) { + sub.request(1) + assertSuccess(queue.offer(i)) + s.expectNext(i) + } + + queue.watchCompletion().pipeTo(testActor) + expectNoMsg(pause) + + sub.cancel() + expectMsg(()) + } + + "buffer when needed" in { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + for (n ← 1 to 20) assertSuccess(queue.offer(n)) + sub.request(10) + for (n ← 1 to 10) assertSuccess(queue.offer(n)) + sub.request(10) + for (n ← 11 to 20) assertSuccess(queue.offer(n)) + + for (n ← 200 to 399) assertSuccess(queue.offer(n)) + sub.request(100) + for (n ← 300 to 399) assertSuccess(queue.offer(n)) + sub.cancel() + } + + "not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + sub.request(1) + + assertSuccess(queue.offer(1)) + + sub.cancel() + } + + "wait for demand when buffer is 0" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + queue.offer(1).pipeTo(testActor) + expectNoMsg(pause) + sub.request(1) + expectMsg(QueueOfferResult.Enqueued) + s.expectNext(1) + sub.cancel() + } + + "finish offer and complete futures when stream completed" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + + queue.watchCompletion.pipeTo(testActor) + queue.offer(1) pipeTo testActor + expectNoMsg(pause) + + sub.cancel() + + expectMsgAllOf(QueueOfferResult.QueueClosed, ()) + } + + "fail stream on buffer overflow in fail mode" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + s.expectSubscription + + queue.offer(1) + queue.offer(2) + s.expectError() + } + + "remember pull from downstream to send offered element immediately" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val probe = TestProbe() + val queue = testSource(1, OverflowStrategy.dropHead, probe).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + + sub.request(1) + probe.expectMsg(SourceTestMessages.Pull) + assertSuccess(queue.offer(1)) + s.expectNext(1) + sub.cancel() + } + + "fail offer future if user does not wait in backpressure mode" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(5, OverflowStrategy.backpressure).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + + for (i ← 1 to 5) assertSuccess(queue.offer(i)) + + queue.offer(6).pipeTo(testActor) + expectNoMsg(pause) + + val future = queue.offer(7) + future.onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + future.onSuccess { case _ ⇒ fail() } + Await.ready(future, pause) + + sub.request(1) + s.expectNext(1) + expectMsg(QueueOfferResult.Enqueued) + sub.cancel() + } + + "complete watching future with failure if stream failed" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + queue.watchCompletion().pipeTo(testActor) + queue.offer(1) //need to wait when first offer is done as initialization can be done in this moment + queue.offer(2) + expectMsgClass(classOf[Status.Failure]) + } + + "return false when elemen was not added to buffer" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + + queue.offer(1) + queue.offer(2) pipeTo testActor + expectMsg(QueueOfferResult.Dropped) + + sub.request(1) + s.expectNext(1) + sub.cancel() + } + + "wait when buffer is full and backpressure is on" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.backpressure).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + assertSuccess(queue.offer(1)) + + queue.offer(2) pipeTo testActor + expectNoMsg(pause) + + sub.request(1) + s.expectNext(1) + + sub.request(1) + s.expectNext(2) + expectMsg(QueueOfferResult.Enqueued) + + sub.cancel() + } + + "fail offer future when stream is completed" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val queue = Source.queue(1, OverflowStrategy.dropNew).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + queue.watchCompletion().pipeTo(testActor) + sub.cancel() + expectMsg(()) + + queue.offer(1).onFailure { case e ⇒ e.isInstanceOf[IllegalStateException] should ===(true) } + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index 25f0fb5c89..e900d46e2f 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -3,48 +3,53 @@ */ package akka.stream +import OverflowStrategies._ + /** - * Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element. + * Represents a strategy that decides how to deal with a buffer of time based stage + * that is full but is about to receive a new element. */ -sealed abstract class OverflowStrategy extends Serializable -sealed trait DelayOverflowStrategy extends Serializable +sealed abstract class DelayOverflowStrategy extends Serializable -private[akka] trait BaseOverflowStrategy { +final case class BufferOverflowException(msg: String) extends RuntimeException(msg) +/** + * Represents a strategy that decides how to deal with a buffer that is full but is + * about to receive a new element. + */ +sealed abstract class OverflowStrategy extends DelayOverflowStrategy +private[akka] object OverflowStrategies { /** * INTERNAL API */ - private[akka] case object DropHead extends OverflowStrategy with DelayOverflowStrategy - + private[akka] case object DropHead extends OverflowStrategy /** * INTERNAL API */ - private[akka] case object DropTail extends OverflowStrategy with DelayOverflowStrategy - + private[akka] case object DropTail extends OverflowStrategy /** * INTERNAL API */ - private[akka] case object DropBuffer extends OverflowStrategy with DelayOverflowStrategy - + private[akka] case object DropBuffer extends OverflowStrategy /** * INTERNAL API */ - private[akka] case object DropNew extends OverflowStrategy with DelayOverflowStrategy - + private[akka] case object DropNew extends OverflowStrategy /** * INTERNAL API */ - private[akka] case object Backpressure extends OverflowStrategy with DelayOverflowStrategy - + private[akka] case object Backpressure extends OverflowStrategy /** * INTERNAL API */ - private[akka] case object Fail extends OverflowStrategy with DelayOverflowStrategy { - final case class BufferOverflowException(msg: String) extends RuntimeException(msg) - } + private[akka] case object Fail extends OverflowStrategy + /** + * INTERNAL API + */ + private[akka] case object EmitEarly extends DelayOverflowStrategy } -object OverflowStrategy extends BaseOverflowStrategy { +object OverflowStrategy { /** * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. @@ -79,12 +84,7 @@ object OverflowStrategy extends BaseOverflowStrategy { def fail: OverflowStrategy = Fail } -object DelayOverflowStrategy extends BaseOverflowStrategy { - /** - * INTERNAL API - */ - private[akka] case object EmitEarly extends DelayOverflowStrategy - +object DelayOverflowStrategy { /** * If the buffer is full when a new element is available this strategy send next element downstream without waiting */ diff --git a/akka-stream/src/main/scala/akka/stream/Queue.scala b/akka-stream/src/main/scala/akka/stream/Queue.scala index 919edba4f3..ddd9eed8d5 100644 --- a/akka-stream/src/main/scala/akka/stream/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/Queue.scala @@ -12,13 +12,21 @@ trait SourceQueue[T] { /** * Method offers next element to a stream and returns future that: - * - competes with true if element is consumed by a stream - * - competes with false when stream dropped offered element - * - fails if stream is completed or cancelled. + * - completes with `Enqueued` if element is consumed by a stream + * - completes with `Dropped` when stream dropped offered element + * - completes with `QueueClosed` when stream is completed during future is active + * - completes with `Failure(f)` when failure to enqueue element from upstream + * - fails when stream is completed or you cannot call offer in this moment because of implementation rules + * (like for backpressure mode and full buffer you need to wait for last offer call Future completion) * * @param elem element to send to a stream */ - def offer(elem: T): Future[Boolean] + def offer(elem: T): Future[QueueOfferResult] + + /** + * Method returns future that completes when stream is completed and fails when stream failed + */ + def watchCompletion(): Future[Unit] } /** @@ -35,3 +43,33 @@ trait SinkQueue[T] { */ def pull(): Future[Option[T]] } + +sealed abstract class QueueOfferResult + +/** + * Contains types that is used as return types for async callbacks to streams + */ +object QueueOfferResult { + + /** + * Type is used to indicate that stream is successfully enqueued an element + */ + final case object Enqueued extends QueueOfferResult + + /** + * Type is used to indicate that stream is dropped an element + */ + final case object Dropped extends QueueOfferResult + + /** + * Type is used to indicate that stream is failed before or during call to the stream + * @param cause - exception that stream failed with + */ + final case class Failure(cause: Throwable) extends QueueOfferResult + + /** + * Type is used to indicate that stream is completed before call + */ + case object QueueClosed extends QueueOfferResult +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala deleted file mode 100644 index 1f7ed0e1e1..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.impl - -import akka.actor.{ ActorRef, Props } -import akka.stream.OverflowStrategy -import akka.stream.OverflowStrategy._ -import akka.stream.actor.ActorPublisherMessage.Request -import akka.stream.impl.AcknowledgePublisher.{ Rejected, Ok } - -/** - * INTERNAL API - */ -private[akka] object AcknowledgePublisher { - def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = - Props(new AcknowledgePublisher(bufferSize, overflowStrategy)) - - case class Ok() - case class Rejected() -} - -/** - * INTERNAL API - */ -private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: OverflowStrategy) - extends ActorRefSourceActor(bufferSize, overflowStrategy) { - - var backpressedElem: Option[ActorRef] = None - - override def requestElem: Receive = { - case _: Request ⇒ - // totalDemand is tracked by super - if (bufferSize != 0) - while (totalDemand > 0L && !buffer.isEmpty) { - //if buffer is full - sent ack message to sender in case of Backpressure mode - if (buffer.isFull) backpressedElem match { - case Some(ref) ⇒ - ref ! Ok(); backpressedElem = None - case None ⇒ //do nothing - } - onNext(buffer.dequeue()) - } - } - - override def receiveElem: Receive = { - case elem if isActive ⇒ - if (totalDemand > 0L) { - onNext(elem) - sendAck(true) - } else if (bufferSize == 0) { - log.debug("Dropping element because there is no downstream demand: [{}]", elem) - sendAck(false) - } else if (!buffer.isFull) - enqueueAndSendAck(elem) - else (overflowStrategy: @unchecked) match { - case DropHead ⇒ - log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") - buffer.dropHead() - enqueueAndSendAck(elem) - case DropTail ⇒ - log.debug("Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") - buffer.dropTail() - enqueueAndSendAck(elem) - case DropBuffer ⇒ - log.debug("Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") - buffer.clear() - enqueueAndSendAck(elem) - case DropNew ⇒ - log.debug("Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") - sendAck(false) - case Fail ⇒ - log.error("Failing because buffer is full and overflowStrategy is: [Fail]") - onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) - case Backpressure ⇒ - log.debug("Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") - sendAck(false) //does not allow to send more than buffer size - } - } - - def enqueueAndSendAck(elem: Any): Unit = { - buffer.enqueue(elem) - if (buffer.isFull && overflowStrategy == Backpressure) backpressedElem = Some(sender) - else sendAck(true) - } - - def sendAck(isOk: Boolean): Unit = { - val msg = if (isOk) Ok() else Rejected() - context.sender() ! msg - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 1e9c35ed4e..32dabdbe10 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -6,14 +6,15 @@ package akka.stream.impl import akka.actor.ActorLogging import akka.actor.Props import akka.actor.Status -import akka.stream.OverflowStrategy +import akka.stream.OverflowStrategies._ +import akka.stream.{ BufferOverflowException, OverflowStrategy, OverflowStrategies } /** * INTERNAL API */ private[akka] object ActorRefSourceActor { def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = { - require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported") + require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") Props(new ActorRefSourceActor(bufferSize, overflowStrategy)) } } @@ -58,7 +59,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf log.debug("Dropping element because there is no downstream demand: [{}]", elem) else if (!buffer.isFull) buffer.enqueue(elem) - else (overflowStrategy: @unchecked) match { + else overflowStrategy match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() @@ -76,7 +77,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf log.debug("Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") case Fail ⇒ log.error("Failing because buffer is full and overflowStrategy is: [Fail]") - onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) + onErrorThenStop(new BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) case Backpressure ⇒ // there is a precondition check in Source.actorRefSource factory method log.debug("Backpressuring because buffer is full and overflowStrategy is: [Backpressure]") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 1f5a737656..90455ebf6e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -3,19 +3,14 @@ */ package akka.stream.impl -import java.util.concurrent.atomic.AtomicInteger - import akka.NotUsed import akka.actor._ import akka.stream._ -import akka.stream.impl.AcknowledgePublisher.{ Ok, Rejected } import akka.stream.impl.StreamLayout.Module -import akka.util.Timeout import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.duration.{ FiniteDuration, _ } -import scala.concurrent.{ Future, Promise } +import scala.concurrent.Promise import scala.language.postfixOps /** @@ -122,32 +117,3 @@ private[akka] final class ActorRefSource[Out]( override def withAttributes(attr: Attributes): Module = new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) } - -/** - * INTERNAL API - */ -private[akka] final class AcknowledgeSource[Out](bufferSize: Int, overflowStrategy: OverflowStrategy, - val attributes: Attributes, shape: SourceShape[Out], - timeout: FiniteDuration = 5 seconds) - extends SourceModule[Out, SourceQueue[Out]](shape) { - - override def create(context: MaterializationContext) = { - import akka.pattern.ask - val ref = ActorMaterializer.downcast(context.materializer).actorOf(context, - AcknowledgePublisher.props(bufferSize, overflowStrategy)) - implicit val t = Timeout(timeout) - - (akka.stream.actor.ActorPublisher[Out](ref), new SourceQueue[Out] { - implicit val ctx = context.materializer.executionContext - override def offer(out: Out): Future[Boolean] = (ref ? out).map { - case Ok() ⇒ true - case Rejected() ⇒ false - } - }) - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, SourceQueue[Out]] = - new AcknowledgeSource[Out](bufferSize, overflowStrategy, attributes, shape, timeout) - override def withAttributes(attr: Attributes): Module = - new AcknowledgeSource(bufferSize, overflowStrategy, attr, amendShape(attr), timeout) -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 70ff5b0008..739df1def1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -10,7 +10,7 @@ import akka.actor.{ ActorRef, Props } import akka.stream.Attributes.InputBuffer import akka.stream._ import akka.stream.impl.StreamLayout.Module -import akka.stream.stage.{ AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler } +import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance @@ -243,11 +243,7 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV * INTERNAL API */ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] { - trait RequestElementCallback[E] { - val requestElement = new AtomicReference[AnyRef](Nil) - } - - type Requested[E] = Promise[Option[T]] + type Requested[E] = Promise[Option[E]] val in = Inlet[T]("queueSink.in") override val shape: SinkShape[T] = SinkShape.of(in) @@ -261,15 +257,17 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1) var currentRequest: Option[Requested[T]] = None - val stageLogic = new GraphStageLogic(shape) with RequestElementCallback[Requested[T]] { + val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] { override def preStart(): Unit = { setKeepGoing(true) - val list = requestElement.getAndSet(callback.invoke _).asInstanceOf[List[Requested[T]]] - list.reverse.foreach(callback.invoke) + initCallback(callback.invoke) pull(in) } + override def postStop(): Unit = stopCallback(promise ⇒ + promise.failure(new IllegalStateException("Stream is terminated. QueueSink is detached"))) + private val callback: AsyncCallback[Requested[T]] = getAsyncCallback(promise ⇒ currentRequest match { case Some(_) ⇒ @@ -311,17 +309,8 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS (stageLogic, new SinkQueue[T] { override def pull(): Future[Option[T]] = { - val ref = stageLogic.requestElement val p = Promise[Option[T]] - ref.get() match { - case l: List[_] ⇒ - if (!ref.compareAndSet(l, p :: l)) - ref.get() match { - case _: List[_] ⇒ throw new IllegalStateException("Concurrent call of SinkQueue.pull() is detected") - case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p) - } - case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p) - } + stageLogic.invoke(p) p.future } }) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala new file mode 100644 index 0000000000..e3eda50bca --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -0,0 +1,127 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.OverflowStrategies._ +import akka.stream._ +import akka.stream.stage._ +import scala.concurrent.{ Future, Promise } + +/** + * INTERNAL API + */ +private[akka] class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueue[T]] { + type Offered = Promise[QueueOfferResult] + + val out = Outlet[T]("queueSource.out") + override val shape: SourceShape[T] = SourceShape.of(out) + val completion = Promise[Unit] + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[(T, Offered)] { + val buffer = if (maxBuffer == 0) null else FixedSizeBuffer[T](maxBuffer) + var pendingOffer: Option[(T, Offered)] = None + var pulled = false + + override def preStart(): Unit = initCallback(callback.invoke) + override def postStop(): Unit = stopCallback { + case (elem, promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached")) + } + + private def enqueueAndSuccess(elem: T, promise: Offered): Unit = { + buffer.enqueue(elem) + promise.success(QueueOfferResult.Enqueued) + } + + private def bufferElem(elem: T, promise: Offered): Unit = { + if (!buffer.isFull) { + enqueueAndSuccess(elem, promise) + } else overflowStrategy match { + case DropHead ⇒ + buffer.dropHead() + enqueueAndSuccess(elem, promise) + case DropTail ⇒ + buffer.dropTail() + enqueueAndSuccess(elem, promise) + case DropBuffer ⇒ + buffer.clear() + enqueueAndSuccess(elem, promise) + case DropNew ⇒ + promise.success(QueueOfferResult.Dropped) + case Fail ⇒ + val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") + promise.success(QueueOfferResult.Failure(bufferOverflowException)) + completion.failure(bufferOverflowException) + failStage(bufferOverflowException) + case Backpressure ⇒ + pendingOffer match { + case Some(_) ⇒ + promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) + case None ⇒ + pendingOffer = Some((elem, promise)) + } + } + } + + private val callback: AsyncCallback[(T, Offered)] = getAsyncCallback(tuple ⇒ { + val (elem, promise) = tuple + + if (maxBuffer != 0) { + bufferElem(elem, promise) + if (pulled) { + push(out, buffer.dequeue()) + pulled = false + } + } else if (pulled) { + push(out, elem) + pulled = false + promise.success(QueueOfferResult.Enqueued) + } else pendingOffer = Some(tuple) + }) + + setHandler(out, new OutHandler { + override def onDownstreamFinish(): Unit = { + pendingOffer match { + case Some((elem, promise)) ⇒ + promise.success(QueueOfferResult.QueueClosed) + pendingOffer = None + case None ⇒ // do nothing + } + completion.success(()) + completeStage() + } + + override def onPull(): Unit = { + if (maxBuffer == 0) + pendingOffer match { + case Some((elem, promise)) ⇒ + push(out, elem) + promise.success(QueueOfferResult.Enqueued) + pendingOffer = None + case None ⇒ pulled = true + } + else if (!buffer.isEmpty) { + push(out, buffer.dequeue()) + pendingOffer match { + case Some((elem, promise)) ⇒ + enqueueAndSuccess(elem, promise) + pendingOffer = None + case None ⇒ //do nothing + } + } else pulled = true + } + }) + } + + (stageLogic, new SourceQueue[T] { + override def watchCompletion() = completion.future + override def offer(element: T): Future[QueueOfferResult] = { + val p = Promise[QueueOfferResult]() + stageLogic.invoke((element, p)) + p.future + } + }) + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index e33ee8481c..bfc73d3a78 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -87,7 +87,7 @@ private[stream] object Stages { val subscriberSource = name("subscriberSource") val actorPublisherSource = name("actorPublisherSource") val actorRefSource = name("actorRefSource") - val acknowledgeSource = name("acknowledgeSource") + val queueSource = name("queueSource") val inputStreamSource = name("inputStreamSource") and IODispatcher val outputStreamSource = name("outputStreamSource") and IODispatcher val fileSource = name("fileSource") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index c1074b6c45..5a659d64b9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,8 +6,8 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } -import akka.stream.DelayOverflowStrategy.EmitEarly import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ @@ -372,8 +372,6 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta */ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] { - import OverflowStrategy._ - private val buffer = FixedSizeBuffer[T](size) override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = @@ -394,8 +392,8 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt if (buffer.isEmpty) ctx.finish() else ctx.absorbTermination() - val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { - (overflowStrategy: @unchecked) match { + val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = + overflowStrategy match { case DropHead ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.dropHead() buffer.enqueue(elem) @@ -416,13 +414,13 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt if (buffer.isFull) ctx.holdUpstream() else ctx.pull() case Fail ⇒ (ctx, elem) ⇒ - if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) + if (buffer.isFull) ctx.fail(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) else { buffer.enqueue(elem) ctx.pull() } } - } + } /** @@ -961,7 +959,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS setHandler(in, handler = new InHandler { //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full override def onPush(): Unit = { - if (buffer.isFull) (strategy: @unchecked) match { + if (buffer.isFull) strategy match { case EmitEarly ⇒ if (!isTimerActive(timerName)) push(out, buffer.dequeue()._2) @@ -969,24 +967,24 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS cancelTimer(timerName) onTimer(timerName) } - case DelayOverflowStrategy.DropHead ⇒ + case DropHead ⇒ buffer.dropHead() grabAndPull(true) - case DelayOverflowStrategy.DropTail ⇒ + case DropTail ⇒ buffer.dropTail() grabAndPull(true) - case DelayOverflowStrategy.DropNew ⇒ + case DropNew ⇒ grab(in) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) - case DelayOverflowStrategy.DropBuffer ⇒ + case DropBuffer ⇒ buffer.clear() grabAndPull(true) - case DelayOverflowStrategy.Fail ⇒ - failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) - case DelayOverflowStrategy.Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") + case Fail ⇒ + failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) + case Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } else { - grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1) + grabAndPull(strategy != Backpressure || buffer.size < size - 1) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2aa378591e..8ca7c92d20 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -272,28 +272,34 @@ object Source { /** * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, - * otherwise they will be buffered until request for demand is received. + * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded + * if downstream is terminated. * * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true - * if element was added to buffer or sent downstream. It completes - * with false if element was dropped. + * [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)` + * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete + * with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] + * when downstream is completed. * - * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full. + * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` + * call when buffer is full. * - * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped - * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does - * not matter. + * You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]]. + * It returns future that completes with success when stream is completed or fail when stream is failed. * - * @param bufferSize The size of the buffer in element count + * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand. + * When `bufferSize` is 0 the `overflowStrategy` does not matter. + * + * SourceQueue that current source is materialized to is for single thread usage only. + * + * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer - * @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]`` */ - def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration): Source[T, SourceQueue[T]] = - new Source(scaladsl.Source.queue(bufferSize, overflowStrategy, timeout)) + def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueue[T]] = + new Source(scaladsl.Source.queue(bufferSize, overflowStrategy)) } @@ -1263,7 +1269,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * @param seed Provides the first state for a batched value using the first unconsumed element as a start * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ - def batch[S](max: Long, seed: function.Function[Out, S],aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + def batch[S](max: Long, seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = new Source(delegate.batch(max, seed.apply)(aggregate.apply)) /** @@ -1294,7 +1300,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * @param seed Provides the first state for a batched value using the first unconsumed element as a start * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ - def batchWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S],aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + def batchWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = new Source(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 339e9b94ba..06b5e5cd4a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -907,8 +907,8 @@ trait FlowOps[+Out, +Mat] { * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) - //FIXME: conflate can be expressed as a batch - //via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) + //FIXME: conflate can be expressed as a batch + //via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 88e32bd8f6..870774e629 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -361,7 +361,7 @@ object Source { */ def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") - require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported") + require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) } @@ -387,29 +387,33 @@ object Source { /** * Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, - * otherwise they will be buffered until request for demand is received. + * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded + * if downstream is terminated. * * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if * there is no space available in the buffer. * * Acknowledgement mechanism is available. - * [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true - * if element was added to buffer or sent downstream. It completes - * with false if element was dropped. + * [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)` + * if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete + * with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]] + * when downstream is completed. * - * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full. + * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` + * call when buffer is full. * - * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped - * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does - * not matter. + * You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]]. + * It returns future that completes with success when stream is completed or fail when stream is failed. * - * @param bufferSize The size of the buffer in element count + * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand. + * When `bufferSize` is 0 the `overflowStrategy` does not matter. + * + * SourceQueue that current source is materialized to is for single thread usage only. + * + * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer - * @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]`` */ - def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration = 5.seconds): Source[T, SourceQueue[T]] = { - require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") - new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource"))) - } + def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueue[T]] = + Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource)) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 0dddad277b..5e2e3cb55a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -3,23 +3,21 @@ */ package akka.stream.stage -import java.util import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicReference } import akka.NotUsed +import java.util.concurrent.locks.ReentrantLock import akka.actor._ import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Unwatch, Watch } import akka.event.LoggingAdapter import akka.japi.function.{ Effect, Procedure } import akka.stream._ import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly -import akka.stream.impl.fusing.{ GraphInterpreter, GraphModule, GraphStageModule, SubSource, SubSink } +import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSource, SubSink } import akka.stream.impl.{ ReactiveStreamsCompliance, SeqActorName } import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration -import akka.stream.impl.SubscriptionTimeoutException import akka.stream.actor.ActorSubscriberMessage import akka.stream.actor.ActorPublisherMessage @@ -1273,3 +1271,55 @@ abstract class AbstractOutHandler extends OutHandler * (completing when upstream completes, failing when upstream fails, completing when downstream cancels). */ abstract class AbstractInOutHandler extends InHandler with OutHandler + +/** + * INTERNAL API + * This trait wraps callback for `GraphStage` stage instances and handle gracefully cases when stage is + * not yet initialized or already finished. + * + * While `GraphStage` has not initialized it adds all requests to list. + * As soon as `GraphStage` is started it stops collecting requests (pointing to real callback + * function) and run all the callbacks from the list + * + * Supposed to be used by GraphStages that share call back to outer world + */ +private[akka] trait CallbackWrapper[T] extends AsyncCallback[T] { + private trait CallbackState + private case class NotInitialized(list: List[T]) extends CallbackState + private case class Initialized(f: T ⇒ Unit) extends CallbackState + private case class Stopped(f: T ⇒ Unit) extends CallbackState + + /* + * To preserve message order when switching between not initialized / initialized states + * lock is used. Case is similar to RepointableActorRef + */ + private[this] final val lock = new ReentrantLock + + private[this] val callbackState = new AtomicReference[CallbackState](NotInitialized(Nil)) + + def stopCallback(f: T ⇒ Unit): Unit = locked { + callbackState.set(Stopped(f)) + } + + def initCallback(f: T ⇒ Unit): Unit = locked { + val list = (callbackState.getAndSet(Initialized(f)): @unchecked) match { + case NotInitialized(l) ⇒ l + } + list.reverse.foreach(f) + } + + override def invoke(arg: T): Unit = locked { + callbackState.get() match { + case Initialized(cb) ⇒ cb(arg) + case list @ NotInitialized(l) ⇒ callbackState.compareAndSet(list, NotInitialized(arg :: l)) + case Stopped(cb) ⇒ + lock.unlock() + cb(arg) + } + } + + private[this] def locked(body: ⇒ Unit): Unit = { + lock.lock() + try body finally if (lock.isLocked) lock.unlock() + } +} \ No newline at end of file