Merge pull request #20016 from akka/wip-fix-matVal-graph-RK

simplify materialized value computation tree, fixes #20015
This commit is contained in:
Roland Kuhn 2016-03-17 10:49:18 +01:00
commit 9125ca2663
30 changed files with 580 additions and 277 deletions

View file

@ -8,7 +8,6 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import akka.NotUsed
import akka.http.scaladsl.model.RequestEntity
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule }
import akka.stream.scaladsl._
@ -187,7 +186,7 @@ private[http] object StreamUtils {
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new OneTimePublisherSink[In](attributes, shape, cell)
override def withAttributes(attr: Attributes): Module =
override def withAttributes(attr: Attributes): OneTimePublisherSink[In] =
new OneTimePublisherSink[In](attr, amendShape(attr), cell)
}
/** A copy of SubscriberSource that allows access to the subscriber through the cell but can only materialized once */
@ -212,7 +211,7 @@ private[http] object StreamUtils {
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] =
new OneTimeSubscriberSource[Out](attributes, shape, cell)
override def withAttributes(attr: Attributes): Module =
override def withAttributes(attr: Attributes): OneTimeSubscriberSource[Out] =
new OneTimeSubscriberSource[Out](attr, amendShape(attr), cell)
}

View file

@ -4,7 +4,6 @@
package akka.http.javadsl.model;
import akka.http.impl.util.Util;
import akka.http.javadsl.model.headers.*;
import akka.japi.Pair;

View file

@ -11,12 +11,10 @@ import akka.stream._
class StreamLayoutSpec extends AkkaSpec {
import StreamLayout._
def testAtomic(inPortCount: Int, outPortCount: Int): Module = new Module {
def testAtomic(inPortCount: Int, outPortCount: Int): Module = new AtomicModule {
override val shape = AmorphousShape(List.fill(inPortCount)(Inlet("")), List.fill(outPortCount)(Outlet("")))
override def replaceShape(s: Shape): Module = ???
override def subModules: Set[Module] = Set.empty
override def carbonCopy: Module = ???
override def attributes: Attributes = Attributes.none
@ -174,7 +172,7 @@ class StreamLayoutSpec extends AkkaSpec {
var publishers = Vector.empty[TestPublisher]
var subscribers = Vector.empty[TestSubscriber]
override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes,
override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes,
matVal: java.util.Map[Module, Any]): Unit = {
for (inPort atomic.inPorts) {
val subscriber = TestSubscriber(atomic, inPort)

View file

@ -12,109 +12,162 @@ import akka.testkit.AkkaSpec
class GraphMatValueSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
import GraphDSL.Implicits._
"A Graph with materialized value" must {
val foldSink = Sink.fold[Int, Int](0)(_ + _)
val foldSink = Sink.fold[Int, Int](0)(_ + _)
"A Graph with materialized value" when {
"expose the materialized value as source" in {
val sub = TestSubscriber.manualProbe[Int]()
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> Sink.fromSubscriber(sub)
ClosedShape
}).run()
for (autoFusing Seq(true, false)) {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
.withAutoFusing(autoFusing)
implicit val materializer = ActorMaterializer(settings)
val r1 = Await.result(f, 3.seconds)
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
s"using autoFusing=$autoFusing" must {
r1 should ===(r2)
}
"expose the materialized value as source" in {
val sub = TestSubscriber.manualProbe[Int]()
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> Sink.fromSubscriber(sub)
ClosedShape
}).run()
"expose the materialized value as source multiple times" in {
val sub = TestSubscriber.manualProbe[Int]()
val r1 = Await.result(f, 3.seconds)
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> zip.in0
b.materializedValue.mapAsync(4)(identity) ~> zip.in1
r1 should ===(r2)
}
zip.out ~> Sink.fromSubscriber(sub)
ClosedShape
}).run()
"expose the materialized value as source multiple times" in {
val sub = TestSubscriber.manualProbe[Int]()
val r1 = Await.result(f, 3.seconds)
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
val f = RunnableGraph.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.materializedValue.mapAsync(4)(identity) ~> zip.in0
b.materializedValue.mapAsync(4)(identity) ~> zip.in1
r1 should ===(r2 / 2)
}
zip.out ~> Sink.fromSubscriber(sub)
ClosedShape
}).run()
// Exposes the materialized value as a stream value
val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
SourceShape(b.materializedValue)
})
val r1 = Await.result(f, 3.seconds)
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
"allow exposing the materialized value as port" in {
val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155)
}
r1 should ===(r2 / 2)
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}
"work properly with nesting and reusing" in {
val compositeSource1 = Source.fromGraph(GraphDSL.create(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out.mapAsync(4)(identity) ~> zip.in0
s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
SourceShape(zip.out)
})
val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out ~> zip.in0
s2.out.map(_ * 10000) ~> zip.in1
SourceShape(zip.out)
})
val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run()
Await.result(result, 3.seconds) should ===(55555555)
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(55)
Await.result(f3, 3.seconds) should ===(55)
Await.result(f4, 3.seconds) should ===(55)
}
"work also when the sources module is copied" in {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
// Exposes the materialized value as a stream value
val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source.fromGraph(GraphDSL.create(foldSink) { implicit b
fold
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
Source(1 to 10) ~> fold
SourceShape(b.materializedValue)
})
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
"allow exposing the materialized value as port" in {
val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155)
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterializedValue((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}
"work properly with nesting and reusing" in {
val compositeSource1 = Source.fromGraph(GraphDSL.create(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out.mapAsync(4)(identity) ~> zip.in0
s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
SourceShape(zip.out)
})
val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.out ~> zip.in0
s2.out.map(_ * 10000) ~> zip.in1
SourceShape(zip.out)
})
val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run()
Await.result(result, 3.seconds) should ===(55555555)
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(55)
Await.result(f3, 3.seconds) should ===(55)
Await.result(f4, 3.seconds) should ===(55)
}
"work also when the sources module is copied" in {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(foldSink) {
implicit builder
fold
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
}
"work also when the sources module is copied and the graph is extended before using the matValSrc" in {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(foldSink) {
implicit builder
fold
val map = builder.add(Flow[Future[Int]].mapAsync(4)(identity))
builder.materializedValue ~> map
FlowShape(fold.in, map.outlet)
})
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
}
"perform side-effecting transformations even when not used as source" in {
var done = false
val g = GraphDSL.create() { implicit b
import GraphDSL.Implicits._
Source.empty.mapMaterializedValue(_ done = true) ~> Sink.ignore
ClosedShape
}
val r = RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore) { implicit b
(s)
b.add(g)
Source(1 to 10) ~> s
ClosedShape
})
r.run().futureValue should ===(akka.Done)
done should ===(true)
}
"produce NotUsed when not importing materialized value" in {
val source = Source.fromGraph(GraphDSL.create() { implicit b
SourceShape(b.materializedValue)
})
source.runWith(Sink.seq).futureValue should ===(List(akka.NotUsed))
}
"produce NotUsed when starting from Flow.via" in {
Source.empty.viaMat(Flow[Int].map(_ * 2))(Keep.right).to(Sink.ignore).run() should ===(akka.NotUsed)
}
"produce NotUsed when starting from Flow.via with transformation" in {
var done = false
Source.empty.viaMat(
Flow[Int].via(Flow[Int].mapMaterializedValue(_ done = true)))(Keep.right)
.to(Sink.ignore).run() should ===(akka.NotUsed)
done should ===(true)
}
}
}
}
}

View file

@ -61,6 +61,8 @@ trait GraphApply {
private[stream] object GraphApply {
final class GraphImpl[S <: Shape, Mat](override val shape: S, private[stream] override val module: StreamLayout.Module)
extends Graph[S, Mat] {
override def toString: String = s"Graph($shape, $module)"
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GraphImpl(shape, module.withAttributes(attr))

View file

@ -31,11 +31,7 @@ object Fusing {
* implementations based on [[akka.stream.stage.GraphStage]]) and not forbidden
* via [[akka.stream.Attributes#AsyncBoundary]].
*/
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] =
g match {
case fg: FusedGraph[_, _] fg
case _ Impl.aggressive(g)
}
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = Impl.aggressive(g)
/**
* A fused graph of the right shape, containing a [[FusedModule]] which
@ -48,6 +44,14 @@ object Fusing {
override def withAttributes(attr: Attributes) = copy(module = module.withAttributes(attr))
}
object FusedGraph {
def unapply[S <: Shape, M](g: Graph[S, M]): Option[(FusedModule, S)] =
g.module match {
case f: FusedModule => Some((f, g.shape))
case _ => None
}
}
/**
* When fusing a [[Graph]] a part of the internal stage wirings are hidden within
* [[akka.stream.impl.fusing.GraphInterpreter#GraphAssembly]] objects that are

View file

@ -215,6 +215,8 @@ object ClosedShape extends ClosedShape {
* Java API: obtain ClosedShape instance
*/
def getInstance: ClosedShape = this
override def toString: String = "ClosedShape"
}
/**

View file

@ -11,7 +11,7 @@ import akka.event.Logging
import akka.dispatch.Dispatchers
import akka.pattern.ask
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.StreamLayout.{ Module, AtomicModule }
import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule }
import akka.stream.impl.io.TLSActor
import akka.stream.impl.io.TlsModule
@ -97,7 +97,8 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
name
}
override protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
if (MaterializerSession.Debug) println(s"materializing $atomic")
def newMaterializationContext() =
new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))

View file

@ -5,11 +5,12 @@ package akka.stream.impl
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.event.Logging
/**
* INTERNAL API
*/
private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module {
private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.AtomicModule {
override def replaceShape(s: Shape) =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
else this
@ -18,6 +19,6 @@ private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module {
val outPort = Outlet[Out]("Flow.out")
override val shape = new FlowShape(inPort, outPort)
override def subModules: Set[Module] = Set.empty
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
}

View file

@ -6,29 +6,30 @@ package akka.stream.impl
import akka.NotUsed
import akka.actor._
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.StreamLayout.AtomicModule
import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Promise
import akka.event.Logging
/**
* INTERNAL API
*/
private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Module {
private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule {
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat)
override def replaceShape(s: Shape): Module =
override def replaceShape(s: Shape): AtomicModule =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that")
else this
// This is okay since the only caller of this method is right below.
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]
override def carbonCopy: Module = newInstance(SourceShape(shape.out.carbonCopy()))
override def subModules: Set[Module] = Set.empty
override def carbonCopy: AtomicModule = newInstance(SourceShape(shape.out.carbonCopy()))
protected def amendShape(attr: Attributes): SourceShape[Out] = {
val thisN = attributes.nameOrDefault(null)
@ -52,7 +53,7 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] = new SubscriberSource[Out](attributes, shape)
override def withAttributes(attr: Attributes): Module = new SubscriberSource[Out](attr, amendShape(attr))
override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSource[Out](attr, amendShape(attr))
}
/**
@ -63,22 +64,26 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap
* back-pressure upstream.
*/
private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) {
override protected def label: String = s"PublisherSource($p)"
override def create(context: MaterializationContext) = (p, NotUsed)
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] = new PublisherSource[Out](p, attributes, shape)
override def withAttributes(attr: Attributes): Module = new PublisherSource[Out](p, attr, amendShape(attr))
override def withAttributes(attr: Attributes): AtomicModule = new PublisherSource[Out](p, attr, amendShape(attr))
}
/**
* INTERNAL API
*/
private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) {
override def create(context: MaterializationContext) = {
val p = Promise[Option[Out]]()
new MaybePublisher[Out](p, attributes.nameOrDefault("MaybeSource"))(context.materializer.executionContext) p
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Promise[Option[Out]]] = new MaybeSource[Out](attributes, shape)
override def withAttributes(attr: Attributes): Module = new MaybeSource(attr, amendShape(attr))
override def withAttributes(attr: Attributes): AtomicModule = new MaybeSource(attr, amendShape(attr))
}
/**
@ -95,7 +100,7 @@ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorPublisherSource[Out](props, attributes, shape)
override def withAttributes(attr: Attributes): Module = new ActorPublisherSource(props, attr, amendShape(attr))
override def withAttributes(attr: Attributes): AtomicModule = new ActorPublisherSource(props, attr, amendShape(attr))
}
/**
@ -105,6 +110,8 @@ private[akka] final class ActorRefSource[Out](
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
extends SourceModule[Out, ActorRef](shape) {
override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"
override def create(context: MaterializationContext) = {
val mat = ActorMaterializer.downcast(context.materializer)
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
@ -113,6 +120,6 @@ private[akka] final class ActorRefSource[Out](
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] =
new ActorRefSource[Out](bufferSize, overflowStrategy, attributes, shape)
override def withAttributes(attr: Attributes): Module =
override def withAttributes(attr: Attributes): AtomicModule =
new ActorRefSource(bufferSize, overflowStrategy, attr, amendShape(attr))
}

View file

@ -8,7 +8,7 @@ import akka.actor.{ ActorRef, Props }
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
@ -20,11 +20,12 @@ import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import java.util.Optional
import akka.event.Logging
/**
* INTERNAL API
*/
private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends Module {
private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule {
/**
* Create the Subscriber or VirtualPublisher that consumes the incoming
@ -35,16 +36,14 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
*/
def create(context: MaterializationContext): (AnyRef, Mat)
override def replaceShape(s: Shape): Module =
override def replaceShape(s: Shape): AtomicModule =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that")
else this
// This is okay since we the only caller of this method is right below.
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
override def carbonCopy: Module = newInstance(SinkShape(shape.in.carbonCopy()))
override def subModules: Set[Module] = Set.empty
override def carbonCopy: AtomicModule = newInstance(SinkShape(shape.in.carbonCopy()))
protected def amendShape(attr: Attributes): SinkShape[In] = {
val thisN = attributes.nameOrDefault(null)
@ -53,6 +52,10 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
if ((thatN eq null) || thisN == thatN) shape
else shape.copy(in = Inlet(thatN + ".in"))
}
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
}
/**
@ -64,8 +67,6 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
*/
private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
override def toString: String = "PublisherSink"
/*
* This method is the reason why SinkModule.create may return something that is
* not a Subscriber: a VirtualPublisher is used in order to avoid the immediate
@ -77,7 +78,7 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] = new PublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): Module = new PublisherSink[In](attr, amendShape(attr))
override def withAttributes(attr: Attributes): AtomicModule = new PublisherSink[In](attr, amendShape(attr))
}
/**
@ -100,7 +101,7 @@ private[akka] final class FanoutPublisherSink[In](
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): Module =
override def withAttributes(attr: Attributes): AtomicModule =
new FanoutPublisherSink[In](attr, amendShape(attr))
}
@ -118,8 +119,7 @@ private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkSh
}
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Done]] = new SinkholeSink(attributes, shape)
override def withAttributes(attr: Attributes): Module = new SinkholeSink(attr, amendShape(attr))
override def toString: String = "SinkholeSink"
override def withAttributes(attr: Attributes): AtomicModule = new SinkholeSink(attr, amendShape(attr))
}
/**
@ -131,8 +131,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att
override def create(context: MaterializationContext) = (subscriber, NotUsed)
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] = new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: Attributes): Module = new SubscriberSink[In](subscriber, attr, amendShape(attr))
override def toString: String = "SubscriberSink"
override def withAttributes(attr: Attributes): AtomicModule = new SubscriberSink[In](subscriber, attr, amendShape(attr))
}
/**
@ -142,8 +141,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att
private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape)
override def withAttributes(attr: Attributes): Module = new CancelSink(attr, amendShape(attr))
override def toString: String = "CancelSink"
override def withAttributes(attr: Attributes): AtomicModule = new CancelSink(attr, amendShape(attr))
}
/**
@ -159,8 +157,7 @@ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes:
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, ActorRef] = new ActorSubscriberSink[In](props, attributes, shape)
override def withAttributes(attr: Attributes): Module = new ActorSubscriberSink[In](props, attr, amendShape(attr))
override def toString: String = "ActorSubscriberSink"
override def withAttributes(attr: Attributes): AtomicModule = new ActorSubscriberSink[In](props, attr, amendShape(attr))
}
/**
@ -180,9 +177,8 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new ActorRefSink[In](ref, onCompleteMessage, attributes, shape)
override def withAttributes(attr: Attributes): Module =
override def withAttributes(attr: Attributes): AtomicModule =
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
override def toString: String = "ActorRefSink"
}
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
@ -257,6 +253,8 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV
private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
val in = Inlet[T]("seq.in")
override def toString: String = "SeqStage"
override val shape: SinkShape[T] = SinkShape.of(in)
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
@ -302,6 +300,8 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
override def initialAttributes = DefaultAttributes.queueSink
override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "QueueSink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
type Received[E] = Try[Option[E]]

View file

@ -208,6 +208,7 @@ private[stream] object Stages {
final case class GroupBy(maxSubstreams: Int, f: Any Any, attributes: Attributes = groupBy) extends StageModule {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def label: String = s"GroupBy($maxSubstreams)"
}
final case class DirectProcessor(p: () (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule {

View file

@ -19,6 +19,7 @@ import scala.collection.JavaConverters._
import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.fusing.GraphModule
import akka.event.Logging
/**
* INTERNAL API
@ -104,9 +105,21 @@ object StreamLayout {
if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems")
}
// TODO: Materialization order
// TODO: Special case linear composites
// TODO: Cycles
object IgnorableMatValComp {
def apply(comp: MaterializedValueNode): Boolean =
comp match {
case Atomic(module) IgnorableMatValComp(module)
case _: Combine | _: Transform false
case Ignore true
}
def apply(module: Module): Boolean =
module match {
case _: AtomicModule | EmptyModule true
case CopiedModule(_, _, module) IgnorableMatValComp(module)
case CompositeModule(_, _, _, _, comp, _) IgnorableMatValComp(comp)
case FusedModule(_, _, _, _, comp, _, _) IgnorableMatValComp(comp)
}
}
sealed trait MaterializedValueNode {
/*
@ -121,14 +134,14 @@ object StreamLayout {
override def toString: String = s"Combine($dep1,$dep2)"
}
case class Atomic(module: Module) extends MaterializedValueNode {
override def toString: String = s"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)}[${module.hashCode}])"
override def toString: String = f"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)}[${System.identityHashCode(module)}%08x])"
}
case class Transform(f: Any Any, dep: MaterializedValueNode) extends MaterializedValueNode {
override def toString: String = s"Transform($dep)"
}
case object Ignore extends MaterializedValueNode
trait Module {
sealed trait Module {
def shape: Shape
/**
@ -241,19 +254,30 @@ object StreamLayout {
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to compose().")
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
val modules1 = if (this.isSealed) Set(this) else this.subModules
val modules2 = if (that.isSealed) Set(that) else that.subModules
val modulesLeft = if (this.isSealed) Set(this) else this.subModules
val modulesRight = if (that.isSealed) Set(that) else that.subModules
val matComputation1 = if (this.isSealed) Atomic(this) else this.materializedValueComputation
val matComputation2 = if (that.isSealed) Atomic(that) else that.materializedValueComputation
val matCompLeft = if (this.isSealed) Atomic(this) else this.materializedValueComputation
val matCompRight = if (that.isSealed) Atomic(that) else that.materializedValueComputation
val mat =
{
val comp =
if (f == scaladsl.Keep.left) {
if (IgnorableMatValComp(matCompRight)) matCompLeft else null
} else if (f == scaladsl.Keep.right) {
if (IgnorableMatValComp(matCompLeft)) matCompRight else null
} else null
if (comp == null) Combine(f.asInstanceOf[(Any, Any) Any], matCompLeft, matCompRight)
else comp
}
CompositeModule(
modules1 ++ modules2,
modulesLeft ++ modulesRight,
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
downstreams ++ that.downstreams,
upstreams ++ that.upstreams,
// would like to optimize away this allocation for Keep.{left,right} but that breaks side-effecting transformations
Combine(f.asInstanceOf[(Any, Any) Any], matComputation1, matComputation2),
mat,
Attributes.none)
}
@ -314,7 +338,7 @@ object StreamLayout {
final override def equals(obj: scala.Any): Boolean = super.equals(obj)
}
object EmptyModule extends Module {
case object EmptyModule extends Module {
override def shape = ClosedShape
override def replaceShape(s: Shape) =
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
@ -357,7 +381,7 @@ object StreamLayout {
override def isCopied: Boolean = true
override def toString: String = s"$copyOf (copy)"
override def toString: String = f"[${System.identityHashCode(this)}%08x] copy of $copyOf"
}
final case class CompositeModule(
@ -379,13 +403,13 @@ object StreamLayout {
override def withAttributes(attributes: Attributes): Module = copy(attributes = attributes)
override def toString =
s"""
f"""CompositeModule [${System.identityHashCode(this)}%08x]
| Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules:
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")}
| ${subModules.iterator.map(m s"(${m.attributes.nameLifted.getOrElse("unnamed")}) ${m.toString.replaceAll("\n", "\n ")}").mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin
| MatValue: $materializedValueComputation""".stripMargin
}
object CompositeModule {
@ -414,13 +438,24 @@ object StreamLayout {
override def withAttributes(attributes: Attributes): FusedModule = copy(attributes = attributes)
override def toString =
s"""
| Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules:
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin
f"""FusedModule [${System.identityHashCode(this)}%08x]
| Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules:
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
| MatValue: $materializedValueComputation""".stripMargin
}
/**
* This is the only extension point for the sealed type hierarchy: composition
* (i.e. the module tree) is managed strictly within this file, only leaf nodes
* may be declared elsewhere.
*/
abstract class AtomicModule extends Module {
final override def subModules: Set[Module] = Set.empty
final override def downstreams: Map[OutPort, InPort] = super.downstreams
final override def upstreams: Map[InPort, OutPort] = super.upstreams
}
}
@ -718,6 +753,8 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
new ju.HashMap[InPort, AnyRef] :: Nil
private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] =
new ju.HashMap[OutPort, Publisher[Any]] :: Nil
private var matValSrcStack: List[ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]]] =
new ju.HashMap[MaterializedValueNode, List[MaterializedValueSource[Any]]] :: Nil
/*
* Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
@ -732,13 +769,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
private def subscribers: ju.Map[InPort, AnyRef] = subscribersStack.head
private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head
private def currentLayout: Module = moduleStack.head
private def matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = matValSrcStack.head
// Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
// of the same module.
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def enterScope(enclosing: CopiedModule): Unit = {
if (MaterializerSession.Debug) println(f"entering scope [${System.identityHashCode(enclosing)}%08x]")
subscribersStack ::= new ju.HashMap
publishersStack ::= new ju.HashMap
matValSrcStack ::= new ju.HashMap
moduleStack ::= enclosing.copyOf
}
@ -747,12 +787,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// leading to port identity collisions)
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def exitScope(enclosing: CopiedModule): Unit = {
if (MaterializerSession.Debug) println(f"exiting scope [${System.identityHashCode(enclosing)}%08x]")
val scopeSubscribers = subscribers
val scopePublishers = publishers
subscribersStack = subscribersStack.tail
publishersStack = publishersStack.tail
matValSrcStack = matValSrcStack.tail
moduleStack = moduleStack.tail
if (MaterializerSession.Debug) println(s" subscribers = $scopeSubscribers\n publishers = $scopePublishers")
// When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of
// the original module and assign them to the copy ports in the outer scope that we will return to
enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
@ -765,6 +809,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
}
final def materialize(): Any = {
if (MaterializerSession.Debug) println(s"beginning materialization of $topLevel")
require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)")
require(
topLevel.isRunnable,
@ -789,7 +834,6 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def mergeAttributes(parent: Attributes, current: Attributes): Attributes =
parent and current
private val matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = new ju.HashMap
def registerSrc(ms: MaterializedValueSource[Any]): Unit = {
if (MaterializerSession.Debug) println(s"registering source $ms")
matValSrc.get(ms.computation) match {
@ -801,53 +845,60 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def materializeModule(module: Module, effectiveAttributes: Attributes): Any = {
val materializedValues: ju.Map[Module, Any] = new ju.HashMap
if (MaterializerSession.Debug) println(f"entering module [${System.identityHashCode(module)}%08x] (${Logging.simpleName(module)})")
for (submodule module.subModules) {
val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes)
submodule match {
case GraphStageModule(shape, attributes, mv: MaterializedValueSource[_])
val copy = mv.copySrc.asInstanceOf[MaterializedValueSource[Any]]
registerSrc(copy)
materializeAtomic(copy.module, subEffectiveAttributes, materializedValues)
case atomic if atomic.isAtomic
case atomic: AtomicModule
materializeAtomic(atomic, subEffectiveAttributes, materializedValues)
case copied: CopiedModule
enterScope(copied)
materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes))
exitScope(copied)
case composite
case composite @ (_: CompositeModule | _: FusedModule)
materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes))
case EmptyModule => // nothing to do or say
}
}
if (MaterializerSession.Debug) {
println("RESOLVING")
println(s" module = $module")
println(s" computation = ${module.materializedValueComputation}")
println(f"resolving module [${System.identityHashCode(module)}%08x] computation ${module.materializedValueComputation}")
println(s" matValSrc = $matValSrc")
println(s" matVals = $materializedValues")
println(s" matVals =\n ${materializedValues.asScala.map(p "%08x".format(System.identityHashCode(p._1)) -> p._2).mkString("\n ")}")
}
resolveMaterialized(module.materializedValueComputation, materializedValues, " ")
val ret = resolveMaterialized(module.materializedValueComputation, materializedValues, 2)
while (!matValSrc.isEmpty) {
val node = matValSrc.keySet.iterator.next()
if (MaterializerSession.Debug) println(s" delayed computation of $node")
resolveMaterialized(node, materializedValues, 4)
}
if (MaterializerSession.Debug) println(f"exiting module [${System.identityHashCode(module)}%08x]")
ret
}
protected def materializeComposite(composite: Module, effectiveAttributes: Attributes): Any = {
materializeModule(composite, effectiveAttributes)
}
protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit
protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit
private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], indent: String): Any = {
if (MaterializerSession.Debug) println(indent + matNode)
private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], spaces: Int): Any = {
if (MaterializerSession.Debug) println(" " * spaces + matNode)
val ret = matNode match {
case Atomic(m) matVal.get(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, matVal, indent + " "), resolveMaterialized(d2, matVal, indent + " "))
case Transform(f, d) f(resolveMaterialized(d, matVal, indent + " "))
case Combine(f, d1, d2) f(resolveMaterialized(d1, matVal, spaces + 2), resolveMaterialized(d2, matVal, spaces + 2))
case Transform(f, d) f(resolveMaterialized(d, matVal, spaces + 2))
case Ignore NotUsed
}
if (MaterializerSession.Debug) println(indent + s"result = $ret")
if (MaterializerSession.Debug) println(" " * spaces + s"result = $ret")
matValSrc.remove(matNode) match {
case null // nothing to do
case srcs
if (MaterializerSession.Debug) println(indent + s"triggering sources $srcs")
if (MaterializerSession.Debug) println(" " * spaces + s"triggering sources $srcs")
srcs.foreach(_.setValue(ret))
}
ret

View file

@ -10,7 +10,7 @@ import akka.event.Logging
import akka.stream._
import akka.stream.impl._
import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module }
import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module, AtomicModule }
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.{ Subscriber, Subscription }
@ -23,9 +23,9 @@ import scala.annotation.tailrec
/**
* INTERNAL API
*/
private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes,
matValIDs: Array[Module]) extends Module {
override def subModules: Set[Module] = Set.empty
private[stream] final case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes,
matValIDs: Array[Module]) extends AtomicModule {
override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr)
override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
@ -34,7 +34,12 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at
if (newShape != shape) CompositeModule(this, newShape)
else this
override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes"
override def toString: String =
s"""GraphModule
| ${assembly.toString.replace("\n", "\n ")}
| shape=$shape, attributes=$attributes
| matVals=
| ${matValIDs.mkString("\n ")}""".stripMargin
}
/**

View file

@ -27,7 +27,14 @@ private[stream] object Fusing {
/**
* Fuse everything that is not forbidden via AsyncBoundary attribute.
*/
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = {
def aggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] =
g match {
case fg: FusedGraph[_, _] fg
case FusedGraph(module, shape) => FusedGraph(module, shape)
case _ doAggressive(g)
}
private def doAggressive[S <: Shape, M](g: Graph[S, M]): FusedGraph[S, M] = {
val struct = new BuildStructuralInfo
/*
* First perform normalization by descending the module tree and recording
@ -153,6 +160,7 @@ private[stream] object Fusing {
}
pos += 1
case _ => throw new IllegalArgumentException("unexpected module structure")
}
val outsB2 = new Array[Outlet[_]](insB2.size)
@ -178,6 +186,7 @@ private[stream] object Fusing {
}
}
pos += 1
case _ => throw new IllegalArgumentException("unexpected module structure")
}
/*
@ -207,7 +216,10 @@ private[stream] object Fusing {
copyToArray(outOwnersB3.iterator, outOwners, outStart)
// FIXME attributes should contain some naming info and async boundary where needed
val firstModule = group.iterator.next()
val firstModule = group.iterator.next() match {
case c: CopiedModule => c
case _ => throw new IllegalArgumentException("unexpected module structure")
}
val async = if (isAsync(firstModule)) Attributes(AsyncBoundary) else Attributes.none
val disp = dispatcher(firstModule) match {
case None Attributes.none
@ -253,7 +265,7 @@ private[stream] object Fusing {
case _ if m.isAtomic true // non-GraphStage atomic or has AsyncBoundary
case _ m.attributes.contains(AsyncBoundary)
}
if (Debug) log(s"entering ${m.getClass} (hash=${m.hashCode}, async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})")
if (Debug) log(s"entering ${m.getClass} (hash=${struct.hash(m)}, async=$async, name=${m.attributes.nameLifted}, dispatcher=${dispatcher(m)})")
val localGroup =
if (async) struct.newGroup(indent)
else openGroup
@ -315,6 +327,7 @@ private[stream] object Fusing {
struct.registerInternals(newShape, indent)
copy
case _ => throw new IllegalArgumentException("unexpected module structure")
}
val newgm = gm.copy(shape = oldShape.copyFromPorts(oldIns.toList, oldOuts.toList), matValIDs = newids)
// make sure to add all the port mappings from old GraphModule Shape to new shape
@ -356,7 +369,7 @@ private[stream] object Fusing {
subMatBuilder ++= res
}
val subMat = subMatBuilder.result()
if (Debug) log(subMat.map(p s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, ""))
if (Debug) log(subMat.map(p s"${p._1.getClass.getName}[${struct.hash(p._1)}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, ""))
// we need to remove all wirings that this module copied from nested modules so that we
// dont do wirings twice
val oldDownstreams = m match {
@ -370,17 +383,17 @@ private[stream] object Fusing {
// now rewrite the materialized value computation based on the copied modules and their computation nodes
val matNodeMapping: ju.Map[MaterializedValueNode, MaterializedValueNode] = new ju.HashMap
val newMat = rewriteMat(subMat, m.materializedValueComputation, matNodeMapping)
if (Debug) log(matNodeMapping.asScala.map(p s"${p._1} -> ${p._2}").mkString("matNodeMapping\n " + " " * indent, "\n " + " " * indent, ""))
// and finally rewire all MaterializedValueSources to their new computation nodes
val matSrcs = struct.exitMatCtx()
matSrcs.foreach { c
if (Debug) log(s"materialized value source: ${struct.hash(c)}")
val ms = c.copyOf match {
case g: GraphStageModule g.stage.asInstanceOf[MaterializedValueSource[Any]]
}
val ms = c.copyOf.asInstanceOf[GraphStageModule].stage.asInstanceOf[MaterializedValueSource[Any]]
val mapped = ms.computation match {
case Atomic(sub) subMat(sub)
case Ignore => Ignore
case other matNodeMapping.get(other)
}
if (Debug) log(s"materialized value source: ${c.copyOf} -> $mapped")
require(mapped != null, s"mismatch:\n ${ms.computation}\n ${m.materializedValueComputation}")
val newSrc = new MaterializedValueSource[Any](mapped, ms.out)
val replacement = CopiedModule(c.shape, c.attributes, newSrc.module)
@ -692,7 +705,7 @@ private[stream] object Fusing {
/**
* Determine whether the given CopiedModule has an AsyncBoundary attribute.
*/
private def isAsync(m: Module): Boolean = m match {
private def isAsync(m: CopiedModule): Boolean = m match {
case CopiedModule(_, inherited, orig)
val attr = inherited and orig.attributes
attr.contains(AsyncBoundary)

View file

@ -191,14 +191,17 @@ private[akka] object GraphInterpreter {
(inHandlers, outHandlers, logics)
}
override def toString: String =
override def toString: String = {
val stageList = stages.iterator.zip(originalAttributes.iterator).map {
case (stage, attr) s"${stage.module} [${attr.attributeList.mkString(", ")}]"
}
"GraphAssembly\n " +
stages.mkString("Stages: [", ",", "]") + "\n " +
originalAttributes.mkString("Attributes: [", ",", "]") + "\n " +
ins.mkString("Inlets: [", ",", "]") + "\n " +
inOwners.mkString("InOwners: [", ",", "]") + "\n " +
outs.mkString("Outlets: [", ",", "]") + "\n " +
outOwners.mkString("OutOwners: [", ",", "]")
stageList.mkString("[ ", "\n ", "\n ]") + "\n " +
ins.mkString("[", ",", "]") + "\n " +
inOwners.mkString("[", ",", "]") + "\n " +
outs.mkString("[", ",", "]") + "\n " +
outOwners.mkString("[", ",", "]")
}
}
object GraphAssembly {

View file

@ -24,20 +24,18 @@ import scala.util.Try
*/
private[akka] final case class GraphStageModule(shape: Shape,
attributes: Attributes,
stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module {
stage: GraphStageWithMaterializedValue[Shape, Any]) extends AtomicModule {
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
override def replaceShape(s: Shape): Module =
if (s != shape) CompositeModule(this, s)
else this
override def subModules: Set[Module] = Set.empty
override def withAttributes(attributes: Attributes): Module =
if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage)
else this
override def toString: String = stage.toString
override def toString: String = f"GraphStage($stage) [${System.identityHashCode(this)}%08x]"
}
/**
@ -133,6 +131,7 @@ object GraphStages {
override val initialAttributes = Attributes.name("breaker")
override val shape = FlowShape(Inlet[Any]("breaker.in"), Outlet[Any]("breaker.out"))
override def toString: String = "Breaker"
override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Breaker]
@ -167,6 +166,7 @@ object GraphStages {
override val shape = BidiShape(
Inlet[Any]("breaker.in1"), Outlet[Any]("breaker.out1"),
Inlet[Any]("breaker.in2"), Outlet[Any]("breaker.out2"))
override def toString: String = "BidiBreaker"
override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Breaker]
@ -215,21 +215,6 @@ object GraphStages {
def bidiBreaker[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]] = BidiBreaker.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]]]
private object TickSource {
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
private val cancelPromise = Promise[Done]()
def cancelFuture: Future[Done] = cancelPromise.future
override def cancel(): Boolean = {
if (!isCancelled) cancelPromise.trySuccess(Done)
true
}
override def isCancelled: Boolean = cancelled.get()
}
}
private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] {
val in = Inlet[Any]("terminationWatcher.in")
val out = Outlet[Any]("terminationWatcher.out")
@ -269,6 +254,21 @@ object GraphStages {
def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] =
TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]]
private object TickSource {
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
private val cancelPromise = Promise[Done]()
def cancelFuture: Future[Done] = cancelPromise.future
override def cancel(): Boolean = {
if (!isCancelled) cancelPromise.trySuccess(Done)
true
}
override def isCancelled: Boolean = cancelled.get()
}
}
final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
override val shape = SourceShape(Outlet[T]("TickSource.out"))
@ -302,7 +302,7 @@ object GraphStages {
(logic, cancellable)
}
override def toString: String = "TickSource"
override def toString: String = s"TickSource($initialDelay, $interval, $tick)"
}
/**

View file

@ -22,6 +22,8 @@ import scala.concurrent.{ Future, Promise }
private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
extends SinkModule[ByteString, Future[IOResult]](shape) {
override protected def label: String = s"FileSink($f, $options)"
override def create(context: MaterializationContext) = {
val materializer = ActorMaterializer.downcast(context.materializer)
val settings = materializer.effectiveSettings(context.effectiveAttributes)
@ -68,4 +70,3 @@ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, va
override def withAttributes(attr: Attributes): Module =
new OutputStreamSink(createOutput, attr, amendShape(attr), autoFlush)
}

View file

@ -41,6 +41,8 @@ private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: At
override def withAttributes(attr: Attributes): Module =
new FileSource(f, chunkSize, attr, amendShape(attr))
override protected def label: String = s"FileSource($f, $chunkSize)"
}
/**

View file

@ -140,7 +140,8 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
case Failed(ex)
isStageAlive = false
throw new IOException(ex)
case null throw new IOException("Timeout on waiting for new data")
case null throw new IOException("Timeout on waiting for new data")
case Initialized throw new IllegalStateException("message 'Initialized' must come first")
}
} catch {
case ex: InterruptedException throw new IOException(ex)
@ -215,4 +216,3 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
}
}
}

View file

@ -3,7 +3,7 @@ package akka.stream.impl.io
import javax.net.ssl.SSLContext
import akka.stream._
import akka.stream.impl.StreamLayout.{ CompositeModule, Module }
import akka.stream.impl.StreamLayout.{ CompositeModule, AtomicModule }
import akka.stream.TLSProtocol._
import akka.util.ByteString
@ -15,11 +15,10 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu
shape: Shape, attributes: Attributes,
sslContext: SSLContext,
firstSession: NegotiateNewSession,
role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends Module {
override def subModules: Set[Module] = Set.empty
role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule {
override def withAttributes(att: Attributes): Module = copy(attributes = att)
override def carbonCopy: Module =
override def withAttributes(att: Attributes): TlsModule = copy(attributes = att)
override def carbonCopy: TlsModule =
TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo)
override def replaceShape(s: Shape) =
@ -27,6 +26,8 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu
shape.requireSamePortsAs(s)
CompositeModule(this, s)
} else this
override def toString: String = f"TlsModule($firstSession, $role, $closing, $hostInfo) [${System.identityHashCode(this)}%08x]"
}
/**

View file

@ -75,6 +75,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
override def shape: FlowShape[In, Out] = delegate.shape
private[stream] def module: StreamLayout.Module = delegate.module
override def toString: String = delegate.toString
/** Converts this Flow to its Scala DSL counterpart */
def asScala: scaladsl.Flow[In, Out, Mat] = delegate
@ -119,6 +121,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaMat(flow)(combinerToScala(combine)))
@ -158,6 +163,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink)(combinerToScala(combine)))
@ -189,6 +197,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] =
RunnableGraph.fromGraph(delegate.joinMat(flow)(combinerToScala(combine)))
@ -228,6 +239,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2], combine: function.Function2[Mat, Mat2, M]): Flow[I2, O2, M] =
new Flow(delegate.joinMat(bidi)(combinerToScala(combine)))
@ -1246,6 +1260,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#concat]]
*/
def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
@ -1282,7 +1299,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* @see [[#prepend]].
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#prepend]]
*/
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.prependMat(that)(combinerToScala(matF)))
@ -1306,6 +1326,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#alsoTo]]
*/
def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2],
@ -1349,7 +1372,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* If this [[Flow]] or [[Source]] gets upstream error - stream completes with failure.
*
* @see [[#interleave]].
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]]
*/
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int,
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
@ -1389,6 +1415,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]]
*/
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -1399,6 +1428,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]]
*/
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -1431,6 +1463,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete).
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#mergeSorted]].
*/
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: Comparator[U],
@ -1454,12 +1489,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/**
* Combine the elements of current [[Flow]] and the given [[Source]] into a stream of tuples.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zip]]
*/
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.fromGraph(GraphDSL.create(that,
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] {
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] {
def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = {
val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T])
b.from(s).toInlet(zip.in1)
@ -1487,6 +1525,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* Put together the elements of current [[Flow]] and the given [[Source]]
* into a stream of combined elements using a combiner function.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipWith]]
*/
def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M],
@ -1792,6 +1833,9 @@ object RunnableGraph {
private final class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] {
def shape = ClosedShape
def module = runnable.module
override def toString: String = runnable.toString
override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraphAdapter[Mat2] =
new RunnableGraphAdapter(runnable.mapMaterializedValue(f.apply _))

View file

@ -256,6 +256,8 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
override def shape: SinkShape[In] = delegate.shape
private[stream] def module: StreamLayout.Module = delegate.module
override def toString: String = delegate.toString
/** Converts this Sink to its Scala DSL counterpart */
def asScala: scaladsl.Sink[In, Mat] = delegate

View file

@ -323,6 +323,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
private[stream] def module: StreamLayout.Module = delegate.module
override def toString: String = delegate.toString
/** Converts this Java DSL element to its Scala DSL counterpart. */
def asScala: scaladsl.Source[Out, Mat] = delegate
@ -367,6 +369,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaMat(flow)(combinerToScala(combine)))
@ -406,6 +411,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] =
RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine)))
@ -470,6 +478,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*
* If this [[Source]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#concat]].
*/
def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -507,6 +518,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*
* If the given [[Source]] gets upstream error - no elements from this [[Source]] will be pulled.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#prepend]].
*/
def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -532,6 +546,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Attaches the given [[Sink]] to this [[Flow]], meaning that elements that passes
* through will also be sent to the [[Sink]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#alsoTo]]
*/
def alsoToMat[M2, M3](that: Graph[SinkShape[Out], M2],
@ -574,6 +591,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*
* If one of sources gets upstream error - stream completes with failure.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#interleave]].
*/
def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int,
@ -599,6 +619,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Merge the given [[Source]] to the current one, taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#merge]].
*/
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
@ -630,6 +653,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* waiting for elements, this merge will block when one of the inputs does not have more elements (and
* does not complete).
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#mergeSorted]].
*/
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: util.Comparator[U],
@ -653,6 +679,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
/**
* Combine the elements of current [[Source]] and the given one into a stream of tuples.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zip]].
*/
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
@ -679,6 +708,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* Put together the elements of current [[Source]] and the given one
* into a stream of combined elements using a combiner function.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*
* @see [[#zipWith]].
*/
def zipWithMat[Out2, Out3, M, M2](that: Graph[SourceShape[Out2], M],

View file

@ -6,6 +6,8 @@ package akka.stream
package object javadsl {
def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) M =
f match {
case x if x eq Keep.left scaladsl.Keep.left.asInstanceOf[(M1, M2) M]
case x if x eq Keep.right scaladsl.Keep.right.asInstanceOf[(M1, M2) M]
case s: Function2[_, _, _] s.asInstanceOf[(M1, M2) M]
case other other.apply _
}

View file

@ -30,6 +30,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]]
override def toString: String = s"Flow($shape, $module)"
override type Repr[+O] = Flow[In @uncheckedVariance, O, Mat @uncheckedVariance]
override type ReprMat[+O, +M] = Flow[In @uncheckedVariance, O, M]
@ -42,8 +44,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Flow[In, T, Mat3] =
if (this.isIdentity) {
Flow.fromGraph(flow.asInstanceOf[Graph[FlowShape[In, T], Mat2]])
.mapMaterializedValue(combine(NotUsed.asInstanceOf[Mat], _))
import Predef.Map.empty
import StreamLayout.{ CompositeModule, Ignore, IgnorableMatValComp, Transform, Atomic, Combine }
val m = flow.module
val mat =
if (combine == Keep.left) {
if (IgnorableMatValComp(m)) Ignore else Transform(_ => NotUsed, Atomic(m))
} else Combine(combine.asInstanceOf[(Any, Any) => Any], Ignore, Atomic(m))
new Flow(CompositeModule(Set(m), m.shape, empty, empty, mat, Attributes.none))
} else {
val flowCopy = flow.module.carbonCopy
new Flow(
@ -86,11 +94,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): Sink[In, Mat3] = {
if (isIdentity)
Sink.fromGraph(sink.asInstanceOf[Graph[SinkShape[In], Mat2]])
.mapMaterializedValue(combine(().asInstanceOf[Mat], _))
.mapMaterializedValue(combine(NotUsed.asInstanceOf[Mat], _))
else {
val sinkCopy = sink.module.carbonCopy
new Sink(
@ -132,6 +143,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] = {
val flowCopy = flow.module.carbonCopy
@ -176,6 +190,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) M): Flow[I2, O2, M] = {
val copy = bidi.module.carbonCopy
@ -261,8 +278,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
override def toString = s"""Flow(${module})"""
}
object Flow {
@ -410,23 +425,23 @@ trait FlowOps[+Out, +Mat] {
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = andThen(Recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
via(new RecoverWith(pf))
@ -466,27 +481,27 @@ trait FlowOps[+Out, +Mat] {
def mapConcat[T](f: Out immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f)
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[FlowOps.mapConcat]].
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.mapConcat]]
*/
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[FlowOps.mapConcat]].
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.mapConcat]]
*/
def statefulMapConcat[T](f: () Out immutable.Iterable[T]): Repr[T] =
via(new StatefulMapConcat(f))
@ -1813,6 +1828,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): ReprMat[T, Mat3]
@ -1831,6 +1849,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): ClosedMat[Mat3]
@ -1838,6 +1859,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* Combine the elements of current flow and the given [[Source]] into a stream of tuples.
*
* @see [[#zip]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def zipMat[U, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[(Out, U), Mat3] =
viaMat(zipGraph(that))(matF)
@ -1847,6 +1871,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* into a stream of combined elements using a combiner function.
*
* @see [[#zipWith]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def zipWithMat[Out2, Out3, Mat2, Mat3](that: Graph[SourceShape[Out2], Mat2])(combine: (Out, Out2) Out3)(matF: (Mat, Mat2) Mat3): ReprMat[Out3, Mat3] =
viaMat(zipWithGraph(that)(combine))(matF)
@ -1856,6 +1883,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* picking randomly when several elements ready.
*
* @see [[#merge]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(mergeGraph(that, eagerComplete))(matF)
@ -1870,6 +1900,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* If it gets error from one of upstreams - stream completes with failure.
*
* @see [[#interleave]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def interleaveMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], request: Int)(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(interleaveGraph(that, request))(matF)
@ -1882,6 +1915,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* does not complete).
*
* @see [[#mergeSorted]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3)(implicit ord: Ordering[U]): ReprMat[U, Mat3] =
viaMat(mergeSortedGraph(that))(matF)
@ -1897,6 +1933,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* If this [[Flow]] gets upstream error - no elements from the given [[Source]] will be pulled.
*
* @see [[#concat]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def concatMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(concatGraph(that))(matF)
@ -1912,6 +1951,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* If the given [[Source]] gets upstream error - no elements from this [[Flow]] will be pulled.
*
* @see [[#prepend]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def prependMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[U, Mat3] =
viaMat(prependGraph(that))(matF)
@ -1921,6 +1963,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* through will also be sent to the [[Sink]].
*
* @see [[#alsoTo]]
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF)
@ -1930,6 +1975,9 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
* The Future completes with success when received complete message from upstream or cancel
* from downstream. It fails with the same error when received error message from
* downstream.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) Mat2): ReprMat[Out, Mat2] =
viaMat(GraphStages.terminationWatcher)(matF)

View file

@ -8,7 +8,7 @@ import akka.stream._
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.Stages.{ DefaultAttributes, StageModule}
import akka.stream.impl.Stages.{ DefaultAttributes, StageModule }
import akka.stream.impl.StreamLayout._
import akka.stream.scaladsl.Partition.PartitionOutOfBoundsException
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
@ -862,6 +862,20 @@ object GraphDSL extends GraphApply {
* @return The outlet that will emit the materialized value.
*/
def materializedValue: Outlet[M @uncheckedVariance] = {
/*
* This brings the graph into a homogenous shape: if only one `add` has
* been performed so far, the moduleInProgress will be a CopiedModule
* that upon the next `composeNoMat` will be wrapped together with the
* MaterializedValueSource into a CompositeModule, leading to its
* relevant computation being an Atomic() for the CopiedModule. This is
* what we must reference, and we can only get this reference if we
* create that computation up-front: just making one up will not work
* because that computation node would not be part of the tree and
* the source would not be triggered.
*/
if (moduleInProgress.isInstanceOf[CopiedModule]) {
moduleInProgress = CompositeModule(moduleInProgress, moduleInProgress.shape)
}
val source = new MaterializedValueSource[M](moduleInProgress.materializedValueComputation)
moduleInProgress = moduleInProgress.composeNoMat(source.module)
source.out

View file

@ -27,6 +27,8 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]]
override def toString: String = s"Sink($shape, $module)"
/**
* Transform this Sink by applying a function to each *incoming* upstream element before
* it is passed to the [[Sink]]

View file

@ -38,6 +38,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
override val shape: SourceShape[Out] = module.shape.asInstanceOf[SourceShape[Out]]
override def toString: String = s"Source($shape, $module)"
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = {

View file

@ -710,7 +710,21 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"),
// #20009 internal and shouldn't have been public
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion"),
// #20015 simplify materialized value computation tree
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.subModules"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.downstreams"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.upstreams"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#DirectProcessor.toString"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.Stages$StageModule"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#GroupBy.toString"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"),
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule")
)
)
}