diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala similarity index 69% rename from akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 1b32bdceec..510e773a8a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AcknowledgeSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -4,16 +4,16 @@ package akka.stream.scaladsl import akka.actor.Status -import akka.pattern.{ AskTimeoutException, pipe } +import akka.pattern.pipe import akka.stream.ActorMaterializer import akka.stream.testkit.Utils._ import akka.stream.testkit.{ AkkaSpec, _ } +import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.{ Await, Future } import scala.util.control.NoStackTrace -class AcknowledgeSinkSpec extends AkkaSpec { +class QueueSinkSpec extends AkkaSpec { implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() @@ -21,11 +21,11 @@ class AcknowledgeSinkSpec extends AkkaSpec { val noMsgTimeout = 300.millis - "An AcknowledgeSink" must { + "An QueueSinkSpec" must { "send the elements as result of future" in assertAllStagesStopped { val expected = List(Some(1), Some(2), Some(3), None) - val queue = Source(expected.flatten).runWith(Sink.queue(3)) + val queue = Source(expected.flatten).runWith(Sink.queue()) expected foreach { v ⇒ queue.pull() pipeTo testActor expectMsg(v) @@ -34,7 +34,7 @@ class AcknowledgeSinkSpec extends AkkaSpec { "allow to have only one future waiting for result in each point of time" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(3)) + val queue = Source(probe).runWith(Sink.queue()) val sub = probe.expectSubscription() val future = queue.pull() val future2 = queue.pull() @@ -45,11 +45,12 @@ class AcknowledgeSinkSpec extends AkkaSpec { expectMsg(Some(1)) sub.sendComplete() + queue.pull() } "wait for next element from upstream" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(3)) + val queue = Source(probe).runWith(Sink.queue()) val sub = probe.expectSubscription() queue.pull().pipeTo(testActor) @@ -58,11 +59,12 @@ class AcknowledgeSinkSpec extends AkkaSpec { sub.sendNext(1) expectMsg(Some(1)) sub.sendComplete() + queue.pull() } "fail future on stream failure" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(3)) + val queue = Source(probe).runWith(Sink.queue()) val sub = probe.expectSubscription() queue.pull().pipeTo(testActor) @@ -74,16 +76,17 @@ class AcknowledgeSinkSpec extends AkkaSpec { "fail future when stream failed" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(3, 100.millis)) - val sub = probe.expectSubscription() - sub.sendError(ex) // potential race condition - an[AskTimeoutException] shouldBe thrownBy { Await.result(queue.pull(), 1.second) } + val queue = Source(probe).runWith(Sink.queue()) + val sub = probe.expectSubscription() + sub.sendError(ex) + + the[Exception] thrownBy { Await.result(queue.pull(), 300.millis) } should be(ex) } "timeout future when stream cannot provide data" in assertAllStagesStopped { val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(3)) + val queue = Source(probe).runWith(Sink.queue()) val sub = probe.expectSubscription() queue.pull().pipeTo(testActor) @@ -92,18 +95,7 @@ class AcknowledgeSinkSpec extends AkkaSpec { sub.sendNext(1) expectMsg(Some(1)) sub.sendComplete() - } - - "work when buffer is 0" in assertAllStagesStopped { - val probe = TestPublisher.manualProbe[Int]() - val queue = Source(probe).runWith(Sink.queue(0)) - val sub = probe.expectSubscription() - sub.sendNext(1) - - queue.pull().pipeTo(testActor) - sub.sendNext(2) - expectMsg(Some(2)) - sub.sendComplete() + queue.pull() } } diff --git a/akka-stream/src/main/scala/akka/stream/Queue.scala b/akka-stream/src/main/scala/akka/stream/Queue.scala index eed01e7a86..919edba4f3 100644 --- a/akka-stream/src/main/scala/akka/stream/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/Queue.scala @@ -29,8 +29,8 @@ trait SinkQueue[T] { /** * Method pulls elements from stream and returns future that: - * - fails if stream is finished - * - completes with None in case if stream is completed after we got future + * - fails if stream is failed + * - completes with None in case if stream is completed * - completes with `Some(element)` in case next element is available from stream. */ def pull(): Future[Option[T]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgeSubscriber.scala deleted file mode 100644 index b93073d75e..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgeSubscriber.scala +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.impl - -import akka.actor.{ ActorLogging, ActorRef, Props, Status } -import akka.stream.actor.ActorPublisherMessage.Request -import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage, RequestStrategy } - -import scala.util.{ Try, Failure, Success } - -private[akka] object AcknowledgeSubscriber { - def props(highWatermark: Int) = - Props(new AcknowledgeSubscriber(highWatermark)) -} - -/** - * INTERNAL API - */ -private[akka] class AcknowledgeSubscriber(maxBuffer: Int) extends ActorSubscriber with ActorLogging { - import ActorSubscriberMessage._ - - var buffer: Vector[Any] = Vector.empty - - override val requestStrategy = new RequestStrategy { - def requestDemand(remainingRequested: Int): Int = { - maxBuffer - buffer.size - remainingRequested - } - } - - var requester: Option[ActorRef] = None - - def receive = { - case Request(_) ⇒ - if (requester.isEmpty) { - requester = Some(sender) - trySendElementDownstream() - } else - sender ! Status.Failure( - new IllegalStateException("You have to wait for first future to be resolved to send another request")) - - case OnNext(elem) ⇒ - if (maxBuffer != 0) { - buffer :+= elem - trySendElementDownstream() - } else requester match { - case Some(ref) ⇒ - requester = None - ref ! Some(elem) - case None ⇒ log.debug("Dropping element because there is no downstream demand: [{}]", elem) - } - - case OnError(cause) ⇒ - trySendDownstream(Status.Failure(cause)) - context.stop(self) - - case OnComplete ⇒ - if (buffer.isEmpty) { - trySendDownstream(Status.Success(None)) - context.stop(self) - } - } - - def trySendElementDownstream(): Unit = { - requester match { - case Some(ref) ⇒ - if (buffer.size > 0) { - ref ! Some(buffer.head) - requester = None - buffer = buffer.tail - } else if (canceled) { - ref ! None - context.stop(self) - } - - case None ⇒ //do nothing - } - } - - def trySendDownstream(e: Any): Unit = { - requester match { - case Some(ref) ⇒ - ref ! e - case None ⇒ //do nothing - } - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 69c277fd53..008cb7ee4b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -3,17 +3,19 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.AtomicReference + import akka.actor.{ ActorRef, Props } -import akka.stream.actor.ActorPublisherMessage.Request -import akka.stream.impl.StreamLayout.Module +import akka.stream.Attributes.InputBuffer import akka.stream._ -import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, GraphStageLogic } -import akka.util.Timeout +import akka.stream.impl.StreamLayout.Module +import akka.stream.stage.{ AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler } import org.reactivestreams.{ Publisher, Subscriber } + import scala.annotation.unchecked.uncheckedVariance -import scala.concurrent.duration.{ FiniteDuration, _ } import scala.concurrent.{ Future, Promise } import scala.language.postfixOps +import scala.util.{ Failure, Success, Try } /** * INTERNAL API @@ -167,32 +169,6 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any override def toString: String = "ActorRefSink" } -/** - * INTERNAL API - */ -private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: Attributes, - shape: SinkShape[In], timeout: FiniteDuration) extends SinkModule[In, SinkQueue[In]](shape) { - - override def create(context: MaterializationContext) = { - import akka.pattern.ask - val actorMaterializer = ActorMaterializer.downcast(context.materializer) - - implicit val t = Timeout(timeout) - val subscriberRef = actorMaterializer.actorOf(context, - AcknowledgeSubscriber.props(bufferSize)) - (akka.stream.actor.ActorSubscriber[In](subscriberRef), - new SinkQueue[In] { - override def pull(): Future[Option[In]] = (subscriberRef ? Request(1)).mapTo[Option[In]] - }) - } - - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, SinkQueue[In]] = - new AcknowledgeSink[In](bufferSize, attributes, shape, timeout) - override def withAttributes(attr: Attributes): Module = - new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout) - override def toString: String = "AcknowledgeSink" -} - private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { val in = Inlet[T]("lastOption.in") @@ -257,3 +233,92 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV }, p.future) } } + +/** + * INTERNAL API + */ +private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] { + trait RequestElementCallback[E] { + val requestElement = new AtomicReference[AnyRef](Nil) + } + + type Requested[E] = Promise[Option[T]] + + val in = Inlet[T]("queueSink.in") + override val shape: SinkShape[T] = SinkShape.of(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + type Received[E] = Try[Option[E]] + + val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + + val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1) + var currentRequest: Option[Requested[T]] = None + + val stageLogic = new GraphStageLogic(shape) with RequestElementCallback[Requested[T]] { + override def keepGoingAfterAllPortsClosed = true + + override def preStart(): Unit = { + val list = requestElement.getAndSet(callback.invoke _).asInstanceOf[List[Requested[T]]] + list.reverse.foreach(callback.invoke) + pull(in) + } + + private val callback: AsyncCallback[Requested[T]] = + getAsyncCallback(promise ⇒ currentRequest match { + case Some(_) ⇒ + promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request")) + case None ⇒ + if (buffer.isEmpty) currentRequest = Some(promise) + else sendDownstream(promise) + }) + + def sendDownstream(promise: Requested[T]): Unit = { + val e = buffer.dequeue() + promise.complete(e) + e match { + case Success(_: Some[_]) ⇒ //do nothing + case Success(None) ⇒ completeStage() + case Failure(t) ⇒ failStage(t) + } + } + + def enqueueAndNotify(requested: Received[T]): Unit = { + buffer.enqueue(requested) + currentRequest match { + case Some(p) ⇒ + sendDownstream(p) + currentRequest = None + case None ⇒ //do nothing + } + } + + setHandler(in, new InHandler { + override def onPush(): Unit = { + enqueueAndNotify(Success(Some(grab(in)))) + if (buffer.used < maxBuffer - 1) pull(in) + } + override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) + override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex)) + }) + } + + (stageLogic, new SinkQueue[T] { + override def pull(): Future[Option[T]] = { + val ref = stageLogic.requestElement + val p = Promise[Option[T]] + ref.get() match { + case l: List[_] ⇒ + if (!ref.compareAndSet(l, p :: l)) + ref.get() match { + case _: List[_] ⇒ throw new IllegalStateException("Concurrent call of SinkQueue.pull() is detected") + case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p) + } + case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p) + } + p.future + } + }) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 2c5ab5a1dc..fbd8faa6ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -95,7 +95,7 @@ private[stream] object Stages { val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") val actorSubscriberSink = name("actorSubscriberSink") - val acknowledgeSink = name("acknowledgeSink") + val queueSink = name("queueSink") val outputStreamSink = name("outputStreamSink") and IODispatcher val inputStreamSink = name("inputStreamSink") and IODispatcher val fileSink = name("fileSource") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 92d68d98cd..ff2ee432ad 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1154,7 +1154,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create(that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index b190a514a8..3e864ab9f6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -203,14 +203,20 @@ object Sink { * [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. * `Future` completes when element is available. * - * `Sink` will request at most `bufferSize` number of elements from - * upstream and then stop back pressure. + * Before calling pull method second time you need to wait until previous Future completes. + * Pull returns Failed future with ''IllegalStateException'' if previous future has not yet completed. * - * @param bufferSize The size of the buffer in element count - * @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]`` + * `Sink` will request at most number of elements equal to size of `inputBuffer` from + * upstream and then stop back pressure. You can configure size of input + * buffer by using [[Sink.withAttributes]] method. + * + * For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None + * as completion marker + * + * @see [[akka.stream.SinkQueue]] */ - def queue[T](bufferSize: Int, timeout: FiniteDuration): Sink[T, SinkQueue[T]] = - new Sink(scaladsl.Sink.queue(bufferSize, timeout)) + def queue[T](): Sink[T, SinkQueue[T]] = + new Sink(scaladsl.Sink.queue()) /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 03e98123c3..4409e64782 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -270,16 +270,20 @@ object Sink { * [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``. * `Future` completes when element is available. * - * `Sink` will request at most `bufferSize` number of elements from - * upstream and then stop back pressure. + * Before calling pull method second time you need to wait until previous Future completes. + * Pull returns Failed future with ''IllegalStateException'' if previous future has not yet completed. * - * @param bufferSize The size of the buffer in element count - * @param timeout Timeout for ``SinkQueue.pull():Future[Option[T]]`` + * `Sink` will request at most number of elements equal to size of `inputBuffer` from + * upstream and then stop back pressure. You can configure size of input + * buffer by using [[Sink.withAttributes]] method. + * + * For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None + * as completion marker + * + * @see [[akka.stream.SinkQueue]] */ - def queue[T](bufferSize: Int, timeout: FiniteDuration = 5.seconds): Sink[T, SinkQueue[T]] = { - require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") - new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout)) - } + def queue[T](): Sink[T, SinkQueue[T]] = + Sink.fromGraph(new QueueSink().withAttributes(DefaultAttributes.queueSink)) /** * Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites