From 4c0d6ddb7a587f163f165f899f27ecd624165fae Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 9 Mar 2017 10:46:10 +0100 Subject: [PATCH 1/5] =ben,str make sure to return values in benchmarks --- .../scala/akka/stream/GraphBuilderBenchmark.scala | 15 +++++++-------- .../akka/stream/MaterializationBenchmark.scala | 8 ++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala index aa9f2837a2..84486a1aeb 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala @@ -5,6 +5,9 @@ package akka.stream import java.util.concurrent.TimeUnit + +import akka.NotUsed +import akka.stream.scaladsl.RunnableGraph import org.openjdk.jmh.annotations._ @State(Scope.Benchmark) @@ -16,22 +19,18 @@ class GraphBuilderBenchmark { var complexity = 0 @Benchmark - def flow_with_map(): Unit = { + def flow_with_map(): RunnableGraph[NotUsed] = MaterializationBenchmark.flowWithMapBuilder(complexity) - } @Benchmark - def graph_with_junctions(): Unit = { + def graph_with_junctions(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithJunctionsBuilder(complexity) - } @Benchmark - def graph_with_nested_imports(): Unit = { + def graph_with_nested_imports(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithNestedImportsBuilder(complexity) - } @Benchmark - def graph_with_imported_flow(): Unit = { + def graph_with_imported_flow(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithImportedFlowBuilder(complexity) - } } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 37892f36ad..32e8da50e7 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -101,14 +101,14 @@ class MaterializationBenchmark { } @Benchmark - def flow_with_map(): Unit = flowWithMap.run() + def flow_with_map(): NotUsed = flowWithMap.run() @Benchmark - def graph_with_junctions(): Unit = graphWithJunctions.run() + def graph_with_junctions(): NotUsed = graphWithJunctions.run() @Benchmark - def graph_with_nested_imports(): Unit = graphWithNestedImports.run() + def graph_with_nested_imports(): NotUsed = graphWithNestedImports.run() @Benchmark - def graph_with_imported_flow(): Unit = graphWithImportedFlow.run() + def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run() } From 67fd0cc0b6ade4abaf686e303cb62280583bad77 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 9 Mar 2017 17:44:11 +0100 Subject: [PATCH 2/5] =str #22438 make use of OptionVal in materializer internals --- .../src/main/scala/akka/util/OptionVal.scala | 3 + .../akka/stream/GraphBuilderBenchmark.scala | 4 +- .../stream/MaterializationBenchmark.scala | 2 +- .../stream/impl/ActorMaterializerImpl.scala | 3 +- .../impl/PhasedFusingActorMaterializer.scala | 9 +- .../akka/stream/impl/TraversalBuilder.scala | 90 ++++++++++--------- .../impl/fusing/ActorGraphInterpreter.scala | 14 +-- 7 files changed, 65 insertions(+), 60 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/OptionVal.scala b/akka-actor/src/main/scala/akka/util/OptionVal.scala index 76750e1490..7e94641633 100644 --- a/akka-actor/src/main/scala/akka/util/OptionVal.scala +++ b/akka-actor/src/main/scala/akka/util/OptionVal.scala @@ -50,6 +50,9 @@ private[akka] final class OptionVal[+A >: Null](val x: A) extends AnyVal { def getOrElse[B >: A](default: B): B = if (x == null) default else x + def contains[B >: A](it: B): Boolean = + x != null && x == it + /** * Returns the option's value if it is nonempty, or `null` if it is empty. */ diff --git a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala index 84486a1aeb..3a8818906f 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/GraphBuilderBenchmark.scala @@ -19,7 +19,7 @@ class GraphBuilderBenchmark { var complexity = 0 @Benchmark - def flow_with_map(): RunnableGraph[NotUsed] = + def flow_with_map(): RunnableGraph[NotUsed] = MaterializationBenchmark.flowWithMapBuilder(complexity) @Benchmark @@ -27,7 +27,7 @@ class GraphBuilderBenchmark { MaterializationBenchmark.graphWithJunctionsBuilder(complexity) @Benchmark - def graph_with_nested_imports(): RunnableGraph[NotUsed] = + def graph_with_nested_imports(): RunnableGraph[NotUsed] = MaterializationBenchmark.graphWithNestedImportsBuilder(complexity) @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 32e8da50e7..1c589ed0d1 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -84,7 +84,7 @@ class MaterializationBenchmark { var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ - @Param(Array("1", "10", "100", "1000")) + @Param(Array("1", "10")) var complexity = 0 @Setup diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a12948ae57..a358cd999c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -11,6 +11,7 @@ import akka.event.LoggingAdapter import akka.pattern.ask import akka.stream._ import akka.stream.impl.fusing.GraphInterpreterShell +import akka.util.OptionVal import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Await, ExecutionContextExecutor } @@ -90,7 +91,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa val subFusingPhase = new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { - new GraphStageIsland(settings, materializer, islandName, Some(registerShell)).asInstanceOf[PhaseIsland[Any]] + new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]] } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 631eff49c9..0b06c647ac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -8,7 +8,7 @@ import java.util.ArrayList import java.util.concurrent.atomic.AtomicBoolean import akka.NotUsed -import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill, Props } +import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } import akka.stream.Attributes.InputBuffer @@ -35,7 +35,7 @@ object PhasedFusingActorMaterializer { val DefaultPhase: Phase[Any] = new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = - new GraphStageIsland(settings, materializer, islandName, subflowFuser = None).asInstanceOf[PhaseIsland[Any]] + new GraphStageIsland(settings, materializer, islandName, subflowFuser = OptionVal.None).asInstanceOf[PhaseIsland[Any]] } val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]]( @@ -536,7 +536,7 @@ final class GraphStageIsland( effectiveSettings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String, - subflowFuser: Option[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { + subflowFuser: OptionVal[GraphInterpreterShell ⇒ ActorRef]) extends PhaseIsland[GraphStageLogic] { // TODO: remove these private val logicArrayType = Array.empty[GraphStageLogic] private[this] val logics = new ArrayList[GraphStageLogic](64) @@ -654,9 +654,8 @@ final class GraphStageIsland( shell.connections = finalConnections shell.logics = logics.toArray(logicArrayType) - // TODO make OptionVal subflowFuser match { - case Some(fuseIntoExistingInterperter) ⇒ + case OptionVal.Some(fuseIntoExistingInterperter) ⇒ fuseIntoExistingInterperter(shell) case _ ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 74da492efb..7b4f41d55d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -7,6 +7,8 @@ package akka.stream.impl import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.scaladsl.Keep +import akka.util.OptionVal +import scala.language.existentials /** * Graphs to be materialized are defined by their traversal. There is no explicit graph information tracked, instead @@ -464,14 +466,14 @@ final case class AtomicTraversalBuilder( object LinearTraversalBuilder { // TODO: Remove - private val cachedEmptyLinear = LinearTraversalBuilder(None, None, 0, 0, PushNotUsed, None, Attributes.none) + private val cachedEmptyLinear = LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, Attributes.none) private[this] final val wireBackward: Array[Int] = Array(-1) private[this] final val noWire: Array[Int] = Array() def empty(attributes: Attributes = Attributes.none): LinearTraversalBuilder = if (attributes eq Attributes.none) cachedEmptyLinear - else LinearTraversalBuilder(None, None, 0, 0, PushNotUsed, None, attributes, EmptyTraversal) + else LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, attributes, EmptyTraversal) /** * Create a traversal builder specialized for linear graphs. This is designed to be much faster and lightweight @@ -482,8 +484,8 @@ object LinearTraversalBuilder { require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.") TraversalBuilder.initShape(module.shape) - val inPortOpt = module.shape.inlets.headOption - val outPortOpt = module.shape.outlets.headOption + val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull) + val outPortOpt = OptionVal(module.shape.outlets.headOption.orNull) val wiring = if (outPortOpt.isDefined) wireBackward else noWire @@ -493,7 +495,7 @@ object LinearTraversalBuilder { inOffset = 0, if (inPortOpt.isDefined) 1 else 0, traversalSoFar = MaterializeAtomic(module, wiring), - pendingBuilder = None, + pendingBuilder = OptionVal.None, attributes) } @@ -523,12 +525,12 @@ object LinearTraversalBuilder { } LinearTraversalBuilder( - inPort = inOpt, - outPort = None, + inPort = OptionVal(inOpt.orNull), + outPort = OptionVal.None, inOffset = inOffs, inSlots = completed.inSlots, completed.traversal.concat(addMatCompose(PushNotUsed, combine)), - pendingBuilder = None, + pendingBuilder = OptionVal.None, Attributes.none) case composite ⇒ @@ -540,12 +542,12 @@ object LinearTraversalBuilder { } LinearTraversalBuilder( - inPort = inOpt, - outPort = Some(out), + inPort = OptionVal(inOpt.orNull), + outPort = OptionVal.Some(out), inOffset = inOffs, inSlots = composite.inSlots, addMatCompose(PushNotUsed, combine), - pendingBuilder = Some(composite), + pendingBuilder = OptionVal.Some(composite), Attributes.none, beforeBuilder = EmptyTraversal) @@ -562,15 +564,15 @@ object LinearTraversalBuilder { * -1 relative offset to something else (see rewireLastOutTo). */ final case class LinearTraversalBuilder( - inPort: Option[InPort], - outPort: Option[OutPort], + inPort: OptionVal[InPort], + outPort: OptionVal[OutPort], inOffset: Int, override val inSlots: Int, traversalSoFar: Traversal, - pendingBuilder: Option[TraversalBuilder], + pendingBuilder: OptionVal[TraversalBuilder], attributes: Attributes, - beforeBuilder: Traversal = EmptyTraversal, - islandTag: Option[IslandTag] = None) extends TraversalBuilder { + beforeBuilder: Traversal = EmptyTraversal, + islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty @@ -583,7 +585,7 @@ final case class LinearTraversalBuilder( * This builder can always return a traversal. */ override def traversal: Traversal = { - if (outPort.nonEmpty) + if (outPort.isDefined) throw new IllegalStateException("Traversal cannot be acquired until all output ports have been wired") applyIslandAndAttributes(traversalSoFar) } @@ -598,8 +600,8 @@ final case class LinearTraversalBuilder( private def applyIslandAndAttributes(t: Traversal): Traversal = { val withIslandTag = islandTag match { - case None ⇒ t - case Some(tag) ⇒ EnterIsland(tag).concat(t).concat(ExitIsland) + case OptionVal.None ⇒ t + case OptionVal.Some(tag) ⇒ EnterIsland(tag).concat(t).concat(ExitIsland) } if (attributes eq Attributes.none) withIslandTag @@ -625,19 +627,19 @@ final case class LinearTraversalBuilder( override def wire(out: OutPort, in: InPort): TraversalBuilder = { if (outPort.contains(out) && inPort.contains(in)) { pendingBuilder match { - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ copy( - inPort = None, - outPort = None, + inPort = OptionVal.None, + outPort = OptionVal.None, traversalSoFar = applyIslandAndAttributes( beforeBuilder.concat( composite .assign(out, inOffset - composite.offsetOfModule(out)) .traversal).concat(traversalSoFar)), - pendingBuilder = None, beforeBuilder = EmptyTraversal) - case None ⇒ - copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) + pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) + case OptionVal.None ⇒ + copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } } else throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") @@ -646,8 +648,8 @@ final case class LinearTraversalBuilder( override def offsetOfModule(out: OutPort): Int = { if (outPort.contains(out)) { pendingBuilder match { - case Some(composite) ⇒ composite.offsetOfModule(out) - case None ⇒ 0 // Output belongs to the last module, which will be materialized *first* + case OptionVal.Some(composite) ⇒ composite.offsetOfModule(out) + case OptionVal.None ⇒ 0 // Output belongs to the last module, which will be materialized *first* } } else throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder") @@ -665,9 +667,9 @@ final case class LinearTraversalBuilder( override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = { if (outPort.contains(out)) { pendingBuilder match { - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ copy( - outPort = None, + outPort = OptionVal.None, traversalSoFar = applyIslandAndAttributes( beforeBuilder.concat( @@ -675,10 +677,10 @@ final case class LinearTraversalBuilder( .assign(out, relativeSlot) .traversal .concat(traversalSoFar))), - pendingBuilder = None, + pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) - case None ⇒ - copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) + case OptionVal.None ⇒ + copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } } else throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") @@ -705,7 +707,7 @@ final case class LinearTraversalBuilder( toAppend.copy( traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose))) } else { - if (outPort.nonEmpty) { + if (outPort.isDefined) { require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " + "because there is a dangling output.") @@ -749,7 +751,7 @@ final case class LinearTraversalBuilder( * different. */ val assembledTraversalForThis = this.pendingBuilder match { - case None ⇒ + case OptionVal.None ⇒ /* * This is the case where we are a pure linear builder (all composites have been already completed), * which means that traversalSoFar contains everything already, except the final attributes and islands @@ -788,7 +790,7 @@ final case class LinearTraversalBuilder( rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots) } - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ /* * This is the case where our last module is a composite, and since it does not have its output port * wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. @@ -842,7 +844,7 @@ final case class LinearTraversalBuilder( * There are two variants, depending whether toAppend is purely linear or if it has a composite at the end. */ toAppend.pendingBuilder match { - case None ⇒ + case OptionVal.None ⇒ /* * This is the simple case, when the other is purely linear. We just concatenate the traversals * and do some bookkeeping. @@ -855,13 +857,13 @@ final case class LinearTraversalBuilder( inOffset = inOffset + toAppend.inSlots, // Build in reverse so it yields a more efficient layout for left-to-right building traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), - pendingBuilder = None, + pendingBuilder = OptionVal.None, attributes = Attributes.none, // attributes are none for the new enclosing builder beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites - islandTag = None // islandTag is reset for the new enclosing builder + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder ) - case Some(composite) ⇒ + case OptionVal.Some(composite) ⇒ /* * In this case we need to assemble as much as we can, and create a new "sandwich" of * beforeBuilder ~ pendingBuilder ~ traversalSoFar @@ -875,9 +877,9 @@ final case class LinearTraversalBuilder( // First prepare island enter and exit if tags are present toAppend.islandTag match { - case None ⇒ + case OptionVal.None ⇒ // Nothing changes - case Some(tag) ⇒ + case OptionVal.Some(tag) ⇒ // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal) // Exit the island just after the appended builder (they should not applied to _this_ builder) @@ -908,7 +910,7 @@ final case class LinearTraversalBuilder( pendingBuilder = toAppend.pendingBuilder, attributes = Attributes.none, // attributes are none for the new enclosing builder beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites - islandTag = None // islandTag is reset for the new enclosing builder + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder ) } } else throw new Exception("should this happen?") @@ -927,8 +929,8 @@ final case class LinearTraversalBuilder( */ override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder = this.islandTag match { - case Some(tag) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - case None ⇒ copy(islandTag = Some(islandTag)) + case OptionVal.Some(tag) ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + case OptionVal.None ⇒ copy(islandTag = OptionVal.Some(islandTag)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 7314a65b5b..ecf5f1d26b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -298,22 +298,22 @@ object ActorGraphInterpreter { } def shutdown(reason: Option[Throwable]): Unit = { - shutdownReason = reason + shutdownReason = OptionVal(reason.orNull) pendingSubscribers.getAndSet(null) match { case null ⇒ // already called earlier case pending ⇒ pending foreach reportSubscribeFailure } } - @volatile private var shutdownReason: Option[Throwable] = None + @volatile private var shutdownReason: OptionVal[Throwable] = OptionVal.None private def reportSubscribeFailure(subscriber: Subscriber[Any]): Unit = try shutdownReason match { - case Some(e: SpecViolation) ⇒ // ok, not allowed to call onError - case Some(e) ⇒ + case OptionVal.Some(e: SpecViolation) ⇒ // ok, not allowed to call onError + case OptionVal.Some(e) ⇒ tryOnSubscribe(subscriber, CancelledSubscription) tryOnError(subscriber, e) - case None ⇒ + case OptionVal.None ⇒ tryOnSubscribe(subscriber, CancelledSubscription) tryOnComplete(subscriber) } catch { @@ -341,7 +341,7 @@ object ActorGraphInterpreter { // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) private var downstreamCompleted = false // when upstream failed before we got the exposed publisher - private var upstreamFailed: Option[Throwable] = None + private var upstreamFailed: OptionVal[Throwable] = OptionVal.None private var upstreamCompleted: Boolean = false private def onNext(elem: Any): Unit = { @@ -362,7 +362,7 @@ object ActorGraphInterpreter { // No need to fail if had already been cancelled, or we closed earlier if (!(downstreamCompleted || upstreamCompleted)) { upstreamCompleted = true - upstreamFailed = Some(e) + upstreamFailed = OptionVal.Some(e) publisher.shutdown(Some(e)) if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e) } From 99705d1ccce820174ce301a57dfd06b4750725fa Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 10 Mar 2017 10:40:49 +0100 Subject: [PATCH 3/5] =str #22437 replace require() with if calls to avoid fn allocs --- .../main/scala/akka/stream/MaterializationBenchmark.scala | 2 +- .../scala/akka/stream/impl/ActorMaterializerImpl.scala | 2 -- .../src/main/scala/akka/stream/impl/ActorProcessor.scala | 7 ++++--- .../main/scala/akka/stream/impl/TraversalBuilder.scala | 8 ++++---- .../akka/stream/impl/fusing/ActorGraphInterpreter.scala | 8 ++++---- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 1c589ed0d1..32e8da50e7 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -84,7 +84,7 @@ class MaterializationBenchmark { var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ - @Param(Array("1", "10")) + @Param(Array("1", "10", "100", "1000")) var complexity = 0 @Setup diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a358cd999c..061ee56cc6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -87,8 +87,6 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { * The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph. */ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { - require(registerShell ne null, "When using SubFusing the subflowFuser MUST NOT be null.") // FIXME remove check? - val subFusingPhase = new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index bee34ac91c..d9aec3c41f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -47,8 +47,9 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[ * INTERNAL API */ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { - require(size > 0, "buffer size cannot be zero") - require((size & (size - 1)) == 0, "buffer size must be a power of two") + if (size < 1) throw new IllegalArgumentException(s"buffer size MSUT be positive (was: $size") + if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two") + // TODO: buffer and batch sizing heuristics private var upstream: Subscription = _ private val inputBuffer = Array.ofDim[AnyRef](size) @@ -114,7 +115,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) } protected def onSubscribe(subscription: Subscription): Unit = { - require(subscription != null) + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) if (upstreamCompleted) subscription.cancel() else { upstream = subscription diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 7b4f41d55d..38fbd5eff4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -480,8 +480,8 @@ object LinearTraversalBuilder { * than its generic counterpart. It can be freely mixed with the generic builder in both ways. */ def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): LinearTraversalBuilder = { - require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.") - require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.") + if (module.shape.inlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.") + if (module.shape.outlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.") TraversalBuilder.initShape(module.shape) val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull) @@ -708,8 +708,8 @@ final case class LinearTraversalBuilder( traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose))) } else { if (outPort.isDefined) { - require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " + - "because there is a dangling output.") + if (toAppend.inPort.isEmpty) + throw new IllegalArgumentException("Appended linear module must have an unwired input port because there is a dangling output.") /* * To understand how append work, first the general structure of the LinearTraversalBuilder must be diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index ecf5f1d26b..8521504a92 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -89,8 +89,8 @@ object ActorGraphInterpreter { override def logic: GraphStageLogic = BatchingActorInputBoundary.this } - require(size > 0, "buffer size cannot be zero") - require((size & (size - 1)) == 0, "buffer size must be a power of two") + if (size <= 0) throw new IllegalArgumentException("buffer size cannot be zero") + if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two") private var actor: ActorRef = ActorRef.noSender private var upstream: Subscription = _ @@ -136,7 +136,7 @@ object ActorGraphInterpreter { private def dequeue(): Any = { val elem = inputBuffer(nextInputElementCursor) - require(elem ne null, "Internal queue must never contain a null") + if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null") inputBuffer(nextInputElementCursor) = null batchRemaining -= 1 @@ -196,7 +196,7 @@ object ActorGraphInterpreter { } def onSubscribe(subscription: Subscription): Unit = { - require(subscription != null, "Subscription cannot be null") + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) if (upstreamCompleted) { tryCancel(subscription) } else if (downstreamCanceled) { From e2a15a2a580125ce615f8fec2e9dd98d15ce4118 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 10 Mar 2017 11:42:57 +0100 Subject: [PATCH 4/5] =str use OptionVal in Traversal building --- .../stream/MaterializationBenchmark.scala | 2 +- .../akka/stream/impl/ActorProcessor.scala | 4 +- .../main/scala/akka/stream/impl/Modules.scala | 1 + .../main/scala/akka/stream/impl/Sinks.scala | 1 + .../akka/stream/impl/TraversalBuilder.scala | 45 ++++++++++++------- .../scala/akka/stream/impl/io/TlsModule.scala | 2 +- .../scala/akka/stream/stage/GraphStage.scala | 2 +- 7 files changed, 35 insertions(+), 22 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 32e8da50e7..1c589ed0d1 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -84,7 +84,7 @@ class MaterializationBenchmark { var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ - @Param(Array("1", "10", "100", "1000")) + @Param(Array("1", "10")) var complexity = 0 @Setup diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index d9aec3c41f..6dd7366fe3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -47,8 +47,8 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[ * INTERNAL API */ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { - if (size < 1) throw new IllegalArgumentException(s"buffer size MSUT be positive (was: $size") - if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two") + if (size < 1) throw new IllegalArgumentException(s"buffer size must be positive (was: $size)") + if ((size & (size - 1)) != 0) throw new IllegalArgumentException(s"buffer size must be a power of two (was: $size)") // TODO: buffer and batch sizing heuristics private var upstream: Subscription = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 3f2966e751..cf6d853bb3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -12,6 +12,7 @@ import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Promise import akka.event.Logging +import akka.util.OptionVal /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index e9e8476e4b..1d7fca3d27 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -37,6 +37,7 @@ import scala.compat.java8.OptionConverters._ import java.util.Optional import akka.event.Logging +import akka.util.OptionVal /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 38fbd5eff4..3e001afee1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -353,7 +353,7 @@ final case class CompletedTraversalBuilder( inSlots: Int, inToOffset: Map[InPort, Int], attributes: Attributes, - islandTag: Option[IslandTag] = None) extends TraversalBuilder { + islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { val key = new BuilderKey @@ -367,8 +367,8 @@ final case class CompletedTraversalBuilder( override def traversal: Traversal = { val withIsland = islandTag match { - case Some(tag) ⇒ EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) - case None ⇒ traversalSoFar + case OptionVal.Some(tag) ⇒ EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) + case _ ⇒ traversalSoFar } if (attributes eq Attributes.none) withIsland @@ -392,8 +392,8 @@ final case class CompletedTraversalBuilder( override def makeIsland(islandTag: IslandTag): TraversalBuilder = this.islandTag match { - case None ⇒ copy(islandTag = Some(islandTag)) - case Some(_) ⇒ this + case OptionVal.None ⇒ copy(islandTag = OptionVal(islandTag)) + case OptionVal.Some(_) ⇒ this } override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = @@ -518,10 +518,10 @@ object LinearTraversalBuilder { else empty().append(linear, combine) case completed: CompletedTraversalBuilder ⇒ - val inOpt = shape.inlets.headOption + val inOpt = OptionVal(shape.inlets.headOption.orNull) val inOffs = inOpt match { - case Some(in) ⇒ completed.offsetOf(in) - case None ⇒ 0 + case OptionVal.Some(in) ⇒ completed.offsetOf(in) + case OptionVal.None ⇒ 0 } LinearTraversalBuilder( @@ -534,11 +534,11 @@ object LinearTraversalBuilder { Attributes.none) case composite ⇒ - val inOpt = shape.inlets.headOption + val inOpt = OptionVal(shape.inlets.headOption.orNull) val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder val inOffs = inOpt match { - case Some(in) ⇒ composite.offsetOf(in) - case None ⇒ 0 + case OptionVal.Some(in) ⇒ composite.offsetOf(in) + case OptionVal.None ⇒ 0 } LinearTraversalBuilder( @@ -640,6 +640,11 @@ final case class LinearTraversalBuilder( pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) case OptionVal.None ⇒ copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) + case OptionVal.None ⇒ + copy( + inPort = OptionVal.None, + outPort = OptionVal.None, + traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } } else throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") @@ -681,6 +686,9 @@ final case class LinearTraversalBuilder( beforeBuilder = EmptyTraversal) case OptionVal.None ⇒ copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) + copy( + outPort = OptionVal.None, + traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } } else throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") @@ -977,7 +985,7 @@ final case class CompositeTraversalBuilder( outOwners: Map[OutPort, BuilderKey] = Map.empty, unwiredOuts: Int = 0, attributes: Attributes, - islandTag: IslandTag = null) extends TraversalBuilder { + islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { override def toString: String = s""" @@ -1020,7 +1028,10 @@ final case class CompositeTraversalBuilder( remaining = remaining.tail } - val finalTraversal = if (islandTag == null) traversal else EnterIsland(islandTag).concat(traversal).concat(ExitIsland) + val finalTraversal = islandTag match { + case OptionVal.None ⇒ traversal + case OptionVal.Some(tag) ⇒ EnterIsland(tag).concat(traversal).concat(ExitIsland) + } // The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it // to be embedded in a larger graph, making partial graph reuse much more efficient. @@ -1159,9 +1170,9 @@ final case class CompositeTraversalBuilder( } override def makeIsland(islandTag: IslandTag): TraversalBuilder = { - if (this.islandTag eq null) - copy(islandTag = islandTag) - else - this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + this.islandTag match { + case OptionVal.None ⇒ copy(islandTag = OptionVal(islandTag)) + case _ ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 0bf03b351f..3817a0fe1a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -8,7 +8,7 @@ import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.TLSProtocol._ import akka.stream.impl.{ TlsModuleIslandTag, TraversalBuilder } -import akka.util.ByteString +import akka.util.{ ByteString, OptionVal } import scala.util.Try diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 68a101b1f8..5b3df91a2a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -14,7 +14,7 @@ import akka.japi.function.{ Effect, Procedure } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } -import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } +import akka.stream.impl.{ EmptyTraversal, LinearTraversalBuilder, ReactiveStreamsCompliance, TraversalBuilder } import scala.collection.mutable.ArrayBuffer import scala.collection.{ immutable, mutable } From f7b7c3642dbd943efd55bb3e2b63a8a1549e0c91 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 10 Mar 2017 14:21:37 +0100 Subject: [PATCH 5/5] =pro silence MiMa warning in testkit --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 4 ++-- project/MiMa.scala | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 14db4c577b..ce14ca82d3 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -228,7 +228,7 @@ private[akka] class Shard( def passivate(entity: ActorRef, stopMessage: Any): Unit = { idByRef.get(entity) match { - case Some(id) => if (!messageBuffers.contains(id)) { + case Some(id) ⇒ if (!messageBuffers.contains(id)) { log.debug("Passivating started on entity {}", id) passivating = passivating + entity @@ -237,7 +237,7 @@ private[akka] class Shard( } else { log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity.", entity) } - case None => log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity) + case None ⇒ log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity) } } diff --git a/project/MiMa.scala b/project/MiMa.scala index 3ef35a24de..e93a160578 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -540,7 +540,11 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageWithMaterializedValue.module"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.ModuleExtractor"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.ModuleExtractor$"), - ProblemFilters.excludePackage("akka.stream.impl") + ProblemFilters.excludePackage("akka.stream.impl"), + + // small changes in attributes + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSource.withAttributes"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSink.withAttributes") // NOTE: filters that will be backported to 2.4 should go to the latest 2.4 version below )