diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index b94f300db3..564afef79d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -46,7 +46,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { case "shutdown" ⇒ context.system.terminate() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid, self) + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala index 72238ed791..6358af88a1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala @@ -11,8 +11,8 @@ import akka.stream.impl.StreamLayout.Module */ private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module { override def replaceShape(s: Shape) = - if (s == shape) this - else throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule") + else this val inPort = Inlet[In]("Flow.in") val outPort = Outlet[Out]("Flow.out") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 4109caa5c7..cb73fde8cb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -21,8 +21,8 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat) override def replaceShape(s: Shape): Module = - if (s == shape) this - else throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that") + if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that") + else this // This is okay since the only caller of this method is right below. protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat] diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 6e50d56fa9..7053e982d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -31,8 +31,8 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte def create(context: MaterializationContext): (Subscriber[In] @uncheckedVariance, Mat) override def replaceShape(s: Shape): Module = - if (s == shape) this - else throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that") + if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that") + else this // This is okay since we the only caller of this method is right below. protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat] diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 56fa6817e4..921144b73d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -23,6 +23,7 @@ private[stream] object Stages { object DefaultAttributes { val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") + val inputBufferOne = inputBuffer(initial = 1, max = 1) val fused = name("fused") val materializedValueSource = name("matValueSource") @@ -98,8 +99,8 @@ private[stream] object Stages { val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") - val headSink = name("headSink") and inputBuffer(initial = 1, max = 1) - val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1) + val headSink = name("headSink") and inputBufferOne + val headOptionSink = name("headOptionSink") and inputBufferOne val lastSink = name("lastSink") val lastOptionSink = name("lastOptionSink") val seqSink = name("seqSink") diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 8252f3838c..e2f5f369b8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -319,22 +319,20 @@ object StreamLayout { object EmptyModule extends Module { override def shape = ClosedShape override def replaceShape(s: Shape) = - if (s == ClosedShape) this - else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") + if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") + else this override def compose(that: Module): Module = that override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") - override def subModules: Set[Module] = Set.empty - override def withAttributes(attributes: Attributes): Module = throw new UnsupportedOperationException("EmptyModule cannot carry attributes") + + override def subModules: Set[Module] = Set.empty override def attributes = Attributes.none - override def carbonCopy: Module = this - override def isRunnable: Boolean = false override def isAtomic: Boolean = false override def materializedValueComputation: MaterializedValueNode = Ignore @@ -345,20 +343,23 @@ object StreamLayout { copyOf: Module) extends Module { override val subModules: Set[Module] = Set(copyOf) - override def withAttributes(attr: Attributes): Module = this.copy(attributes = attr) + override def withAttributes(attr: Attributes): Module = + if (attr ne attributes) this.copy(attributes = attr) + else this override def carbonCopy: Module = this.copy(shape = shape.deepCopy()) - override def replaceShape(s: Shape): Module = { - shape.requireSamePortsAs(s) - CompositeModule(this, s) - } + override def replaceShape(s: Shape): Module = + if (s != shape) { + shape.requireSamePortsAs(s) + CompositeModule(this, s) + } else this override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf) override def isCopied: Boolean = true - override def toString: String = "copy of " + copyOf.toString + override def toString: String = s"$copyOf (copy)" } final case class CompositeModule( @@ -369,10 +370,11 @@ object StreamLayout { override val materializedValueComputation: MaterializedValueNode, override val attributes: Attributes) extends Module { - override def replaceShape(s: Shape): Module = { - shape.requireSamePortsAs(s) - copy(shape = s) - } + override def replaceShape(s: Shape): Module = + if (s != shape) { + shape.requireSamePortsAs(s) + copy(shape = s) + } else this override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this) @@ -380,11 +382,12 @@ object StreamLayout { override def toString = s""" - | Module: ${this.attributes.nameOrDefault("unnamed")} - | Modules: ${subModules.iterator.map(m ⇒ "\n " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("")} - | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} - | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} - |""".stripMargin + | Name: ${this.attributes.nameOrDefault("unnamed")} + | Modules: + | ${subModules.iterator.map(m ⇒ m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} + | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} + | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} + |""".stripMargin } object CompositeModule { @@ -402,10 +405,11 @@ object StreamLayout { override def isFused: Boolean = true - override def replaceShape(s: Shape): Module = { - shape.requireSamePortsAs(s) - copy(shape = s) - } + override def replaceShape(s: Shape): Module = + if (s != shape) { + shape.requireSamePortsAs(s) + copy(shape = s) + } else this override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this) @@ -413,11 +417,11 @@ object StreamLayout { override def toString = s""" - | Module: ${this.attributes.nameOrDefault("unnamed")} + | Name: ${this.attributes.nameOrDefault("unnamed")} | Modules: - | ${subModules.iterator.map(m ⇒ m.toString.split("\n").mkString("\n ")).mkString("\n ")} - | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} - | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} + | ${subModules.iterator.map(m ⇒ m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")} + | Downstreams: ${downstreams.iterator.map { case (in, out) ⇒ s"\n $in -> $out" }.mkString("")} + | Upstreams: ${upstreams.iterator.map { case (out, in) ⇒ s"\n $out -> $in" }.mkString("")} |""".stripMargin } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 1f82637e73..3e8f3799f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -31,7 +31,9 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) - override final def replaceShape(newShape: Shape): Module = CompositeModule(this, newShape) + override final def replaceShape(newShape: Shape): Module = + if (newShape != shape) CompositeModule(this, newShape) + else this override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index c51f059e2a..933a143e55 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -25,13 +25,19 @@ import scala.util.Try private[akka] final case class GraphStageModule(shape: Shape, attributes: Attributes, stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module { - def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) + override def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) - def replaceShape(s: Shape): Module = CompositeModule(this, s) + override def replaceShape(s: Shape): Module = + if (s != shape) CompositeModule(this, s) + else this - def subModules: Set[Module] = Set.empty + override def subModules: Set[Module] = Set.empty - def withAttributes(attributes: Attributes): Module = new GraphStageModule(shape, attributes, stage) + override def withAttributes(attributes: Attributes): Module = + if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage) + else this + + override def toString: String = stage.toString } /** diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index cc8c5e4488..781ef0942b 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -127,9 +127,10 @@ object SslTls { TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) override def replaceShape(s: Shape) = - if (s == shape) this - else if (shape.hasSamePortsAs(s)) CompositeModule(this, s) - else throw new IllegalArgumentException("trying to replace shape with different ports") + if (s != shape) { + shape.requireSamePortsAs(s) + CompositeModule(this, s) + } else this } /**