From 67fd0cc0b6ade4abaf686e303cb62280583bad77 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 9 Mar 2017 17:44:11 +0100 Subject: [PATCH] =str #22438 make use of OptionVal in materializer internals --- .../src/main/scala/akka/util/OptionVal.scala | 3 + .../akka/stream/GraphBuilderBenchmark.scala | 4 +- .../stream/MaterializationBenchmark.scala | 2 +- .../stream/impl/ActorMaterializerImpl.scala | 3 +- .../impl/PhasedFusingActorMaterializer.scala | 9 +- .../akka/stream/impl/TraversalBuilder.scala | 90 ++++++++++--------- .../impl/fusing/ActorGraphInterpreter.scala | 14 +-- 7 files changed, 65 insertions(+), 60 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/OptionVal.scala b/akka-actor/src/main/scala/akka/util/OptionVal.scala index 76750e1490..7e94641633 100644 --- a/akka-actor/src/main/scala/akka/util/OptionVal.scala +++ b/akka-actor/src/main/scala/akka/util/OptionVal.scala @@ -50,6 +50,9 @@ private[akka] final class OptionVal[+A >: Null](val x: A) extends AnyVal { def getOrElse[B >: A](default: B): B = if (x == null) default else x + def contains[B >: A](it: B): Boolean = + x != null && x == it + /** * Returns the option's value if it is nonempty, or `null` if it is empty. */ diff --git a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala index 84486a1aeb..3a8818906f 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala @@ -19,7 +19,7 @@ class GraphBuilderBenchmark { var complexity = 0 @Benchmark - def flow_with_map(): RunnableGraph[NotUsed] = + def flow_with_map(): RunnableGraph[NotUsed] = MaterializationBenchmark.flowWithMapBuilder(complexity) @Benchmark @@ -27,7 +27,7 @@ class GraphBuilderBenchmark { MaterializationBenchmark.graphWithJunctionsBuilder(complexity) @Benchmark - def graph_with_nested_imports(): RunnableGraph[NotUsed] = + def graph_with_nested_imports(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithNestedImportsBuilder(complexity) @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 32e8da50e7..1c589ed0d1 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -84,7 +84,7 @@ class MaterializationBenchmark { var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ - @Param(Array("1", "10", "100", "1000")) + @Param(Array("1", "10")) var complexity = 0 @Setup diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a12948ae57..a358cd999c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -11,6 +11,7 @@ import akka.event.LoggingAdapter import akka.pattern.ask import akka.stream._ import akka.stream.impl.fusing.GraphInterpreterShell +import akka.util.OptionVal import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Await, ExecutionContextExecutor } @@ -90,7 +91,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa val subFusingPhase = new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { - new GraphStageIsland(settings, materializer, islandName, Some(registerShell)).asInstanceOf[PhaseIsland[Any]] + new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]] } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 631eff49c9..0b06c647ac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -8,7 +8,7 @@ import java.util.ArrayList import java.util.concurrent.atomic.AtomicBoolean import akka.NotUsed -import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill, Props } +import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } import akka.stream.Attributes.InputBuffer @@ -35,7 +35,7 @@ object PhasedFusingActorMaterializer { val DefaultPhase: Phase[Any] = new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - new GraphStageIsland(settings, materializer, islandName, subflowFuser = None).asInstanceOf[PhaseIsland[Any]] + new GraphStageIsland(settings, materializer, islandName, subflowFuser = OptionVal.None).asInstanceOf[PhaseIsland[Any]] } val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]]( @@ -536,7 +536,7 @@ final class GraphStageIsland( effectiveSettings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String, - subflowFuser: Option[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { + subflowFuser: OptionVal[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { // TODO: remove these private val logicArrayType = Array.empty[GraphStageLogic] private[this] val logics = new ArrayList[GraphStageLogic](64) @@ -654,9 +654,8 @@ final class GraphStageIsland( shell.connections = finalConnections shell.logics = logics.toArray(logicArrayType) - // TODO make OptionVal subflowFuser match { - case Some(fuseIntoExistingInterperter) ⇒ + case OptionVal.Some(fuseIntoExistingInterperter) ⇒ fuseIntoExistingInterperter(shell) case _ ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 74da492efb..7b4f41d55d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -7,6 +7,8 @@ package akka.stream.impl import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.scaladsl.Keep +import akka.util.OptionVal +import scala.language.existentials /** * Graphs to be materialized are defined by their traversal. There is no explicit graph information tracked, instead @@ -464,14 +466,14 @@ final case class AtomicTraversalBuilder( object LinearTraversalBuilder { // TODO: Remove - private val cachedEmptyLinear = LinearTraversalBuilder(None, None, 0, 0, PushNotUsed, None, Attributes.none) + private val cachedEmptyLinear = LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, Attributes.none) private[this] final val wireBackward: Array[Int] = Array(-1) private[this] final val noWire: Array[Int] = Array() def empty(attributes: Attributes = Attributes.none): LinearTraversalBuilder = if (attributes eq Attributes.none) cachedEmptyLinear - else LinearTraversalBuilder(None, None, 0, 0, PushNotUsed, None, attributes, EmptyTraversal) + else LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, attributes, EmptyTraversal) /** * Create a traversal builder specialized for linear graphs. This is designed to be much faster and lightweight @@ -482,8 +484,8 @@ object LinearTraversalBuilder { require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.") TraversalBuilder.initShape(module.shape) - val inPortOpt = module.shape.inlets.headOption - val outPortOpt = module.shape.outlets.headOption + val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull) + val outPortOpt = OptionVal(module.shape.outlets.headOption.orNull) val wiring = if (outPortOpt.isDefined) wireBackward else noWire @@ -493,7 +495,7 @@ object LinearTraversalBuilder { inOffset = 0, if (inPortOpt.isDefined) 1 else 0, traversalSoFar = MaterializeAtomic(module, wiring), - pendingBuilder = None, + pendingBuilder = OptionVal.None, attributes) } @@ -523,12 +525,12 @@ object LinearTraversalBuilder { } LinearTraversalBuilder( - inPort = inOpt, - outPort = None, + inPort = OptionVal(inOpt.orNull), + outPort = OptionVal.None, inOffset = inOffs, inSlots = completed.inSlots, completed.traversal.concat(addMatCompose(PushNotUsed, combine)), - pendingBuilder = None, + pendingBuilder = OptionVal.None, Attributes.none) case composite ⇒ @@ -540,12 +542,12 @@ object LinearTraversalBuilder { } LinearTraversalBuilder( - inPort = inOpt, - outPort = Some(out), + inPort = OptionVal(inOpt.orNull), + outPort = OptionVal.Some(out), inOffset = inOffs, inSlots = composite.inSlots, addMatCompose(PushNotUsed, combine), - pendingBuilder = Some(composite), + pendingBuilder = OptionVal.Some(composite), Attributes.none, beforeBuilder = EmptyTraversal) @@ -562,15 +564,15 @@ object LinearTraversalBuilder { * -1 relative offset to something else (see rewireLastOutTo). */ final case class LinearTraversalBuilder( - inPort: Option[InPort], - outPort: Option[OutPort], + inPort: OptionVal[InPort], + outPort: OptionVal[OutPort], inOffset: Int, override val inSlots: Int, traversalSoFar: Traversal, - pendingBuilder: Option[TraversalBuilder], + pendingBuilder: OptionVal[TraversalBuilder], attributes: Attributes, - beforeBuilder: Traversal = EmptyTraversal, - islandTag: Option[IslandTag] = None) extends TraversalBuilder { + beforeBuilder: Traversal = EmptyTraversal, + islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty @@ -583,7 +585,7 @@ final case class LinearTraversalBuilder( * This builder can always return a traversal. */ override def traversal: Traversal = { - if (outPort.nonEmpty) + if (outPort.isDefined) throw new IllegalStateException("Traversal cannot be acquired until all output ports have been wired") applyIslandAndAttributes(traversalSoFar) } @@ -598,8 +600,8 @@ final case class LinearTraversalBuilder( private def applyIslandAndAttributes(t: Traversal): Traversal = { val withIslandTag = islandTag match { - case None ⇒ t - case Some(tag) ⇒ EnterIsland(tag).concat(t).concat(ExitIsland) + case OptionVal.None ⇒ t + case OptionVal.Some(tag) ⇒ EnterIsland(tag).concat(t).concat(ExitIsland) } if (attributes eq Attributes.none) withIslandTag @@ -625,19 +627,19 @@ final case class LinearTraversalBuilder( override def wire(out: OutPort, in: InPort): TraversalBuilder = { if (outPort.contains(out) && inPort.contains(in)) { pendingBuilder match { - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ copy( - inPort = None, - outPort = None, + inPort = OptionVal.None, + outPort = OptionVal.None, traversalSoFar = applyIslandAndAttributes( beforeBuilder.concat( composite .assign(out, inOffset - composite.offsetOfModule(out)) .traversal).concat(traversalSoFar)), - pendingBuilder = None, beforeBuilder = EmptyTraversal) - case None ⇒ - copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) + pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) + case OptionVal.None ⇒ + copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } } else throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") @@ -646,8 +648,8 @@ final case class LinearTraversalBuilder( override def offsetOfModule(out: OutPort): Int = { if (outPort.contains(out)) { pendingBuilder match { - case Some(composite) ⇒ composite.offsetOfModule(out) - case None ⇒ 0 // Output belongs to the last module, which will be materialized *first* + case OptionVal.Some(composite) ⇒ composite.offsetOfModule(out) + case OptionVal.None ⇒ 0 // Output belongs to the last module, which will be materialized *first* } } else throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder") @@ -665,9 +667,9 @@ final case class LinearTraversalBuilder( override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = { if (outPort.contains(out)) { pendingBuilder match { - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ copy( - outPort = None, + outPort = OptionVal.None, traversalSoFar = applyIslandAndAttributes( beforeBuilder.concat( @@ -675,10 +677,10 @@ final case class LinearTraversalBuilder( .assign(out, relativeSlot) .traversal .concat(traversalSoFar))), - pendingBuilder = None, + pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) - case None ⇒ - copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) + case OptionVal.None ⇒ + copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } } else throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") @@ -705,7 +707,7 @@ final case class LinearTraversalBuilder( toAppend.copy( traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose))) } else { - if (outPort.nonEmpty) { + if (outPort.isDefined) { require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " + "because there is a dangling output.") @@ -749,7 +751,7 @@ final case class LinearTraversalBuilder( * different. */ val assembledTraversalForThis = this.pendingBuilder match { - case None ⇒ + case OptionVal.None ⇒ /* * This is the case where we are a pure linear builder (all composites have been already completed), * which means that traversalSoFar contains everything already, except the final attributes and islands @@ -788,7 +790,7 @@ final case class LinearTraversalBuilder( rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots) } - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ /* * This is the case where our last module is a composite, and since it does not have its output port * wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. @@ -842,7 +844,7 @@ final case class LinearTraversalBuilder( * There are two variants, depending whether toAppend is purely linear or if it has a composite at the end. */ toAppend.pendingBuilder match { - case None ⇒ + case OptionVal.None ⇒ /* * This is the simple case, when the other is purely linear. We just concatenate the traversals * and do some bookkeeping. @@ -855,13 +857,13 @@ final case class LinearTraversalBuilder( inOffset = inOffset + toAppend.inSlots, // Build in reverse so it yields a more efficient layout for left-to-right building traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), - pendingBuilder = None, + pendingBuilder = OptionVal.None, attributes = Attributes.none, // attributes are none for the new enclosing builder beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites - islandTag = None // islandTag is reset for the new enclosing builder + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder ) - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ /* * In this case we need to assemble as much as we can, and create a new "sandwich" of * beforeBuilder ~ pendingBuilder ~ traversalSoFar @@ -875,9 +877,9 @@ final case class LinearTraversalBuilder( // First prepare island enter and exit if tags are present toAppend.islandTag match { - case None ⇒ + case OptionVal.None ⇒ // Nothing changes - case Some(tag) ⇒ + case OptionVal.Some(tag) ⇒ // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal) // Exit the island just after the appended builder (they should not applied to _this_ builder) @@ -908,7 +910,7 @@ final case class LinearTraversalBuilder( pendingBuilder = toAppend.pendingBuilder, attributes = Attributes.none, // attributes are none for the new enclosing builder beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites - islandTag = None // islandTag is reset for the new enclosing builder + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder ) } } else throw new Exception("should this happen?") @@ -927,8 +929,8 @@ final case class LinearTraversalBuilder( */ override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder = this.islandTag match { - case Some(tag) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - case None ⇒ copy(islandTag = Some(islandTag)) + case OptionVal.Some(tag) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + case OptionVal.None ⇒ copy(islandTag = OptionVal.Some(islandTag)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 7314a65b5b..ecf5f1d26b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -298,22 +298,22 @@ object ActorGraphInterpreter { } def shutdown(reason: Option[Throwable]): Unit = { - shutdownReason = reason + shutdownReason = OptionVal(reason.orNull) pendingSubscribers.getAndSet(null) match { case null ⇒ // already called earlier case pending ⇒ pending foreach reportSubscribeFailure } } - @volatile private var shutdownReason: Option[Throwable] = None + @volatile private var shutdownReason: OptionVal[Throwable] = OptionVal.None private def reportSubscribeFailure(subscriber: Subscriber[Any]): Unit = try shutdownReason match { - case Some(e: SpecViolation) ⇒ // ok, not allowed to call onError - case Some(e) ⇒ + case OptionVal.Some(e: SpecViolation) ⇒ // ok, not allowed to call onError + case OptionVal.Some(e) ⇒ tryOnSubscribe(subscriber, CancelledSubscription) tryOnError(subscriber, e) - case None ⇒ + case OptionVal.None ⇒ tryOnSubscribe(subscriber, CancelledSubscription) tryOnComplete(subscriber) } catch { @@ -341,7 +341,7 @@ object ActorGraphInterpreter { // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) private var downstreamCompleted = false // when upstream failed before we got the exposed publisher - private var upstreamFailed: Option[Throwable] = None + private var upstreamFailed: OptionVal[Throwable] = OptionVal.None private var upstreamCompleted: Boolean = false private def onNext(elem: Any): Unit = { @@ -362,7 +362,7 @@ object ActorGraphInterpreter { // No need to fail if had already been cancelled, or we closed earlier if (!(downstreamCompleted || upstreamCompleted)) { upstreamCompleted = true - upstreamFailed = Some(e) + upstreamFailed = OptionVal.Some(e) publisher.shutdown(Some(e)) if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) }