= - Minor cleanups, will get an appropriate commit message soon

This commit is contained in:
Viktor Klang 2016-02-15 10:37:19 +01:00
parent e1afb62cab
commit 365e6243b3
9 changed files with 60 additions and 46 deletions

View file

@ -46,7 +46,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig {
class Subject extends Actor {
def receive = {
case "shutdown" context.system.terminate()
case "identify" sender() ! (AddressUidExtension(context.system).addressUid, self)
case "identify" sender() ! (AddressUidExtension(context.system).addressUid -> self)
}
}

View file

@ -11,8 +11,8 @@ import akka.stream.impl.StreamLayout.Module
*/
private[stream] trait FlowModule[In, Out, Mat] extends StreamLayout.Module {
override def replaceShape(s: Shape) =
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a FlowModule")
else this
val inPort = Inlet[In]("Flow.in")
val outPort = Outlet[Out]("Flow.out")

View file

@ -21,8 +21,8 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out
def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat)
override def replaceShape(s: Shape): Module =
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that")
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Source, you need to wrap it in a Graph for that")
else this
// This is okay since the only caller of this method is right below.
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]

View file

@ -31,8 +31,8 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
def create(context: MaterializationContext): (Subscriber[In] @uncheckedVariance, Mat)
override def replaceShape(s: Shape): Module =
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that")
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of a Sink, you need to wrap it in a Graph for that")
else this
// This is okay since we the only caller of this method is right below.
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]

View file

@ -23,6 +23,7 @@ private[stream] object Stages {
object DefaultAttributes {
val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")
val inputBufferOne = inputBuffer(initial = 1, max = 1)
val fused = name("fused")
val materializedValueSource = name("matValueSource")
@ -98,8 +99,8 @@ private[stream] object Stages {
val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink")
val headSink = name("headSink") and inputBuffer(initial = 1, max = 1)
val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1)
val headSink = name("headSink") and inputBufferOne
val headOptionSink = name("headOptionSink") and inputBufferOne
val lastSink = name("lastSink")
val lastOptionSink = name("lastOptionSink")
val seqSink = name("seqSink")

View file

@ -319,22 +319,20 @@ object StreamLayout {
object EmptyModule extends Module {
override def shape = ClosedShape
override def replaceShape(s: Shape) =
if (s == ClosedShape) this
else throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule")
else this
override def compose(that: Module): Module = that
override def compose[A, B, C](that: Module, f: (A, B) C): Module =
throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule")
override def subModules: Set[Module] = Set.empty
override def withAttributes(attributes: Attributes): Module =
throw new UnsupportedOperationException("EmptyModule cannot carry attributes")
override def subModules: Set[Module] = Set.empty
override def attributes = Attributes.none
override def carbonCopy: Module = this
override def isRunnable: Boolean = false
override def isAtomic: Boolean = false
override def materializedValueComputation: MaterializedValueNode = Ignore
@ -345,20 +343,23 @@ object StreamLayout {
copyOf: Module) extends Module {
override val subModules: Set[Module] = Set(copyOf)
override def withAttributes(attr: Attributes): Module = this.copy(attributes = attr)
override def withAttributes(attr: Attributes): Module =
if (attr ne attributes) this.copy(attributes = attr)
else this
override def carbonCopy: Module = this.copy(shape = shape.deepCopy())
override def replaceShape(s: Shape): Module = {
override def replaceShape(s: Shape): Module =
if (s != shape) {
shape.requireSamePortsAs(s)
CompositeModule(this, s)
}
} else this
override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf)
override def isCopied: Boolean = true
override def toString: String = "copy of " + copyOf.toString
override def toString: String = s"$copyOf (copy)"
}
final case class CompositeModule(
@ -369,10 +370,11 @@ object StreamLayout {
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module {
override def replaceShape(s: Shape): Module = {
override def replaceShape(s: Shape): Module =
if (s != shape) {
shape.requireSamePortsAs(s)
copy(shape = s)
}
} else this
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this)
@ -380,8 +382,9 @@ object StreamLayout {
override def toString =
s"""
| Module: ${this.attributes.nameOrDefault("unnamed")}
| Modules: ${subModules.iterator.map(m "\n " + m.attributes.nameOrDefault(m.getClass.getName)).mkString("")}
| Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules:
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin
@ -402,10 +405,11 @@ object StreamLayout {
override def isFused: Boolean = true
override def replaceShape(s: Shape): Module = {
override def replaceShape(s: Shape): Module =
if (s != shape) {
shape.requireSamePortsAs(s)
copy(shape = s)
}
} else this
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this)
@ -413,9 +417,9 @@ object StreamLayout {
override def toString =
s"""
| Module: ${this.attributes.nameOrDefault("unnamed")}
| Name: ${this.attributes.nameOrDefault("unnamed")}
| Modules:
| ${subModules.iterator.map(m m.toString.split("\n").mkString("\n ")).mkString("\n ")}
| ${subModules.iterator.map(m m.attributes.nameLifted.getOrElse(m.toString.replaceAll("\n", "\n "))).mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin

View file

@ -31,7 +31,9 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at
override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
override final def replaceShape(newShape: Shape): Module = CompositeModule(this, newShape)
override final def replaceShape(newShape: Shape): Module =
if (newShape != shape) CompositeModule(this, newShape)
else this
override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes"
}

View file

@ -25,13 +25,19 @@ import scala.util.Try
private[akka] final case class GraphStageModule(shape: Shape,
attributes: Attributes,
stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module {
def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this)
def replaceShape(s: Shape): Module = CompositeModule(this, s)
override def replaceShape(s: Shape): Module =
if (s != shape) CompositeModule(this, s)
else this
def subModules: Set[Module] = Set.empty
override def subModules: Set[Module] = Set.empty
def withAttributes(attributes: Attributes): Module = new GraphStageModule(shape, attributes, stage)
override def withAttributes(attributes: Attributes): Module =
if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage)
else this
override def toString: String = stage.toString
}
/**

View file

@ -127,9 +127,10 @@ object SslTls {
TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo)
override def replaceShape(s: Shape) =
if (s == shape) this
else if (shape.hasSamePortsAs(s)) CompositeModule(this, s)
else throw new IllegalArgumentException("trying to replace shape with different ports")
if (s != shape) {
shape.requireSamePortsAs(s)
CompositeModule(this, s)
} else this
}
/**