=str use OptionVal in Traversal building

This commit is contained in:
Konrad `ktoso` Malawski 2017-03-10 11:42:57 +01:00
parent 99705d1ccc
commit e2a15a2a58
7 changed files with 35 additions and 22 deletions

View file

@ -84,7 +84,7 @@ class MaterializationBenchmark {
var graphWithNestedImports: RunnableGraph[NotUsed] = _
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
@Param(Array("1", "10", "100", "1000"))
@Param(Array("1", "10"))
var complexity = 0
@Setup

View file

@ -47,8 +47,8 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[
* INTERNAL API
*/
private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates {
if (size < 1) throw new IllegalArgumentException(s"buffer size MSUT be positive (was: $size")
if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two")
if (size < 1) throw new IllegalArgumentException(s"buffer size must be positive (was: $size)")
if ((size & (size - 1)) != 0) throw new IllegalArgumentException(s"buffer size must be a power of two (was: $size)")
// TODO: buffer and batch sizing heuristics
private var upstream: Subscription = _

View file

@ -12,6 +12,7 @@ import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Promise
import akka.event.Logging
import akka.util.OptionVal
/**
* INTERNAL API

View file

@ -37,6 +37,7 @@ import scala.compat.java8.OptionConverters._
import java.util.Optional
import akka.event.Logging
import akka.util.OptionVal
/**
* INTERNAL API

View file

@ -353,7 +353,7 @@ final case class CompletedTraversalBuilder(
inSlots: Int,
inToOffset: Map[InPort, Int],
attributes: Attributes,
islandTag: Option[IslandTag] = None) extends TraversalBuilder {
islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder {
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder = {
val key = new BuilderKey
@ -367,8 +367,8 @@ final case class CompletedTraversalBuilder(
override def traversal: Traversal = {
val withIsland = islandTag match {
case Some(tag) EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland)
case None traversalSoFar
case OptionVal.Some(tag) EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland)
case _ traversalSoFar
}
if (attributes eq Attributes.none) withIsland
@ -392,8 +392,8 @@ final case class CompletedTraversalBuilder(
override def makeIsland(islandTag: IslandTag): TraversalBuilder =
this.islandTag match {
case None copy(islandTag = Some(islandTag))
case Some(_) this
case OptionVal.None copy(islandTag = OptionVal(islandTag))
case OptionVal.Some(_) this
}
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder =
@ -518,10 +518,10 @@ object LinearTraversalBuilder {
else empty().append(linear, combine)
case completed: CompletedTraversalBuilder
val inOpt = shape.inlets.headOption
val inOpt = OptionVal(shape.inlets.headOption.orNull)
val inOffs = inOpt match {
case Some(in) completed.offsetOf(in)
case None 0
case OptionVal.Some(in) completed.offsetOf(in)
case OptionVal.None 0
}
LinearTraversalBuilder(
@ -534,11 +534,11 @@ object LinearTraversalBuilder {
Attributes.none)
case composite
val inOpt = shape.inlets.headOption
val inOpt = OptionVal(shape.inlets.headOption.orNull)
val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder
val inOffs = inOpt match {
case Some(in) composite.offsetOf(in)
case None 0
case OptionVal.Some(in) composite.offsetOf(in)
case OptionVal.None 0
}
LinearTraversalBuilder(
@ -640,6 +640,11 @@ final case class LinearTraversalBuilder(
pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal)
case OptionVal.None
copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
case OptionVal.None
copy(
inPort = OptionVal.None,
outPort = OptionVal.None,
traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
}
} else
throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.")
@ -681,6 +686,9 @@ final case class LinearTraversalBuilder(
beforeBuilder = EmptyTraversal)
case OptionVal.None
copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
copy(
outPort = OptionVal.None,
traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
}
} else
throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder")
@ -977,7 +985,7 @@ final case class CompositeTraversalBuilder(
outOwners: Map[OutPort, BuilderKey] = Map.empty,
unwiredOuts: Int = 0,
attributes: Attributes,
islandTag: IslandTag = null) extends TraversalBuilder {
islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder {
override def toString: String =
s"""
@ -1020,7 +1028,10 @@ final case class CompositeTraversalBuilder(
remaining = remaining.tail
}
val finalTraversal = if (islandTag == null) traversal else EnterIsland(islandTag).concat(traversal).concat(ExitIsland)
val finalTraversal = islandTag match {
case OptionVal.None traversal
case OptionVal.Some(tag) EnterIsland(tag).concat(traversal).concat(ExitIsland)
}
// The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it
// to be embedded in a larger graph, making partial graph reuse much more efficient.
@ -1159,9 +1170,9 @@ final case class CompositeTraversalBuilder(
}
override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
if (this.islandTag eq null)
copy(islandTag = islandTag)
else
this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
this.islandTag match {
case OptionVal.None copy(islandTag = OptionVal(islandTag))
case _ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
}
}
}

View file

@ -8,7 +8,7 @@ import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.TLSProtocol._
import akka.stream.impl.{ TlsModuleIslandTag, TraversalBuilder }
import akka.util.ByteString
import akka.util.{ ByteString, OptionVal }
import scala.util.Try

View file

@ -14,7 +14,7 @@ import akka.japi.function.{ Effect, Procedure }
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource }
import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder }
import akka.stream.impl.{ EmptyTraversal, LinearTraversalBuilder, ReactiveStreamsCompliance, TraversalBuilder }
import scala.collection.mutable.ArrayBuffer
import scala.collection.{ immutable, mutable }