=str Add LazyFutureSource to avoid sub materialization.
This commit is contained in:
parent
c9d3e6af94
commit
1042dcb0c9
4 changed files with 71 additions and 14 deletions
|
|
@ -112,6 +112,7 @@ import pekko.stream.Attributes._
|
||||||
val iterableSource = name("iterableSource")
|
val iterableSource = name("iterableSource")
|
||||||
val cycledSource = name("cycledSource")
|
val cycledSource = name("cycledSource")
|
||||||
val futureSource = name("futureSource")
|
val futureSource = name("futureSource")
|
||||||
|
val lazyFutureSource = name("lazyFutureSource")
|
||||||
val futureFlattenSource = name("futureFlattenSource")
|
val futureFlattenSource = name("futureFlattenSource")
|
||||||
val tickSource = name("tickSource")
|
val tickSource = name("tickSource")
|
||||||
val singleSource = name("singleSource")
|
val singleSource = name("singleSource")
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.stream.impl.fusing
|
||||||
|
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
|
import org.apache.pekko
|
||||||
|
import pekko.dispatch.ExecutionContexts
|
||||||
|
import pekko.stream.Attributes
|
||||||
|
import pekko.stream.Attributes.SourceLocation
|
||||||
|
import pekko.stream.Outlet
|
||||||
|
import pekko.stream.SourceShape
|
||||||
|
import pekko.stream.impl.ReactiveStreamsCompliance
|
||||||
|
import pekko.stream.impl.Stages.DefaultAttributes
|
||||||
|
import pekko.stream.stage.GraphStage
|
||||||
|
import pekko.stream.stage.GraphStageLogic
|
||||||
|
import pekko.stream.stage.OutHandler
|
||||||
|
|
||||||
|
private[pekko] class LazyFutureSource[T](f: () => Future[T]) extends GraphStage[SourceShape[T]] {
|
||||||
|
require(f != null, "f should not be null.")
|
||||||
|
private val out = Outlet[T]("LazyFutureSource.out")
|
||||||
|
val shape: SourceShape[T] = SourceShape(out)
|
||||||
|
override def initialAttributes: Attributes =
|
||||||
|
DefaultAttributes.lazyFutureSource and
|
||||||
|
SourceLocation.forLambda(f)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) with OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more
|
||||||
|
val future = f()
|
||||||
|
ReactiveStreamsCompliance.requireNonNullElement(future)
|
||||||
|
future.value match {
|
||||||
|
case Some(result) => handle(result)
|
||||||
|
case None =>
|
||||||
|
val cb = getAsyncCallback[Try[T]](handle).invoke _
|
||||||
|
future.onComplete(cb)(ExecutionContexts.parasitic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def handle(result: Try[T]): Unit = result match {
|
||||||
|
case scala.util.Success(null) => completeStage()
|
||||||
|
case scala.util.Success(v) => emit(out, v, () => completeStage())
|
||||||
|
case scala.util.Failure(t) => failStage(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
setHandler(out, this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -365,13 +365,7 @@ object Source {
|
||||||
* is failed with a [[pekko.stream.NeverMaterializedException]]
|
* is failed with a [[pekko.stream.NeverMaterializedException]]
|
||||||
*/
|
*/
|
||||||
def lazyCompletionStage[T](create: Creator[CompletionStage[T]]): Source[T, NotUsed] =
|
def lazyCompletionStage[T](create: Creator[CompletionStage[T]]): Source[T, NotUsed] =
|
||||||
scaladsl.Source
|
new Source(scaladsl.Source.lazyFuture(() => create.create().asScala))
|
||||||
.lazySource { () =>
|
|
||||||
val f = create.create().asScala
|
|
||||||
scaladsl.Source.future(f)
|
|
||||||
}
|
|
||||||
.mapMaterializedValue(_ => NotUsed.notUsed())
|
|
||||||
.asJava
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defers invoking the `create` function to create a future source until there is downstream demand.
|
* Defers invoking the `create` function to create a future source until there is downstream demand.
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,6 @@ import scala.collection.immutable
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
import org.reactivestreams.{ Publisher, Subscriber }
|
|
||||||
|
|
||||||
import org.apache.pekko
|
import org.apache.pekko
|
||||||
import pekko.{ Done, NotUsed }
|
import pekko.{ Done, NotUsed }
|
||||||
import pekko.actor.{ ActorRef, Cancellable }
|
import pekko.actor.{ ActorRef, Cancellable }
|
||||||
|
|
@ -30,12 +28,14 @@ import pekko.annotation.InternalApi
|
||||||
import pekko.stream.{ Outlet, SourceShape, _ }
|
import pekko.stream.{ Outlet, SourceShape, _ }
|
||||||
import pekko.stream.impl.{ PublisherSource, _ }
|
import pekko.stream.impl.{ PublisherSource, _ }
|
||||||
import pekko.stream.impl.Stages.DefaultAttributes
|
import pekko.stream.impl.Stages.DefaultAttributes
|
||||||
import pekko.stream.impl.fusing.{ GraphStages, LazySingleSource }
|
import pekko.stream.impl.fusing.{ GraphStages, LazyFutureSource, LazySingleSource }
|
||||||
import pekko.stream.impl.fusing.GraphStages._
|
import pekko.stream.impl.fusing.GraphStages._
|
||||||
import pekko.stream.stage.GraphStageWithMaterializedValue
|
import pekko.stream.stage.GraphStageWithMaterializedValue
|
||||||
import pekko.util.ConstantFun
|
import pekko.util.ConstantFun
|
||||||
import pekko.util.FutureConverters._
|
import pekko.util.FutureConverters._
|
||||||
|
|
||||||
|
import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `Source` is a set of stream processing steps that has one open output. It can comprise
|
* A `Source` is a set of stream processing steps that has one open output. It can comprise
|
||||||
* any number of internal sources and transformations that are wired together, or it can be
|
* any number of internal sources and transformations that are wired together, or it can be
|
||||||
|
|
@ -556,10 +556,7 @@ object Source {
|
||||||
* the laziness and will trigger the factory immediately.
|
* the laziness and will trigger the factory immediately.
|
||||||
*/
|
*/
|
||||||
def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] =
|
def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] =
|
||||||
lazySource { () =>
|
fromGraph(new LazyFutureSource(create))
|
||||||
val f = create()
|
|
||||||
future(f)
|
|
||||||
}.mapMaterializedValue(_ => NotUsed)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defers invoking the `create` function to create a future source until there is downstream demand.
|
* Defers invoking the `create` function to create a future source until there is downstream demand.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue