diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 425d368c70..63fb195a10 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -122,7 +122,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider val flow = Flow[ByteString] - .via(Flow.lazyInit(_ ⇒ { + .via(Flow.lazyInitAsync(() ⇒ { // only open the actual connection if any new messages are sent afr.loFreq( TcpOutbound_Connected, @@ -132,7 +132,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider Flow[ByteString] .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) .via(connectionFlow)) - }, () ⇒ NotUsed)) + })) .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal ⇒ Source.empty }) .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel)) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index e5d99a438b..1e2bc55b36 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -978,16 +978,9 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseLazyInit() throws Exception { final CompletionStage> future = new CompletableFuture>(); future.toCompletableFuture().complete(Flow.fromFunction((id) -> id)); - Creator ignoreFunction = new Creator() { - @Override - public NotUsed create() throws Exception { - return NotUsed.getInstance(); - } - }; - Integer result = Source.range(1, 10) - .via(Flow.lazyInit((i) -> future, ignoreFunction)) + .via(Flow.lazyInitAsync(() -> future)) .runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 96a2b3ecd3..57d6a471f5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -195,9 +195,13 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { //LazySink must wait for result of initialization even if got upstreamComplete targetFile { f ⇒ val completion = Source(List(TestByteStrings.head)) - .runWith(Sink.lazyInit[ByteString, Future[IOResult]]( - _ ⇒ Future.successful(FileIO.toPath(f)), () ⇒ Future.successful(IOResult.createSuccessful(0))) - .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext))) + .runWith(Sink.lazyInitAsync( + () ⇒ Future.successful(FileIO.toPath(f))) + // map a Future[Option[Future[IOResult]]] into a Future[Option[IOResult]] + .mapMaterializedValue(_.flatMap { + case Some(future) ⇒ future.map(Some(_))(ExecutionContexts.sameThreadExecutionContext) + case None ⇒ Future.successful(None) + }(ExecutionContexts.sameThreadExecutionContext))) Await.result(completion, 3.seconds) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala index 5837e51bc5..24a8940c11 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala @@ -4,21 +4,15 @@ package akka.stream.scaladsl -import java.util.concurrent.TimeoutException - import akka.NotUsed -import akka.stream.ActorAttributes.supervisionStrategy -import akka.stream.Supervision._ import akka.stream._ -import akka.stream.impl.fusing.LazyFlow -import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue } -import akka.stream.testkit.{ StreamSpec, TestPublisher } -import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ -import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.testkit.{ StreamSpec, TestPublisher } +import org.scalatest.concurrent.ScalaFutures -import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ +import scala.concurrent.{ Future, Promise } class LazyFlowSpec extends StreamSpec { @@ -26,16 +20,15 @@ class LazyFlowSpec extends StreamSpec { .withInputBuffer(initialSize = 1, maxSize = 1) implicit val materializer = ActorMaterializer(settings) - val fallback = () ⇒ NotUsed val ex = TE("") "A LazyFlow" must { - def mapF(e: Int): Future[Flow[Int, String, NotUsed]] = + def mapF(e: Int): () ⇒ Future[Flow[Int, String, NotUsed]] = () ⇒ Future.successful(Flow.fromFunction[Int, String](i ⇒ (i * e).toString)) - val flowF = Future.successful(Flow.fromFunction[Int, Int](id ⇒ id)) + val flowF = Future.successful(Flow[Int]) "work in happy case" in assertAllStagesStopped { val probe = Source(2 to 10) - .via(Flow.lazyInit[Int, String, NotUsed](mapF, fallback)) + .via(Flow.lazyInitAsync[Int, String, NotUsed](mapF(2))) .runWith(TestSink.probe[String]) probe.request(100) (2 to 10).map(i ⇒ (i * 2).toString).foreach(probe.expectNext) @@ -45,7 +38,7 @@ class LazyFlowSpec extends StreamSpec { val p = Promise[Flow[Int, Int, NotUsed]]() val sourceProbe = TestPublisher.manualProbe[Int]() val flowProbe = Source.fromPublisher(sourceProbe) - .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ p.future, fallback)) + .via(Flow.lazyInitAsync[Int, Int, NotUsed](() ⇒ p.future)) .runWith(TestSink.probe[Int]) val sourceSub = sourceProbe.expectSubscription() @@ -55,7 +48,7 @@ class LazyFlowSpec extends StreamSpec { sourceSub.expectRequest(1) sourceProbe.expectNoMsg(200.millis) - p.success(Flow.fromFunction[Int, Int](id ⇒ id)) + p.success(Flow[Int]) flowProbe.request(99) flowProbe.expectNext(0) (1 to 10).foreach(i ⇒ { @@ -66,26 +59,41 @@ class LazyFlowSpec extends StreamSpec { } "complete when there was no elements in the stream" in assertAllStagesStopped { - def flowMaker(i: Int) = flowF + def flowMaker() = flowF val probe = Source.empty - .via(Flow.lazyInit(flowMaker, () ⇒ 0)) + .via(Flow.lazyInitAsync(flowMaker)) .runWith(TestSink.probe[Int]) probe.request(1).expectComplete() } - "complete normally when upstream is completed" in assertAllStagesStopped { - val probe = Source.single(1) - .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback)) - .runWith(TestSink.probe[Int]) - probe.request(1) - .expectNext(1) - .expectComplete() + "complete normally when upstream completes BEFORE the stage has switched to the inner flow" in assertAllStagesStopped { + val promise = Promise[Flow[Int, Int, NotUsed]] + val (pub, sub) = TestSource.probe[Int] + .viaMat(Flow.lazyInitAsync(() ⇒ promise.future))(Keep.left) + .toMat(TestSink.probe)(Keep.both) + .run() + sub.request(1) + pub.sendNext(1).sendComplete() + promise.success(Flow[Int]) + sub.expectNext(1).expectComplete() + } + + "complete normally when upstream completes AFTER the stage has switched to the inner flow" in assertAllStagesStopped { + val (pub, sub) = TestSource.probe[Int] + .viaMat(Flow.lazyInitAsync(() ⇒ Future.successful(Flow[Int])))(Keep.left) + .toMat(TestSink.probe)(Keep.both) + .run() + sub.request(1) + pub.sendNext(1) + sub.expectNext(1) + pub.sendComplete() + sub.expectComplete() } "fail gracefully when flow factory method failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() val probe = Source.fromPublisher(sourceProbe) - .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ throw ex, fallback)) + .via(Flow.lazyInitAsync[Int, Int, NotUsed](() ⇒ throw ex)) .runWith(TestSink.probe[Int]) val sourceSub = sourceProbe.expectSubscription() @@ -99,8 +107,8 @@ class LazyFlowSpec extends StreamSpec { "fail gracefully when upstream failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() val probe = Source.fromPublisher(sourceProbe) - .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback)) - .runWith(TestSink.probe[Int]) + .via(Flow.lazyInitAsync(() ⇒ flowF)) + .runWith(TestSink.probe) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) @@ -114,9 +122,8 @@ class LazyFlowSpec extends StreamSpec { "fail gracefully when factory future failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() val flowProbe = Source.fromPublisher(sourceProbe) - .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ Future.failed(ex), fallback)) - .withAttributes(supervisionStrategy(stoppingDecider)) - .runWith(TestSink.probe[Int]) + .via(Flow.lazyInitAsync[Int, Int, NotUsed](() ⇒ Future.failed(ex))) + .runWith(TestSink.probe) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) @@ -127,8 +134,7 @@ class LazyFlowSpec extends StreamSpec { "cancel upstream when the downstream is cancelled" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() val probe = Source.fromPublisher(sourceProbe) - .via(Flow.lazyInit[Int, Int, NotUsed](_ ⇒ flowF, fallback)) - .withAttributes(supervisionStrategy(stoppingDecider)) + .via(Flow.lazyInitAsync[Int, Int, NotUsed](() ⇒ flowF)) .runWith(TestSink.probe[Int]) val sourceSub = sourceProbe.expectSubscription() @@ -141,44 +147,17 @@ class LazyFlowSpec extends StreamSpec { sourceSub.expectCancellation() } - "continue if supervision is resume" in assertAllStagesStopped { - val sourceProbe = TestPublisher.manualProbe[Int]() - def flowBuilder(a: Int) = if (a == 0) throw ex else Future.successful(Flow.fromFunction[Int, Int](id ⇒ id)) - val probe = Source.fromPublisher(sourceProbe) - .via(Flow.lazyInit[Int, Int, NotUsed](flowBuilder, fallback)) - .withAttributes(supervisionStrategy(resumingDecider)) - .runWith(TestSink.probe[Int]) - - val sourceSub = sourceProbe.expectSubscription() - probe.request(1) - sourceSub.expectRequest(1) - sourceSub.sendNext(0) - sourceSub.expectRequest(1) - sourceSub.sendNext(1) - probe.expectNext(1) - probe.cancel() - } - - "fail correctly when materialization of inner sink fails" in assertAllStagesStopped { - val matFail = TE("fail!") - object FailingInnerMat extends GraphStageWithMaterializedValue[FlowShape[String, String], Option[String]] { - val in = Inlet[String]("in") - val out = Outlet[String]("out") - val shape = FlowShape(in, out) - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = - (new GraphStageLogic(shape) { - throw matFail - }, Some("fine")) - } - + "fail correctly when factory throw error" in assertAllStagesStopped { + val msg = "fail!" + val matFail = TE(msg) val result = Source.single("whatever") - .viaMat(Flow.lazyInit( - _ ⇒ Future.successful(Flow.fromGraph(FailingInnerMat)), - () ⇒ Some("boom")))(Keep.right) + .viaMat(Flow.lazyInitAsync(() ⇒ throw matFail))(Keep.right) .toMat(Sink.ignore)(Keep.left) .run() - result should ===(Some("boom")) + ScalaFutures.whenReady(result.failed) { e ⇒ + e.getMessage shouldBe msg + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index c1c1941f23..189023d580 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -25,13 +25,12 @@ class LazySinkSpec extends StreamSpec { .withInputBuffer(initialSize = 1, maxSize = 1) implicit val materializer = ActorMaterializer(settings) - val fallback = () ⇒ fail("Must not call fallback function") val ex = TE("") "A LazySink" must { "work in happy case" in assertAllStagesStopped { - val futureProbe = Source(0 to 10).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) - val probe = Await.result(futureProbe, 300.millis) + val futureProbe = Source(0 to 10).runWith(Sink.lazyInitAsync(() ⇒ Future.successful(TestSink.probe[Int]))) + val probe = Await.result(futureProbe, 300.millis).get probe.request(100) (0 to 10).foreach(probe.expectNext) } @@ -39,7 +38,7 @@ class LazySinkSpec extends StreamSpec { "work with slow sink init" in assertAllStagesStopped { val p = Promise[Sink[Int, Probe[Int]]]() val sourceProbe = TestPublisher.manualProbe[Int]() - val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ p.future, fallback)) + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync(() ⇒ p.future)) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) @@ -49,7 +48,7 @@ class LazySinkSpec extends StreamSpec { a[TimeoutException] shouldBe thrownBy { Await.result(futureProbe, 200.millis) } p.success(TestSink.probe[Int]) - val probe = Await.result(futureProbe, 300.millis) + val probe = Await.result(futureProbe, 300.millis).get probe.request(100) probe.expectNext(0) (1 to 10).foreach(i ⇒ { @@ -60,14 +59,14 @@ class LazySinkSpec extends StreamSpec { } "complete when there was no elements in stream" in assertAllStagesStopped { - val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ ⇒ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () ⇒ Future.successful(0))) + val futureProbe = Source.empty.runWith(Sink.lazyInitAsync(() ⇒ Future.successful(Sink.fold[Int, Int](0)(_ + _)))) val futureResult = Await.result(futureProbe, 300.millis) - Await.result(futureResult, 300.millis) should ===(0) + futureResult should ===(None) } "complete normally when upstream is completed" in assertAllStagesStopped { - val futureProbe = Source.single(1).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) - val futureResult = Await.result(futureProbe, 300.millis) + val futureProbe = Source.single(1).runWith(Sink.lazyInitAsync(() ⇒ Future.successful(TestSink.probe[Int]))) + val futureResult = Await.result(futureProbe, 300.millis).get futureResult.request(1) .expectNext(1) .expectComplete() @@ -75,7 +74,7 @@ class LazySinkSpec extends StreamSpec { "failed gracefully when sink factory method failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() - val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ throw ex, fallback)) + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync[Int, Probe[Int]](() ⇒ throw ex)) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) @@ -86,22 +85,21 @@ class LazySinkSpec extends StreamSpec { "failed gracefully when upstream failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() - val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync(() ⇒ Future.successful(TestSink.probe[Int]))) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) sourceSub.sendNext(0) - val probe = Await.result(futureProbe, 300.millis) + val probe = Await.result(futureProbe, 300.millis).get probe.request(1) .expectNext(0) sourceSub.sendError(ex) probe.expectError(ex) } - "failed gracefully when factory future failed" in assertAllStagesStopped { + "fail gracefully when factory future failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() - val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.failed(ex), fallback) - .withAttributes(supervisionStrategy(stoppingDecider))) + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync(() ⇒ Future.failed(ex))) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) @@ -111,40 +109,18 @@ class LazySinkSpec extends StreamSpec { "cancel upstream when internal sink is cancelled" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() - val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ ⇒ Future.successful(TestSink.probe[Int]), fallback)) + val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync(() ⇒ Future.successful(TestSink.probe[Int]))) val sourceSub = sourceProbe.expectSubscription() sourceSub.expectRequest(1) sourceSub.sendNext(0) sourceSub.expectRequest(1) - val probe = Await.result(futureProbe, 300.millis) + val probe = Await.result(futureProbe, 300.millis).get probe.request(1) .expectNext(0) probe.cancel() sourceSub.expectCancellation() } - "continue if supervision is resume" in assertAllStagesStopped { - val sourceProbe = TestPublisher.manualProbe[Int]() - val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](a ⇒ - if (a == 0) throw ex else Future.successful(TestSink.probe[Int]), fallback) - .withAttributes(supervisionStrategy(resumingDecider))) - - val sourceSub = sourceProbe.expectSubscription() - sourceSub.expectRequest(1) - sourceSub.sendNext(0) - sourceSub.expectRequest(1) - sourceSub.sendNext(1) - val probe = Await.result(futureProbe, 300.millis) - probe.request(1) - probe.expectNext(1) - probe.cancel() - } - - "fail future when zero throws exception" in assertAllStagesStopped { - val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ ⇒ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () ⇒ throw ex)) - a[TE] shouldBe thrownBy { Await.result(futureProbe, 300.millis) } - } - "fail correctly when materialization of inner sink fails" in assertAllStagesStopped { val matFail = TE("fail!") object FailingInnerMat extends GraphStage[SinkShape[String]] { @@ -155,11 +131,10 @@ class LazySinkSpec extends StreamSpec { } } - val result = Source.single("whatever") + val result = Source(List("whatever")) .runWith( - Sink.lazyInit[String, NotUsed]( - str ⇒ Future.successful(Sink.fromGraph(FailingInnerMat)), - () ⇒ NotUsed)) + Sink.lazyInitAsync[String, NotUsed]( + () ⇒ { println("create sink"); Future.successful(Sink.fromGraph(FailingInnerMat)) })) result.failed.futureValue should ===(matFail) } diff --git a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes index 531cacef61..1313933f9e 100644 --- a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes @@ -1,6 +1,10 @@ # #24604 Deduplicate logic for IODispatcher ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$") +# #24670 materialized value of Flow.lazyInit must be a future +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.LazyFlow.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.LazySink.this") + # #24581 RS violation ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.VirtualProcessor$Both") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProcessor#Both.create") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProcessor#Both.create") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 71c3468955..b88653c976 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -4,38 +4,31 @@ package akka.stream.impl -import akka.dispatch.ExecutionContexts -import akka.stream.ActorAttributes.SupervisionStrategy -import akka.stream.impl.QueueSink.{ Output, Pull } +import java.util.Optional +import java.util.concurrent.CompletionStage + import akka.NotUsed import akka.actor.{ ActorRef, Props } +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.dispatch.ExecutionContexts +import akka.event.Logging import akka.stream.Attributes.InputBuffer import akka.stream._ +import akka.stream.impl.QueueSink.{ Output, Pull } import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.AtomicModule - -import akka.actor.{ ActorRef, Props } -import akka.stream.Attributes.InputBuffer -import akka.stream._ +import akka.stream.scaladsl.{ Sink, SinkQueueWithCancel, Source } import akka.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance +import scala.collection.generic.CanBuildFrom import scala.collection.immutable +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.stream.scaladsl.{ Sink, SinkQueueWithCancel, Source } -import java.util.concurrent.CompletionStage - -import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ -import java.util.Optional - -import akka.annotation.{ DoNotInherit, InternalApi } -import akka.event.Logging - -import scala.collection.generic.CanBuildFrom /** * INTERNAL API @@ -456,7 +449,7 @@ import scala.collection.generic.CanBuildFrom /** * INTERNAL API */ -@InternalApi final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], zeroMat: () ⇒ M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { +@InternalApi final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[M]]] { val in = Inlet[T]("lazySink.in") override def initialAttributes = DefaultAttributes.lazySink override val shape: SinkShape[T] = SinkShape.of(in) @@ -464,102 +457,122 @@ import scala.collection.generic.CanBuildFrom override def toString: String = "LazySink" override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider - var completed = false - val promise = Promise[M]() + val promise = Promise[Option[M]]() val stageLogic = new GraphStageLogic(shape) with InHandler { + var switching = false override def preStart(): Unit = pull(in) override def onPush(): Unit = { - try { - val element = grab(in) - val cb: AsyncCallback[Try[Sink[T, M]]] = - getAsyncCallback { - case Success(sink) ⇒ initInternalSource(sink, element) - case Failure(e) ⇒ failure(e) - } - sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext) - setHandler(in, new InHandler { - override def onPush(): Unit = () - override def onUpstreamFinish(): Unit = gotCompletionEvent() - override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) - }) - } catch { - case NonFatal(e) ⇒ decider(e) match { - case Supervision.Stop ⇒ failure(e) - case _ ⇒ pull(in) + val element = grab(in) + switching = true + val cb: AsyncCallback[Try[Sink[T, M]]] = + getAsyncCallback { + case Success(sink) ⇒ + // check if the stage is still in need for the lazy sink + // (there could have been an onUpstreamFailure in the meantime that has completed the promise) + if (!promise.isCompleted) { + try { + val mat = switchTo(sink, element) + promise.success(Some(mat)) + setKeepGoing(true) + } catch { + case NonFatal(e) ⇒ + promise.failure(e) + failStage(e) + } + } + case Failure(e) ⇒ + promise.failure(e) + failStage(e) } + try { + sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + } catch { + case NonFatal(e) ⇒ + promise.failure(e) + failStage(e) } } - private def failure(ex: Throwable): Unit = { - failStage(ex) - promise.failure(ex) - } - override def onUpstreamFinish(): Unit = { - completeStage() - promise.tryComplete(Try(zeroMat())) + // ignore onUpstreamFinish while the stage is switching but setKeepGoing + // + if (switching) { + // there is a cached element -> the stage must not be shut down automatically because isClosed(in) is satisfied + setKeepGoing(true) + } else { + promise.success(None) + super.onUpstreamFinish() + } } - override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) + + override def onUpstreamFailure(ex: Throwable): Unit = { + promise.failure(ex) + super.onUpstreamFailure(ex) + } + setHandler(in, this) - private def gotCompletionEvent(): Unit = { + private def switchTo(sink: Sink[T, M], firstElement: T): M = { + + var firstElementPushed = false + + val subOutlet = new SubSourceOutlet[T]("LazySink") + + val matVal = Source.fromGraph(subOutlet.source).runWith(sink)(interpreter.subFusingMaterializer) + + def maybeCompleteStage(): Unit = { + if (isClosed(in) && subOutlet.isClosed) { + completeStage() + } + } + + // The stage must not be shut down automatically; it is completed when maybeCompleteStage decides setKeepGoing(true) - completed = true - } - private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = { - val sourceOut = new SubSourceOutlet[T]("LazySink") - - def switchToFirstElementHandlers(): Unit = { - sourceOut.setHandler(new OutHandler { - override def onPull(): Unit = { - sourceOut.push(firstElement) - if (completed) internalSourceComplete() else switchToFinalHandlers() + setHandler(in, new InHandler { + override def onPush(): Unit = { + subOutlet.push(grab(in)) + } + override def onUpstreamFinish(): Unit = { + if (firstElementPushed) { + subOutlet.complete() + maybeCompleteStage() } - override def onDownstreamFinish(): Unit = internalSourceComplete() - }) + } + override def onUpstreamFailure(ex: Throwable): Unit = { + // propagate exception irrespective if the cached element has been pushed or not + subOutlet.fail(ex) + maybeCompleteStage() + } + }) - setHandler(in, new InHandler { - override def onPush(): Unit = sourceOut.push(grab(in)) - override def onUpstreamFinish(): Unit = gotCompletionEvent() - override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) - }) - } + subOutlet.setHandler(new OutHandler { + override def onPull(): Unit = { + if (firstElementPushed) { + pull(in) + } else { + // the demand can be satisfied right away by the cached element + firstElementPushed = true + subOutlet.push(firstElement) + // in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed + // -> check if the completion must be propagated now + if (isClosed(in)) { + subOutlet.complete() + maybeCompleteStage() + } + } + } + override def onDownstreamFinish(): Unit = { + if (!isClosed(in)) { + cancel(in) + } + maybeCompleteStage() + } + }) - def switchToFinalHandlers(): Unit = { - sourceOut.setHandler(new OutHandler { - override def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = internalSourceComplete() - }) - setHandler(in, new InHandler { - override def onPush(): Unit = sourceOut.push(grab(in)) - override def onUpstreamFinish(): Unit = internalSourceComplete() - override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) - }) - } - - def internalSourceComplete(): Unit = { - sourceOut.complete() - completeStage() - } - - def internalSourceFailure(ex: Throwable): Unit = { - sourceOut.fail(ex) - failStage(ex) - } - - switchToFirstElementHandlers() - try { - val matVal = Source.fromGraph(sourceOut.source).runWith(sink)(interpreter.subFusingMaterializer) - promise.trySuccess(matVal) - } catch { - case NonFatal(ex) ⇒ - promise.tryFailure(ex) - failStage(ex) - } + matVal } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index da71a496cd..7fec0bbd93 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -22,7 +22,7 @@ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder -import scala.concurrent.Future +import scala.concurrent.{ Future, Promise } import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy @@ -362,8 +362,7 @@ private[stream] object Collect { override def toString: String = "Scan" override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { - self ⇒ + new GraphStageLogic(shape) with InHandler with OutHandler { self ⇒ private var aggregator = zero private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider @@ -425,8 +424,7 @@ private[stream] object Collect { override val toString: String = "ScanAsync" override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { - self ⇒ + new GraphStageLogic(shape) with InHandler with OutHandler { self ⇒ private var current: Out = zero private var eventualCurrent: Future[Out] = Future.successful(current) @@ -1412,7 +1410,8 @@ private[stream] object Collect { if (isEnabled(logLevels.onFailure)) logLevels.onFailure match { case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name) - case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage) + case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause + .getClass), cause.getMessage) } super.onUpstreamFailure(cause) @@ -1463,7 +1462,8 @@ private[stream] object Collect { private final val DefaultLoggerName = "akka.stream.Log" private final val OffInt = LogLevels.Off.asInt - private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel) + private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging + .DebugLevel, onFailure = Logging.ErrorLevel) } /** @@ -1482,6 +1482,7 @@ private[stream] object Collect { @InternalApi private[akka] object GroupedWeightedWithin { val groupedWeightedWithinTimer = "GroupedWeightedWithinTimer" } + /** * INTERNAL API */ @@ -1609,6 +1610,7 @@ private[stream] object Collect { if (isAvailable(out)) emitGroup() else pushEagerly = true } + setHandlers(in, out, this) } } @@ -1792,8 +1794,7 @@ private[stream] object Collect { @InternalApi private[akka] final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.reduce - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - self ⇒ + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self ⇒ override def toString = s"Reduce.Logic(aggregator=$aggregator)" var aggregator: T = _ @@ -1962,148 +1963,197 @@ private[stream] object Collect { /** * INTERNAL API */ -@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], zeroMat: () ⇒ M) - extends GraphStageWithMaterializedValue[FlowShape[I, O], M] { +@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]]) + extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[Option[M]]] { val in = Inlet[I]("lazyFlow.in") val out = Outlet[O]("lazyFlow.out") + override def initialAttributes = DefaultAttributes.lazyFlow + override val shape: FlowShape[I, O] = FlowShape.of(in, out) override def toString: String = "LazyFlow" override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider - var completed = false - var matVal: Option[M] = None + val matPromise = Promise[Option[M]]() val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - val subSink = new SubSinkInlet[O]("LazyFlowSubSink") + + var switching = false + + // + // implementation of handler methods in initial state + // override def onPush(): Unit = { - try { - val element = grab(in) - val cb: AsyncCallback[Try[Flow[I, O, M]]] = - getAsyncCallback { - case Success(flow) ⇒ initInternalSource(flow, element) - case Failure(e) ⇒ failure(e) + val element = grab(in) + switching = true + val cb = getAsyncCallback[Try[Flow[I, O, M]]] { + case Success(flow) ⇒ + // check if the stage is still in need for the lazy flow + // (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise) + if (!matPromise.isCompleted) { + try { + val mat = switchTo(flow, element) + matPromise.success(Some(mat)) + } catch { + case NonFatal(e) ⇒ + matPromise.failure(e) + failStage(e) + } } - flowFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext) - setHandler(in, new InHandler { - override def onPush(): Unit = throw new IllegalStateException("LazyFlow received push while waiting for flowFactory to complete.") - override def onUpstreamFinish(): Unit = gotCompletionEvent() - override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) - }) - } catch { - case NonFatal(e) ⇒ decider(e) match { - case Supervision.Stop ⇒ failure(e) - case _ ⇒ pull(in) - } + case Failure(e) ⇒ + matPromise.failure(e) + failStage(e) } + try { + flowFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + } catch { + case NonFatal(e) ⇒ + matPromise.failure(e) + failStage(e) + } + } + + override def onUpstreamFinish(): Unit = { + // ignore onUpstreamFinish while the stage is switching but setKeepGoing + if (switching) { + setKeepGoing(true) + } else { + matPromise.success(None) + super.onUpstreamFinish() + } + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + matPromise.failure(ex) + super.onUpstreamFailure(ex) + } + + override def onDownstreamFinish(): Unit = { + matPromise.success(None) + super.onDownstreamFinish() } override def onPull(): Unit = { pull(in) - subSink.pull() + } + + setHandler(in, this) + setHandler(out, this) + + private def switchTo(flow: Flow[I, O, M], firstElement: I): M = { + + var firstElementPushed = false + + // + // ports are wired in the following way: + // + // in ~> subOutlet ~> lazyFlow ~> subInlet ~> out + // + + val subInlet = new SubSinkInlet[O]("LazyFlowSubSink") + val subOutlet = new SubSourceOutlet[I]("LazyFlowSubSource") + + val matVal = Source.fromGraph(subOutlet.source) + .viaMat(flow)(Keep.right) + .toMat(subInlet.sink)(Keep.left) + .run()(interpreter.subFusingMaterializer) + + // The lazily materialized flow may be constructed from a sink and a source. Therefore termination + // signals (completion, cancellation, and errors) are not guaranteed to pass through the flow. This + // means that this stage must not be completed as soon as one side of the flow is finished. + // + // Invariant: isClosed(out) == subInlet.isClosed after each event because termination signals (i.e. + // completion, cancellation, and failure) between these two ports are always forwarded. + // + // However, isClosed(in) and subOutlet.isClosed may be different. This happens if upstream completes before + // the cached element was pushed. + def maybeCompleteStage(): Unit = { + if (isClosed(in) && subOutlet.isClosed && isClosed(out)) { + completeStage() + } + } + + // The stage must not be shut down automatically; it is completed when maybeCompleteStage decides + setKeepGoing(true) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + subOutlet.push(grab(in)) + } + override def onUpstreamFinish(): Unit = { + if (firstElementPushed) { + subOutlet.complete() + maybeCompleteStage() + } + } + override def onUpstreamFailure(ex: Throwable): Unit = { + // propagate exception irrespective if the cached element has been pushed or not + subOutlet.fail(ex) + maybeCompleteStage() + } + }) setHandler(out, new OutHandler { override def onPull(): Unit = { - subSink.pull() + subInlet.pull() } - override def onDownstreamFinish(): Unit = { - subSink.cancel() - completeStage() + subInlet.cancel() + maybeCompleteStage() } }) - subSink.setHandler(new InHandler { + subOutlet.setHandler(new OutHandler { + override def onPull(): Unit = { + if (firstElementPushed) { + pull(in) + } else { + // the demand can be satisfied right away by the cached element + firstElementPushed = true + subOutlet.push(firstElement) + // in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed + // -> check if the completion must be propagated now + if (isClosed(in)) { + subOutlet.complete() + maybeCompleteStage() + } + } + } + override def onDownstreamFinish(): Unit = { + if (!isClosed(in)) { + cancel(in) + } + maybeCompleteStage() + } + }) + + subInlet.setHandler(new InHandler { override def onPush(): Unit = { - val elem = subSink.grab() - push(out, elem) + push(out, subInlet.grab()) } - override def onUpstreamFinish(): Unit = { - completeStage() + complete(out) + maybeCompleteStage() + } + override def onUpstreamFailure(ex: Throwable): Unit = { + fail(out, ex) + maybeCompleteStage() } }) - } - setHandler(out, this) - - private def failure(ex: Throwable): Unit = { - matVal = Some(zeroMat()) - failStage(ex) - } - - override def onUpstreamFinish(): Unit = { - matVal = Some(zeroMat()) - completeStage() - } - override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) - setHandler(in, this) - - private def gotCompletionEvent(): Unit = { - setKeepGoing(true) - completed = true - } - - private def initInternalSource(flow: Flow[I, O, M], firstElement: I): Unit = { - val sourceOut = new SubSourceOutlet[I]("LazyFlowSubSource") - - def switchToFirstElementHandlers(): Unit = { - sourceOut.setHandler(new OutHandler { - override def onPull(): Unit = { - sourceOut.push(firstElement) - if (completed) internalSourceComplete() else switchToFinalHandlers() - } - override def onDownstreamFinish(): Unit = internalSourceComplete() - }) - - setHandler(in, new InHandler { - override def onPush(): Unit = sourceOut.push(grab(in)) - override def onUpstreamFinish(): Unit = gotCompletionEvent() - override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) - }) + if (isClosed(out)) { + // downstream may have been canceled while the stage was switching + subInlet.cancel() + } else { + subInlet.pull() } - def switchToFinalHandlers(): Unit = { - sourceOut.setHandler(new OutHandler { - override def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = internalSourceComplete() - }) - setHandler(in, new InHandler { - override def onPush(): Unit = { - val elem = grab(in) - sourceOut.push(elem) - } - override def onUpstreamFinish(): Unit = internalSourceComplete() - override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) - }) - } - - def internalSourceComplete(): Unit = { - sourceOut.complete() - // normal completion, subSink.onUpstreamFinish will complete the stage - } - - def internalSourceFailure(ex: Throwable): Unit = { - sourceOut.fail(ex) - failStage(ex) - } - - switchToFirstElementHandlers() - try { - matVal = Some(Source.fromGraph(sourceOut.source) - .viaMat(flow)(Keep.right).toMat(subSink.sink)(Keep.left).run()(interpreter.subFusingMaterializer)) - } catch { - case NonFatal(ex) ⇒ - subSink.cancel() - matVal = Some(zeroMat()) - failStage(ex) - } + matVal } } - (stageLogic, matVal.getOrElse(zeroMat())) + (stageLogic, matPromise.future) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6f5c2995ce..b777b3dc16 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -13,7 +13,7 @@ import org.reactivestreams.Processor import scala.concurrent.duration.FiniteDuration import akka.japi.Util -import java.util.Comparator +import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage import akka.actor.ActorRef @@ -207,20 +207,9 @@ object Flow { /** * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created - * if there are no elements, because of completion or error. - * The materialized value of the `Flow` will be the materialized - * value of the created internal flow. + * if there are no elements, because of completion, cancellation, or error. * - * If `flowFactory` throws an exception and the supervision decision is - * [[akka.stream.Supervision.Stop]] the materialized value of the flow will be completed with - * the result of the `fallback`. For all other supervision options it will - * try to create flow with the next element. - * - * `fallback` will be executed when there was no elements and completed is received from upstream - * or when there was an exception either thrown by the `flowFactory` or during the internal flow - * materialization process. - * - * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * The materialized value of the `Flow` is the value that is created by the `fallback` function. * * '''Emits when''' the internal flow is successfully created and it emits * @@ -230,11 +219,39 @@ object Flow { * * '''Cancels when''' downstream cancels */ - def lazyInit[I, O, M](flowFactory: function.Function[I, CompletionStage[Flow[I, O, M]]], fallback: function.Creator[M]): Flow[I, O, M] = - Flow.fromGraph(new LazyFlow[I, O, M]( - t ⇒ flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), - () ⇒ fallback.create())) + @Deprecated + @deprecated("Use lazyInitAsync instead. (lazyInitAsync returns a flow with a more useful materialized value.)", "2.5.12") + def lazyInit[I, O, M](flowFactory: function.Function[I, CompletionStage[Flow[I, O, M]]], fallback: function.Creator[M]): Flow[I, O, M] = { + import scala.compat.java8.FutureConverters._ + val sflow = scaladsl.Flow + .fromGraph(new LazyFlow[I, O, M](t ⇒ flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext))) + .mapMaterializedValue(_ ⇒ fallback.create()) + new Flow(sflow) + } + /** + * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created + * if there are no elements, because of completion, cancellation, or error. + * + * The materialized value of the `Flow` is a `Future[Option[M]]` that is completed with `Some(mat)` when the internal + * flow gets materialized or with `None` when there where no elements. If the flow materialization (including + * the call of the `flowFactory`) fails then the future is completed with a failure. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyInitAsync[I, O, M](flowFactory: function.Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[Optional[M]]] = { + import scala.compat.java8.FutureConverters._ + + val sflow = scaladsl.Flow.lazyInitAsync(() ⇒ flowFactory.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) + .mapMaterializedValue(fut ⇒ fut.map(_.fold[Optional[M]](Optional.empty())(m ⇒ Optional.ofNullable(m)))(ExecutionContexts.sameThreadExecutionContext).toJava) + new Flow(sflow) + } /** * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with * fan-in combinators where you do not want to pay the cost of casting each element in a `map`. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 86c04d4eab..1a61d5f15d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -268,18 +268,33 @@ object Sink { * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, * because of completion or error. * - * If `sinkFactory` throws an exception and the supervision decision is - * [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will - * try to create sink with next element - * - * `fallback` will be executed when there was no elements and completed is received from upstream. - * - * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * If upstream completes before an element was received then the `Future` is completed with the value created by fallback. + * If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal + * sink fails then the `Future` is completed with the exception. + * Otherwise the `Future` is completed with the materialized value of the internal sink. */ + @Deprecated + @deprecated("Use lazyInitAsync instead. (lazyInitAsync no more needs a fallback function and the materialized value more clearly indicates if the internal sink was materialized or not.)", "2.5.11") def lazyInit[T, M](sinkFactory: function.Function[T, CompletionStage[Sink[T, M]]], fallback: function.Creator[M]): Sink[T, CompletionStage[M]] = new Sink(scaladsl.Sink.lazyInit[T, M]( t ⇒ sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), () ⇒ fallback.create()).mapMaterializedValue(_.toJava)) + + /** + * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, + * because of completion or error. + * + * If upstream completes before an element was received then the `Future` is completed with `None`. + * If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal + * sink fails then the `Future` is completed with the exception. + * Otherwise the `Future` is completed with the materialized value of the internal sink. + */ + def lazyInitAsync[T, M](sinkFactory: function.Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[Optional[M]]] = { + val sSink = scaladsl.Sink.lazyInitAsync[T, M]( + () ⇒ sinkFactory.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext) + ).mapMaterializedValue(fut ⇒ fut.map(_.fold(Optional.empty[M]())(m ⇒ Optional.ofNullable(m)))(ExecutionContexts.sameThreadExecutionContext).toJava) + new Sink(sSink) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e7689ea94a..ca5cd50a25 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -522,20 +522,9 @@ object Flow { /** * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created - * if there are no elements, because of completion or error. - * The materialized value of the `Flow` will be the materialized - * value of the created internal flow. + * if there are no elements, because of completion, cancellation, or error. * - * If `flowFactory` throws an exception and the supervision decision is - * [[akka.stream.Supervision.Stop]] the materialized value of the flow will be completed with - * the result of the `fallback`. For all other supervision options it will - * try to create flow with the next element. - * - * `fallback` will be executed when there was no elements and completed is received from upstream - * or when there was an exception either thrown by the `flowFactory` or during the internal flow - * materialization process. - * - * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * The materialized value of the `Flow` is the value that is created by the `fallback` function. * * '''Emits when''' the internal flow is successfully created and it emits * @@ -545,8 +534,29 @@ object Flow { * * '''Cancels when''' downstream cancels */ + @Deprecated + @deprecated("Use lazyInitAsync instead. (lazyInitAsync returns a flow with a more useful materialized value.)", "2.5.12") def lazyInit[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, M] = - Flow.fromGraph(new LazyFlow[I, O, M](flowFactory, fallback)) + Flow.fromGraph(new LazyFlow[I, O, M](flowFactory)).mapMaterializedValue(_ ⇒ fallback()) + + /** + * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created + * if there are no elements, because of completion, cancellation, or error. + * + * The materialized value of the `Flow` is a `Future[Option[M]]` that is completed with `Some(mat)` when the internal + * flow gets materialized or with `None` when there where no elements. If the flow materialization (including + * the call of the `flowFactory`) fails then the future is completed with a failure. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyInitAsync[I, O, M](flowFactory: () ⇒ Future[Flow[I, O, M]]): Flow[I, O, Future[Option[M]]] = + Flow.fromGraph(new LazyFlow[I, O, M](_ ⇒ flowFactory())) } object RunnableGraph { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index bb910e54a2..fd43a077e0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -501,15 +501,26 @@ object Sink { * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, * because of completion or error. * - * If `sinkFactory` throws an exception and the supervision decision is - * [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will - * try to create sink with next element - * - * `fallback` will be executed when there was no elements and completed is received from upstream. - * - * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * If upstream completes before an element was received then the `Future` is completed with the value created by fallback. + * If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal + * sink fails then the `Future` is completed with the exception. + * Otherwise the `Future` is completed with the materialized value of the internal sink. */ + @Deprecated + @deprecated("Use lazyInitAsync instead. (lazyInitAsync no more needs a fallback function and the materialized value more clearly indicates if the internal sink was materialized or not.)", "2.5.11") def lazyInit[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], fallback: () ⇒ M): Sink[T, Future[M]] = - Sink.fromGraph(new LazySink(sinkFactory, fallback)) + Sink.fromGraph(new LazySink[T, M](sinkFactory)).mapMaterializedValue(_.map(_.getOrElse(fallback()))(ExecutionContexts.sameThreadExecutionContext)) + + /** + * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, + * because of completion or error. + * + * If upstream completes before an element was received then the `Future` is completed with `None`. + * If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal + * sink fails then the `Future` is completed with the exception. + * Otherwise the `Future` is completed with the materialized value of the internal sink. + */ + def lazyInitAsync[T, M](sinkFactory: () ⇒ Future[Sink[T, M]]): Sink[T, Future[Option[M]]] = + Sink.fromGraph(new LazySink[T, M](_ ⇒ sinkFactory())) }