ensure that graph attributes are not lost

- this entails making Module.isSealed==true if attributes are set
- also removed Module.nest(), which implied fixing replaceShape to form
  a CompositeModule where CopiedModule was used before (GraphModule and
  TlsModule)
This commit is contained in:
Roland Kuhn 2016-02-12 08:28:16 +01:00
parent 4e49d75ad8
commit 7b7647435b
18 changed files with 87 additions and 97 deletions

View file

@ -8,4 +8,5 @@ akka {
akka.actor.warn-about-java-serializer-usage = false
stream.materializer.debug.fuzzing-mode = on
stream.secret-test-fuzzing-warning-disable = 42
}

View file

@ -511,7 +511,9 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
EventFilter[IllegalArgumentException](pattern = ".*Cannot pull closed port.*", occurrences = 1).intercept {
upstream.onComplete()
}
val ev = lastEvents()
ev.nonEmpty should be(true)
ev.forall {

View file

@ -4,7 +4,6 @@
package akka.stream.scaladsl
import akka.NotUsed
import scala.collection.immutable
import scala.concurrent.duration._
import akka.stream.ActorMaterializer
@ -14,6 +13,7 @@ import akka.stream.testkit.Utils._
import org.reactivestreams.Subscription
import akka.testkit.TestProbe
import org.reactivestreams.Subscriber
import akka.testkit.EventFilter
class FlowIteratorSpec extends AbstractFlowIteratorSpec {
override def testName = "A Flow based on an iterator producing function"
@ -40,7 +40,9 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec {
sub.request(1)
c.expectNext(1)
c.expectNoMsg(100.millis)
EventFilter[IllegalStateException](message = "not two", occurrences = 1).intercept {
sub.request(2)
}
c.expectError().getMessage should be("not two")
sub.request(2)
c.expectNoMsg(100.millis)

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber }
import org.reactivestreams.Publisher
import scala.concurrent.duration._
import akka.testkit.EventFilter
class FlowZipWithSpec extends BaseTwoStreamsSetup {
@ -46,7 +47,9 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup {
probe.expectNext(1 / -2)
probe.expectNext(2 / -1)
EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
probe.expectError() match {
case a: java.lang.ArithmeticException a.getMessage should be("/ by zero")
}

View file

@ -8,9 +8,9 @@ import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.reactivestreams.Publisher
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.testkit.EventFilter
class GraphUnzipWithSpec extends AkkaSpec {
@ -174,7 +174,9 @@ class GraphUnzipWithSpec extends AkkaSpec {
leftProbe.expectNext(1 / -1)
rightProbe.expectNext("1/-1")
EventFilter[ArithmeticException](occurrences = 1).intercept {
requestFromBoth()
}
leftProbe.expectError() match {
case a: java.lang.ArithmeticException a.getMessage should be("/ by zero")

View file

@ -3,6 +3,7 @@ package akka.stream.scaladsl
import akka.stream.testkit._
import scala.concurrent.duration._
import akka.stream._
import akka.testkit.EventFilter
class GraphZipWithSpec extends TwoStreamsSetup {
import GraphDSL.Implicits._
@ -65,7 +66,9 @@ class GraphZipWithSpec extends TwoStreamsSetup {
probe.expectNext(1 / -2)
probe.expectNext(2 / -1)
EventFilter[ArithmeticException](occurrences = 1).intercept {
subscription.request(2)
}
probe.expectError() match {
case a: java.lang.ArithmeticException a.getMessage should be("/ by zero")
}

View file

@ -13,6 +13,7 @@ import scala.util.control.NoStackTrace
import akka.stream._
import akka.stream.testkit._
import akka.NotUsed
import akka.testkit.EventFilter
class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
@ -235,6 +236,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
"terminate with a failure if there is an exception thrown" in {
val t = new RuntimeException("expected")
EventFilter[RuntimeException](message = "expected", occurrences = 1) intercept
whenReady(
Source.unfold((0, 1)) {
case (a, _) if a > 10000000 throw t

View file

@ -15,7 +15,7 @@ trait GraphApply {
def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ⇒ S): Graph[S, NotUsed] = {
val builder = new GraphDSL.Builder
val s = buildBlock(builder)
val mod = builder.module.nest().replaceShape(s)
val mod = builder.module.replaceShape(s)
new GraphApply.GraphImpl(s, mod)
}
@ -28,7 +28,7 @@ trait GraphApply {
val builder = new GraphDSL.Builder
val s1 = builder.add(g1)
val s = buildBlock(builder)(s1)
val mod = builder.module.nest().replaceShape(s)
val mod = builder.module.replaceShape(s)
new GraphApply.GraphImpl(s, mod)
}
@ -47,7 +47,7 @@ trait GraphApply {
[2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))#
]
val s = buildBlock(builder)([#s1#])
val mod = builder.module.nest().replaceShape(s)
val mod = builder.module.replaceShape(s)
new GraphApply.GraphImpl(s, mod)
}#
@ -63,7 +63,7 @@ private[stream] object GraphApply {
extends Graph[S, Mat] {
override def withAttributes(attr: Attributes): Graph[S, Mat] =
new GraphImpl(shape, module.withAttributes(attr).nest())
new GraphImpl(shape, module.withAttributes(attr))
override def named(name: String): Graph[S, Mat] = withAttributes(Attributes.name(name))
}

View file

@ -36,7 +36,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
private val _logger = Logging.getLogger(system, this)
override def logger = _logger
if (settings.fuzzingMode) {
if (settings.fuzzingMode && !system.settings.config.hasPath("akka.stream.secret-test-fuzzing-warning-disable")) {
_logger.warning("Fuzzing mode is enabled on this system. If you see this warning on your production system then " +
"set akka.stream.materializer.debug.fuzzing-mode to off.")
}

View file

@ -136,6 +136,10 @@ object StreamLayout {
/**
* Verify that the given Shape has the same ports and return a new module with that shape.
* Concrete implementations may throw UnsupportedOperationException where applicable.
*
* Please note that this method MUST NOT be implemented using a CopiedModule since
* the purpose of replaceShape can also be to rearrange the ports (as in BidiFlow.reversed)
* and that purpose would be defeated.
*/
def replaceShape(s: Shape): Module
@ -199,7 +203,7 @@ object StreamLayout {
downstreams.updated(from, to),
upstreams.updated(to, from),
materializedValueComputation,
attributes)
if (isSealed) Attributes.none else attributes)
}
final def transformMaterializedValue(f: Any Any): Module = {
@ -289,39 +293,20 @@ object StreamLayout {
Attributes.none)
}
/**
* Creates a new Module which contains `this` Module
* @return a new Module
*/
def nest(): Module = {
if (Debug) validate(this)
CompositeModule(
Set(this),
shape,
/*
* Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the
* layout information of all the nested modules). Copied modules break the nesting, scoping them to the
* copied module. The MaterializerSession will take care of propagating the necessary Publishers and Subscribers
* from the enclosed scope to the outer scope.
*/
downstreams,
upstreams,
/*
* Wrapping like this shields the outer module from the details of the
* materialized value computation of its submodules.
*/
Atomic(this),
Attributes.none)
}
def subModules: Set[Module]
final def isSealed: Boolean = isAtomic || isCopied || isFused
final def isSealed: Boolean = isAtomic || isCopied || isFused || attributes.attributeList.nonEmpty
def downstreams: Map[OutPort, InPort] = Map.empty
def upstreams: Map[InPort, OutPort] = Map.empty
def materializedValueComputation: MaterializedValueNode = Atomic(this)
/**
* The purpose of this method is to create a copy to be included in a larger
* graph such that port identity clashes are avoided. Where a full copy is not
* possible or desirable, use a CopiedModule. The shape of the resulting
* module MUST NOT contain the same ports as this modules shape.
*/
def carbonCopy: Module
def attributes: Attributes
@ -342,8 +327,6 @@ object StreamLayout {
override def compose[A, B, C](that: Module, f: (A, B) C): Module =
throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule")
override def nest(): Module = this
override def subModules: Set[Module] = Set.empty
override def withAttributes(attributes: Attributes): Module =
@ -368,7 +351,7 @@ object StreamLayout {
override def replaceShape(s: Shape): Module = {
shape.requireSamePortsAs(s)
copy(shape = s)
CompositeModule(this, s)
}
override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf)
@ -404,6 +387,10 @@ object StreamLayout {
|""".stripMargin
}
object CompositeModule {
def apply(m: Module, s: Shape): CompositeModule = CompositeModule(Set(m), s, Map.empty, Map.empty, Atomic(m), Attributes.none)
}
final case class FusedModule(
override val subModules: Set[Module],
override val shape: Shape,

View file

@ -8,7 +8,7 @@ import akka.actor._
import akka.event.Logging
import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.impl.StreamLayout.{ CopiedModule, Module }
import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module }
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly }
import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
@ -29,13 +29,9 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at
override def subModules: Set[Module] = Set.empty
override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr)
override final def carbonCopy: Module = {
val newShape = shape.deepCopy()
replaceShape(newShape)
}
override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
override final def replaceShape(newShape: Shape): Module =
CopiedModule(newShape, attributes, copyOf = this)
override final def replaceShape(newShape: Shape): Module = CompositeModule(this, newShape)
override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes"
}

View file

@ -246,7 +246,7 @@ private[stream] object Fusing {
struct: BuildStructuralInfo,
openGroup: ju.Set[Module],
indent: Int): List[(Module, MaterializedValueNode)] = {
def log(msg: String): Unit = println(indent + msg)
def log(msg: String): Unit = println(" " * indent + msg)
val async = m match {
case _: GraphStageModule m.attributes.contains(AsyncBoundary)
case _: GraphModule m.attributes.contains(AsyncBoundary)
@ -275,7 +275,7 @@ private[stream] object Fusing {
* - we need to register the contained modules but take care to not include the internal
* wirings into the final result, see also `struct.removeInternalWires()`
*/
if (Debug) log(s"graph module ${m.toString.replace("\n", "\n" + indent)}")
if (Debug) log(s"graph module ${m.toString.replace("\n", "\n" + " " * indent)}")
// storing the old Shape in arrays for in-place updating as we clone the contained GraphStages
val oldIns = oldShape.inlets.toArray
@ -356,7 +356,7 @@ private[stream] object Fusing {
subMatBuilder ++= res
}
val subMat = subMatBuilder.result()
if (Debug) log(subMat.map(p s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + indent, "\n " + indent, ""))
if (Debug) log(subMat.map(p s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, ""))
// we need to remove all wirings that this module copied from nested modules so that we
// dont do wirings twice
val oldDownstreams = m match {

View file

@ -25,10 +25,9 @@ import scala.util.Try
private[akka] final case class GraphStageModule(shape: Shape,
attributes: Attributes,
stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module {
def carbonCopy: Module = replaceShape(shape.deepCopy())
def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
def replaceShape(s: Shape): Module =
CopiedModule(s, Attributes.none, this)
def replaceShape(s: Shape): Module = CompositeModule(this, s)
def subModules: Set[Module] = Set.empty

View file

@ -9,7 +9,7 @@ import java.util.Optional
import akka.{ NotUsed, japi }
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.StreamLayout.{ Module, CompositeModule }
import akka.util.ByteString
import javax.net.ssl._
@ -128,7 +128,7 @@ object SslTls {
override def replaceShape(s: Shape) =
if (s == shape) this
else if (shape.hasSamePortsAs(s)) copy(shape = s)
else if (shape.hasSamePortsAs(s)) CompositeModule(this, s)
else throw new IllegalArgumentException("trying to replace shape with different ports")
}

View file

@ -116,12 +116,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
/**
* Turn this BidiFlow around by 180 degrees, logically flipping it upside down in a protocol stack.
*/
def reversed: BidiFlow[I2, O2, I1, O1, Mat] = {
BidiFlow.fromGraph(GraphDSL.create(this) { implicit b
reversed
BidiShape(reversed.in2, reversed.out2, reversed.in1, reversed.out1)
})
}
def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(module.replaceShape(BidiShape(shape.in2, shape.out2, shape.in1, shape.out1)))
/**
* Transform only the materialized value of this BidiFlow, leaving all other properties as they were.
@ -137,7 +132,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(module.withAttributes(attr).nest())
new BidiFlow(module.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`

View file

@ -354,7 +354,7 @@ final case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Mo
def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
override def withAttributes(attr: Attributes): RunnableGraph[Mat] =
new RunnableGraph(module.withAttributes(attr).nest())
new RunnableGraph(module.withAttributes(attr))
override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name))
}
@ -928,8 +928,6 @@ trait FlowOps[+Out, +Mat] {
def conflateWithSeed[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] =
via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the

View file

@ -58,7 +58,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(module.withAttributes(attr).nest())
new Sink(module.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`

View file

@ -132,7 +132,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
new Source(module.withAttributes(attr).nest())
new Source(module.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`