From e0562abba9eae232eb30c23b3c8c57ae4e430d42 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 7 Jul 2016 07:01:28 -0400 Subject: [PATCH] +str 20129 add lazySink (#20579) --- .../akka/stream/scaladsl/LazySinkSpec.scala | 147 ++++++++++++++++++ .../main/scala/akka/stream/impl/Sinks.scala | 106 ++++++++++++- .../main/scala/akka/stream/impl/Stages.scala | 1 + .../main/scala/akka/stream/javadsl/Sink.scala | 20 ++- .../scala/akka/stream/scaladsl/Sink.scala | 14 ++ 5 files changed, 283 insertions(+), 5 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala new file mode 100644 index 0000000000..ee806f3eea --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.util.concurrent.TimeoutException + +import akka.stream.ActorAttributes.supervisionStrategy +import akka.stream.Supervision._ +import akka.stream._ +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.AkkaSpec + +import scala.concurrent.{ Promise, Future, Await } +import scala.concurrent.duration._ + +class LazySinkSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + implicit val materializer = ActorMaterializer(settings) + + val fallback = () ⇒ fail("Must not call fallback function") + val ex = TE("") + + "A LazySink" must { + "work in happy case" in assertAllStagesStopped { + val futureProbe = Source(0 to 10).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) + val probe = Await.result(futureProbe, 300.millis) + probe.request(100) + (0 to 10).foreach(probe.expectNext) + } + + "work with slow sink init" in assertAllStagesStopped { + val p = Promise[Sink[Int, Probe[Int]]]() + val sourceProbe = TestPublisher.manualProbe[Int]() + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ p.future, fallback)) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectRequest(1) + sourceProbe.expectNoMsg(200.millis) + a[TimeoutException] shouldBe thrownBy { Await.result(futureProbe, 200.millis) } + + p.success(TestSink.probe[Int]) + val probe = Await.result(futureProbe, 300.millis) + probe.request(100) + probe.expectNext(0) + (1 to 10).foreach(i ⇒ { + sourceSub.sendNext(i) + probe.expectNext(i) + }) + sourceSub.sendComplete() + } + + "complete when there was no elements in stream" in assertAllStagesStopped { + val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ ⇒ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () ⇒ Future.successful(0))) + val futureResult = Await.result(futureProbe, 300.millis) + Await.result(futureResult, 300.millis) should ===(0) + } + + "complete normally when upstream is completed" in assertAllStagesStopped { + val futureProbe = Source.single(1).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) + val futureResult = Await.result(futureProbe, 300.millis) + futureResult.request(1) + .expectNext(1) + .expectComplete() + } + + "failed gracefully when sink factory method failed" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ throw ex, fallback)) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectCancellation() + a[RuntimeException] shouldBe thrownBy { Await.result(futureProbe, 300.millis) } + } + + "failed gracefully when upstream failed" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + val probe = Await.result(futureProbe, 300.millis) + probe.request(1) + .expectNext(0) + sourceSub.sendError(ex) + probe.expectError(ex) + } + + "failed gracefully when factory future failed" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.failed(ex), fallback) + .withAttributes(supervisionStrategy(stoppingDecider))) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + a[TE] shouldBe thrownBy { Await.result(futureProbe, 300.millis) } + } + + "cancel upstream when internal sink is cancelled" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectRequest(1) + val probe = Await.result(futureProbe, 300.millis) + probe.request(1) + .expectNext(0) + probe.cancel() + sourceSub.expectCancellation() + } + + "continue if supervision is resume" in assertAllStagesStopped { + val sourceProbe = TestPublisher.manualProbe[Int]() + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](a ⇒ + if (a == 0) throw ex else Future.successful(TestSink.probe[Int]), fallback) + .withAttributes(supervisionStrategy(resumingDecider))) + + val sourceSub = sourceProbe.expectSubscription() + sourceSub.expectRequest(1) + sourceSub.sendNext(0) + sourceSub.expectRequest(1) + sourceSub.sendNext(1) + val probe = Await.result(futureProbe, 300.millis) + probe.request(1) + probe.expectNext(1) + probe.cancel() + } + + "fail future when zero throws exception" in assertAllStagesStopped { + val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ ⇒ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () ⇒ throw ex)) + a[TE] shouldBe thrownBy { Await.result(futureProbe, 300.millis) } + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 840f6a5d80..60fcee909b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -3,7 +3,11 @@ */ package akka.stream.impl +import akka.dispatch.ExecutionContexts +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.Supervision.{ stoppingDecider, Stop } import akka.stream.impl.QueueSink.{ Output, Pull } +import akka.stream.impl.fusing.GraphInterpreter import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Actor, Props } import akka.stream.Attributes.InputBuffer @@ -20,11 +24,11 @@ import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.{ Promise, Future } +import scala.concurrent.{ ExecutionContext, Promise, Future } import scala.language.postfixOps import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.stream.scaladsl.{ SinkQueueWithCancel, SinkQueue } +import akka.stream.scaladsl.{ Source, Sink, SinkQueueWithCancel, SinkQueue } import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ @@ -444,3 +448,101 @@ private[akka] final class ReducerState[T, R](val collector: java.util.stream.Col def finish(): R = collector.finisher().apply(reduced) } +/** + * INTERNAL API + */ +final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], zeroMat: () ⇒ M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { + val in = Inlet[T]("lazySink.in") + override def initialAttributes = DefaultAttributes.lazySink + override val shape: SinkShape[T] = SinkShape.of(in) + + override def toString: String = "LazySink" + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider) + + val promise = Promise[M]() + val stageLogic = new GraphStageLogic(shape) with InHandler { + override def preStart(): Unit = pull(in) + + override def onPush(): Unit = { + try { + val element = grab(in) + val cb: AsyncCallback[Try[Sink[T, M]]] = getAsyncCallback { + case Success(sink) ⇒ initInternalSource(sink, element) + case Failure(e) ⇒ failure(e) + } + sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext) + } catch { + case NonFatal(e) ⇒ decider(e) match { + case Supervision.Stop ⇒ failure(e) + case _ ⇒ pull(in) + } + } + } + + private def failure(ex: Throwable): Unit = { + failStage(ex) + promise.failure(ex) + } + + override def onUpstreamFinish(): Unit = { + completeStage() + promise.tryComplete(Try(zeroMat())) + } + override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) + setHandler(in, this) + + private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = { + val sourceOut = new SubSourceOutlet[T]("LazySink") + var completed = false + + def switchToFirstElementHandlers(): Unit = { + sourceOut.setHandler(new OutHandler { + override def onPull(): Unit = { + sourceOut.push(firstElement) + if (completed) internalSourceComplete() else switchToFinalHandlers() + } + override def onDownstreamFinish(): Unit = internalSourceComplete() + }) + + setHandler(in, new InHandler { + override def onPush(): Unit = sourceOut.push(grab(in)) + override def onUpstreamFinish(): Unit = { + setKeepGoing(true) + completed = true + } + override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) + }) + } + + def switchToFinalHandlers(): Unit = { + sourceOut.setHandler(new OutHandler { + override def onPull(): Unit = pull(in) + override def onDownstreamFinish(): Unit = internalSourceComplete() + }) + setHandler(in, new InHandler { + override def onPush(): Unit = sourceOut.push(grab(in)) + override def onUpstreamFinish(): Unit = internalSourceComplete() + override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) + }) + } + + def internalSourceComplete(): Unit = { + sourceOut.complete() + completeStage() + } + + def internalSourceFailure(ex: Throwable): Unit = { + sourceOut.fail(ex) + failStage(ex) + } + + switchToFirstElementHandlers() + promise.trySuccess(Source.fromGraph(sourceOut.source).runWith(sink)(interpreter.subFusingMaterializer)) + } + + } + (stageLogic, promise.future) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index ebcd16fa71..4ae0ab8bd4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -127,6 +127,7 @@ private[stream] object Stages { val actorRefWithAck = name("actorRefWithAckSink") val actorSubscriberSink = name("actorSubscriberSink") val queueSink = name("queueSink") + val lazySink = name("lazySink") val outputStreamSink = name("outputStreamSink") and IODispatcher val inputStreamSink = name("inputStreamSink") and IODispatcher val fileSink = name("fileSink") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index cfcd9b0b11..0aa1df808d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -8,15 +8,14 @@ import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function -import akka.stream.impl.StreamLayout +import akka.stream.impl.{ LazySink, StreamLayout, SinkQueueAdapter } import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.compat.java8.OptionConverters._ import scala.concurrent.{ Future, ExecutionContext } import scala.util.Try import java.util.concurrent.CompletionStage -import scala.compat.java8.FutureConverters.FutureOps -import akka.stream.impl.SinkQueueAdapter +import scala.compat.java8.FutureConverters._ /** Java API */ object Sink { @@ -247,6 +246,21 @@ object Sink { */ def queue[T](): Sink[T, SinkQueueWithCancel[T]] = new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_))) + + /** + * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, + * because of completion or error. + * + * If `sinkFactory` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will + * try to create sink with next element + * + * `fallback` will be executed when there was no elements and completed is received from upstream. + */ + def lazyInit[T, M](sinkFactory: function.Function[T, CompletionStage[Sink[T, M]]], fallback: function.Creator[M]): Sink[T, CompletionStage[M]] = + new Sink(scaladsl.Sink.lazyInit[T, M]( + t ⇒ sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), + () ⇒ fallback.create()).mapMaterializedValue(_.toJava)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 27737f245f..a77e18f13d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -348,4 +348,18 @@ object Sink { */ def queue[T](): Sink[T, SinkQueueWithCancel[T]] = Sink.fromGraph(new QueueSink()) + + /** + * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, + * because of completion or error. + * + * If `sinkFactory` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will + * try to create sink with next element + * + * `fallback` will be executed when there was no elements and completed is received from upstream. + */ + def lazyInit[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], fallback: () ⇒ M): Sink[T, Future[M]] = + Sink.fromGraph(new LazySink(sinkFactory, fallback)) + }