-str - Improvements and renames in internal streams classes

* Renames Module.grow -> Module.compose
 * Renames Module.connect -> Module.wire
 * Renames Module.growConnect -> Module.fuse
 * Renames Module.wrap -> Module.nest

 * Adds explicit identity equals and hashCode to InPort and OutPort

 * Reimplements many of the Source factories to avoid copying

 * Documents Module.compose, Module.fuse, Module.wire and Module.nest

 * Removes Attributes.nameLifted

 * Optimizes Attributes.nameOrDefault
This commit is contained in:
Viktor Klang 2015-07-06 22:00:21 +02:00
parent efc659b70a
commit 66a116d3d2
19 changed files with 333 additions and 290 deletions

View file

@ -5,8 +5,9 @@ package akka.stream.impl
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import org.reactivestreams.{ Processor, Subscription, Subscriber, Publisher }
import org.reactivestreams.{ Subscription, Subscriber, Publisher }
import akka.stream._
import akka.event.Logging.simpleName
class StreamLayoutSpec extends AkkaSpec {
import StreamLayout._
@ -40,7 +41,7 @@ class StreamLayoutSpec extends AkkaSpec {
stage1.isSource should be(false)
val stage2 = testStage()
val flow12 = stage1.grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head)
val flow12 = stage1.compose(stage2, Keep.none).wire(stage1.outPorts.head, stage2.inPorts.head)
flow12.inPorts should be(stage1.inPorts)
flow12.outPorts should be(stage2.outPorts)
@ -65,7 +66,7 @@ class StreamLayoutSpec extends AkkaSpec {
sink3.isSink should be(true)
sink3.isSource should be(false)
val source012 = source0.grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head)
val source012 = source0.compose(flow12, Keep.none).wire(source0.outPorts.head, flow12.inPorts.head)
source012.inPorts.size should be(0)
source012.outPorts should be(flow12.outPorts)
source012.isRunnable should be(false)
@ -73,7 +74,7 @@ class StreamLayoutSpec extends AkkaSpec {
source012.isSink should be(false)
source012.isSource should be(true)
val sink123 = flow12.grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head)
val sink123 = flow12.compose(sink3, Keep.none).wire(flow12.outPorts.head, sink3.inPorts.head)
sink123.inPorts should be(flow12.inPorts)
sink123.outPorts.size should be(0)
sink123.isRunnable should be(false)
@ -81,13 +82,13 @@ class StreamLayoutSpec extends AkkaSpec {
sink123.isSink should be(true)
sink123.isSource should be(false)
val runnable0123a = source0.grow(sink123, Keep.none).connect(source0.outPorts.head, sink123.inPorts.head)
val runnable0123b = source012.grow(sink3, Keep.none).connect(source012.outPorts.head, sink3.inPorts.head)
val runnable0123a = source0.compose(sink123, Keep.none).wire(source0.outPorts.head, sink123.inPorts.head)
val runnable0123b = source012.compose(sink3, Keep.none).wire(source012.outPorts.head, sink3.inPorts.head)
val runnable0123c =
source0
.grow(flow12, Keep.none).connect(source0.outPorts.head, flow12.inPorts.head)
.grow(sink3, Keep.none).connect(flow12.outPorts.head, sink3.inPorts.head)
.compose(flow12, Keep.none).wire(source0.outPorts.head, flow12.inPorts.head)
.compose(sink3, Keep.none).wire(flow12.outPorts.head, sink3.inPorts.head)
runnable0123a.inPorts.size should be(0)
runnable0123a.outPorts.size should be(0)
@ -111,9 +112,9 @@ class StreamLayoutSpec extends AkkaSpec {
val stage2 = testStage()
val sink = testSink()
val runnable = source.grow(stage1, Keep.none).connect(source.outPorts.head, stage1.inPorts.head)
.grow(stage2, Keep.none).connect(stage1.outPorts.head, stage2.inPorts.head)
.grow(sink, Keep.none).connect(stage2.outPorts.head, sink.inPorts.head)
val runnable = source.compose(stage1, Keep.none).wire(source.outPorts.head, stage1.inPorts.head)
.compose(stage2, Keep.none).wire(stage1.outPorts.head, stage2.inPorts.head)
.compose(sink, Keep.none).wire(stage2.outPorts.head, sink.inPorts.head)
checkMaterialized(runnable)
}

View file

@ -28,7 +28,7 @@ trait GraphApply {
def partial[S <: Shape]()(buildBlock: FlowGraph.Builder[Unit] ⇒ S): Graph[S, Unit] = {
val builder = new FlowGraph.Builder
val s = buildBlock(builder)
val mod = builder.module.wrap().replaceShape(s)
val mod = builder.module.nest().replaceShape(s)
new GraphApply.GraphImpl(s, mod)
}
@ -56,7 +56,7 @@ trait GraphApply {
val builder = new FlowGraph.Builder
val s1 = builder.add(g1)
val s = buildBlock(builder)(s1)
val mod = builder.module.wrap().replaceShape(s)
val mod = builder.module.nest().replaceShape(s)
new GraphApply.GraphImpl(s, mod)
}
@ -91,7 +91,7 @@ trait GraphApply {
[2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val s = buildBlock(builder)([#s1#])
val mod = builder.module.wrap().replaceShape(s)
val mod = builder.module.nest().replaceShape(s)
new GraphApply.GraphImpl(s, mod)
}#
@ -109,7 +109,7 @@ private[stream] object GraphApply {
extends Graph[S, Mat] {
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GraphImpl(shape, module.withAttributes(attr).wrap())
new GraphImpl(shape, module.withAttributes(attr).nest())
override def named(name: String): Graph[S, Mat] = withAttributes(Attributes.name(name))
}

View file

@ -31,7 +31,7 @@ class ZipWith1[[#A1#], O] private[stream] (override val shape: FanInShape1[[#A1#
extends Graph[FanInShape1[[#A1#], O], Unit] {
override def withAttributes(attr: Attributes): ZipWith1[[#A1#], O] =
new ZipWith1(shape, module.withAttributes(attr).wrap())
new ZipWith1(shape, module.withAttributes(attr).nest())
override def named(name: String): ZipWith1[[#A1#], O] = withAttributes(Attributes.name(name))
}

View file

@ -129,12 +129,11 @@ object ActorMaterializer {
* INTERNAL API
*/
private[akka] def downcast(materializer: Materializer): ActorMaterializer =
materializer match {
materializer match { //FIXME this method is going to cause trouble for other Materializer implementations
case m: ActorMaterializer m
case _ throw new IllegalArgumentException(s"required [${classOf[ActorMaterializer].getName}] " +
s"but got [${materializer.getClass.getName}]")
}
}
/**

View file

@ -5,6 +5,7 @@ package akka.stream
import akka.event.Logging
import scala.annotation.tailrec
import scala.collection.immutable
import akka.stream.impl.Stages.StageModule
import akka.japi.function
@ -63,38 +64,37 @@ final case class Attributes private (attributeList: immutable.Seq[Attributes.Att
/**
* INTERNAL API
*/
private[akka] def nameLifted: Option[String] =
if (attributeList.isEmpty)
None
else {
val sb = new java.lang.StringBuilder
val iter = attributeList.iterator
while (iter.hasNext) {
iter.next() match {
case Name(name)
if (sb.length == 0) sb.append(name)
else sb.append("-").append(name)
case _
private[akka] def nameLifted: Option[String] = Option(nameOrDefault(null))
/**
* INTERNAL API
*/
private[akka] def nameOrDefault(default: String = "unknown-operation"): String = {
@tailrec def concatNames(i: Iterator[Attribute], first: String, buf: StringBuilder): String =
if (i.hasNext)
i.next() match {
case Name(n)
if (buf ne null) concatNames(i, null, buf.append('-').append(n))
else if (first ne null) {
val b = new StringBuilder(
(first.length() + n.length()) match {
case x if x < 0 throw new IllegalStateException("Names too long to concatenate")
case y if y > Int.MaxValue / 2 Int.MaxValue
case z Math.max(Integer.highestOneBit(z) * 2, 32)
})
concatNames(i, null, b.append(first).append('-').append(n))
} else concatNames(i, n, null)
case _ concatNames(i, first, buf)
}
}
if (sb.length == 0) None
else Some(sb.toString)
else if (buf eq null) first
else buf.toString
concatNames(attributeList.iterator, null, null) match {
case null default
case some some
}
/**
* INTERNAL API
*/
private[akka] def nameOrDefault(default: String = "unknown-operation"): String = nameLifted match {
case Some(name) name
case _ default
}
/**
* INTERNAL API
*/
private[akka] def nameOption: Option[String] =
attributeList.collectFirst { case Name(name) name }
/**
* INTERNAL API
*/

View file

@ -3,6 +3,8 @@
*/
package akka.stream
import akka.util.Collections.EmptyImmutableSeq
import scala.collection.immutable
import scala.collection.JavaConverters._
@ -12,14 +14,20 @@ import scala.collection.JavaConverters._
* It is also used in the Java DSL for untyped Inlets as a work-around
* for otherwise unreasonable existential types.
*/
sealed abstract class InPort
sealed abstract class InPort { self: Inlet[_]
final override def hashCode: Int = System.identityHashCode(this)
final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef]
}
/**
* An output port of a StreamLayout.Module. This type logically belongs
* into the impl package but must live here due to how `sealed` works.
* It is also used in the Java DSL for untyped Outlets as a work-around
* for otherwise unreasonable existential types.
*/
sealed abstract class OutPort
sealed abstract class OutPort { self: Outlet[_]
final override def hashCode: Int = System.identityHashCode(this)
final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef]
}
/**
* An Inlet is a typed input to a Shape. Its partner in the Module view
@ -139,8 +147,8 @@ abstract class AbstractShape extends Shape {
*/
sealed abstract class ClosedShape extends Shape
object ClosedShape extends ClosedShape {
override val inlets: immutable.Seq[Inlet[_]] = Nil
override val outlets: immutable.Seq[Outlet[_]] = Nil
override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq
override def deepCopy() = this
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit ClosedShape")
@ -170,7 +178,7 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se
* of data.
*/
final case class SourceShape[+T](outlet: Outlet[T]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = Nil
override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
override def deepCopy(): SourceShape[T] = SourceShape(outlet.carbonCopy())
@ -203,7 +211,7 @@ final case class FlowShape[-I, +O](inlet: Inlet[I], outlet: Outlet[O]) extends S
*/
final case class SinkShape[-T](inlet: Inlet[T]) extends Shape {
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
override val outlets: immutable.Seq[Outlet[_]] = Nil
override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq
override def deepCopy(): SinkShape[T] = SinkShape(inlet.carbonCopy())
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {

View file

@ -68,7 +68,7 @@ private[akka] case class ActorMaterializerImpl(
override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
if (haveShutDown.get())
throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.")
if (StreamLayout.Debug) runnableGraph.module.validate()
if (StreamLayout.Debug) StreamLayout.validate(runnableGraph.module)
val session = new MaterializerSession(runnableGraph.module) {
private val flowName = createFlowName()
@ -299,8 +299,6 @@ private[akka] object ActorProcessorFactory {
val settings = materializer.effectiveSettings(att)
def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ())
op match {
case Identity(_) throw new AssertionError("Identity cannot end up in ActorProcessorFactory")
case Fused(ops, _) (ActorInterpreter.props(settings, ops, materializer, att), ())
case Map(f, _) interp(fusing.Map(f, settings.supervisionDecider))
case Filter(p, _) interp(fusing.Filter(p, settings.supervisionDecider))
case Drop(n, _) interp(fusing.Drop(n))
@ -328,6 +326,7 @@ private[akka] object ActorProcessorFactory {
val s_m = mkStageAndMat()
(ActorInterpreter.props(settings, List(s_m._1), materializer, att), s_m._2)
case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
case Identity(_) throw new AssertionError("Identity cannot end up in ActorProcessorFactory")
}
}

View file

@ -3,17 +3,14 @@
*/
package akka.stream.impl
import java.io.{ InputStream, File }
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props }
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.impl.StreamLayout.Module
import akka.stream._
import akka.util.ByteString
import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import scala.concurrent.{ Promise }
import scala.util.{ Failure, Success }
/**
@ -34,13 +31,13 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out
override def subModules: Set[Module] = Set.empty
def amendShape(attr: Attributes): SourceShape[Out] =
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = Outlet(name + ".out"))
}
protected def amendShape(attr: Attributes): SourceShape[Out] = {
val thisN = attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
else shape.copy(outlet = Outlet(thatN + ".out"))
}
}
/**

View file

@ -3,20 +3,12 @@
*/
package akka.stream.impl
import java.io.File
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ Deploy, ActorRef, Props }
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.impl.StreamLayout.Module
import akka.stream.Attributes
import akka.stream.{ Inlet, Shape, SinkShape }
import akka.util.ByteString
import akka.stream.{ Attributes, Inlet, Shape, SinkShape, MaterializationContext, ActorMaterializer }
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import akka.stream.MaterializationContext
import akka.stream.ActorMaterializer
/**
* INTERNAL API
@ -36,12 +28,12 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
override def subModules: Set[Module] = Set.empty
def amendShape(attr: Attributes): SinkShape[In] = {
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(inlet = Inlet(name + ".in"))
}
protected def amendShape(attr: Attributes): SinkShape[In] = {
val thisN = attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
else shape.copy(inlet = Inlet(thatN + ".in"))
}
}

View file

@ -5,13 +5,12 @@ package akka.stream.impl
import akka.event.{ LoggingAdapter, Logging }
import akka.stream.impl.SplitDecision.SplitDecision
import akka.stream.{ OverflowStrategy, TimerTransformer }
import akka.stream.Attributes
import akka.stream.impl.StreamLayout._
import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes }
import akka.stream.Attributes._
import akka.stream.stage.Stage
import org.reactivestreams.Processor
import StreamLayout._
import akka.event.Logging.simpleName
import scala.collection.immutable
import scala.concurrent.Future
@ -121,16 +120,6 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
object Fused {
def apply(ops: immutable.Seq[Stage[_, _]]): Fused =
Fused(ops, name(ops.iterator.map(x Logging.simpleName(x).toLowerCase).mkString("+")))
}
final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: Attributes = fused) extends StageModule {
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
final case class Map(f: Any Any, attributes: Attributes = map) extends StageModule {
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()

View file

@ -23,6 +23,76 @@ private[akka] object StreamLayout {
// compile-time constant
final val Debug = false
final def validate(m: Module, level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = {
val ids = Iterator from 1
def id(obj: AnyRef) = idMap get obj match {
case Some(x) x
case None
val x = ids.next()
idMap(obj) = x
x
}
def in(i: InPort) = s"${i.toString}@${id(i)}"
def out(o: OutPort) = s"${o.toString}@${id(o)}"
def ins(i: Iterable[InPort]) = i.map(in).mkString("In[", ",", "]")
def outs(o: Iterable[OutPort]) = o.map(out).mkString("Out[", ",", "]")
def pair(p: (OutPort, InPort)) = s"${in(p._2)}->${out(p._1)}"
def pairs(p: Iterable[(OutPort, InPort)]) = p.map(pair).mkString("[", ",", "]")
import m._
val inset: Set[InPort] = shape.inlets.toSet
val outset: Set[OutPort] = shape.outlets.toSet
var problems: List[String] = Nil
if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets)
if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}"
if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}"
if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets)
if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}"
if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}"
val ups = upstreams.toSet
val ups2 = ups.map(_.swap)
val downs = downstreams.toSet
val inter = ups2.intersect(downs)
if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}"
val (allIn, dupIn, allOut, dupOut) =
subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) {
case ((ai, di, ao, doo), m) (ai ++ m.inPorts, di ++ ai.intersect(m.inPorts), ao ++ m.outPorts, doo ++ ao.intersect(m.outPorts))
}
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}"
if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}"
val unIn = allIn -- inset -- upstreams.keySet
if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}"
val unOut = allOut -- outset -- downstreams.keySet
if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}"
def atomics(n: MaterializedValueNode): Set[Module] =
n match {
case Ignore Set.empty
case Transform(f, dep) atomics(dep)
case Atomic(m) Set(m)
case Combine(f, left, right) atomics(left) ++ atomics(right)
}
val atomic = atomics(materializedValueComputation)
if ((atomic -- subModules - m).nonEmpty) problems ::= s"computation refers to non-existent modules [${atomic -- subModules - m mkString ","}]"
val print = doPrint || problems.nonEmpty
if (print) {
val indent = " " * (level * 2)
println(s"$indent${simpleName(this)}($shape): ${ins(inPorts)} ${outs(outPorts)}")
downstreams foreach { case (o, i) println(s"$indent ${out(o)} -> ${in(i)}") }
problems foreach (p println(s"$indent -!- $p"))
}
subModules foreach (sm validate(sm, level + 1, print, idMap))
if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems")
}
// TODO: Materialization order
// TODO: Special case linear composites
// TODO: Cycles
@ -34,6 +104,7 @@ private[akka] object StreamLayout {
case object Ignore extends MaterializedValueNode
trait Module {
def shape: Shape
/**
* Verify that the given Shape has the same ports and return a new module with that shape.
@ -52,14 +123,40 @@ private[akka] object StreamLayout {
def isAtomic: Boolean = subModules.isEmpty
def isCopied: Boolean = false
final def growConnect(that: Module, from: OutPort, to: InPort): Module =
growConnect(that, from, to, Keep.left)
/**
* Fuses this Module to `that` Module by wiring together `from` and `to`,
* retaining the materialized value of `this` in the result
* @param that a Module to fuse with
* @param from the data source to wire
* @param to the data sink to wire
* @return a Module representing the fusion of `this` and `that`
*/
final def fuse(that: Module, from: OutPort, to: InPort): Module =
fuse(that, from, to, Keep.left)
final def growConnect[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) C): Module =
this.grow(that, f).connect(from, to)
/**
* Fuses this Module to `that` Module by wiring together `from` and `to`,
* transforming the materialized values of `this` and `that` using the
* provided function `f`
* @param that a Module to fuse with
* @param from the data source to wire
* @param to the data sink to wire
* @param f the function to apply to the materialized values
* @return a Module representing the fusion of `this` and `that`
*/
final def fuse[A, B, C](that: Module, from: OutPort, to: InPort, f: (A, B) C): Module =
this.compose(that, f).wire(from, to)
final def connect[A, B](from: OutPort, to: InPort): Module = {
if (Debug) validate()
/**
* Creates a new Module based on the current Module but with
* the given OutPort wired to the given InPort.
*
* @param from the OutPort to wire
* @param to the InPort to wire
* @return a new Module with the ports wired
*/
final def wire(from: OutPort, to: InPort): Module = {
if (Debug) validate(this)
require(outPorts(from),
if (downstreams.contains(from)) s"The output port [$from] is already connected"
@ -78,7 +175,7 @@ private[akka] object StreamLayout {
}
final def transformMaterializedValue(f: Any Any): Module = {
if (Debug) validate()
if (Debug) validate(this)
CompositeModule(
subModules = if (this.isSealed) Set(this) else this.subModules,
@ -89,10 +186,27 @@ private[akka] object StreamLayout {
attributes)
}
def grow(that: Module): Module = grow(that, Keep.left)
/**
* Creates a new Module which is `this` Module composed with `that` Module.
*
* @param that a Module to be composed with (cannot be itself)
* @return a Module that represents the composition of `this` and `that`
*/
def compose(that: Module): Module = compose(that, Keep.left)
def grow[A, B, C](that: Module, f: (A, B) C): Module = {
if (Debug) validate()
/**
* Creates a new Module which is `this` Module composed with `that` Module,
* using the given function `f` to compose the materialized value of `this` with
* the materialized value of `that`.
* @param that a Module to be composed with (cannot be itself)
* @param f a function which combines the materialized values
* @tparam A the type of the materialized value of `this`
* @tparam B the type of the materialized value of `that`
* @tparam C the type of the materialized value of the returned Module
* @return a Module that represents the composition of `this` and `that`
*/
def compose[A, B, C](that: Module, f: (A, B) C): Module = {
if (Debug) validate(this)
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to grow().")
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
@ -113,8 +227,12 @@ private[akka] object StreamLayout {
attributes)
}
def wrap(): Module = {
if (Debug) validate()
/**
* Creates a new Module which contains `this` Module
* @return a new Module
*/
def nest(): Module = {
if (Debug) validate(this)
CompositeModule(
subModules = Set(this),
@ -149,74 +267,6 @@ private[akka] object StreamLayout {
final override def hashCode(): Int = super.hashCode()
final override def equals(obj: scala.Any): Boolean = super.equals(obj)
final def validate(level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = {
val ids = Iterator from 1
def id(obj: AnyRef) = idMap get obj match {
case Some(x) x
case None
val x = ids.next()
idMap(obj) = x
x
}
def in(i: InPort) = s"${i.toString}@${id(i)}"
def out(o: OutPort) = s"${o.toString}@${id(o)}"
def ins(i: Iterable[InPort]) = i.map(in).mkString("In[", ",", "]")
def outs(o: Iterable[OutPort]) = o.map(out).mkString("Out[", ",", "]")
def pair(p: (OutPort, InPort)) = s"${in(p._2)}->${out(p._1)}"
def pairs(p: Iterable[(OutPort, InPort)]) = p.map(pair).mkString("[", ",", "]")
val inset: Set[InPort] = shape.inlets.toSet
val outset: Set[OutPort] = shape.outlets.toSet
var problems: List[String] = Nil
if (inset.size != shape.inlets.size) problems ::= "shape has duplicate inlets: " + ins(shape.inlets)
if (inset != inPorts) problems ::= s"shape has extra ${ins(inset -- inPorts)}, module has extra ${ins(inPorts -- inset)}"
if (inset.intersect(upstreams.keySet).nonEmpty) problems ::= s"found connected inlets ${inset.intersect(upstreams.keySet)}"
if (outset.size != shape.outlets.size) problems ::= "shape has duplicate outlets: " + outs(shape.outlets)
if (outset != outPorts) problems ::= s"shape has extra ${outs(outset -- outPorts)}, module has extra ${outs(outPorts -- outset)}"
if (outset.intersect(downstreams.keySet).nonEmpty) problems ::= s"found connected outlets ${outset.intersect(downstreams.keySet)}"
val ups = upstreams.toSet
val ups2 = ups.map(_.swap)
val downs = downstreams.toSet
val inter = ups2.intersect(downs)
if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}"
val (allIn, dupIn, allOut, dupOut) =
subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) {
case ((ai, di, ao, doo), m) (ai ++ m.inPorts, di ++ ai.intersect(m.inPorts), ao ++ m.outPorts, doo ++ ao.intersect(m.outPorts))
}
if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}"
if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}"
if (!isSealed && (inset -- allIn).nonEmpty) problems ::= s"foreign inlets ${ins(inset -- allIn)}"
if (!isSealed && (outset -- allOut).nonEmpty) problems ::= s"foreign outlets ${outs(outset -- allOut)}"
val unIn = allIn -- inset -- upstreams.keySet
if (unIn.nonEmpty && !isCopied) problems ::= s"unconnected inlets ${ins(unIn)}"
val unOut = allOut -- outset -- downstreams.keySet
if (unOut.nonEmpty && !isCopied) problems ::= s"unconnected outlets ${outs(unOut)}"
def atomics(n: MaterializedValueNode): Set[Module] =
n match {
case Ignore Set.empty
case Transform(f, dep) atomics(dep)
case Atomic(m) Set(m)
case Combine(f, left, right) atomics(left) ++ atomics(right)
}
val atomic = atomics(materializedValueComputation)
if ((atomic -- subModules - this).nonEmpty) problems ::= s"computation refers to non-existent modules [${atomic -- subModules - this mkString ","}]"
val print = doPrint || problems.nonEmpty
if (print) {
val indent = " " * (level * 2)
println(s"$indent${simpleName(this)}($shape): ${ins(inPorts)} ${outs(outPorts)}")
downstreams foreach { case (o, i) println(s"$indent ${out(o)} -> ${in(i)}") }
problems foreach (p println(s"$indent -!- $p"))
}
subModules foreach (_.validate(level + 1, print, idMap))
if (problems.nonEmpty && !doPrint) throw new IllegalStateException(s"module inconsistent, found ${problems.size} problems")
}
}
object EmptyModule extends Module {
@ -225,12 +275,12 @@ private[akka] object StreamLayout {
if (s == ClosedShape) this
else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
override def grow(that: Module): Module = that
override def compose(that: Module): Module = that
override def grow[A, B, C](that: Module, f: (A, B) C): Module =
override def compose[A, B, C](that: Module, f: (A, B) C): Module =
throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule")
override def wrap(): Module = this
override def nest(): Module = this
override def subModules: Set[Module] = Set.empty
@ -283,14 +333,13 @@ private[akka] object StreamLayout {
override def toString =
s"""
| Module: ${this.attributes.nameOption.getOrElse("unnamed")}
| Modules: ${subModules.toSeq.map(m " " + m.attributes.nameOption.getOrElse(m.getClass.getName)).mkString("\n")}
| Module: ${this.attributes.nameOrDefault("unnamed")}
| Modules: ${subModules.iterator.map(m " " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("\n")}
| Downstreams:
| ${downstreams.map { case (in, out) s" $in -> $out" }.mkString("\n")}
| ${downstreams.iterator.map { case (in, out) s" $in -> $out" }.mkString("\n")}
| Upstreams:
| ${upstreams.map { case (out, in) s" $out -> $in" }.mkString("\n")}
| ${upstreams.iterator.map { case (out, in) s" $out -> $in" }.mkString("\n")}
""".stripMargin
}
}
@ -409,14 +458,13 @@ private[stream] final case class MaterializedValueSource[M](
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource")
def amendShape(attr: Attributes): SourceShape[M] = {
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = Outlet(name + ".out"))
}
}
private def amendShape(attr: Attributes): SourceShape[M] = {
val thisN = attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
else shape.copy(outlet = Outlet(thatN + ".out"))
}
}
/**

View file

@ -20,6 +20,7 @@ import scala.concurrent.{ Future, Promise }
private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
extends SourceModule[ByteString, Future[Long]](shape) {
override def create(context: MaterializationContext) = {
// FIXME rewrite to be based on AsyncStage rather than dangerous downcasts
val mat = ActorMaterializer.downcast(context.materializer)
val settings = mat.effectiveSettings(context.effectiveAttributes)

View file

@ -10,6 +10,7 @@ import javax.net.ssl._
import scala.annotation.varargs
import scala.collection.immutable
import java.security.cert.Certificate
import akka.event.Logging.simpleName
/**
* Stream cipher support based upon JSSE.

View file

@ -25,8 +25,7 @@ object SynchronousFileSource {
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def apply(f: File, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] =
new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource")))
.named(DefaultAttributes.nameOption.get)
new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource")).nest()) // TO DISCUSS: I had to add wrap() here to make the name available
/**
* Creates a synchronous (Java 6 compatible) Source from a Files contents.
@ -38,8 +37,7 @@ object SynchronousFileSource {
*
* It materializes a [[Future]] containing the number of bytes read from the source file upon completion.
*/
def create(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] =
create(f, DefaultChunkSize)
def create(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = create(f, DefaultChunkSize)
/**
* Creates a synchronous (Java 6 compatible) Source from a Files contents.
@ -53,5 +51,4 @@ object SynchronousFileSource {
*/
def create(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] =
apply(f, chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]]
}

View file

@ -56,9 +56,9 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
val ins = copy.shape.inlets
val outs = copy.shape.outlets
new BidiFlow(module
.grow(copy, combine)
.connect(shape.out1, ins(0))
.connect(outs(1), shape.in2)
.compose(copy, combine)
.wire(shape.out1, ins(0))
.wire(outs(1), shape.in2)
.replaceShape(BidiShape(shape.in1, outs(0), ins(1), shape.out2)))
}
@ -106,9 +106,9 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
val in = copy.shape.inlets.head
val out = copy.shape.outlets.head
new Flow(module
.grow(copy, combine)
.connect(shape.out1, in)
.connect(out, shape.in2)
.compose(copy, combine)
.wire(shape.out1, in)
.wire(out, shape.in2)
.replaceShape(FlowShape(shape.in1, shape.out2)))
}
@ -118,7 +118,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(module.replaceShape(shape.reversed))
override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(module.withAttributes(attr).wrap())
new BidiFlow(module.withAttributes(attr).nest())
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
withAttributes(Attributes.name(name))

View file

@ -3,23 +3,22 @@
*/
package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.impl.SplitDecision._
import scala.language.higherKinds
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream._
import akka.stream.Attributes._
import akka.stream.stage._
import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import scala.annotation.implicitNotFound
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.Future
import scala.language.higherKinds
import akka.stream.stage._
import akka.stream.impl.{ Stages, StreamLayout, FlowModule }
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
@ -74,7 +73,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
val flowCopy = flow.module.carbonCopy
new Flow(
module
.growConnect(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine)
.fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine)
.replaceShape(FlowShape(shape.inlet, flowCopy.shape.outlets.head)))
}
}
@ -120,7 +119,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
val sinkCopy = sink.module.carbonCopy
new Sink(
module
.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)
.fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)
.replaceShape(SinkShape(shape.inlet)))
}
}
@ -162,9 +161,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
val flowCopy = flow.module.carbonCopy
RunnableGraph(
module
.grow(flowCopy, combine)
.connect(shape.outlet, flowCopy.shape.inlets.head)
.connect(flowCopy.shape.outlets.head, shape.inlet))
.compose(flowCopy, combine)
.wire(shape.outlet, flowCopy.shape.inlets.head)
.wire(flowCopy.shape.outlets.head, shape.inlet))
}
/**
@ -207,9 +206,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
val ins = copy.shape.inlets
val outs = copy.shape.outlets
new Flow(module
.grow(copy, combine)
.connect(shape.outlet, ins(0))
.connect(outs(1), shape.inlet)
.compose(copy, combine)
.wire(shape.outlet, ins(0))
.wire(outs(1), shape.inlet)
.replaceShape(FlowShape(ins(1), outs(0))))
}
@ -247,18 +246,18 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
//No need to copy here, op is a fresh instance
if (op.isInstanceOf[Stages.Identity]) this.asInstanceOf[Repr[U, Mat]]
else if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat]]
else new Flow(module.growConnect(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort)))
else new Flow(module.fuse(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort)))
}
private[stream] def andThenMat[U, Mat2](op: MaterializingStageFactory): Repr[U, Mat2] = {
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]]
else new Flow(module.growConnect(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort)))
else new Flow(module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort)))
}
private[akka] def andThenMat[U, Mat2, O >: Out](processorFactory: () (Processor[O, U], Mat2)): Repr[U, Mat2] = {
val op = Stages.DirectProcessor(processorFactory.asInstanceOf[() (Processor[Any, Any], Any)])
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U, Mat2]]
else new Flow[In, U, Mat2](module.growConnect(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort)))
else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort)))
}
/**
@ -266,10 +265,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out, Mat] = {
override def withAttributes(attr: Attributes): Repr[Out, Mat] =
if (this.module eq EmptyModule) this
else new Flow(module.withAttributes(attr).wrap())
}
else new Flow(module.withAttributes(attr).nest())
override def named(name: String): Repr[Out, Mat] = withAttributes(Attributes.name(name))
@ -278,9 +276,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a of a [[Source#subscriber]] and
* and `Publisher` of a [[Sink#publisher]].
*/
def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2) = {
def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2) =
Source.wrap(source).via(this).toMat(sink)(Keep.both).run()
}
/**
* Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]]
@ -304,7 +301,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
}
object Flow extends FlowApply {
@ -362,7 +358,7 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module)
def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
override def withAttributes(attr: Attributes): RunnableGraph[Mat] =
new RunnableGraph(module.withAttributes(attr).wrap)
new RunnableGraph(module.withAttributes(attr).nest)
override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name))

View file

@ -45,7 +45,7 @@ class Merge[T] private (inputPorts: Int,
extends Graph[UniformFanInShape[T, T], Unit] {
override def withAttributes(attr: Attributes): Merge[T] =
new Merge(inputPorts, shape, module.withAttributes(attr).wrap())
new Merge(inputPorts, shape, module.withAttributes(attr).nest())
override def named(name: String): Merge[T] = withAttributes(Attributes.name(name))
}
@ -94,7 +94,7 @@ class MergePreferred[T] private (secondaryPorts: Int,
extends Graph[MergePreferred.MergePreferredShape[T], Unit] {
override def withAttributes(attr: Attributes): MergePreferred[T] =
new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).wrap())
new MergePreferred(secondaryPorts, shape, module.withAttributes(attr).nest())
override def named(name: String): MergePreferred[T] = withAttributes(Attributes.name(name))
}
@ -132,7 +132,7 @@ class Broadcast[T] private (outputPorts: Int,
extends Graph[UniformFanOutShape[T, T], Unit] {
override def withAttributes(attr: Attributes): Broadcast[T] =
new Broadcast(outputPorts, shape, module.withAttributes(attr).wrap())
new Broadcast(outputPorts, shape, module.withAttributes(attr).nest())
override def named(name: String): Broadcast[T] = withAttributes(Attributes.name(name))
}
@ -175,7 +175,7 @@ class Balance[T] private (outputPorts: Int,
extends Graph[UniformFanOutShape[T, T], Unit] {
override def withAttributes(attr: Attributes): Balance[T] =
new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).wrap())
new Balance(outputPorts, waitForAllDownstreams, shape, module.withAttributes(attr).nest())
override def named(name: String): Balance[T] = withAttributes(Attributes.name(name))
}
@ -208,7 +208,7 @@ class Zip[A, B] private (override val shape: FanInShape2[A, B, (A, B)],
extends Graph[FanInShape2[A, B, (A, B)], Unit] {
override def withAttributes(attr: Attributes): Zip[A, B] =
new Zip(shape, module.withAttributes(attr).wrap())
new Zip(shape, module.withAttributes(attr).nest())
override def named(name: String): Zip[A, B] = withAttributes(Attributes.name(name))
}
@ -257,7 +257,7 @@ class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B],
extends Graph[FanOutShape2[(A, B), A, B], Unit] {
override def withAttributes(attr: Attributes): Unzip[A, B] =
new Unzip(shape, module.withAttributes(attr).wrap())
new Unzip(shape, module.withAttributes(attr).nest())
override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name))
}
@ -292,7 +292,7 @@ class Concat[T] private (override val shape: UniformFanInShape[T, T],
extends Graph[UniformFanInShape[T, T], Unit] {
override def withAttributes(attr: Attributes): Concat[T] =
new Concat(shape, module.withAttributes(attr).wrap())
new Concat(shape, module.withAttributes(attr).nest())
override def named(name: String): Concat[T] = withAttributes(Attributes.name(name))
}
@ -306,13 +306,13 @@ object FlowGraph extends GraphApply {
val flowCopy = via.module.carbonCopy
moduleInProgress =
moduleInProgress
.grow(flowCopy)
.connect(from, flowCopy.shape.inlets.head)
.connect(flowCopy.shape.outlets.head, to)
.compose(flowCopy)
.wire(from, flowCopy.shape.inlets.head)
.wire(flowCopy.shape.outlets.head, to)
}
def addEdge[T](from: Outlet[T], to: Inlet[T]): Unit = {
moduleInProgress = moduleInProgress.connect(from, to)
moduleInProgress = moduleInProgress.wire(from, to)
}
/**
@ -321,9 +321,9 @@ object FlowGraph extends GraphApply {
* connected.
*/
def add[S <: Shape](graph: Graph[S, _]): S = {
if (StreamLayout.Debug) graph.module.validate()
if (StreamLayout.Debug) StreamLayout.validate(graph.module)
val copy = graph.module.carbonCopy
moduleInProgress = moduleInProgress.grow(copy)
moduleInProgress = moduleInProgress.compose(copy)
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
}
@ -334,9 +334,9 @@ object FlowGraph extends GraphApply {
* Flow, Sink and Graph.
*/
private[stream] def add[S <: Shape, A](graph: Graph[S, _], transform: (A) Any): S = {
if (StreamLayout.Debug) graph.module.validate()
if (StreamLayout.Debug) StreamLayout.validate(graph.module)
val copy = graph.module.carbonCopy
moduleInProgress = moduleInProgress.grow(copy.transformMaterializedValue(transform.asInstanceOf[Any Any]))
moduleInProgress = moduleInProgress.compose(copy.transformMaterializedValue(transform.asInstanceOf[Any Any]))
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
}
@ -347,9 +347,9 @@ object FlowGraph extends GraphApply {
* Flow, Sink and Graph.
*/
private[stream] def add[S <: Shape, A, B](graph: Graph[S, _], combine: (A, B) Any): S = {
if (StreamLayout.Debug) graph.module.validate()
if (StreamLayout.Debug) StreamLayout.validate(graph.module)
val copy = graph.module.carbonCopy
moduleInProgress = moduleInProgress.grow(copy, combine)
moduleInProgress = moduleInProgress.compose(copy, combine)
graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S]
}
@ -372,15 +372,15 @@ object FlowGraph extends GraphApply {
*/
def materializedValue: Outlet[M] = {
val module = new MaterializedValueSource[Any]
moduleInProgress = moduleInProgress.grow(module)
moduleInProgress = moduleInProgress.compose(module)
module.shape.outlet.asInstanceOf[Outlet[M]]
}
private[stream] def andThen(port: OutPort, op: StageModule): Unit = {
moduleInProgress =
moduleInProgress
.grow(op)
.connect(port, op.inPort)
.compose(op)
.wire(port, op.inPort)
}
private[stream] def buildRunnable[Mat](): RunnableGraph[Mat] = {
@ -389,7 +389,7 @@ object FlowGraph extends GraphApply {
"Cannot build the RunnableGraph because there are unconnected ports: " +
(moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", "))
}
new RunnableGraph(moduleInProgress.wrap())
new RunnableGraph(moduleInProgress.nest())
}
private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = {
@ -400,7 +400,7 @@ object FlowGraph extends GraphApply {
s"Cannot build Source with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.outPorts.head != outlet)
throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}")
new Source(moduleInProgress.replaceShape(SourceShape(outlet)).wrap())
new Source(moduleInProgress.replaceShape(SourceShape(outlet)).nest())
}
private[stream] def buildFlow[In, Out, Mat](inlet: Inlet[In], outlet: Outlet[Out]): Flow[In, Out, Mat] = {
@ -411,7 +411,7 @@ object FlowGraph extends GraphApply {
throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}")
if (moduleInProgress.inPorts.head != inlet)
throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}")
new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).wrap())
new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).nest())
}
private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = {
@ -422,7 +422,7 @@ object FlowGraph extends GraphApply {
throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the modules open Outlets [${moduleInProgress.outPorts.mkString(",")}]")
if (moduleInProgress.inPorts.toSet != shape.inlets.toSet)
throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the modules open Inlets [${moduleInProgress.inPorts.mkString(",")}]")
new BidiFlow(moduleInProgress.replaceShape(shape).wrap())
new BidiFlow(moduleInProgress.replaceShape(shape).nest())
}
private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = {
@ -433,7 +433,7 @@ object FlowGraph extends GraphApply {
s"Cannot build Sink with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.inPorts.head != inlet)
throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}")
new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).wrap())
new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).nest())
}
private[stream] def module: Module = moduleInProgress

View file

@ -36,7 +36,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(module.withAttributes(attr).wrap())
new Sink(module.withAttributes(attr).nest())
override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name))

View file

@ -3,33 +3,19 @@
*/
package akka.stream.scaladsl
import scala.language.higherKinds
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.stream._
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule, DefaultAttributes }
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective }
import org.reactivestreams._
import akka.stream.{ SourceShape, Inlet, Outlet }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage }
import scala.annotation.unchecked.uncheckedVariance
import scala.language.higherKinds
import akka.actor.Props
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher }
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import scala.language.higherKinds
import scala.concurrent.{ ExecutionContext, Future }
import akka.stream.{ Materializer, Graph }
import akka.stream.impl._
import akka.actor.Cancellable
import akka.actor.ActorRef
import scala.concurrent.Promise
import akka.stream.stage.SyncDirective
import akka.stream.OverflowStrategy
import akka.stream.Attributes
/**
* A `Source` is a set of stream processing steps that has one open output. It can comprise
@ -58,8 +44,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
val flowCopy = flow.module.carbonCopy
new Source(
module
.growConnect(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine)
.replaceShape(SourceShape(flowCopy.shape.outlets.head)))
.fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine)
.replaceShape(SourceShape(flowCopy.shape.outlets.head))) // FIXME why is not .wrap() needed here?
}
}
@ -75,7 +61,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] = {
val sinkCopy = sink.module.carbonCopy
RunnableGraph(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine))
RunnableGraph(module.fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine))
}
/**
@ -89,14 +75,14 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
// No need to copy here, op is a fresh instance
new Source(
module
.growConnect(op, shape.outlet, op.inPort)
.fuse(op, shape.outlet, op.inPort)
.replaceShape(SourceShape(op.outPort)))
}
override private[scaladsl] def andThenMat[U, Mat2](op: MaterializingStageFactory): Repr[U, Mat2] = {
new Source(
module
.growConnect(op, shape.outlet, op.inPort, Keep.right)
.fuse(op, shape.outlet, op.inPort, Keep.right)
.replaceShape(SourceShape(op.outPort)))
}
@ -150,14 +136,18 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
*/
def ++[Out2 >: Out, M](second: Graph[SourceShape[Out2], M]): Source[Out2, (Mat, M)] = concat(second)
/**
* Nests the current Source and returns a Source with the given Attributes
* @param attr the attributes to add
* @return a new Source with the added attributes
*/
override def withAttributes(attr: Attributes): Repr[Out, Mat] =
new Source(module.withAttributes(attr).wrap())
new Source(module.withAttributes(attr).nest()) // User API
override def named(name: String): Repr[Out, Mat] = withAttributes(Attributes.name(name))
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)
}
object Source extends SourceApply {
@ -193,7 +183,10 @@ object Source extends SourceApply {
* from the downstream transformation steps.
*/
def apply[T](f: () Iterator[T]): Source[T, Unit] =
apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() })
apply(new immutable.Iterable[T] {
override def iterator: Iterator[T] = f()
override def toString: String = "() => Iterator"
})
/**
* A graph with the shape of a source logically is a source, this method makes
@ -223,7 +216,11 @@ object Source extends SourceApply {
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): Source[T, Unit] =
Source.single(future).mapAsyncUnordered(1)(id).withAttributes(DefaultAttributes.futureSource)
new Source(
new PublisherSource(
SingleElementPublisher(future, "FutureSource"),
DefaultAttributes.futureSource,
shape("FutureSource"))).mapAsyncUnordered(1)(id)
/**
* Elements are emitted periodically with the specified interval.
@ -240,22 +237,36 @@ object Source extends SourceApply {
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def single[T](element: T): Source[T, Unit] =
apply(SingleElementPublisher(element, "SingleSource")).withAttributes(DefaultAttributes.singleSource)
new Source(
new PublisherSource(
SingleElementPublisher(element, "SingleSource"),
DefaultAttributes.singleSource,
shape("SingleSource")))
/**
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, Unit] =
apply(new immutable.Iterable[T] {
override val iterator: Iterator[T] = Iterator.continually(element)
override def toString: String = "Iterable.continually(" + element + ")"
}).withAttributes(DefaultAttributes.repeat)
new Source(
new PublisherSource(
SingleElementPublisher(
new immutable.Iterable[T] {
override val iterator: Iterator[T] = Iterator.continually(element)
override def toString: String = "repeat(" + element + ")"
}, "RepeatSource"),
DefaultAttributes.repeat,
shape("RepeatSource"))).mapConcat(id)
/**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
*/
def empty[T]: Source[T, Unit] = _empty
private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher).withAttributes(DefaultAttributes.emptySource)
private[this] val _empty: Source[Nothing, Unit] =
new Source(
new PublisherSource[Nothing](
EmptyPublisher,
DefaultAttributes.emptySource,
shape("EmptySource")))
/**
* Create a `Source` with no elements, which does not complete its downstream,
@ -273,7 +284,11 @@ object Source extends SourceApply {
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T, Unit] =
apply(ErrorPublisher(cause, "FailedSource")).withAttributes(DefaultAttributes.failedSource)
new Source(
new PublisherSource(
ErrorPublisher(cause, "FailedSource")[T],
DefaultAttributes.failedSource,
shape("FailedSource")))
/**
* Concatenates two sources so that the first element