diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index 7296b5cfa1..ae66bd5c6a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -5,8 +5,9 @@ package akka.stream.impl import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec -import org.reactivestreams.{ Processor, Subscription, Subscriber, Publisher } +import org.reactivestreams.{ Subscription, Subscriber, Publisher } import akka.stream._ +import akka.event.Logging.simpleName class StreamLayoutSpec extends AkkaSpec { import StreamLayout._ @@ -40,7 +41,7 @@ class StreamLayoutSpec extends AkkaSpec { stage1.isSource should be(false) val stage2 = testStage() - val flow12 = stage1.grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head) + val flow12 = stage1.compose(stage2, Keep.none).wire(stage1.outPorts.head, stage2.inPorts.head) flow12.inPorts should be(stage1.inPorts) flow12.outPorts should be(stage2.outPorts) @@ -65,7 +66,7 @@ class StreamLayoutSpec extends AkkaSpec { sink3.isSink should be(true) sink3.isSource should be(false) - val source012 = source0.grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head) + val source012 = source0.compose(flow12, Keep.none).wire(source0.outPorts.head, flow12.inPorts.head) source012.inPorts.size should be(0) source012.outPorts should be(flow12.outPorts) source012.isRunnable should be(false) @@ -73,7 +74,7 @@ class StreamLayoutSpec extends AkkaSpec { source012.isSink should be(false) source012.isSource should be(true) - val sink123 = flow12.grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head) + val sink123 = flow12.compose(sink3, Keep.none).wire(flow12.outPorts.head, sink3.inPorts.head) sink123.inPorts should be(flow12.inPorts) sink123.outPorts.size should be(0) sink123.isRunnable should be(false) @@ -81,13 +82,13 @@ class StreamLayoutSpec extends AkkaSpec { sink123.isSink should be(true) sink123.isSource should be(false) - val runnable0123a = source0.grow(sink123, Keep.none).connect(source0.outPorts.head, sink123.inPorts.head) - val runnable0123b = source012.grow(sink3, Keep.none).connect(source012.outPorts.head, sink3.inPorts.head) + val runnable0123a = source0.compose(sink123, Keep.none).wire(source0.outPorts.head, sink123.inPorts.head) + val runnable0123b = source012.compose(sink3, Keep.none).wire(source012.outPorts.head, sink3.inPorts.head) val runnable0123c = source0 - .grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head) - .grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head) + .compose(flow12, Keep.none).wire(source0.outPorts.head, flow12.inPorts.head) + .compose(sink3, Keep.none).wire(flow12.outPorts.head, sink3.inPorts.head) runnable0123a.inPorts.size should be(0) runnable0123a.outPorts.size should be(0) @@ -111,9 +112,9 @@ class StreamLayoutSpec extends AkkaSpec { val stage2 = testStage() val sink = testSink() - val runnable = source.grow(stage1, Keep.none).connect(source.outPorts.head, stage1.inPorts.head) - .grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head) - .grow(sink, Keep.none).connect(stage2.outPorts.head, sink.inPorts.head) + val runnable = source.compose(stage1, Keep.none).wire(source.outPorts.head, stage1.inPorts.head) + .compose(stage2, Keep.none).wire(stage1.outPorts.head, stage2.inPorts.head) + .compose(sink, Keep.none).wire(stage2.outPorts.head, sink.inPorts.head) checkMaterialized(runnable) } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index d6ea9b1405..a703e5ed4b 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -28,7 +28,7 @@ trait GraphApply { def partial[S <: Shape]()(buildBlock: FlowGraph.Builder[Unit] ⇒ S): Graph[S, Unit] = { val builder = new FlowGraph.Builder val s = buildBlock(builder) - val mod = builder.module.wrap().replaceShape(s) + val mod = builder.module.nest().replaceShape(s) new GraphApply.GraphImpl(s, mod) } @@ -56,7 +56,7 @@ trait GraphApply { val builder = new FlowGraph.Builder val s1 = builder.add(g1) val s = buildBlock(builder)(s1) - val mod = builder.module.wrap().replaceShape(s) + val mod = builder.module.nest().replaceShape(s) new GraphApply.GraphImpl(s, mod) } @@ -91,7 +91,7 @@ trait GraphApply { [2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val s = buildBlock(builder)([#s1#]) - val mod = builder.module.wrap().replaceShape(s) + val mod = builder.module.nest().replaceShape(s) new GraphApply.GraphImpl(s, mod) }# @@ -109,7 +109,7 @@ private[stream] object GraphApply { extends Graph[S, Mat] { override def withAttributes(attr: Attributes): Graph[S, Mat] = - new GraphImpl(shape, module.withAttributes(attr).wrap()) + new GraphImpl(shape, module.withAttributes(attr).nest()) override def named(name: String): Graph[S, Mat] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index fdf25d28c7..821983f593 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -31,7 +31,7 @@ class ZipWith1[[#A1#], O] private[stream] (override val shape: FanInShape1[[#A1# extends Graph[FanInShape1[[#A1#], O], Unit] { override def withAttributes(attr: Attributes): ZipWith1[[#A1#], O] = - new ZipWith1(shape, module.withAttributes(attr).wrap()) + new ZipWith1(shape, module.withAttributes(attr).nest()) override def named(name: String): ZipWith1[[#A1#], O] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 26c3a7ce58..254ee46aac 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -129,12 +129,11 @@ object ActorMaterializer { * INTERNAL API */ private[akka] def downcast(materializer: Materializer): ActorMaterializer = - materializer match { + materializer match { //FIXME this method is going to cause trouble for other Materializer implementations case m: ActorMaterializer ⇒ m case _ ⇒ throw new IllegalArgumentException(s"required [${classOf[ActorMaterializer].getName}] " + s"but got [${materializer.getClass.getName}]") } - } /** diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 3eb0f7e76d..1732717140 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -5,6 +5,7 @@ package akka.stream import akka.event.Logging +import scala.annotation.tailrec import scala.collection.immutable import akka.stream.impl.Stages.StageModule import akka.japi.function @@ -63,38 +64,37 @@ final case class Attributes private (attributeList: immutable.Seq[Attributes.Att /** * INTERNAL API */ - private[akka] def nameLifted: Option[String] = - if (attributeList.isEmpty) - None - else { - val sb = new java.lang.StringBuilder - val iter = attributeList.iterator - while (iter.hasNext) { - iter.next() match { - case Name(name) ⇒ - if (sb.length == 0) sb.append(name) - else sb.append("-").append(name) - case _ ⇒ + private[akka] def nameLifted: Option[String] = Option(nameOrDefault(null)) + + /** + * INTERNAL API + */ + private[akka] def nameOrDefault(default: String = "unknown-operation"): String = { + @tailrec def concatNames(i: Iterator[Attribute], first: String, buf: StringBuilder): String = + if (i.hasNext) + i.next() match { + case Name(n) ⇒ + if (buf ne null) concatNames(i, null, buf.append('-').append(n)) + else if (first ne null) { + val b = new StringBuilder( + (first.length() + n.length()) match { + case x if x < 0 ⇒ throw new IllegalStateException("Names too long to concatenate") + case y if y > Int.MaxValue / 2 ⇒ Int.MaxValue + case z ⇒ Math.max(Integer.highestOneBit(z) * 2, 32) + }) + concatNames(i, null, b.append(first).append('-').append(n)) + } else concatNames(i, n, null) + case _ ⇒ concatNames(i, first, buf) } - } - if (sb.length == 0) None - else Some(sb.toString) + else if (buf eq null) first + else buf.toString + + concatNames(attributeList.iterator, null, null) match { + case null ⇒ default + case some ⇒ some } - - /** - * INTERNAL API - */ - private[akka] def nameOrDefault(default: String = "unknown-operation"): String = nameLifted match { - case Some(name) ⇒ name - case _ ⇒ default } - /** - * INTERNAL API - */ - private[akka] def nameOption: Option[String] = - attributeList.collectFirst { case Name(name) ⇒ name } - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index cc123c0714..63101e5689 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -3,6 +3,8 @@ */ package akka.stream +import akka.util.Collections.EmptyImmutableSeq + import scala.collection.immutable import scala.collection.JavaConverters._ @@ -12,14 +14,20 @@ import scala.collection.JavaConverters._ * It is also used in the Java DSL for “untyped Inlets” as a work-around * for otherwise unreasonable existential types. */ -sealed abstract class InPort +sealed abstract class InPort { self: Inlet[_] ⇒ + final override def hashCode: Int = System.identityHashCode(this) + final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef] +} /** * An output port of a StreamLayout.Module. This type logically belongs * into the impl package but must live here due to how `sealed` works. * It is also used in the Java DSL for “untyped Outlets” as a work-around * for otherwise unreasonable existential types. */ -sealed abstract class OutPort +sealed abstract class OutPort { self: Outlet[_] ⇒ + final override def hashCode: Int = System.identityHashCode(this) + final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef] +} /** * An Inlet is a typed input to a Shape. Its partner in the Module view @@ -139,8 +147,8 @@ abstract class AbstractShape extends Shape { */ sealed abstract class ClosedShape extends Shape object ClosedShape extends ClosedShape { - override val inlets: immutable.Seq[Inlet[_]] = Nil - override val outlets: immutable.Seq[Outlet[_]] = Nil + override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq + override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq override def deepCopy() = this override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit ClosedShape") @@ -170,7 +178,7 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se * of data. */ final case class SourceShape[+T](outlet: Outlet[T]) extends Shape { - override val inlets: immutable.Seq[Inlet[_]] = Nil + override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq override val outlets: immutable.Seq[Outlet[_]] = List(outlet) override def deepCopy(): SourceShape[T] = SourceShape(outlet.carbonCopy()) @@ -203,7 +211,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S */ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = List(inlet) - override val outlets: immutable.Seq[Outlet[_]] = Nil + override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq override def deepCopy(): SinkShape[T] = SinkShape(inlet.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a44840356e..1d652434c0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -68,7 +68,7 @@ private[akka] case class ActorMaterializerImpl( override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { if (haveShutDown.get()) throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.") - if (StreamLayout.Debug) runnableGraph.module.validate() + if (StreamLayout.Debug) StreamLayout.validate(runnableGraph.module) val session = new MaterializerSession(runnableGraph.module) { private val flowName = createFlowName() @@ -299,8 +299,6 @@ private[akka] object ActorProcessorFactory { val settings = materializer.effectiveSettings(att) def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ()) op match { - case Identity(_) ⇒ throw new AssertionError("Identity cannot end up in ActorProcessorFactory") - case Fused(ops, _) ⇒ (ActorInterpreter.props(settings, ops, materializer, att), ()) case Map(f, _) ⇒ interp(fusing.Map(f, settings.supervisionDecider)) case Filter(p, _) ⇒ interp(fusing.Filter(p, settings.supervisionDecider)) case Drop(n, _) ⇒ interp(fusing.Drop(n)) @@ -328,6 +326,7 @@ private[akka] object ActorProcessorFactory { val s_m = mkStageAndMat() (ActorInterpreter.props(settings, List(s_m._1), materializer, att), s_m._2) case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") + case Identity(_) ⇒ throw new AssertionError("Identity cannot end up in ActorProcessorFactory") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 93000e9dc8..3776655de2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -3,17 +3,14 @@ */ package akka.stream.impl -import java.io.{ InputStream, File } import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props } -import akka.stream.ActorAttributes.Dispatcher import akka.stream.impl.StreamLayout.Module import akka.stream._ -import akka.util.ByteString import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Future, Promise } +import scala.concurrent.{ Promise } import scala.util.{ Failure, Success } /** @@ -34,13 +31,13 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out override def subModules: Set[Module] = Set.empty - def amendShape(attr: Attributes): SourceShape[Out] = - attr.nameOption match { - case None ⇒ shape - case s: Some[String] if s == attributes.nameOption ⇒ shape - case Some(name) ⇒ shape.copy(outlet = Outlet(name + ".out")) - } + protected def amendShape(attr: Attributes): SourceShape[Out] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(outlet = Outlet(thatN + ".out")) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 3649c7e312..f66d452b91 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -3,20 +3,12 @@ */ package akka.stream.impl -import java.io.File -import java.util.concurrent.atomic.AtomicReference import akka.actor.{ Deploy, ActorRef, Props } -import akka.stream.ActorAttributes.Dispatcher import akka.stream.impl.StreamLayout.Module -import akka.stream.Attributes -import akka.stream.{ Inlet, Shape, SinkShape } -import akka.util.ByteString +import akka.stream.{ Attributes, Inlet, Shape, SinkShape, MaterializationContext, ActorMaterializer } import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance -import scala.collection.immutable import scala.concurrent.{ Future, Promise } -import akka.stream.MaterializationContext -import akka.stream.ActorMaterializer /** * INTERNAL API @@ -36,12 +28,12 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte override def subModules: Set[Module] = Set.empty - def amendShape(attr: Attributes): SinkShape[In] = { - attr.nameOption match { - case None ⇒ shape - case s: Some[String] if s == attributes.nameOption ⇒ shape - case Some(name) ⇒ shape.copy(inlet = Inlet(name + ".in")) - } + protected def amendShape(attr: Attributes): SinkShape[In] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(inlet = Inlet(thatN + ".in")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 611b88cd4e..534241f7e0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -5,13 +5,12 @@ package akka.stream.impl import akka.event.{ LoggingAdapter, Logging } import akka.stream.impl.SplitDecision.SplitDecision -import akka.stream.{ OverflowStrategy, TimerTransformer } -import akka.stream.Attributes +import akka.stream.impl.StreamLayout._ +import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes } import akka.stream.Attributes._ import akka.stream.stage.Stage import org.reactivestreams.Processor -import StreamLayout._ - +import akka.event.Logging.simpleName import scala.collection.immutable import scala.concurrent.Future @@ -121,16 +120,6 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } - object Fused { - def apply(ops: immutable.Seq[Stage[_, _]]): Fused = - Fused(ops, name(ops.iterator.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+"))) - } - - final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: Attributes = fused) extends StageModule { - def withAttributes(attributes: Attributes) = copy(attributes = attributes) - override protected def newInstance: StageModule = this.copy() - } - final case class Map(f: Any ⇒ Any, attributes: Attributes = map) extends StageModule { def withAttributes(attributes: Attributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 366a27302f..721c349c68 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -23,6 +23,76 @@ private[akka] object StreamLayout { // compile-time constant final val Debug = false + final def validate(m: Module, level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = { + val ids = Iterator from 1 + def id(obj: AnyRef) = idMap get obj match { + case Some(x) ⇒ x + case None ⇒ + val x = ids.next() + idMap(obj) = x + x + } + def in(i: InPort) = s"${i.toString}@${id(i)}" + def out(o: OutPort) = s"${o.toString}@${id(o)}" + def ins(i: Iterable[InPort]) = i.map(in).mkString("In[", ",", "]") + def outs(o: Iterable[OutPort]) = o.map(out).mkString("Out[", ",", "]") + def pair(p: (OutPort, InPort)) = s"${in(p._2)}->${out(p._1)}" + def pairs(p: Iterable[(OutPort, InPort)]) = p.map(pair).mkString("[", ",", "]") + + import m._ + + val inset: Set[InPort] = shape.inlets.toSet + val outset: Set[OutPort] = shape.outlets.toSet + var problems: List[String] = Nil + + if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets) + if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}" + if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}" + if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets) + if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}" + if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}" + val ups = upstreams.toSet + val ups2 = ups.map(_.swap) + val downs = downstreams.toSet + val inter = ups2.intersect(downs) + if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}" + val (allIn, dupIn, allOut, dupOut) = + subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) { + case ((ai, di, ao, doo), m) ⇒ (ai ++ m.inPorts, di ++ ai.intersect(m.inPorts), ao ++ m.outPorts, doo ++ ao.intersect(m.outPorts)) + } + if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" + if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" + if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}" + if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}" + val unIn = allIn -- inset -- upstreams.keySet + if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}" + val unOut = allOut -- outset -- downstreams.keySet + if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}" + + def atomics(n: MaterializedValueNode): Set[Module] = + n match { + case Ignore ⇒ Set.empty + case Transform(f, dep) ⇒ atomics(dep) + case Atomic(m) ⇒ Set(m) + case Combine(f, left, right) ⇒ atomics(left) ++ atomics(right) + } + val atomic = atomics(materializedValueComputation) + if ((atomic -- subModules - m).nonEmpty) problems ::= s"computation refers to non-existent modules [${atomic -- subModules - m mkString ","}]" + + val print = doPrint || problems.nonEmpty + + if (print) { + val indent = " " * (level * 2) + println(s"$indent${simpleName(this)}($shape): ${ins(inPorts)} ${outs(outPorts)}") + downstreams foreach { case (o, i) ⇒ println(s"$indent ${out(o)} -> ${in(i)}") } + problems foreach (p ⇒ println(s"$indent -!- $p")) + } + + subModules foreach (sm ⇒ validate(sm, level + 1, print, idMap)) + + if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems") + } + // TODO: Materialization order // TODO: Special case linear composites // TODO: Cycles @@ -34,6 +104,7 @@ private[akka] object StreamLayout { case object Ignore extends MaterializedValueNode trait Module { + def shape: Shape /** * Verify that the given Shape has the same ports and return a new module with that shape. @@ -52,14 +123,40 @@ private[akka] object StreamLayout { def isAtomic: Boolean = subModules.isEmpty def isCopied: Boolean = false - final def growConnect(that: Module, from: OutPort, to: InPort): Module = - growConnect(that, from, to, Keep.left) + /** + * Fuses this Module to `that` Module by wiring together `from` and `to`, + * retaining the materialized value of `this` in the result + * @param that a Module to fuse with + * @param from the data source to wire + * @param to the data sink to wire + * @return a Module representing the fusion of `this` and `that` + */ + final def fuse(that: Module, from: OutPort, to: InPort): Module = + fuse(that, from, to, Keep.left) - final def growConnect[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) ⇒ C): Module = - this.grow(that, f).connect(from, to) + /** + * Fuses this Module to `that` Module by wiring together `from` and `to`, + * transforming the materialized values of `this` and `that` using the + * provided function `f` + * @param that a Module to fuse with + * @param from the data source to wire + * @param to the data sink to wire + * @param f the function to apply to the materialized values + * @return a Module representing the fusion of `this` and `that` + */ + final def fuse[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) ⇒ C): Module = + this.compose(that, f).wire(from, to) - final def connect[A, B](from: OutPort, to: InPort): Module = { - if (Debug) validate() + /** + * Creates a new Module based on the current Module but with + * the given OutPort wired to the given InPort. + * + * @param from the OutPort to wire + * @param to the InPort to wire + * @return a new Module with the ports wired + */ + final def wire(from: OutPort, to: InPort): Module = { + if (Debug) validate(this) require(outPorts(from), if (downstreams.contains(from)) s"The output port [$from] is already connected" @@ -78,7 +175,7 @@ private[akka] object StreamLayout { } final def transformMaterializedValue(f: Any ⇒ Any): Module = { - if (Debug) validate() + if (Debug) validate(this) CompositeModule( subModules = if (this.isSealed) Set(this) else this.subModules, @@ -89,10 +186,27 @@ private[akka] object StreamLayout { attributes) } - def grow(that: Module): Module = grow(that, Keep.left) + /** + * Creates a new Module which is `this` Module composed with `that` Module. + * + * @param that a Module to be composed with (cannot be itself) + * @return a Module that represents the composition of `this` and `that` + */ + def compose(that: Module): Module = compose(that, Keep.left) - def grow[A, B, C](that: Module, f: (A, B) ⇒ C): Module = { - if (Debug) validate() + /** + * Creates a new Module which is `this` Module composed with `that` Module, + * using the given function `f` to compose the materialized value of `this` with + * the materialized value of `that`. + * @param that a Module to be composed with (cannot be itself) + * @param f a function which combines the materialized values + * @tparam A the type of the materialized value of `this` + * @tparam B the type of the materialized value of `that` + * @tparam C the type of the materialized value of the returned Module + * @return a Module that represents the composition of `this` and `that` + */ + def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = { + if (Debug) validate(this) require(that ne this, "A module cannot be added to itself. You should pass a separate instance to grow().") require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.") @@ -113,8 +227,12 @@ private[akka] object StreamLayout { attributes) } - def wrap(): Module = { - if (Debug) validate() + /** + * Creates a new Module which contains `this` Module + * @return a new Module + */ + def nest(): Module = { + if (Debug) validate(this) CompositeModule( subModules = Set(this), @@ -149,74 +267,6 @@ private[akka] object StreamLayout { final override def hashCode(): Int = super.hashCode() final override def equals(obj: scala.Any): Boolean = super.equals(obj) - - final def validate(level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = { - val ids = Iterator from 1 - def id(obj: AnyRef) = idMap get obj match { - case Some(x) ⇒ x - case None ⇒ - val x = ids.next() - idMap(obj) = x - x - } - def in(i: InPort) = s"${i.toString}@${id(i)}" - def out(o: OutPort) = s"${o.toString}@${id(o)}" - def ins(i: Iterable[InPort]) = i.map(in).mkString("In[", ",", "]") - def outs(o: Iterable[OutPort]) = o.map(out).mkString("Out[", ",", "]") - def pair(p: (OutPort, InPort)) = s"${in(p._2)}->${out(p._1)}" - def pairs(p: Iterable[(OutPort, InPort)]) = p.map(pair).mkString("[", ",", "]") - - val inset: Set[InPort] = shape.inlets.toSet - val outset: Set[OutPort] = shape.outlets.toSet - var problems: List[String] = Nil - - if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets) - if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}" - if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}" - if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets) - if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}" - if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}" - val ups = upstreams.toSet - val ups2 = ups.map(_.swap) - val downs = downstreams.toSet - val inter = ups2.intersect(downs) - if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}" - val (allIn, dupIn, allOut, dupOut) = - subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) { - case ((ai, di, ao, doo), m) ⇒ (ai ++ m.inPorts, di ++ ai.intersect(m.inPorts), ao ++ m.outPorts, doo ++ ao.intersect(m.outPorts)) - } - if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" - if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" - if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}" - if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}" - val unIn = allIn -- inset -- upstreams.keySet - if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}" - val unOut = allOut -- outset -- downstreams.keySet - if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}" - - def atomics(n: MaterializedValueNode): Set[Module] = - n match { - case Ignore ⇒ Set.empty - case Transform(f, dep) ⇒ atomics(dep) - case Atomic(m) ⇒ Set(m) - case Combine(f, left, right) ⇒ atomics(left) ++ atomics(right) - } - val atomic = atomics(materializedValueComputation) - if ((atomic -- subModules - this).nonEmpty) problems ::= s"computation refers to non-existent modules [${atomic -- subModules - this mkString ","}]" - - val print = doPrint || problems.nonEmpty - - if (print) { - val indent = " " * (level * 2) - println(s"$indent${simpleName(this)}($shape): ${ins(inPorts)} ${outs(outPorts)}") - downstreams foreach { case (o, i) ⇒ println(s"$indent ${out(o)} -> ${in(i)}") } - problems foreach (p ⇒ println(s"$indent -!- $p")) - } - - subModules foreach (_.validate(level + 1, print, idMap)) - - if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems") - } } object EmptyModule extends Module { @@ -225,12 +275,12 @@ private[akka] object StreamLayout { if (s == ClosedShape) this else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") - override def grow(that: Module): Module = that + override def compose(that: Module): Module = that - override def grow[A, B, C](that: Module, f: (A, B) ⇒ C): Module = + override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") - override def wrap(): Module = this + override def nest(): Module = this override def subModules: Set[Module] = Set.empty @@ -283,14 +333,13 @@ private[akka] object StreamLayout { override def toString = s""" - | Module: ${this.attributes.nameOption.getOrElse("unnamed")} - | Modules: ${subModules.toSeq.map(m ⇒ " " + m.attributes.nameOption.getOrElse(m.getClass.getName)).mkString("\n")} + | Module: ${this.attributes.nameOrDefault("unnamed")} + | Modules: ${subModules.iterator.map(m ⇒ " " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("\n")} | Downstreams: - | ${downstreams.map { case (in, out) ⇒ s" $in -> $out" }.mkString("\n")} + | ${downstreams.iterator.map { case (in, out) ⇒ s" $in -> $out" }.mkString("\n")} | Upstreams: - | ${upstreams.map { case (out, in) ⇒ s" $out -> $in" }.mkString("\n")} + | ${upstreams.iterator.map { case (out, in) ⇒ s" $out -> $in" }.mkString("\n")} """.stripMargin - } } @@ -409,14 +458,13 @@ private[stream] final case class MaterializedValueSource[M]( if (s == shape) this else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource") - def amendShape(attr: Attributes): SourceShape[M] = { - attr.nameOption match { - case None ⇒ shape - case s: Some[String] if s == attributes.nameOption ⇒ shape - case Some(name) ⇒ shape.copy(outlet = Outlet(name + ".out")) - } - } + private def amendShape(attr: Attributes): SourceShape[M] = { + val thisN = attributes.nameOrDefault(null) + val thatN = attr.nameOrDefault(null) + if ((thatN eq null) || thisN == thatN) shape + else shape.copy(outlet = Outlet(thatN + ".out")) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index b0ae3fd8b8..633e998050 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -20,6 +20,7 @@ import scala.concurrent.{ Future, Promise } private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[Long]](shape) { override def create(context: MaterializationContext) = { + // FIXME rewrite to be based on AsyncStage rather than dangerous downcasts val mat = ActorMaterializer.downcast(context.materializer) val settings = mat.effectiveSettings(context.effectiveAttributes) diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 4121679292..eb5438c6a4 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -10,6 +10,7 @@ import javax.net.ssl._ import scala.annotation.varargs import scala.collection.immutable import java.security.cert.Certificate +import akka.event.Logging.simpleName /** * Stream cipher support based upon JSSE. diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala index ab549a5b5c..31f55e2f5f 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala @@ -25,8 +25,7 @@ object SynchronousFileSource { * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ def apply(f: File, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] = - new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource"))) - .named(DefaultAttributes.nameOption.get) + new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource")).nest()) // TO DISCUSS: I had to add wrap() here to make the name available /** * Creates a synchronous (Java 6 compatible) Source from a Files contents. @@ -38,8 +37,7 @@ object SynchronousFileSource { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def create(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = - create(f, DefaultChunkSize) + def create(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = create(f, DefaultChunkSize) /** * Creates a synchronous (Java 6 compatible) Source from a Files contents. @@ -53,5 +51,4 @@ object SynchronousFileSource { */ def create(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = apply(f, chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]] - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 37efc4787f..f937cef6d4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -56,9 +56,9 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu val ins = copy.shape.inlets val outs = copy.shape.outlets new BidiFlow(module - .grow(copy, combine) - .connect(shape.out1, ins(0)) - .connect(outs(1), shape.in2) + .compose(copy, combine) + .wire(shape.out1, ins(0)) + .wire(outs(1), shape.in2) .replaceShape(BidiShape(shape.in1, outs(0), ins(1), shape.out2))) } @@ -106,9 +106,9 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu val in = copy.shape.inlets.head val out = copy.shape.outlets.head new Flow(module - .grow(copy, combine) - .connect(shape.out1, in) - .connect(out, shape.in2) + .compose(copy, combine) + .wire(shape.out1, in) + .wire(out, shape.in2) .replaceShape(FlowShape(shape.in1, shape.out2))) } @@ -118,7 +118,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(module.replaceShape(shape.reversed)) override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = - new BidiFlow(module.withAttributes(attr).wrap()) + new BidiFlow(module.withAttributes(attr).nest()) override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = withAttributes(Attributes.name(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d76daa8ed7..6f25244e16 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,23 +3,22 @@ */ package akka.stream.scaladsl -import akka.actor.ActorSystem -import akka.stream.impl.SplitDecision._ +import scala.language.higherKinds + import akka.event.LoggingAdapter -import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } -import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream._ import akka.stream.Attributes._ +import akka.stream.stage._ +import akka.stream.impl.{ Stages, StreamLayout } +import akka.stream.impl.SplitDecision._ +import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } +import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.util.Collections.EmptyImmutableSeq import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } -import scala.annotation.implicitNotFound import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.Future -import scala.language.higherKinds -import akka.stream.stage._ -import akka.stream.impl.{ Stages, StreamLayout, FlowModule } /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -74,7 +73,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val flowCopy = flow.module.carbonCopy new Flow( module - .growConnect(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine) + .fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine) .replaceShape(FlowShape(shape.inlet, flowCopy.shape.outlets.head))) } } @@ -120,7 +119,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val sinkCopy = sink.module.carbonCopy new Sink( module - .growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine) + .fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine) .replaceShape(SinkShape(shape.inlet))) } } @@ -162,9 +161,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val flowCopy = flow.module.carbonCopy RunnableGraph( module - .grow(flowCopy, combine) - .connect(shape.outlet, flowCopy.shape.inlets.head) - .connect(flowCopy.shape.outlets.head, shape.inlet)) + .compose(flowCopy, combine) + .wire(shape.outlet, flowCopy.shape.inlets.head) + .wire(flowCopy.shape.outlets.head, shape.inlet)) } /** @@ -207,9 +206,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val ins = copy.shape.inlets val outs = copy.shape.outlets new Flow(module - .grow(copy, combine) - .connect(shape.outlet, ins(0)) - .connect(outs(1), shape.inlet) + .compose(copy, combine) + .wire(shape.outlet, ins(0)) + .wire(outs(1), shape.inlet) .replaceShape(FlowShape(ins(1), outs(0)))) } @@ -247,18 +246,18 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) //No need to copy here, op is a fresh instance if (op.isInstanceOf[Stages.Identity]) this.asInstanceOf[Repr[U, Mat]] else if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]] - else new Flow(module.growConnect(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort))) + else new Flow(module.fuse(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort))) } private[stream] def andThenMat[U, Mat2](op: MaterializingStageFactory): Repr[U, Mat2] = { if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]] - else new Flow(module.growConnect(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) + else new Flow(module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) } private[akka] def andThenMat[U, Mat2, O >: Out](processorFactory: () ⇒ (Processor[O, U], Mat2)): Repr[U, Mat2] = { val op = Stages.DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)]) if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]] - else new Flow[In, U, Mat2](module.growConnect(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) + else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) } /** @@ -266,10 +265,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). */ - override def withAttributes(attr: Attributes): Repr[Out, Mat] = { + override def withAttributes(attr: Attributes): Repr[Out, Mat] = if (this.module eq EmptyModule) this - else new Flow(module.withAttributes(attr).wrap()) - } + else new Flow(module.withAttributes(attr).nest()) override def named(name: String): Repr[Out, Mat] = withAttributes(Attributes.name(name)) @@ -278,9 +276,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a of a [[Source#subscriber]] and * and `Publisher` of a [[Sink#publisher]]. */ - def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2) = { + def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2) = Source.wrap(source).via(this).toMat(sink)(Keep.both).run() - } /** * Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]] @@ -304,7 +301,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) - } object Flow extends FlowApply { @@ -362,7 +358,7 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) override def withAttributes(attr: Attributes): RunnableGraph[Mat] = - new RunnableGraph(module.withAttributes(attr).wrap) + new RunnableGraph(module.withAttributes(attr).nest) override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index dba25efe9c..1a09e1d4f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -45,7 +45,7 @@ class Merge[T] private (inputPorts: Int, extends Graph[UniformFanInShape[T, T], Unit] { override def withAttributes(attr: Attributes): Merge[T] = - new Merge(inputPorts, shape, module.withAttributes(attr).wrap()) + new Merge(inputPorts, shape, module.withAttributes(attr).nest()) override def named(name: String): Merge[T] = withAttributes(Attributes.name(name)) } @@ -94,7 +94,7 @@ class MergePreferred[T] private (secondaryPorts: Int, extends Graph[MergePreferred.MergePreferredShape[T], Unit] { override def withAttributes(attr: Attributes): MergePreferred[T] = - new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).wrap()) + new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).nest()) override def named(name: String): MergePreferred[T] = withAttributes(Attributes.name(name)) } @@ -132,7 +132,7 @@ class Broadcast[T] private (outputPorts: Int, extends Graph[UniformFanOutShape[T, T], Unit] { override def withAttributes(attr: Attributes): Broadcast[T] = - new Broadcast(outputPorts, shape, module.withAttributes(attr).wrap()) + new Broadcast(outputPorts, shape, module.withAttributes(attr).nest()) override def named(name: String): Broadcast[T] = withAttributes(Attributes.name(name)) } @@ -175,7 +175,7 @@ class Balance[T] private (outputPorts: Int, extends Graph[UniformFanOutShape[T, T], Unit] { override def withAttributes(attr: Attributes): Balance[T] = - new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).wrap()) + new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).nest()) override def named(name: String): Balance[T] = withAttributes(Attributes.name(name)) } @@ -208,7 +208,7 @@ class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)], extends Graph[FanInShape2[A, B, (A, B)], Unit] { override def withAttributes(attr: Attributes): Zip[A, B] = - new Zip(shape, module.withAttributes(attr).wrap()) + new Zip(shape, module.withAttributes(attr).nest()) override def named(name: String): Zip[A, B] = withAttributes(Attributes.name(name)) } @@ -257,7 +257,7 @@ class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B], extends Graph[FanOutShape2[(A, B), A, B], Unit] { override def withAttributes(attr: Attributes): Unzip[A, B] = - new Unzip(shape, module.withAttributes(attr).wrap()) + new Unzip(shape, module.withAttributes(attr).nest()) override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name)) } @@ -292,7 +292,7 @@ class Concat[T] private (override val shape: UniformFanInShape[T, T], extends Graph[UniformFanInShape[T, T], Unit] { override def withAttributes(attr: Attributes): Concat[T] = - new Concat(shape, module.withAttributes(attr).wrap()) + new Concat(shape, module.withAttributes(attr).nest()) override def named(name: String): Concat[T] = withAttributes(Attributes.name(name)) } @@ -306,13 +306,13 @@ object FlowGraph extends GraphApply { val flowCopy = via.module.carbonCopy moduleInProgress = moduleInProgress - .grow(flowCopy) - .connect(from, flowCopy.shape.inlets.head) - .connect(flowCopy.shape.outlets.head, to) + .compose(flowCopy) + .wire(from, flowCopy.shape.inlets.head) + .wire(flowCopy.shape.outlets.head, to) } def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit = { - moduleInProgress = moduleInProgress.connect(from, to) + moduleInProgress = moduleInProgress.wire(from, to) } /** @@ -321,9 +321,9 @@ object FlowGraph extends GraphApply { * connected. */ def add[S <: Shape](graph: Graph[S, _]): S = { - if (StreamLayout.Debug) graph.module.validate() + if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.grow(copy) + moduleInProgress = moduleInProgress.compose(copy) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } @@ -334,9 +334,9 @@ object FlowGraph extends GraphApply { * Flow, Sink and Graph. */ private[stream] def add[S <: Shape, A](graph: Graph[S, _], transform: (A) ⇒ Any): S = { - if (StreamLayout.Debug) graph.module.validate() + if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.grow(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any])) + moduleInProgress = moduleInProgress.compose(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any])) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } @@ -347,9 +347,9 @@ object FlowGraph extends GraphApply { * Flow, Sink and Graph. */ private[stream] def add[S <: Shape, A, B](graph: Graph[S, _], combine: (A, B) ⇒ Any): S = { - if (StreamLayout.Debug) graph.module.validate() + if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.grow(copy, combine) + moduleInProgress = moduleInProgress.compose(copy, combine) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } @@ -372,15 +372,15 @@ object FlowGraph extends GraphApply { */ def materializedValue: Outlet[M] = { val module = new MaterializedValueSource[Any] - moduleInProgress = moduleInProgress.grow(module) + moduleInProgress = moduleInProgress.compose(module) module.shape.outlet.asInstanceOf[Outlet[M]] } private[stream] def andThen(port: OutPort, op: StageModule): Unit = { moduleInProgress = moduleInProgress - .grow(op) - .connect(port, op.inPort) + .compose(op) + .wire(port, op.inPort) } private[stream] def buildRunnable[Mat](): RunnableGraph[Mat] = { @@ -389,7 +389,7 @@ object FlowGraph extends GraphApply { "Cannot build the RunnableGraph because there are unconnected ports: " + (moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", ")) } - new RunnableGraph(moduleInProgress.wrap()) + new RunnableGraph(moduleInProgress.nest()) } private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = { @@ -400,7 +400,7 @@ object FlowGraph extends GraphApply { s"Cannot build Source with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") if (moduleInProgress.outPorts.head != outlet) throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the module’s open Outlet ${moduleInProgress.outPorts.head}") - new Source(moduleInProgress.replaceShape(SourceShape(outlet)).wrap()) + new Source(moduleInProgress.replaceShape(SourceShape(outlet)).nest()) } private[stream] def buildFlow[In, Out, Mat](inlet: Inlet[In], outlet: Outlet[Out]): Flow[In, Out, Mat] = { @@ -411,7 +411,7 @@ object FlowGraph extends GraphApply { throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the module’s open Outlet ${moduleInProgress.outPorts.head}") if (moduleInProgress.inPorts.head != inlet) throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the module’s open Inlet ${moduleInProgress.inPorts.head}") - new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).wrap()) + new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).nest()) } private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { @@ -422,7 +422,7 @@ object FlowGraph extends GraphApply { throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the module’s open Outlets [${moduleInProgress.outPorts.mkString(",")}]") if (moduleInProgress.inPorts.toSet != shape.inlets.toSet) throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the module’s open Inlets [${moduleInProgress.inPorts.mkString(",")}]") - new BidiFlow(moduleInProgress.replaceShape(shape).wrap()) + new BidiFlow(moduleInProgress.replaceShape(shape).nest()) } private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = { @@ -433,7 +433,7 @@ object FlowGraph extends GraphApply { s"Cannot build Sink with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") if (moduleInProgress.inPorts.head != inlet) throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the module’s open Inlet ${moduleInProgress.inPorts.head}") - new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).wrap()) + new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).nest()) } private[stream] def module: Module = moduleInProgress diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 3804ceb495..d90c014dc2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -36,7 +36,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) override def withAttributes(attr: Attributes): Sink[In, Mat] = - new Sink(module.withAttributes(attr).wrap()) + new Sink(module.withAttributes(attr).nest()) override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 6691d7831c..7b5d2a4db5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,33 +3,19 @@ */ package akka.stream.scaladsl +import scala.language.higherKinds + import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream._ -import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule, DefaultAttributes } -import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } -import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective } -import org.reactivestreams._ -import akka.stream.{ SourceShape, Inlet, Outlet } -import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage } -import scala.annotation.unchecked.uncheckedVariance -import scala.language.higherKinds -import akka.actor.Props -import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } +import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl._ +import org.reactivestreams.{ Publisher, Subscriber } import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } -import scala.language.higherKinds -import scala.concurrent.{ ExecutionContext, Future } -import akka.stream.{ Materializer, Graph } -import akka.stream.impl._ -import akka.actor.Cancellable -import akka.actor.ActorRef -import scala.concurrent.Promise -import akka.stream.stage.SyncDirective -import akka.stream.OverflowStrategy -import akka.stream.Attributes /** * A `Source` is a set of stream processing steps that has one open output. It can comprise @@ -58,8 +44,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) val flowCopy = flow.module.carbonCopy new Source( module - .growConnect(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine) - .replaceShape(SourceShape(flowCopy.shape.outlets.head))) + .fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine) + .replaceShape(SourceShape(flowCopy.shape.outlets.head))) // FIXME why is not .wrap() needed here? } } @@ -75,7 +61,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = { val sinkCopy = sink.module.carbonCopy - RunnableGraph(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) + RunnableGraph(module.fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) } /** @@ -89,14 +75,14 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) // No need to copy here, op is a fresh instance new Source( module - .growConnect(op, shape.outlet, op.inPort) + .fuse(op, shape.outlet, op.inPort) .replaceShape(SourceShape(op.outPort))) } override private[scaladsl] def andThenMat[U, Mat2](op: MaterializingStageFactory): Repr[U, Mat2] = { new Source( module - .growConnect(op, shape.outlet, op.inPort, Keep.right) + .fuse(op, shape.outlet, op.inPort, Keep.right) .replaceShape(SourceShape(op.outPort))) } @@ -150,14 +136,18 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) */ def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concat(second) + /** + * Nests the current Source and returns a Source with the given Attributes + * @param attr the attributes to add + * @return a new Source with the added attributes + */ override def withAttributes(attr: Attributes): Repr[Out, Mat] = - new Source(module.withAttributes(attr).wrap()) + new Source(module.withAttributes(attr).nest()) // User API override def named(name: String): Repr[Out, Mat] = withAttributes(Attributes.name(name)) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) - } object Source extends SourceApply { @@ -193,7 +183,10 @@ object Source extends SourceApply { * from the downstream transformation steps. */ def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] = - apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() }) + apply(new immutable.Iterable[T] { + override def iterator: Iterator[T] = f() + override def toString: String = "() => Iterator" + }) /** * A graph with the shape of a source logically is a source, this method makes @@ -223,7 +216,11 @@ object Source extends SourceApply { * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T, Unit] = - Source.single(future).mapAsyncUnordered(1)(id).withAttributes(DefaultAttributes.futureSource) + new Source( + new PublisherSource( + SingleElementPublisher(future, "FutureSource"), + DefaultAttributes.futureSource, + shape("FutureSource"))).mapAsyncUnordered(1)(id) /** * Elements are emitted periodically with the specified interval. @@ -240,22 +237,36 @@ object Source extends SourceApply { * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, Unit] = - apply(SingleElementPublisher(element, "SingleSource")).withAttributes(DefaultAttributes.singleSource) + new Source( + new PublisherSource( + SingleElementPublisher(element, "SingleSource"), + DefaultAttributes.singleSource, + shape("SingleSource"))) /** * Create a `Source` that will continually emit the given element. */ def repeat[T](element: T): Source[T, Unit] = - apply(new immutable.Iterable[T] { - override val iterator: Iterator[T] = Iterator.continually(element) - override def toString: String = "Iterable.continually(" + element + ")" - }).withAttributes(DefaultAttributes.repeat) + new Source( + new PublisherSource( + SingleElementPublisher( + new immutable.Iterable[T] { + override val iterator: Iterator[T] = Iterator.continually(element) + override def toString: String = "repeat(" + element + ")" + }, "RepeatSource"), + DefaultAttributes.repeat, + shape("RepeatSource"))).mapConcat(id) /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ def empty[T]: Source[T, Unit] = _empty - private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher).withAttributes(DefaultAttributes.emptySource) + private[this] val _empty: Source[Nothing, Unit] = + new Source( + new PublisherSource[Nothing]( + EmptyPublisher, + DefaultAttributes.emptySource, + shape("EmptySource"))) /** * Create a `Source` with no elements, which does not complete its downstream, @@ -273,7 +284,11 @@ object Source extends SourceApply { * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. */ def failed[T](cause: Throwable): Source[T, Unit] = - apply(ErrorPublisher(cause, "FailedSource")).withAttributes(DefaultAttributes.failedSource) + new Source( + new PublisherSource( + ErrorPublisher(cause, "FailedSource")[T], + DefaultAttributes.failedSource, + shape("FailedSource"))) /** * Concatenates two sources so that the first element