=str #22437 replace require() with if calls to avoid fn allocs
This commit is contained in:
parent
67fd0cc0b6
commit
99705d1ccc
5 changed files with 13 additions and 14 deletions
|
|
@ -84,7 +84,7 @@ class MaterializationBenchmark {
|
|||
var graphWithNestedImports: RunnableGraph[NotUsed] = _
|
||||
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
|
||||
|
||||
@Param(Array("1", "10"))
|
||||
@Param(Array("1", "10", "100", "1000"))
|
||||
var complexity = 0
|
||||
|
||||
@Setup
|
||||
|
|
|
|||
|
|
@ -87,8 +87,6 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
|
|||
* The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph.
|
||||
*/
|
||||
private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer {
|
||||
require(registerShell ne null, "When using SubFusing the subflowFuser MUST NOT be null.") // FIXME remove check?
|
||||
|
||||
val subFusingPhase = new Phase[Any] {
|
||||
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = {
|
||||
new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]]
|
||||
|
|
|
|||
|
|
@ -47,8 +47,9 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates {
|
||||
require(size > 0, "buffer size cannot be zero")
|
||||
require((size & (size - 1)) == 0, "buffer size must be a power of two")
|
||||
if (size < 1) throw new IllegalArgumentException(s"buffer size MSUT be positive (was: $size")
|
||||
if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two")
|
||||
|
||||
// TODO: buffer and batch sizing heuristics
|
||||
private var upstream: Subscription = _
|
||||
private val inputBuffer = Array.ofDim[AnyRef](size)
|
||||
|
|
@ -114,7 +115,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
|
|||
}
|
||||
|
||||
protected def onSubscribe(subscription: Subscription): Unit = {
|
||||
require(subscription != null)
|
||||
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
|
||||
if (upstreamCompleted) subscription.cancel()
|
||||
else {
|
||||
upstream = subscription
|
||||
|
|
|
|||
|
|
@ -480,8 +480,8 @@ object LinearTraversalBuilder {
|
|||
* than its generic counterpart. It can be freely mixed with the generic builder in both ways.
|
||||
*/
|
||||
def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): LinearTraversalBuilder = {
|
||||
require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.")
|
||||
require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.")
|
||||
if (module.shape.inlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.")
|
||||
if (module.shape.outlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.")
|
||||
TraversalBuilder.initShape(module.shape)
|
||||
|
||||
val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull)
|
||||
|
|
@ -708,8 +708,8 @@ final case class LinearTraversalBuilder(
|
|||
traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose)))
|
||||
} else {
|
||||
if (outPort.isDefined) {
|
||||
require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " +
|
||||
"because there is a dangling output.")
|
||||
if (toAppend.inPort.isEmpty)
|
||||
throw new IllegalArgumentException("Appended linear module must have an unwired input port because there is a dangling output.")
|
||||
|
||||
/*
|
||||
* To understand how append work, first the general structure of the LinearTraversalBuilder must be
|
||||
|
|
|
|||
|
|
@ -89,8 +89,8 @@ object ActorGraphInterpreter {
|
|||
override def logic: GraphStageLogic = BatchingActorInputBoundary.this
|
||||
}
|
||||
|
||||
require(size > 0, "buffer size cannot be zero")
|
||||
require((size & (size - 1)) == 0, "buffer size must be a power of two")
|
||||
if (size <= 0) throw new IllegalArgumentException("buffer size cannot be zero")
|
||||
if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two")
|
||||
|
||||
private var actor: ActorRef = ActorRef.noSender
|
||||
private var upstream: Subscription = _
|
||||
|
|
@ -136,7 +136,7 @@ object ActorGraphInterpreter {
|
|||
|
||||
private def dequeue(): Any = {
|
||||
val elem = inputBuffer(nextInputElementCursor)
|
||||
require(elem ne null, "Internal queue must never contain a null")
|
||||
if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null")
|
||||
inputBuffer(nextInputElementCursor) = null
|
||||
|
||||
batchRemaining -= 1
|
||||
|
|
@ -196,7 +196,7 @@ object ActorGraphInterpreter {
|
|||
}
|
||||
|
||||
def onSubscribe(subscription: Subscription): Unit = {
|
||||
require(subscription != null, "Subscription cannot be null")
|
||||
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
|
||||
if (upstreamCompleted) {
|
||||
tryCancel(subscription)
|
||||
} else if (downstreamCanceled) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue