From 99705d1ccce820174ce301a57dfd06b4750725fa Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Fri, 10 Mar 2017 10:40:49 +0100 Subject: [PATCH] =str #22437 replace require() with if calls to avoid fn allocs --- .../main/scala/akka/stream/MaterializationBenchmark.scala | 2 +- .../scala/akka/stream/impl/ActorMaterializerImpl.scala | 2 -- .../src/main/scala/akka/stream/impl/ActorProcessor.scala | 7 ++++--- .../main/scala/akka/stream/impl/TraversalBuilder.scala | 8 ++++---- .../akka/stream/impl/fusing/ActorGraphInterpreter.scala | 8 ++++---- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 1c589ed0d1..32e8da50e7 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -84,7 +84,7 @@ class MaterializationBenchmark { var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ - @Param(Array("1", "10")) + @Param(Array("1", "10", "100", "1000")) var complexity = 0 @Setup diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a358cd999c..061ee56cc6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -87,8 +87,6 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { * The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph. */ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { - require(registerShell ne null, "When using SubFusing the subflowFuser MUST NOT be null.") // FIXME remove check? - val subFusingPhase = new Phase[Any] { override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = { new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index bee34ac91c..d9aec3c41f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -47,8 +47,9 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[ * INTERNAL API */ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { - require(size > 0, "buffer size cannot be zero") - require((size & (size - 1)) == 0, "buffer size must be a power of two") + if (size < 1) throw new IllegalArgumentException(s"buffer size MSUT be positive (was: $size") + if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two") + // TODO: buffer and batch sizing heuristics private var upstream: Subscription = _ private val inputBuffer = Array.ofDim[AnyRef](size) @@ -114,7 +115,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) } protected def onSubscribe(subscription: Subscription): Unit = { - require(subscription != null) + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) if (upstreamCompleted) subscription.cancel() else { upstream = subscription diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 7b4f41d55d..38fbd5eff4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -480,8 +480,8 @@ object LinearTraversalBuilder { * than its generic counterpart. It can be freely mixed with the generic builder in both ways. */ def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): LinearTraversalBuilder = { - require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.") - require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.") + if (module.shape.inlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.") + if (module.shape.outlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.") TraversalBuilder.initShape(module.shape) val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull) @@ -708,8 +708,8 @@ final case class LinearTraversalBuilder( traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose))) } else { if (outPort.isDefined) { - require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " + - "because there is a dangling output.") + if (toAppend.inPort.isEmpty) + throw new IllegalArgumentException("Appended linear module must have an unwired input port because there is a dangling output.") /* * To understand how append work, first the general structure of the LinearTraversalBuilder must be diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index ecf5f1d26b..8521504a92 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -89,8 +89,8 @@ object ActorGraphInterpreter { override def logic: GraphStageLogic = BatchingActorInputBoundary.this } - require(size > 0, "buffer size cannot be zero") - require((size & (size - 1)) == 0, "buffer size must be a power of two") + if (size <= 0) throw new IllegalArgumentException("buffer size cannot be zero") + if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two") private var actor: ActorRef = ActorRef.noSender private var upstream: Subscription = _ @@ -136,7 +136,7 @@ object ActorGraphInterpreter { private def dequeue(): Any = { val elem = inputBuffer(nextInputElementCursor) - require(elem ne null, "Internal queue must never contain a null") + if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null") inputBuffer(nextInputElementCursor) = null batchRemaining -= 1 @@ -196,7 +196,7 @@ object ActorGraphInterpreter { } def onSubscribe(subscription: Subscription): Unit = { - require(subscription != null, "Subscription cannot be null") + ReactiveStreamsCompliance.requireNonNullSubscription(subscription) if (upstreamCompleted) { tryCancel(subscription) } else if (downstreamCanceled) {