From 1042dcb0c9c1a4ace4d41f9618b83b4e70f3149b Mon Sep 17 00:00:00 2001 From: kerr Date: Tue, 15 Nov 2022 22:25:00 +0800 Subject: [PATCH] =str Add LazyFutureSource to avoid sub materialization. --- .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../stream/impl/fusing/LazyFutureSource.scala | 65 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Source.scala | 8 +-- .../apache/pekko/stream/scaladsl/Source.scala | 11 ++-- 4 files changed, 71 insertions(+), 14 deletions(-) create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 487ae42990..4a36c8aa14 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -112,6 +112,7 @@ import pekko.stream.Attributes._ val iterableSource = name("iterableSource") val cycledSource = name("cycledSource") val futureSource = name("futureSource") + val lazyFutureSource = name("lazyFutureSource") val futureFlattenSource = name("futureFlattenSource") val tickSource = name("tickSource") val singleSource = name("singleSource") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala new file mode 100644 index 0000000000..a56d2053f8 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.concurrent.Future +import scala.util.Try + +import org.apache.pekko +import pekko.dispatch.ExecutionContexts +import pekko.stream.Attributes +import pekko.stream.Attributes.SourceLocation +import pekko.stream.Outlet +import pekko.stream.SourceShape +import pekko.stream.impl.ReactiveStreamsCompliance +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.GraphStage +import pekko.stream.stage.GraphStageLogic +import pekko.stream.stage.OutHandler + +private[pekko] class LazyFutureSource[T](f: () => Future[T]) extends GraphStage[SourceShape[T]] { + require(f != null, "f should not be null.") + private val out = Outlet[T]("LazyFutureSource.out") + val shape: SourceShape[T] = SourceShape(out) + override def initialAttributes: Attributes = + DefaultAttributes.lazyFutureSource and + SourceLocation.forLambda(f) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def onPull(): Unit = { + setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more + val future = f() + ReactiveStreamsCompliance.requireNonNullElement(future) + future.value match { + case Some(result) => handle(result) + case None => + val cb = getAsyncCallback[Try[T]](handle).invoke _ + future.onComplete(cb)(ExecutionContexts.parasitic) + } + } + + private def handle(result: Try[T]): Unit = result match { + case scala.util.Success(null) => completeStage() + case scala.util.Success(v) => emit(out, v, () => completeStage()) + case scala.util.Failure(t) => failStage(t) + } + + setHandler(out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index f27105d3d8..0156d0a60c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -365,13 +365,7 @@ object Source { * is failed with a [[pekko.stream.NeverMaterializedException]] */ def lazyCompletionStage[T](create: Creator[CompletionStage[T]]): Source[T, NotUsed] = - scaladsl.Source - .lazySource { () => - val f = create.create().asScala - scaladsl.Source.future(f) - } - .mapMaterializedValue(_ => NotUsed.notUsed()) - .asJava + new Source(scaladsl.Source.lazyFuture(() => create.create().asScala)) /** * Defers invoking the `create` function to create a future source until there is downstream demand. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index d03c7cc39c..593547d958 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -21,8 +21,6 @@ import scala.collection.immutable import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration -import org.reactivestreams.{ Publisher, Subscriber } - import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable } @@ -30,12 +28,14 @@ import pekko.annotation.InternalApi import pekko.stream.{ Outlet, SourceShape, _ } import pekko.stream.impl.{ PublisherSource, _ } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ GraphStages, LazySingleSource } +import pekko.stream.impl.fusing.{ GraphStages, LazyFutureSource, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun import pekko.util.FutureConverters._ +import org.reactivestreams.{ Publisher, Subscriber } + /** * A `Source` is a set of stream processing steps that has one open output. It can comprise * any number of internal sources and transformations that are wired together, or it can be @@ -556,10 +556,7 @@ object Source { * the laziness and will trigger the factory immediately. */ def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] = - lazySource { () => - val f = create() - future(f) - }.mapMaterializedValue(_ => NotUsed) + fromGraph(new LazyFutureSource(create)) /** * Defers invoking the `create` function to create a future source until there is downstream demand.