From 5848485f3cc1bc0d3d92e785e9efff2afdadff07 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 7 Mar 2017 11:29:48 +0100 Subject: [PATCH] fix ReverseArrowSpec, #22463 --- .../stream/scaladsl/ReverseArrowSpec.scala | 2 +- .../akka/stream/impl/TraversalBuilder.scala | 71 +++++++------------ 2 files changed, 25 insertions(+), 48 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 35c1a287bb..19d8a61a45 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -161,7 +161,7 @@ class ReverseArrowSpec extends StreamSpec { val src = b.add(source) src ~> f sink2 <~ f - (the[IllegalArgumentException] thrownBy (s <~ f <~ src)).getMessage should include("already connected") + (the[UnsupportedOperationException] thrownBy (s <~ f <~ src)).getMessage should include("Cannot wire ports in a completed builder") ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 3d6cf357c9..3cd7caf294 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -169,16 +169,14 @@ object TraversalBuilder { traversalSoFar = MaterializeAtomic(module, Array.ofDim[Int](module.shape.outlets.size)), inSlots = module.shape.inlets.size, inToOffset = module.shape.inlets.map(in ⇒ in → in.id).toMap, - attributes - ) + attributes) b } else { AtomicTraversalBuilder( module, Array.ofDim[Int](module.shape.outlets.size), module.shape.outlets.size, - attributes - ) + attributes) } } @@ -352,8 +350,7 @@ final case class CompletedTraversalBuilder( traversalSoFar: Traversal, inSlots: Int, inToOffset: Map[InPort, Int], - attributes: Attributes -) extends TraversalBuilder { + attributes: Attributes) extends TraversalBuilder { override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { val key = new BuilderKey @@ -362,8 +359,7 @@ final case class CompletedTraversalBuilder( inSlots = inSlots, inOffsets = inToOffset, pendingBuilders = Map(key → this), - attributes = attributes - ).add(submodule, shape, combineMat) + attributes = attributes).add(submodule, shape, combineMat) } override def traversal: Traversal = @@ -378,7 +374,7 @@ final case class CompletedTraversalBuilder( override def isTraversalComplete: Boolean = true override def wire(out: OutPort, in: InPort): TraversalBuilder = - throw new UnsupportedOperationException("Cannot wire ports in a completed builder.") + throw new UnsupportedOperationException(s"Cannot wire ports in a completed builder. ${out.mappedTo} ~> ${in.mappedTo}") override def setAttributes(attributes: Attributes): TraversalBuilder = copy(attributes = attributes) @@ -406,8 +402,7 @@ final case class AtomicTraversalBuilder( module: AtomicModule[Shape, Any], outToSlot: Array[Int], unwiredOuts: Int, - attributes: Attributes -) extends TraversalBuilder { + attributes: Attributes) extends TraversalBuilder { override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { // TODO: Use automatically a linear builder if applicable @@ -445,8 +440,7 @@ final case class AtomicTraversalBuilder( inSlots = inSlots, // TODO Optimize Map creation inToOffset = module.shape.inlets.iterator.map(in ⇒ in → in.id).toMap, - attributes = attributes - ) + attributes = attributes) } else copy(outToSlot = newOutToSlot, unwiredOuts = newUnwiredOuts) } @@ -490,8 +484,7 @@ object LinearTraversalBuilder { if (inPortOpt.isDefined) 1 else 0, traversalSoFar = MaterializeAtomic(module, wiring), pendingBuilder = None, - attributes - ) + attributes) } def addMatCompose[A, B](t: Traversal, matCompose: (A, B) ⇒ Any): Traversal = { @@ -506,8 +499,7 @@ object LinearTraversalBuilder { def fromBuilder[A, B]( traversalBuilder: TraversalBuilder, shape: Shape, - combine: (A, B) ⇒ Any = Keep.right[A, B] - ): LinearTraversalBuilder = { + combine: (A, B) ⇒ Any = Keep.right[A, B]): LinearTraversalBuilder = { traversalBuilder match { case linear: LinearTraversalBuilder ⇒ if (combine eq Keep.right) linear @@ -527,8 +519,7 @@ object LinearTraversalBuilder { inSlots = completed.inSlots, completed.traversal.concat(addMatCompose(PushNotUsed, combine)), pendingBuilder = None, - Attributes.none - ) + Attributes.none) case composite ⇒ val inOpt = shape.inlets.headOption @@ -546,8 +537,7 @@ object LinearTraversalBuilder { addMatCompose(PushNotUsed, combine), pendingBuilder = Some(composite), Attributes.none, - beforeBuilder = if (inOpt.isDefined) PushAttributes(composite.attributes) else EmptyTraversal - ) + beforeBuilder = if (inOpt.isDefined) PushAttributes(composite.attributes) else EmptyTraversal) } } @@ -569,8 +559,7 @@ final case class LinearTraversalBuilder( traversalSoFar: Traversal, pendingBuilder: Option[TraversalBuilder], attributes: Attributes, - beforeBuilder: Traversal = EmptyTraversal -) extends TraversalBuilder { + beforeBuilder: Traversal = EmptyTraversal) extends TraversalBuilder { protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty @@ -622,8 +611,7 @@ final case class LinearTraversalBuilder( .assign(out, inOffset - composite.offsetOfModule(out)) .traversal .concat(traversalSoFar), - pendingBuilder = None - ) + pendingBuilder = None) case None ⇒ copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } @@ -661,8 +649,7 @@ final case class LinearTraversalBuilder( .assign(out, relativeSlot) .traversal .concat(traversalSoFar), - pendingBuilder = None - ) + pendingBuilder = None) case None ⇒ copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } @@ -686,12 +673,10 @@ final case class LinearTraversalBuilder( if (toAppend.isEmpty) { copy( - traversalSoFar = PushNotUsed.concat(LinearTraversalBuilder.addMatCompose(traversalSoFar, matCompose)) - ) + traversalSoFar = PushNotUsed.concat(LinearTraversalBuilder.addMatCompose(traversalSoFar, matCompose))) } else if (this.isEmpty) { toAppend.copy( - traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose)) - ) + traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose))) } else { if (outPort.nonEmpty) { require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " + @@ -704,8 +689,7 @@ final case class LinearTraversalBuilder( composite .assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset) .traversal - .concat(traversalSoFar) - ) + .concat(traversalSoFar)) case None ⇒ // No need to rewire if input port is at the expected position if (toAppend.inOffset == (toAppend.inSlots - 1)) @@ -734,8 +718,7 @@ final case class LinearTraversalBuilder( traversalSoFar = newTraversal, pendingBuilder = toAppend.pendingBuilder, attributes = Attributes.none, - beforeBuilder = if (toAppend.pendingBuilder.isEmpty) EmptyTraversal else PushAttributes(toAppend.attributes) - ) + beforeBuilder = if (toAppend.pendingBuilder.isEmpty) EmptyTraversal else PushAttributes(toAppend.attributes)) } else throw new Exception("should this happen?") } @@ -797,8 +780,7 @@ final case class CompositeTraversalBuilder( outOwners: Map[OutPort, BuilderKey] = Map.empty, unwiredOuts: Int = 0, attributes: Attributes, - islandTag: IslandTag = null -) extends TraversalBuilder { + islandTag: IslandTag = null) extends TraversalBuilder { override def toString: String = s""" @@ -849,8 +831,7 @@ final case class CompositeTraversalBuilder( traversalSoFar = finalTraversal, inSlots, inOffsets, - attributes - ) + attributes) } else this } @@ -876,15 +857,13 @@ final case class CompositeTraversalBuilder( // TODO Optimize Map access pendingBuilders = pendingBuilders.updated(builderKey, result), // pendingBuilders = pendingBuilders - builderKey, - unwiredOuts = unwiredOuts - 1 - ) + unwiredOuts = unwiredOuts - 1) } else { // Update structures with result copy( inBaseOffsetForOut = inBaseOffsetForOut - out, unwiredOuts = unwiredOuts - 1, - pendingBuilders = pendingBuilders.updated(builderKey, result) - ) + pendingBuilders = pendingBuilders.updated(builderKey, result)) } // If we have no more unconnected outputs, we can finally build the Traversal and shed most of the auxiliary data. @@ -934,8 +913,7 @@ final case class CompositeTraversalBuilder( reverseBuildSteps = newBuildSteps, inSlots = inSlots + submodule.inSlots, pendingBuilders = pendingBuilders.updated(builderKey, submodule), - inOffsets = newInOffsets - ) + inOffsets = newInOffsets) } else { // Added module have unwired outputs. @@ -969,8 +947,7 @@ final case class CompositeTraversalBuilder( inBaseOffsetForOut = newBaseOffsetsForOut, outOwners = newOutOwners, pendingBuilders = pendingBuilders.updated(builderKey, submodule), - unwiredOuts = unwiredOuts + submodule.unwiredOuts - ) + unwiredOuts = unwiredOuts + submodule.unwiredOuts) } added.completeIfPossible