simplify materialized value computation tree, fixes #20015

- also fixes materialized value sources for graphs that import zero or
  one graphs, with and without Fusing
This commit is contained in:
Roland Kuhn 2016-03-11 17:08:30 +01:00
parent b52c498638
commit b255a19374
31 changed files with 582 additions and 279 deletions

View file

@ -8,7 +8,6 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import akka.NotUsed import akka.NotUsed
import akka.http.scaladsl.model.RequestEntity import akka.http.scaladsl.model.RequestEntity
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule } import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
@ -187,7 +186,7 @@ private[http] object StreamUtils {
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new OneTimePublisherSink[In](attributes, shape, cell) new OneTimePublisherSink[In](attributes, shape, cell)
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): OneTimePublisherSink[In] =
new OneTimePublisherSink[In](attr, amendShape(attr), cell) new OneTimePublisherSink[In](attr, amendShape(attr), cell)
} }
/** A copy of SubscriberSource that allows access to the subscriber through the cell but can only materialized once */ /** A copy of SubscriberSource that allows access to the subscriber through the cell but can only materialized once */
@ -212,7 +211,7 @@ private[http] object StreamUtils {
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] =
new OneTimeSubscriberSource[Out](attributes, shape, cell) new OneTimeSubscriberSource[Out](attributes, shape, cell)
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): OneTimeSubscriberSource[Out] =
new OneTimeSubscriberSource[Out](attr, amendShape(attr), cell) new OneTimeSubscriberSource[Out](attr, amendShape(attr), cell)
} }

View file

@ -4,7 +4,6 @@
package akka.http.javadsl.model; package akka.http.javadsl.model;
import akka.http.impl.util.Util;
import akka.http.javadsl.model.headers.*; import akka.http.javadsl.model.headers.*;
import akka.japi.Pair; import akka.japi.Pair;

View file

@ -11,12 +11,10 @@ import akka.stream._
class StreamLayoutSpec extends AkkaSpec { class StreamLayoutSpec extends AkkaSpec {
import StreamLayout._ import StreamLayout._
def testAtomic(inPortCount: Int, outPortCount: Int): Module = new Module { def testAtomic(inPortCount: Int, outPortCount: Int): Module = new AtomicModule {
override val shape = AmorphousShape(List.fill(inPortCount)(Inlet("")), List.fill(outPortCount)(Outlet(""))) override val shape = AmorphousShape(List.fill(inPortCount)(Inlet("")), List.fill(outPortCount)(Outlet("")))
override def replaceShape(s: Shape): Module = ??? override def replaceShape(s: Shape): Module = ???
override def subModules: Set[Module] = Set.empty
override def carbonCopy: Module = ??? override def carbonCopy: Module = ???
override def attributes: Attributes = Attributes.none override def attributes: Attributes = Attributes.none
@ -174,7 +172,7 @@ class StreamLayoutSpec extends AkkaSpec {
var publishers = Vector.empty[TestPublisher] var publishers = Vector.empty[TestPublisher]
var subscribers = Vector.empty[TestSubscriber] var subscribers = Vector.empty[TestSubscriber]
override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes,
matVal: java.util.Map[Module, Any]): Unit = { matVal: java.util.Map[Module, Any]): Unit = {
for (inPort atomic.inPorts) { for (inPort atomic.inPorts) {
val subscriber = TestSubscriber(atomic, inPort) val subscriber = TestSubscriber(atomic, inPort)

View file

@ -12,109 +12,162 @@ import akka.testkit.AkkaSpec
class GraphMatValueSpec extends AkkaSpec { class GraphMatValueSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
import GraphDSL.Implicits._ import GraphDSL.Implicits._
"A Graph with materialized value" must { val foldSink = Sink.fold[Int, Int](0)(_ + _)
val foldSink = Sink.fold[Int, Int](0)(_ + _) "A Graph with materialized value" when {
"expose the materialized value as source" in { for (autoFusing Seq(true, false)) {
val sub = TestSubscriber.manualProbe[Int]() val settings = ActorMaterializerSettings(system)
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b .withInputBuffer(initialSize = 2, maxSize = 16)
fold .withAutoFusing(autoFusing)
Source(1 to 10) ~> fold implicit val materializer = ActorMaterializer(settings)
b.materializedValue.mapAsync(4)(identity) ~> Sink.fromSubscriber(sub)
ClosedShape
}).run()
val r1 = Await.result(f, 3.seconds) s"using autoFusing=$autoFusing" must {
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
r1 should ===(r2) "expose the materialized value as source" in {
} val sub = TestSubscriber.manualProbe[Int]()
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> Sink.fromSubscriber(sub)
ClosedShape
}).run()
"expose the materialized value as source multiple times" in { val r1 = Await.result(f, 3.seconds)
val sub = TestSubscriber.manualProbe[Int]() sub.expectSubscription().request(1)
val r2 = sub.expectNext()
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b r1 should ===(r2)
fold }
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> zip.in0
b.materializedValue.mapAsync(4)(identity) ~> zip.in1
zip.out ~> Sink.fromSubscriber(sub) "expose the materialized value as source multiple times" in {
ClosedShape val sub = TestSubscriber.manualProbe[Int]()
}).run()
val r1 = Await.result(f, 3.seconds) val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b
sub.expectSubscription().request(1) fold
val r2 = sub.expectNext() val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> zip.in0
b.materializedValue.mapAsync(4)(identity) ~> zip.in1
r1 should ===(r2 / 2) zip.out ~> Sink.fromSubscriber(sub)
} ClosedShape
}).run()
// Exposes the materialized value as a stream value val r1 = Await.result(f, 3.seconds)
val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source.fromGraph(GraphDSL.create(foldSink) { implicit b sub.expectSubscription().request(1)
fold val r2 = sub.expectNext()
Source(1 to 10) ~> fold
SourceShape(b.materializedValue)
})
"allow exposing the materialized value as port" in { r1 should ===(r2 / 2)
val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() }
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155)
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { // Exposes the materialized value as a stream value
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ()) val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source.fromGraph(GraphDSL.create(foldSink) { implicit b
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}
"work properly with nesting and reusing" in {
val compositeSource1 = Source.fromGraph(GraphDSL.create(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out.mapAsync(4)(identity) ~> zip.in0
s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
SourceShape(zip.out)
})
val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out ~> zip.in0
s2.out.map(_ * 10000) ~> zip.in1
SourceShape(zip.out)
})
val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run()
Await.result(result, 3.seconds) should ===(55555555)
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(55)
Await.result(f3, 3.seconds) should ===(55)
Await.result(f4, 3.seconds) should ===(55)
}
"work also when the sources module is copied" in {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold fold
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet) Source(1 to 10) ~> fold
}) SourceShape(b.materializedValue)
})
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55) "allow exposing the materialized value as port" in {
val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155)
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}
"work properly with nesting and reusing" in {
val compositeSource1 = Source.fromGraph(GraphDSL.create(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out.mapAsync(4)(identity) ~> zip.in0
s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
SourceShape(zip.out)
})
val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out ~> zip.in0
s2.out.map(_ * 10000) ~> zip.in1
SourceShape(zip.out)
})
val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run()
Await.result(result, 3.seconds) should ===(55555555)
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(55)
Await.result(f3, 3.seconds) should ===(55)
Await.result(f4, 3.seconds) should ===(55)
}
"work also when the sources module is copied" in {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(foldSink) {
implicit builder
fold
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
}
"work also when the sources module is copied and the graph is extended before using the matValSrc" in {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(foldSink) {
implicit builder
fold
val map = builder.add(Flow[Future[Int]].mapAsync(4)(identity))
builder.materializedValue ~> map
FlowShape(fold.in, map.outlet)
})
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
}
"perform side-effecting transformations even when not used as source" in {
var done = false
val g = GraphDSL.create() { implicit b
import GraphDSL.Implicits._
Source.empty.mapMaterializedValue(_ done = true) ~> Sink.ignore
ClosedShape
}
val r = RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) { implicit b
(s)
b.add(g)
Source(1 to 10) ~> s
ClosedShape
})
r.run().futureValue should ===(akka.Done)
done should ===(true)
}
"produce NotUsed when not importing materialized value" in {
val source = Source.fromGraph(GraphDSL.create() { implicit b
SourceShape(b.materializedValue)
})
source.runWith(Sink.seq).futureValue should ===(List(akka.NotUsed))
}
"produce NotUsed when starting from Flow.via" in {
Source.empty.viaMat(Flow[Int].map(_ * 2))(Keep.right).to(Sink.ignore).run() should ===(akka.NotUsed)
}
"produce NotUsed when starting from Flow.via with transformation" in {
var done = false
Source.empty.viaMat(
Flow[Int].via(Flow[Int].mapMaterializedValue(_ done = true)))(Keep.right)
.to(Sink.ignore).run() should ===(akka.NotUsed)
done should ===(true)
}
}
} }
} }
} }

View file

@ -61,6 +61,8 @@ trait GraphApply {
private[stream] object GraphApply { private[stream] object GraphApply {
final class GraphImpl[S <: Shape, Mat](override val shape: S, private[stream] override val module: StreamLayout.Module) final class GraphImpl[S <: Shape, Mat](override val shape: S, private[stream] override val module: StreamLayout.Module)
extends Graph[S, Mat] { extends Graph[S, Mat] {
override def toString: String = s"Graph($shape, $module)"
override def withAttributes(attr: Attributes): Graph[S, Mat] = override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GraphImpl(shape, module.withAttributes(attr)) new GraphImpl(shape, module.withAttributes(attr))

View file

@ -31,11 +31,7 @@ object Fusing {
* implementations based on [[akka.stream.stage.GraphStage]]) and not forbidden * implementations based on [[akka.stream.stage.GraphStage]]) and not forbidden
* via [[akka.stream.Attributes#AsyncBoundary]]. * via [[akka.stream.Attributes#AsyncBoundary]].
*/ */
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = Impl.aggressive(g)
g match {
case fg: FusedGraph[_, _] fg
case _ Impl.aggressive(g)
}
/** /**
* A fused graph of the right shape, containing a [[FusedModule]] which * A fused graph of the right shape, containing a [[FusedModule]] which
@ -48,6 +44,14 @@ object Fusing {
override def withAttributes(attr: Attributes) = copy(module = module.withAttributes(attr)) override def withAttributes(attr: Attributes) = copy(module = module.withAttributes(attr))
} }
object FusedGraph {
def unapply[S <: Shape, M](g: Graph[S, M]): Option[(FusedModule, S)] =
g.module match {
case f: FusedModule => Some((f, g.shape))
case _ => None
}
}
/** /**
* When fusing a [[Graph]] a part of the internal stage wirings are hidden within * When fusing a [[Graph]] a part of the internal stage wirings are hidden within
* [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are * [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are

View file

@ -215,6 +215,8 @@ object ClosedShape extends ClosedShape {
* Java API: obtain ClosedShape instance * Java API: obtain ClosedShape instance
*/ */
def getInstance: ClosedShape = this def getInstance: ClosedShape = this
override def toString: String = "ClosedShape"
} }
/** /**

View file

@ -11,7 +11,7 @@ import akka.event.Logging
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.pattern.ask import akka.pattern.ask
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.{ Module, AtomicModule }
import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule } import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule }
import akka.stream.impl.io.TLSActor import akka.stream.impl.io.TLSActor
import akka.stream.impl.io.TlsModule import akka.stream.impl.io.TlsModule
@ -97,7 +97,8 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
name name
} }
override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
if (MaterializerSession.Debug) println(s"materializing $atomic")
def newMaterializationContext() = def newMaterializationContext() =
new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))

View file

@ -5,11 +5,12 @@ package akka.stream.impl
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.event.Logging
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module { private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.AtomicModule {
override def replaceShape(s: Shape) = override def replaceShape(s: Shape) =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
else this else this
@ -18,6 +19,6 @@ private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module {
val outPort = Outlet[Out]("Flow.out") val outPort = Outlet[Out]("Flow.out")
override val shape = new FlowShape(inPort, outPort) override val shape = new FlowShape(inPort, outPort)
override def subModules: Set[Module] = Set.empty protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
} }

View file

@ -6,29 +6,30 @@ package akka.stream.impl
import akka.NotUsed import akka.NotUsed
import akka.actor._ import akka.actor._
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.AtomicModule
import org.reactivestreams._ import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Promise import scala.concurrent.Promise
import akka.event.Logging
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module { private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule {
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat) def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat)
override def replaceShape(s: Shape): Module = override def replaceShape(s: Shape): AtomicModule =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that") if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that")
else this else this
// This is okay since the only caller of this method is right below. // This is okay since the only caller of this method is right below.
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat] protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]
override def carbonCopy: Module = newInstance(SourceShape(shape.out.carbonCopy())) override def carbonCopy: AtomicModule = newInstance(SourceShape(shape.out.carbonCopy()))
override def subModules: Set[Module] = Set.empty
protected def amendShape(attr: Attributes): SourceShape[Out] = { protected def amendShape(attr: Attributes): SourceShape[Out] = {
val thisN = attributes.nameOrDefault(null) val thisN = attributes.nameOrDefault(null)
@ -52,7 +53,7 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap
} }
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape) override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape)
override def withAttributes(attr: Attributes): Module = new SubscriberSource[Out](attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSource[Out](attr, amendShape(attr))
} }
/** /**
@ -63,22 +64,26 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap
* back-pressure upstream. * back-pressure upstream.
*/ */
private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) { private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) {
override protected def label: String = s"PublisherSource($p)"
override def create(context: MaterializationContext) = (p, NotUsed) override def create(context: MaterializationContext) = (p, NotUsed)
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attributes, shape) override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attributes, shape)
override def withAttributes(attr: Attributes): Module = new PublisherSource[Out](p, attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new PublisherSource[Out](p, attr, amendShape(attr))
} }
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) {
override def create(context: MaterializationContext) = { override def create(context: MaterializationContext) = {
val p = Promise[Option[Out]]() val p = Promise[Option[Out]]()
new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) p new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) p
} }
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape) override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape)
override def withAttributes(attr: Attributes): Module = new MaybeSource(attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new MaybeSource(attr, amendShape(attr))
} }
/** /**
@ -95,7 +100,7 @@ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorPublisherSource[Out](props, attributes, shape) new ActorPublisherSource[Out](props, attributes, shape)
override def withAttributes(attr: Attributes): Module = new ActorPublisherSource(props, attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new ActorPublisherSource(props, attr, amendShape(attr))
} }
/** /**
@ -105,6 +110,8 @@ private[akka] final class ActorRefSource[Out](
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
extends SourceModule[Out, ActorRef](shape) { extends SourceModule[Out, ActorRef](shape) {
override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"
override def create(context: MaterializationContext) = { override def create(context: MaterializationContext) = {
val mat = ActorMaterializer.downcast(context.materializer) val mat = ActorMaterializer.downcast(context.materializer)
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
@ -113,6 +120,6 @@ private[akka] final class ActorRefSource[Out](
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape) new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape)
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): AtomicModule =
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr)) new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
} }

View file

@ -8,7 +8,7 @@ import akka.actor.{ ActorRef, Props }
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream._ import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
@ -20,11 +20,12 @@ import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import java.util.Optional import java.util.Optional
import akka.event.Logging
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module { private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule {
/** /**
* Create the Subscriber or VirtualPublisher that consumes the incoming * Create the Subscriber or VirtualPublisher that consumes the incoming
@ -35,16 +36,14 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
*/ */
def create(context: MaterializationContext): (AnyRef, Mat) def create(context: MaterializationContext): (AnyRef, Mat)
override def replaceShape(s: Shape): Module = override def replaceShape(s: Shape): AtomicModule =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that") if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that")
else this else this
// This is okay since we the only caller of this method is right below. // This is okay since we the only caller of this method is right below.
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat] protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
override def carbonCopy: Module = newInstance(SinkShape(shape.in.carbonCopy())) override def carbonCopy: AtomicModule = newInstance(SinkShape(shape.in.carbonCopy()))
override def subModules: Set[Module] = Set.empty
protected def amendShape(attr: Attributes): SinkShape[In] = { protected def amendShape(attr: Attributes): SinkShape[In] = {
val thisN = attributes.nameOrDefault(null) val thisN = attributes.nameOrDefault(null)
@ -53,6 +52,10 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
if ((thatN eq null) || thisN == thatN) shape if ((thatN eq null) || thisN == thatN) shape
else shape.copy(in = Inlet(thatN + ".in")) else shape.copy(in = Inlet(thatN + ".in"))
} }
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
} }
/** /**
@ -64,8 +67,6 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
*/ */
private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
override def toString: String = "PublisherSink"
/* /*
* This method is the reason why SinkModule.create may return something that is * This method is the reason why SinkModule.create may return something that is
* not a Subscriber: a VirtualPublisher is used in order to avoid the immediate * not a Subscriber: a VirtualPublisher is used in order to avoid the immediate
@ -77,7 +78,7 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha
} }
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): Module = new PublisherSink[In](attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new PublisherSink[In](attr, amendShape(attr))
} }
/** /**
@ -100,7 +101,7 @@ private[akka] final class FanoutPublisherSink[In](
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attributes, shape) new FanoutPublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): AtomicModule =
new FanoutPublisherSink[In](attr, amendShape(attr)) new FanoutPublisherSink[In](attr, amendShape(attr))
} }
@ -118,8 +119,7 @@ private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkSh
} }
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape)
override def withAttributes(attr: Attributes): Module = new SinkholeSink(attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new SinkholeSink(attr, amendShape(attr))
override def toString: String = "SinkholeSink"
} }
/** /**
@ -131,8 +131,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att
override def create(context: MaterializationContext) = (subscriber, NotUsed) override def create(context: MaterializationContext) = (subscriber, NotUsed)
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: Attributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSink[In](subscriber, attr, amendShape(attr))
override def toString: String = "SubscriberSink"
} }
/** /**
@ -142,8 +141,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att
private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) { private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed) override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape)
override def withAttributes(attr: Attributes): Module = new CancelSink(attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new CancelSink(attr, amendShape(attr))
override def toString: String = "CancelSink"
} }
/** /**
@ -159,8 +157,7 @@ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes:
} }
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape)
override def withAttributes(attr: Attributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr)) override def withAttributes(attr: Attributes): AtomicModule = new ActorSubscriberSink[In](props, attr, amendShape(attr))
override def toString: String = "ActorSubscriberSink"
} }
/** /**
@ -180,9 +177,8 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape) new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): AtomicModule =
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
override def toString: String = "ActorRefSink"
} }
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
@ -257,6 +253,8 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV
private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
val in = Inlet[T]("seq.in") val in = Inlet[T]("seq.in")
override def toString: String = "SeqStage"
override val shape: SinkShape[T] = SinkShape.of(in) override val shape: SinkShape[T] = SinkShape.of(in)
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
@ -302,6 +300,8 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
override def initialAttributes = DefaultAttributes.queueSink override def initialAttributes = DefaultAttributes.queueSink
override val shape: SinkShape[T] = SinkShape.of(in) override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "QueueSink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] { val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
type Received[E] = Try[Option[E]] type Received[E] = Try[Option[E]]

View file

@ -208,6 +208,7 @@ private[stream] object Stages {
final case class GroupBy(maxSubstreams: Int, f: Any Any, attributes: Attributes = groupBy) extends StageModule { final case class GroupBy(maxSubstreams: Int, f: Any Any, attributes: Attributes = groupBy) extends StageModule {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes) override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def label: String = s"GroupBy($maxSubstreams)"
} }
final case class DirectProcessor(p: () (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule { final case class DirectProcessor(p: () (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule {

View file

@ -19,6 +19,7 @@ import scala.collection.JavaConverters._
import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.fusing.GraphModule import akka.stream.impl.fusing.GraphModule
import akka.event.Logging
/** /**
* INTERNAL API * INTERNAL API
@ -104,9 +105,21 @@ object StreamLayout {
if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems") if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems")
} }
// TODO: Materialization order object IgnorableMatValComp {
// TODO: Special case linear composites def apply(comp: MaterializedValueNode): Boolean =
// TODO: Cycles comp match {
case Atomic(module) IgnorableMatValComp(module)
case _: Combine | _: Transform false
case Ignore true
}
def apply(module: Module): Boolean =
module match {
case _: AtomicModule | EmptyModule true
case CopiedModule(_, _, module) IgnorableMatValComp(module)
case CompositeModule(_, _, _, _, comp, _) IgnorableMatValComp(comp)
case FusedModule(_, _, _, _, comp, _, _) IgnorableMatValComp(comp)
}
}
sealed trait MaterializedValueNode { sealed trait MaterializedValueNode {
/* /*
@ -121,14 +134,14 @@ object StreamLayout {
override def toString: String = s"Combine($dep1,$dep2)" override def toString: String = s"Combine($dep1,$dep2)"
} }
case class Atomic(module: Module) extends MaterializedValueNode { case class Atomic(module: Module) extends MaterializedValueNode {
override def toString: String = s"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)}[${module.hashCode}])" override def toString: String = f"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)}[${System.identityHashCode(module)}%08x])"
} }
case class Transform(f: Any Any, dep: MaterializedValueNode) extends MaterializedValueNode { case class Transform(f: Any Any, dep: MaterializedValueNode) extends MaterializedValueNode {
override def toString: String = s"Transform($dep)" override def toString: String = s"Transform($dep)"
} }
case object Ignore extends MaterializedValueNode case object Ignore extends MaterializedValueNode
trait Module { sealed trait Module {
def shape: Shape def shape: Shape
/** /**
@ -241,19 +254,30 @@ object StreamLayout {
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to compose().") require(that ne this, "A module cannot be added to itself. You should pass a separate instance to compose().")
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.") require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
val modules1 = if (this.isSealed) Set(this) else this.subModules val modulesLeft = if (this.isSealed) Set(this) else this.subModules
val modules2 = if (that.isSealed) Set(that) else that.subModules val modulesRight = if (that.isSealed) Set(that) else that.subModules
val matComputation1 = if (this.isSealed) Atomic(this) else this.materializedValueComputation val matCompLeft = if (this.isSealed) Atomic(this) else this.materializedValueComputation
val matComputation2 = if (that.isSealed) Atomic(that) else that.materializedValueComputation val matCompRight = if (that.isSealed) Atomic(that) else that.materializedValueComputation
val mat =
{
val comp =
if (f == scaladsl.Keep.left) {
if (IgnorableMatValComp(matCompRight)) matCompLeft else null
} else if (f == scaladsl.Keep.right) {
if (IgnorableMatValComp(matCompLeft)) matCompRight else null
} else null
if (comp == null) Combine(f.asInstanceOf[(Any, Any) Any], matCompLeft, matCompRight)
else comp
}
CompositeModule( CompositeModule(
modules1 ++ modules2, modulesLeft ++ modulesRight,
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets), AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
downstreams ++ that.downstreams, downstreams ++ that.downstreams,
upstreams ++ that.upstreams, upstreams ++ that.upstreams,
// would like to optimize away this allocation for Keep.{left,right} but that breaks side-effecting transformations mat,
Combine(f.asInstanceOf[(Any, Any) Any], matComputation1, matComputation2),
Attributes.none) Attributes.none)
} }
@ -314,7 +338,7 @@ object StreamLayout {
final override def equals(obj: scala.Any): Boolean = super.equals(obj) final override def equals(obj: scala.Any): Boolean = super.equals(obj)
} }
object EmptyModule extends Module { case object EmptyModule extends Module {
override def shape = ClosedShape override def shape = ClosedShape
override def replaceShape(s: Shape) = override def replaceShape(s: Shape) =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
@ -357,7 +381,7 @@ object StreamLayout {
override def isCopied: Boolean = true override def isCopied: Boolean = true
override def toString: String = s"$copyOf (copy)" override def toString: String = f"[${System.identityHashCode(this)}%08x] copy of $copyOf"
} }
final case class CompositeModule( final case class CompositeModule(
@ -379,13 +403,13 @@ object StreamLayout {
override def withAttributes(attributes: Attributes): Module = copy(attributes = attributes) override def withAttributes(attributes: Attributes): Module = copy(attributes = attributes)
override def toString = override def toString =
s""" f"""CompositeModule [${System.identityHashCode(this)}%08x]
| Name: ${this.attributes.nameOrDefault("unnamed")} | Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules: | Modules:
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} | ${subModules.iterator.map(m s"(${m.attributes.nameLifted.getOrElse("unnamed")}) ${m.toString.replaceAll("\n", "\n ")}").mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")} | Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")} | Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin | MatValue: $materializedValueComputation""".stripMargin
} }
object CompositeModule { object CompositeModule {
@ -414,13 +438,24 @@ object StreamLayout {
override def withAttributes(attributes: Attributes): FusedModule = copy(attributes = attributes) override def withAttributes(attributes: Attributes): FusedModule = copy(attributes = attributes)
override def toString = override def toString =
s""" f"""FusedModule [${System.identityHashCode(this)}%08x]
| Name: ${this.attributes.nameOrDefault("unnamed")} | Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules: | Modules:
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} | ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")} | Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")} | Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin | MatValue: $materializedValueComputation""".stripMargin
}
/**
* This is the only extension point for the sealed type hierarchy: composition
* (i.e. the module tree) is managed strictly within this file, only leaf nodes
* may be declared elsewhere.
*/
abstract class AtomicModule extends Module {
final override def subModules: Set[Module] = Set.empty
final override def downstreams: Map[OutPort, InPort] = super.downstreams
final override def upstreams: Map[InPort, OutPort] = super.upstreams
} }
} }
@ -718,6 +753,8 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
new ju.HashMap[InPort, AnyRef] :: Nil new ju.HashMap[InPort, AnyRef] :: Nil
private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] = private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] =
new ju.HashMap[OutPort, Publisher[Any]] :: Nil new ju.HashMap[OutPort, Publisher[Any]] :: Nil
private var matValSrcStack: List[ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]]] =
new ju.HashMap[MaterializedValueNode, List[MaterializedValueSource[Any]]] :: Nil
/* /*
* Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
@ -732,13 +769,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
private def subscribers: ju.Map[InPort, AnyRef] = subscribersStack.head private def subscribers: ju.Map[InPort, AnyRef] = subscribersStack.head
private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head
private def currentLayout: Module = moduleStack.head private def currentLayout: Module = moduleStack.head
private def matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = matValSrcStack.head
// Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
// of the same module. // of the same module.
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def enterScope(enclosing: CopiedModule): Unit = { private def enterScope(enclosing: CopiedModule): Unit = {
if (MaterializerSession.Debug) println(f"entering scope [${System.identityHashCode(enclosing)}%08x]")
subscribersStack ::= new ju.HashMap subscribersStack ::= new ju.HashMap
publishersStack ::= new ju.HashMap publishersStack ::= new ju.HashMap
matValSrcStack ::= new ju.HashMap
moduleStack ::= enclosing.copyOf moduleStack ::= enclosing.copyOf
} }
@ -747,12 +787,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// leading to port identity collisions) // leading to port identity collisions)
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def exitScope(enclosing: CopiedModule): Unit = { private def exitScope(enclosing: CopiedModule): Unit = {
if (MaterializerSession.Debug) println(f"exiting scope [${System.identityHashCode(enclosing)}%08x]")
val scopeSubscribers = subscribers val scopeSubscribers = subscribers
val scopePublishers = publishers val scopePublishers = publishers
subscribersStack = subscribersStack.tail subscribersStack = subscribersStack.tail
publishersStack = publishersStack.tail publishersStack = publishersStack.tail
matValSrcStack = matValSrcStack.tail
moduleStack = moduleStack.tail moduleStack = moduleStack.tail
if (MaterializerSession.Debug) println(s" subscribers = $scopeSubscribers\n publishers = $scopePublishers")
// When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of
// the original module and assign them to the copy ports in the outer scope that we will return to // the original module and assign them to the copy ports in the outer scope that we will return to
enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
@ -765,6 +809,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
} }
final def materialize(): Any = { final def materialize(): Any = {
if (MaterializerSession.Debug) println(s"beginning materialization of $topLevel")
require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)") require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)")
require( require(
topLevel.isRunnable, topLevel.isRunnable,
@ -789,7 +834,6 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def mergeAttributes(parent: Attributes, current: Attributes): Attributes = protected def mergeAttributes(parent: Attributes, current: Attributes): Attributes =
parent and current parent and current
private val matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = new ju.HashMap
def registerSrc(ms: MaterializedValueSource[Any]): Unit = { def registerSrc(ms: MaterializedValueSource[Any]): Unit = {
if (MaterializerSession.Debug) println(s"registering source $ms") if (MaterializerSession.Debug) println(s"registering source $ms")
matValSrc.get(ms.computation) match { matValSrc.get(ms.computation) match {
@ -801,53 +845,60 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def materializeModule(module: Module, effectiveAttributes: Attributes): Any = { protected def materializeModule(module: Module, effectiveAttributes: Attributes): Any = {
val materializedValues: ju.Map[Module, Any] = new ju.HashMap val materializedValues: ju.Map[Module, Any] = new ju.HashMap
if (MaterializerSession.Debug) println(f"entering module [${System.identityHashCode(module)}%08x] (${Logging.simpleName(module)})")
for (submodule module.subModules) { for (submodule module.subModules) {
val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes) val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes)
submodule match { submodule match {
case GraphStageModule(shape, attributes, mv: MaterializedValueSource[_]) case atomic: AtomicModule
val copy = mv.copySrc.asInstanceOf[MaterializedValueSource[Any]]
registerSrc(copy)
materializeAtomic(copy.module, subEffectiveAttributes, materializedValues)
case atomic if atomic.isAtomic
materializeAtomic(atomic, subEffectiveAttributes, materializedValues) materializeAtomic(atomic, subEffectiveAttributes, materializedValues)
case copied: CopiedModule case copied: CopiedModule
enterScope(copied) enterScope(copied)
materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes)) materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes))
exitScope(copied) exitScope(copied)
case composite case composite @ (_: CompositeModule | _: FusedModule)
materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes)) materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes))
case EmptyModule => // nothing to do or say
} }
} }
if (MaterializerSession.Debug) { if (MaterializerSession.Debug) {
println("RESOLVING") println(f"resolving module [${System.identityHashCode(module)}%08x] computation ${module.materializedValueComputation}")
println(s" module = $module")
println(s" computation = ${module.materializedValueComputation}")
println(s" matValSrc = $matValSrc") println(s" matValSrc = $matValSrc")
println(s" matVals = $materializedValues") println(s" matVals =\n ${materializedValues.asScala.map(p "%08x".format(System.identityHashCode(p._1)) -> p._2).mkString("\n ")}")
} }
resolveMaterialized(module.materializedValueComputation, materializedValues, " ")
val ret = resolveMaterialized(module.materializedValueComputation, materializedValues, 2)
while (!matValSrc.isEmpty) {
val node = matValSrc.keySet.iterator.next()
if (MaterializerSession.Debug) println(s" delayed computation of $node")
resolveMaterialized(node, materializedValues, 4)
}
if (MaterializerSession.Debug) println(f"exiting module [${System.identityHashCode(module)}%08x]")
ret
} }
protected def materializeComposite(composite: Module, effectiveAttributes: Attributes): Any = { protected def materializeComposite(composite: Module, effectiveAttributes: Attributes): Any = {
materializeModule(composite, effectiveAttributes) materializeModule(composite, effectiveAttributes)
} }
protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit
private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], indent: String): Any = { private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], spaces: Int): Any = {
if (MaterializerSession.Debug) println(indent + matNode) if (MaterializerSession.Debug) println(" " * spaces + matNode)
val ret = matNode match { val ret = matNode match {
case Atomic(m) matVal.get(m) case Atomic(m) matVal.get(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, matVal, indent + " "), resolveMaterialized(d2, matVal, indent + " ")) case Combine(f, d1, d2) f(resolveMaterialized(d1, matVal, spaces + 2), resolveMaterialized(d2, matVal, spaces + 2))
case Transform(f, d) f(resolveMaterialized(d, matVal, indent + " ")) case Transform(f, d) f(resolveMaterialized(d, matVal, spaces + 2))
case Ignore NotUsed case Ignore NotUsed
} }
if (MaterializerSession.Debug) println(indent + s"result = $ret") if (MaterializerSession.Debug) println(" " * spaces + s"result = $ret")
matValSrc.remove(matNode) match { matValSrc.remove(matNode) match {
case null // nothing to do case null // nothing to do
case srcs case srcs
if (MaterializerSession.Debug) println(indent + s"triggering sources $srcs") if (MaterializerSession.Debug) println(" " * spaces + s"triggering sources $srcs")
srcs.foreach(_.setValue(ret)) srcs.foreach(_.setValue(ret))
} }
ret ret

View file

@ -10,7 +10,7 @@ import akka.event.Logging
import akka.stream._ import akka.stream._
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module } import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module, AtomicModule }
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
@ -23,9 +23,9 @@ import scala.annotation.tailrec
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes, private[stream] final case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes,
matValIDs: Array[Module]) extends Module { matValIDs: Array[Module]) extends AtomicModule {
override def subModules: Set[Module] = Set.empty
override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr) override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr)
override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
@ -34,7 +34,12 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at
if (newShape != shape) CompositeModule(this, newShape) if (newShape != shape) CompositeModule(this, newShape)
else this else this
override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes" override def toString: String =
s"""GraphModule
| ${assembly.toString.replace("\n", "\n ")}
| shape=$shape, attributes=$attributes
| matVals=
| ${matValIDs.mkString("\n ")}""".stripMargin
} }
/** /**

View file

@ -27,7 +27,14 @@ private[stream] object Fusing {
/** /**
* Fuse everything that is not forbidden via AsyncBoundary attribute. * Fuse everything that is not forbidden via AsyncBoundary attribute.
*/ */
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = { def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] =
g match {
case fg: FusedGraph[_, _] fg
case FusedGraph(module, shape) => FusedGraph(module, shape)
case _ doAggressive(g)
}
private def doAggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = {
val struct = new BuildStructuralInfo val struct = new BuildStructuralInfo
/* /*
* First perform normalization by descending the module tree and recording * First perform normalization by descending the module tree and recording
@ -153,6 +160,7 @@ private[stream] object Fusing {
} }
pos += 1 pos += 1
case _ => throw new IllegalArgumentException("unexpected module structure")
} }
val outsB2 = new Array[Outlet[_]](insB2.size) val outsB2 = new Array[Outlet[_]](insB2.size)
@ -178,6 +186,7 @@ private[stream] object Fusing {
} }
} }
pos += 1 pos += 1
case _ => throw new IllegalArgumentException("unexpected module structure")
} }
/* /*
@ -207,7 +216,10 @@ private[stream] object Fusing {
copyToArray(outOwnersB3.iterator, outOwners, outStart) copyToArray(outOwnersB3.iterator, outOwners, outStart)
// FIXME attributes should contain some naming info and async boundary where needed // FIXME attributes should contain some naming info and async boundary where needed
val firstModule = group.iterator.next() val firstModule = group.iterator.next() match {
case c: CopiedModule => c
case _ => throw new IllegalArgumentException("unexpected module structure")
}
val async = if (isAsync(firstModule)) Attributes(AsyncBoundary) else Attributes.none val async = if (isAsync(firstModule)) Attributes(AsyncBoundary) else Attributes.none
val disp = dispatcher(firstModule) match { val disp = dispatcher(firstModule) match {
case None Attributes.none case None Attributes.none
@ -253,7 +265,7 @@ private[stream] object Fusing {
case _ if m.isAtomic true // non-GraphStage atomic or has AsyncBoundary case _ if m.isAtomic true // non-GraphStage atomic or has AsyncBoundary
case _ m.attributes.contains(AsyncBoundary) case _ m.attributes.contains(AsyncBoundary)
} }
if (Debug) log(s"entering ${m.getClass} (hash=${m.hashCode}, async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})") if (Debug) log(s"entering ${m.getClass} (hash=${struct.hash(m)}, async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})")
val localGroup = val localGroup =
if (async) struct.newGroup(indent) if (async) struct.newGroup(indent)
else openGroup else openGroup
@ -315,6 +327,7 @@ private[stream] object Fusing {
struct.registerInternals(newShape, indent) struct.registerInternals(newShape, indent)
copy copy
case _ => throw new IllegalArgumentException("unexpected module structure")
} }
val newgm = gm.copy(shape = oldShape.copyFromPorts(oldIns.toList, oldOuts.toList), matValIDs = newids) val newgm = gm.copy(shape = oldShape.copyFromPorts(oldIns.toList, oldOuts.toList), matValIDs = newids)
// make sure to add all the port mappings from old GraphModule Shape to new shape // make sure to add all the port mappings from old GraphModule Shape to new shape
@ -356,7 +369,7 @@ private[stream] object Fusing {
subMatBuilder ++= res subMatBuilder ++= res
} }
val subMat = subMatBuilder.result() val subMat = subMatBuilder.result()
if (Debug) log(subMat.map(p s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, "")) if (Debug) log(subMat.map(p s"${p._1.getClass.getName}[${struct.hash(p._1)}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, ""))
// we need to remove all wirings that this module copied from nested modules so that we // we need to remove all wirings that this module copied from nested modules so that we
// dont do wirings twice // dont do wirings twice
val oldDownstreams = m match { val oldDownstreams = m match {
@ -370,17 +383,17 @@ private[stream] object Fusing {
// now rewrite the materialized value computation based on the copied modules and their computation nodes // now rewrite the materialized value computation based on the copied modules and their computation nodes
val matNodeMapping: ju.Map[MaterializedValueNode, MaterializedValueNode] = new ju.HashMap val matNodeMapping: ju.Map[MaterializedValueNode, MaterializedValueNode] = new ju.HashMap
val newMat = rewriteMat(subMat, m.materializedValueComputation, matNodeMapping) val newMat = rewriteMat(subMat, m.materializedValueComputation, matNodeMapping)
if (Debug) log(matNodeMapping.asScala.map(p s"${p._1} -> ${p._2}").mkString("matNodeMapping\n " + " " * indent, "\n " + " " * indent, ""))
// and finally rewire all MaterializedValueSources to their new computation nodes // and finally rewire all MaterializedValueSources to their new computation nodes
val matSrcs = struct.exitMatCtx() val matSrcs = struct.exitMatCtx()
matSrcs.foreach { c matSrcs.foreach { c
if (Debug) log(s"materialized value source: ${struct.hash(c)}") val ms = c.copyOf.asInstanceOf[GraphStageModule].stage.asInstanceOf[MaterializedValueSource[Any]]
val ms = c.copyOf match {
case g: GraphStageModule g.stage.asInstanceOf[MaterializedValueSource[Any]]
}
val mapped = ms.computation match { val mapped = ms.computation match {
case Atomic(sub) subMat(sub) case Atomic(sub) subMat(sub)
case Ignore => Ignore
case other matNodeMapping.get(other) case other matNodeMapping.get(other)
} }
if (Debug) log(s"materialized value source: ${c.copyOf} -> $mapped")
require(mapped != null, s"mismatch:\n ${ms.computation}\n ${m.materializedValueComputation}") require(mapped != null, s"mismatch:\n ${ms.computation}\n ${m.materializedValueComputation}")
val newSrc = new MaterializedValueSource[Any](mapped, ms.out) val newSrc = new MaterializedValueSource[Any](mapped, ms.out)
val replacement = CopiedModule(c.shape, c.attributes, newSrc.module) val replacement = CopiedModule(c.shape, c.attributes, newSrc.module)
@ -692,7 +705,7 @@ private[stream] object Fusing {
/** /**
* Determine whether the given CopiedModule has an AsyncBoundary attribute. * Determine whether the given CopiedModule has an AsyncBoundary attribute.
*/ */
private def isAsync(m: Module): Boolean = m match { private def isAsync(m: CopiedModule): Boolean = m match {
case CopiedModule(_, inherited, orig) case CopiedModule(_, inherited, orig)
val attr = inherited and orig.attributes val attr = inherited and orig.attributes
attr.contains(AsyncBoundary) attr.contains(AsyncBoundary)

View file

@ -191,14 +191,17 @@ private[akka] object GraphInterpreter {
(inHandlers, outHandlers, logics) (inHandlers, outHandlers, logics)
} }
override def toString: String = override def toString: String = {
val stageList = stages.iterator.zip(originalAttributes.iterator).map {
case (stage, attr) s"${stage.module} [${attr.attributeList.mkString(", ")}]"
}
"GraphAssembly\n " + "GraphAssembly\n " +
stages.mkString("Stages: [", ",", "]") + "\n " + stageList.mkString("[ ", "\n ", "\n ]") + "\n " +
originalAttributes.mkString("Attributes: [", ",", "]") + "\n " + ins.mkString("[", ",", "]") + "\n " +
ins.mkString("Inlets: [", ",", "]") + "\n " + inOwners.mkString("[", ",", "]") + "\n " +
inOwners.mkString("InOwners: [", ",", "]") + "\n " + outs.mkString("[", ",", "]") + "\n " +
outs.mkString("Outlets: [", ",", "]") + "\n " + outOwners.mkString("[", ",", "]")
outOwners.mkString("OutOwners: [", ",", "]") }
} }
object GraphAssembly { object GraphAssembly {

View file

@ -24,20 +24,18 @@ import scala.util.Try
*/ */
private[akka] final case class GraphStageModule(shape: Shape, private[akka] final case class GraphStageModule(shape: Shape,
attributes: Attributes, attributes: Attributes,
stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module { stage: GraphStageWithMaterializedValue[Shape, Any]) extends AtomicModule {
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) override def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
override def replaceShape(s: Shape): Module = override def replaceShape(s: Shape): Module =
if (s != shape) CompositeModule(this, s) if (s != shape) CompositeModule(this, s)
else this else this
override def subModules: Set[Module] = Set.empty
override def withAttributes(attributes: Attributes): Module = override def withAttributes(attributes: Attributes): Module =
if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage) if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage)
else this else this
override def toString: String = stage.toString override def toString: String = f"GraphStage($stage) [${System.identityHashCode(this)}%08x]"
} }
/** /**
@ -133,6 +131,7 @@ object GraphStages {
override val initialAttributes = Attributes.name("breaker") override val initialAttributes = Attributes.name("breaker")
override val shape = FlowShape(Inlet[Any]("breaker.in"), Outlet[Any]("breaker.out")) override val shape = FlowShape(Inlet[Any]("breaker.in"), Outlet[Any]("breaker.out"))
override def toString: String = "Breaker"
override def createLogicAndMaterializedValue(attr: Attributes) = { override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Breaker] val promise = Promise[Breaker]
@ -167,6 +166,7 @@ object GraphStages {
override val shape = BidiShape( override val shape = BidiShape(
Inlet[Any]("breaker.in1"), Outlet[Any]("breaker.out1"), Inlet[Any]("breaker.in1"), Outlet[Any]("breaker.out1"),
Inlet[Any]("breaker.in2"), Outlet[Any]("breaker.out2")) Inlet[Any]("breaker.in2"), Outlet[Any]("breaker.out2"))
override def toString: String = "BidiBreaker"
override def createLogicAndMaterializedValue(attr: Attributes) = { override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Breaker] val promise = Promise[Breaker]
@ -215,21 +215,6 @@ object GraphStages {
def bidiBreaker[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]] = BidiBreaker.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]]] def bidiBreaker[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]] = BidiBreaker.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]]]
private object TickSource {
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
private val cancelPromise = Promise[Done]()
def cancelFuture: Future[Done] = cancelPromise.future
override def cancel(): Boolean = {
if (!isCancelled) cancelPromise.trySuccess(Done)
true
}
override def isCancelled: Boolean = cancelled.get()
}
}
private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] { private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] {
val in = Inlet[Any]("terminationWatcher.in") val in = Inlet[Any]("terminationWatcher.in")
val out = Outlet[Any]("terminationWatcher.out") val out = Outlet[Any]("terminationWatcher.out")
@ -269,6 +254,21 @@ object GraphStages {
def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] = def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] =
TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]] TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]]
private object TickSource {
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
private val cancelPromise = Promise[Done]()
def cancelFuture: Future[Done] = cancelPromise.future
override def cancel(): Boolean = {
if (!isCancelled) cancelPromise.trySuccess(Done)
true
}
override def isCancelled: Boolean = cancelled.get()
}
}
final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
override val shape = SourceShape(Outlet[T]("TickSource.out")) override val shape = SourceShape(Outlet[T]("TickSource.out"))
@ -302,7 +302,7 @@ object GraphStages {
(logic, cancellable) (logic, cancellable)
} }
override def toString: String = "TickSource" override def toString: String = s"TickSource($initialDelay, $interval, $tick)"
} }
/** /**

View file

@ -22,6 +22,8 @@ import scala.concurrent.{ Future, Promise }
private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
extends SinkModule[ByteString, Future[IOResult]](shape) { extends SinkModule[ByteString, Future[IOResult]](shape) {
override protected def label: String = s"FileSink($f, $options)"
override def create(context: MaterializationContext) = { override def create(context: MaterializationContext) = {
val materializer = ActorMaterializer.downcast(context.materializer) val materializer = ActorMaterializer.downcast(context.materializer)
val settings = materializer.effectiveSettings(context.effectiveAttributes) val settings = materializer.effectiveSettings(context.effectiveAttributes)
@ -68,4 +70,3 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): Module =
new OutputStreamSink(createOutput, attr, amendShape(attr), autoFlush) new OutputStreamSink(createOutput, attr, amendShape(attr), autoFlush)
} }

View file

@ -41,6 +41,8 @@ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: At
override def withAttributes(attr: Attributes): Module = override def withAttributes(attr: Attributes): Module =
new FileSource(f, chunkSize, attr, amendShape(attr)) new FileSource(f, chunkSize, attr, amendShape(attr))
override protected def label: String = s"FileSource($f, $chunkSize)"
} }
/** /**

View file

@ -28,8 +28,8 @@ private[akka] object InputStreamPublisher {
/** INTERNAL API */ /** INTERNAL API */
private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int) private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int)
extends akka.stream.actor.ActorPublisher[ByteString] extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging { with ActorLogging {
// TODO possibly de-duplicate with FilePublisher? // TODO possibly de-duplicate with FilePublisher?

View file

@ -140,7 +140,8 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
case Failed(ex) case Failed(ex)
isStageAlive = false isStageAlive = false
throw new IOException(ex) throw new IOException(ex)
case null throw new IOException("Timeout on waiting for new data") case null throw new IOException("Timeout on waiting for new data")
case Initialized throw new IllegalStateException("message 'Initialized' must come first")
} }
} catch { } catch {
case ex: InterruptedException throw new IOException(ex) case ex: InterruptedException throw new IOException(ex)
@ -215,4 +216,3 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
} }
} }
} }

View file

@ -3,7 +3,7 @@ package akka.stream.impl.io
import javax.net.ssl.SSLContext import javax.net.ssl.SSLContext
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.{ CompositeModule, Module } import akka.stream.impl.StreamLayout.{ CompositeModule, AtomicModule }
import akka.stream.TLSProtocol._ import akka.stream.TLSProtocol._
import akka.util.ByteString import akka.util.ByteString
@ -15,11 +15,10 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu
shape: Shape, attributes: Attributes, shape: Shape, attributes: Attributes,
sslContext: SSLContext, sslContext: SSLContext,
firstSession: NegotiateNewSession, firstSession: NegotiateNewSession,
role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends Module { role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule {
override def subModules: Set[Module] = Set.empty
override def withAttributes(att: Attributes): Module = copy(attributes = att) override def withAttributes(att: Attributes): TlsModule = copy(attributes = att)
override def carbonCopy: Module = override def carbonCopy: TlsModule =
TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo)
override def replaceShape(s: Shape) = override def replaceShape(s: Shape) =
@ -27,6 +26,8 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu
shape.requireSamePortsAs(s) shape.requireSamePortsAs(s)
CompositeModule(this, s) CompositeModule(this, s)
} else this } else this
override def toString: String = f"TlsModule($firstSession, $role, $closing, $hostInfo) [${System.identityHashCode(this)}%08x]"
} }
/** /**

View file

@ -75,6 +75,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
override def shape: FlowShape[In, Out] = delegate.shape override def shape: FlowShape[In, Out] = delegate.shape
private[stream] def module: StreamLayout.Module = delegate.module private[stream] def module: StreamLayout.Module = delegate.module
override def toString: String = delegate.toString
/** Converts this Flow to its Scala DSL counterpart */ /** Converts this Flow to its Scala DSL counterpart */
def asScala: scaladsl.Flow[In, Out, Mat] = delegate def asScala: scaladsl.Flow[In, Out, Mat] = delegate
@ -119,6 +121,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow. * flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaMat(flow)(combinerToScala(combine))) new Flow(delegate.viaMat(flow)(combinerToScala(combine)))
@ -158,6 +163,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink. * Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink)(combinerToScala(combine))) new Sink(delegate.toMat(sink)(combinerToScala(combine)))
@ -189,6 +197,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow. * Flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] = def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] =
RunnableGraph.fromGraph(delegate.joinMat(flow)(combinerToScala(combine))) RunnableGraph.fromGraph(delegate.joinMat(flow)(combinerToScala(combine)))
@ -228,6 +239,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]]. * [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] =
new Flow(delegate.joinMat(bidi)(combinerToScala(combine))) new Flow(delegate.joinMat(bidi)(combinerToScala(combine)))
@ -1246,6 +1260,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* *
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#concat]] * @see [[#concat]]
*/ */
def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
@ -1282,7 +1299,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* *
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
* *
* @see [[#prepend]]. * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#prepend]]
*/ */
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF))) new Flow(delegate.prependMat(that)(combinerToScala(matF)))
@ -1306,6 +1326,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#alsoTo]] * @see [[#alsoTo]]
*/ */
def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2],
@ -1349,7 +1372,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* *
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure. * If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
* *
* @see [[#interleave]]. * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]
*/ */
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int,
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
@ -1389,6 +1415,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]] * @see [[#merge]]
*/ */
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -1399,6 +1428,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams, * Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]] * @see [[#merge]]
*/ */
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -1431,6 +1463,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* waiting for elements, this merge will block when one of the inputs does not have more elements (and * waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete). * does not complete).
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#mergeSorted]]. * @see [[#mergeSorted]].
*/ */
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: Comparator[U], def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: Comparator[U],
@ -1454,12 +1489,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/** /**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples. * Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zip]] * @see [[#zip]]
*/ */
def zipMat[T, M, M2](that: Graph[SourceShape[T], M], def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.fromGraph(GraphDSL.create(that, this.viaMat(Flow.fromGraph(GraphDSL.create(that,
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] {
def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = {
val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T])
b.from(s).toInlet(zip.in1) b.from(s).toInlet(zip.in1)
@ -1487,6 +1525,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Put together the elements of current [[Flow]] and the given [[Source]] * Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function. * into a stream of combined elements using a combiner function.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipWith]] * @see [[#zipWith]]
*/ */
def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M],
@ -1792,6 +1833,9 @@ object RunnableGraph {
private final class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] { private final class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] {
def shape = ClosedShape def shape = ClosedShape
def module = runnable.module def module = runnable.module
override def toString: String = runnable.toString
override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraphAdapter[Mat2] = override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraphAdapter[Mat2] =
new RunnableGraphAdapter(runnable.mapMaterializedValue(f.apply _)) new RunnableGraphAdapter(runnable.mapMaterializedValue(f.apply _))

View file

@ -256,6 +256,8 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
override def shape: SinkShape[In] = delegate.shape override def shape: SinkShape[In] = delegate.shape
private[stream] def module: StreamLayout.Module = delegate.module private[stream] def module: StreamLayout.Module = delegate.module
override def toString: String = delegate.toString
/** Converts this Sink to its Scala DSL counterpart */ /** Converts this Sink to its Scala DSL counterpart */
def asScala: scaladsl.Sink[In, Mat] = delegate def asScala: scaladsl.Sink[In, Mat] = delegate

View file

@ -323,6 +323,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
private[stream] def module: StreamLayout.Module = delegate.module private[stream] def module: StreamLayout.Module = delegate.module
override def toString: String = delegate.toString
/** Converts this Java DSL element to its Scala DSL counterpart. */ /** Converts this Java DSL element to its Scala DSL counterpart. */
def asScala: scaladsl.Source[Out, Mat] = delegate def asScala: scaladsl.Source[Out, Mat] = delegate
@ -367,6 +369,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow. * flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaMat(flow)(combinerToScala(combine))) new Source(delegate.viaMat(flow)(combinerToScala(combine)))
@ -406,6 +411,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink. * Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] = def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] =
RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine))) RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine)))
@ -470,6 +478,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* *
* If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled. * If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#concat]]. * @see [[#concat]].
*/ */
def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -507,6 +518,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* *
* If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled. * If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#prepend]]. * @see [[#prepend]].
*/ */
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -532,6 +546,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#alsoTo]] * @see [[#alsoTo]]
*/ */
def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2], def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2],
@ -574,6 +591,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* *
* If one of sources gets upstream error - stream completes with failure. * If one of sources gets upstream error - stream completes with failure.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]. * @see [[#interleave]].
*/ */
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int,
@ -599,6 +619,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams, * Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready. * picking randomly when several elements ready.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]]. * @see [[#merge]].
*/ */
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -630,6 +653,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* waiting for elements, this merge will block when one of the inputs does not have more elements (and * waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete). * does not complete).
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#mergeSorted]]. * @see [[#mergeSorted]].
*/ */
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: util.Comparator[U], def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: util.Comparator[U],
@ -653,6 +679,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
/** /**
* Combine the elements of current [[Source]] and the given one into a stream of tuples. * Combine the elements of current [[Source]] and the given one into a stream of tuples.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zip]]. * @see [[#zip]].
*/ */
def zipMat[T, M, M2](that: Graph[SourceShape[T], M], def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
@ -679,6 +708,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Put together the elements of current [[Source]] and the given one * Put together the elements of current [[Source]] and the given one
* into a stream of combined elements using a combiner function. * into a stream of combined elements using a combiner function.
* *
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipWith]]. * @see [[#zipWith]].
*/ */
def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M], def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M],

View file

@ -6,6 +6,8 @@ package akka.stream
package object javadsl { package object javadsl {
def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) M = def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) M =
f match { f match {
case x if x eq Keep.left scaladsl.Keep.left.asInstanceOf[(M1, M2) M]
case x if x eq Keep.right scaladsl.Keep.right.asInstanceOf[(M1, M2) M]
case s: Function2[_, _, _] s.asInstanceOf[(M1, M2) M] case s: Function2[_, _, _] s.asInstanceOf[(M1, M2) M]
case other other.apply _ case other other.apply _
} }

View file

@ -30,6 +30,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]] override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]]
override def toString: String = s"Flow($shape, $module)"
override type Repr[+O] = Flow[In @uncheckedVariance, O, Mat @uncheckedVariance] override type Repr[+O] = Flow[In @uncheckedVariance, O, Mat @uncheckedVariance]
override type ReprMat[+O, +M] = Flow[In @uncheckedVariance, O, M] override type ReprMat[+O, +M] = Flow[In @uncheckedVariance, O, M]
@ -42,8 +44,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] = override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] =
if (this.isIdentity) { if (this.isIdentity) {
Flow.fromGraph(flow.asInstanceOf[Graph[FlowShape[In, T], Mat2]]) import Predef.Map.empty
.mapMaterializedValue(combine(NotUsed.asInstanceOf[Mat], _)) import StreamLayout.{ CompositeModule, Ignore, IgnorableMatValComp, Transform, Atomic, Combine }
val m = flow.module
val mat =
if (combine == Keep.left) {
if (IgnorableMatValComp(m)) Ignore else Transform(_ => NotUsed, Atomic(m))
} else Combine(combine.asInstanceOf[(Any, Any) => Any], Ignore, Atomic(m))
new Flow(CompositeModule(Set(m), m.shape, empty, empty, mat, Attributes.none))
} else { } else {
val flowCopy = flow.module.carbonCopy val flowCopy = flow.module.carbonCopy
new Flow( new Flow(
@ -86,11 +94,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink. * Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): Sink[In, Mat3] = { def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): Sink[In, Mat3] = {
if (isIdentity) if (isIdentity)
Sink.fromGraph(sink.asInstanceOf[Graph[SinkShape[In], Mat2]]) Sink.fromGraph(sink.asInstanceOf[Graph[SinkShape[In], Mat2]])
.mapMaterializedValue(combine(().asInstanceOf[Mat], _)) .mapMaterializedValue(combine(NotUsed.asInstanceOf[Mat], _))
else { else {
val sinkCopy = sink.module.carbonCopy val sinkCopy = sink.module.carbonCopy
new Sink( new Sink(
@ -132,6 +143,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow. * Flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] = { def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] = {
val flowCopy = flow.module.carbonCopy val flowCopy = flow.module.carbonCopy
@ -176,6 +190,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]]. * [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) M): Flow[I2, O2, M] = { def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) M): Flow[I2, O2, M] = {
val copy = bidi.module.carbonCopy val copy = bidi.module.carbonCopy
@ -261,8 +278,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/** Converts this Scala DSL element to it's Java DSL counterpart. */ /** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
override def toString = s"""Flow(${module})"""
} }
object Flow { object Flow {
@ -410,23 +425,23 @@ trait FlowOps[+Out, +Mat] {
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = andThen(Recover(pf)) def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = andThen(Recover(pf))
/** /**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized. * Source may be materialized.
* *
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped. * This stage can recover the failure signal, but not the skipped elements, which will be dropped.
* *
* '''Emits when''' element is available from the upstream or upstream is failed and element is available * '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source * from alternative Source
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
* '''Completes when''' upstream completes or upstream failed with exception pf can handle * '''Completes when''' upstream completes or upstream failed with exception pf can handle
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
via(new RecoverWith(pf)) via(new RecoverWith(pf))
@ -466,27 +481,27 @@ trait FlowOps[+Out, +Mat] {
def mapConcat[T](f: Out immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f) def mapConcat[T](f: Out immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f)
/** /**
* Transform each input element into an `Iterable` of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful, * then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization * which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between * the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[FlowOps.mapConcat]]. * invocations. For the stateless variant see [[FlowOps.mapConcat]].
* *
* The returned `Iterable` MUST NOT contain `null` values, * The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection * from the previously calculated collection
* *
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the * '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection * previously calculated collection
* *
* '''Completes when''' upstream completes and all remaining elements has been emitted * '''Completes when''' upstream completes and all remaining elements has been emitted
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
* See also [[FlowOps.mapConcat]] * See also [[FlowOps.mapConcat]]
*/ */
def statefulMapConcat[T](f: () Out immutable.Iterable[T]): Repr[T] = def statefulMapConcat[T](f: () Out immutable.Iterable[T]): Repr[T] =
via(new StatefulMapConcat(f)) via(new StatefulMapConcat(f))
@ -1813,6 +1828,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow. * flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): ReprMat[T, Mat3] def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): ReprMat[T, Mat3]
@ -1831,6 +1849,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* }}} * }}}
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink. * Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): ClosedMat[Mat3] def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): ClosedMat[Mat3]
@ -1838,6 +1859,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* Combine the elements of current flow and the given [[Source]] into a stream of tuples. * Combine the elements of current flow and the given [[Source]] into a stream of tuples.
* *
* @see [[#zip]]. * @see [[#zip]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[(Out, U), Mat3] = def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[(Out, U), Mat3] =
viaMat(zipGraph(that))(matF) viaMat(zipGraph(that))(matF)
@ -1847,6 +1871,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* into a stream of combined elements using a combiner function. * into a stream of combined elements using a combiner function.
* *
* @see [[#zipWith]]. * @see [[#zipWith]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): ReprMat[Out3, Mat3] = def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): ReprMat[Out3, Mat3] =
viaMat(zipWithGraph(that)(combine))(matF) viaMat(zipWithGraph(that)(combine))(matF)
@ -1856,6 +1883,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* picking randomly when several elements ready. * picking randomly when several elements ready.
* *
* @see [[#merge]]. * @see [[#merge]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] = def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(mergeGraph(that, eagerComplete))(matF) viaMat(mergeGraph(that, eagerComplete))(matF)
@ -1870,6 +1900,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* If it gets error from one of upstreams - stream completes with failure. * If it gets error from one of upstreams - stream completes with failure.
* *
* @see [[#interleave]]. * @see [[#interleave]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] = def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(interleaveGraph(that, request))(matF) viaMat(interleaveGraph(that, request))(matF)
@ -1882,6 +1915,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* does not complete). * does not complete).
* *
* @see [[#mergeSorted]]. * @see [[#mergeSorted]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3)(implicit ord: Ordering[U]): ReprMat[U, Mat3] = def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3)(implicit ord: Ordering[U]): ReprMat[U, Mat3] =
viaMat(mergeSortedGraph(that))(matF) viaMat(mergeSortedGraph(that))(matF)
@ -1897,6 +1933,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled. * If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
* *
* @see [[#concat]]. * @see [[#concat]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] = def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(concatGraph(that))(matF) viaMat(concatGraph(that))(matF)
@ -1912,6 +1951,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled. * If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
* *
* @see [[#prepend]]. * @see [[#prepend]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] = def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(prependGraph(that))(matF) viaMat(prependGraph(that))(matF)
@ -1921,6 +1963,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* through will also be sent to the [[Sink]]. * through will also be sent to the [[Sink]].
* *
* @see [[#alsoTo]] * @see [[#alsoTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] = def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF) viaMat(alsoToGraph(that))(matF)
@ -1930,6 +1975,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* The Future completes with success when received complete message from upstream or cancel * The Future completes with success when received complete message from upstream or cancel
* from downstream. It fails with the same error when received error message from * from downstream. It fails with the same error when received error message from
* downstream. * downstream.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/ */
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) Mat2): ReprMat[Out, Mat2] = def watchTermination[Mat2]()(matF: (Mat, Future[Done]) Mat2): ReprMat[Out, Mat2] =
viaMat(GraphStages.terminationWatcher)(matF) viaMat(GraphStages.terminationWatcher)(matF)

View file

@ -8,7 +8,7 @@ import akka.stream._
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.Stages.{ DefaultAttributes, StageModule} import akka.stream.impl.Stages.{ DefaultAttributes, StageModule }
import akka.stream.impl.StreamLayout._ import akka.stream.impl.StreamLayout._
import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
@ -862,6 +862,20 @@ object GraphDSL extends GraphApply {
* @return The outlet that will emit the materialized value. * @return The outlet that will emit the materialized value.
*/ */
def materializedValue: Outlet[M @uncheckedVariance] = { def materializedValue: Outlet[M @uncheckedVariance] = {
/*
* This brings the graph into a homogenous shape: if only one `add` has
* been performed so far, the moduleInProgress will be a CopiedModule
* that upon the next `composeNoMat` will be wrapped together with the
* MaterializedValueSource into a CompositeModule, leading to its
* relevant computation being an Atomic() for the CopiedModule. This is
* what we must reference, and we can only get this reference if we
* create that computation up-front: just making one up will not work
* because that computation node would not be part of the tree and
* the source would not be triggered.
*/
if (moduleInProgress.isInstanceOf[CopiedModule]) {
moduleInProgress = CompositeModule(moduleInProgress, moduleInProgress.shape)
}
val source = new MaterializedValueSource[M](moduleInProgress.materializedValueComputation) val source = new MaterializedValueSource[M](moduleInProgress.materializedValueComputation)
moduleInProgress = moduleInProgress.composeNoMat(source.module) moduleInProgress = moduleInProgress.composeNoMat(source.module)
source.out source.out

View file

@ -27,6 +27,8 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]] override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]]
override def toString: String = s"Sink($shape, $module)"
/** /**
* Transform this Sink by applying a function to each *incoming* upstream element before * Transform this Sink by applying a function to each *incoming* upstream element before
* it is passed to the [[Sink]] * it is passed to the [[Sink]]

View file

@ -38,6 +38,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]] override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]]
override def toString: String = s"Source($shape, $module)"
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = { override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = {

View file

@ -683,7 +683,21 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"), ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"),
// #20009 internal and shouldn't have been public // #20009 internal and shouldn't have been public
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion"),
// #20015 simplify materialized value computation tree
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.subModules"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.downstreams"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.upstreams"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#DirectProcessor.toString"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.Stages$StageModule"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#GroupBy.toString"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"),
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule")
) )
) )
} }