diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 32e8da50e7..1c589ed0d1 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -84,7 +84,7 @@ class MaterializationBenchmark { var graphWithNestedImports: RunnableGraph[NotUsed] = _ var graphWithImportedFlow: RunnableGraph[NotUsed] = _ - @Param(Array("1", "10", "100", "1000")) + @Param(Array("1", "10")) var complexity = 0 @Setup diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index d9aec3c41f..6dd7366fe3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -47,8 +47,8 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[ * INTERNAL API */ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { - if (size < 1) throw new IllegalArgumentException(s"buffer size MSUT be positive (was: $size") - if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two") + if (size < 1) throw new IllegalArgumentException(s"buffer size must be positive (was: $size)") + if ((size & (size - 1)) != 0) throw new IllegalArgumentException(s"buffer size must be a power of two (was: $size)") // TODO: buffer and batch sizing heuristics private var upstream: Subscription = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 3f2966e751..cf6d853bb3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -12,6 +12,7 @@ import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.Promise import akka.event.Logging +import akka.util.OptionVal /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index e9e8476e4b..1d7fca3d27 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -37,6 +37,7 @@ import scala.compat.java8.OptionConverters._ import java.util.Optional import akka.event.Logging +import akka.util.OptionVal /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 38fbd5eff4..3e001afee1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -353,7 +353,7 @@ final case class CompletedTraversalBuilder( inSlots: Int, inToOffset: Map[InPort, Int], attributes: Attributes, - islandTag: Option[IslandTag] = None) extends TraversalBuilder { + islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) ⇒ C): TraversalBuilder = { val key = new BuilderKey @@ -367,8 +367,8 @@ final case class CompletedTraversalBuilder( override def traversal: Traversal = { val withIsland = islandTag match { - case Some(tag) ⇒ EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) - case None ⇒ traversalSoFar + case OptionVal.Some(tag) ⇒ EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) + case _ ⇒ traversalSoFar } if (attributes eq Attributes.none) withIsland @@ -392,8 +392,8 @@ final case class CompletedTraversalBuilder( override def makeIsland(islandTag: IslandTag): TraversalBuilder = this.islandTag match { - case None ⇒ copy(islandTag = Some(islandTag)) - case Some(_) ⇒ this + case OptionVal.None ⇒ copy(islandTag = OptionVal(islandTag)) + case OptionVal.Some(_) ⇒ this } override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = @@ -518,10 +518,10 @@ object LinearTraversalBuilder { else empty().append(linear, combine) case completed: CompletedTraversalBuilder ⇒ - val inOpt = shape.inlets.headOption + val inOpt = OptionVal(shape.inlets.headOption.orNull) val inOffs = inOpt match { - case Some(in) ⇒ completed.offsetOf(in) - case None ⇒ 0 + case OptionVal.Some(in) ⇒ completed.offsetOf(in) + case OptionVal.None ⇒ 0 } LinearTraversalBuilder( @@ -534,11 +534,11 @@ object LinearTraversalBuilder { Attributes.none) case composite ⇒ - val inOpt = shape.inlets.headOption + val inOpt = OptionVal(shape.inlets.headOption.orNull) val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder val inOffs = inOpt match { - case Some(in) ⇒ composite.offsetOf(in) - case None ⇒ 0 + case OptionVal.Some(in) ⇒ composite.offsetOf(in) + case OptionVal.None ⇒ 0 } LinearTraversalBuilder( @@ -640,6 +640,11 @@ final case class LinearTraversalBuilder( pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal) case OptionVal.None ⇒ copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) + case OptionVal.None ⇒ + copy( + inPort = OptionVal.None, + outPort = OptionVal.None, + traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } } else throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") @@ -681,6 +686,9 @@ final case class LinearTraversalBuilder( beforeBuilder = EmptyTraversal) case OptionVal.None ⇒ copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) + copy( + outPort = OptionVal.None, + traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } } else throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") @@ -977,7 +985,7 @@ final case class CompositeTraversalBuilder( outOwners: Map[OutPort, BuilderKey] = Map.empty, unwiredOuts: Int = 0, attributes: Attributes, - islandTag: IslandTag = null) extends TraversalBuilder { + islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder { override def toString: String = s""" @@ -1020,7 +1028,10 @@ final case class CompositeTraversalBuilder( remaining = remaining.tail } - val finalTraversal = if (islandTag == null) traversal else EnterIsland(islandTag).concat(traversal).concat(ExitIsland) + val finalTraversal = islandTag match { + case OptionVal.None ⇒ traversal + case OptionVal.Some(tag) ⇒ EnterIsland(tag).concat(traversal).concat(ExitIsland) + } // The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it // to be embedded in a larger graph, making partial graph reuse much more efficient. @@ -1159,9 +1170,9 @@ final case class CompositeTraversalBuilder( } override def makeIsland(islandTag: IslandTag): TraversalBuilder = { - if (this.islandTag eq null) - copy(islandTag = islandTag) - else - this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + this.islandTag match { + case OptionVal.None ⇒ copy(islandTag = OptionVal(islandTag)) + case _ ⇒ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 0bf03b351f..3817a0fe1a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -8,7 +8,7 @@ import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.TLSProtocol._ import akka.stream.impl.{ TlsModuleIslandTag, TraversalBuilder } -import akka.util.ByteString +import akka.util.{ ByteString, OptionVal } import scala.util.Try diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 68a101b1f8..5b3df91a2a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -14,7 +14,7 @@ import akka.japi.function.{ Effect, Procedure } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } -import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder } +import akka.stream.impl.{ EmptyTraversal, LinearTraversalBuilder, ReactiveStreamsCompliance, TraversalBuilder } import scala.collection.mutable.ArrayBuffer import scala.collection.{ immutable, mutable }