Merge pull request #22521 from ktoso/wip-use-optionval-materializer

OptionVals and no requires() in materializer
This commit is contained in:
Patrik Nordwall 2017-03-10 17:00:47 +01:00 committed by GitHub
commit 4743a6f1e2
14 changed files with 125 additions and 105 deletions

View file

@ -50,6 +50,9 @@ private[akka] final class OptionVal[+A >: Null](val x: A) extends AnyVal {
def getOrElse[B >: A](default: B): B =
if (x == null) default else x
def contains[B >: A](it: B): Boolean =
x != null && x == it
/**
* Returns the option's value if it is nonempty, or `null` if it is empty.
*/

View file

@ -5,6 +5,9 @@
package akka.stream
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.stream.scaladsl.RunnableGraph
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@ -16,22 +19,18 @@ class GraphBuilderBenchmark {
var complexity = 0
@Benchmark
def flow_with_map(): Unit = {
def flow_with_map(): RunnableGraph[NotUsed] =
MaterializationBenchmark.flowWithMapBuilder(complexity)
}
@Benchmark
def graph_with_junctions(): Unit = {
def graph_with_junctions(): RunnableGraph[NotUsed] =
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
}
@Benchmark
def graph_with_nested_imports(): Unit = {
def graph_with_nested_imports(): RunnableGraph[NotUsed] =
MaterializationBenchmark.graphWithNestedImportsBuilder(complexity)
}
@Benchmark
def graph_with_imported_flow(): Unit = {
def graph_with_imported_flow(): RunnableGraph[NotUsed] =
MaterializationBenchmark.graphWithImportedFlowBuilder(complexity)
}
}

View file

@ -84,7 +84,7 @@ class MaterializationBenchmark {
var graphWithNestedImports: RunnableGraph[NotUsed] = _
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
@Param(Array("1", "10", "100", "1000"))
@Param(Array("1", "10"))
var complexity = 0
@Setup
@ -101,14 +101,14 @@ class MaterializationBenchmark {
}
@Benchmark
def flow_with_map(): Unit = flowWithMap.run()
def flow_with_map(): NotUsed = flowWithMap.run()
@Benchmark
def graph_with_junctions(): Unit = graphWithJunctions.run()
def graph_with_junctions(): NotUsed = graphWithJunctions.run()
@Benchmark
def graph_with_nested_imports(): Unit = graphWithNestedImports.run()
def graph_with_nested_imports(): NotUsed = graphWithNestedImports.run()
@Benchmark
def graph_with_imported_flow(): Unit = graphWithImportedFlow.run()
def graph_with_imported_flow(): NotUsed = graphWithImportedFlow.run()
}

View file

@ -228,7 +228,7 @@ private[akka] class Shard(
def passivate(entity: ActorRef, stopMessage: Any): Unit = {
idByRef.get(entity) match {
case Some(id) => if (!messageBuffers.contains(id)) {
case Some(id) if (!messageBuffers.contains(id)) {
log.debug("Passivating started on entity {}", id)
passivating = passivating + entity
@ -237,7 +237,7 @@ private[akka] class Shard(
} else {
log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity.", entity)
}
case None => log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity)
case None log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity)
}
}

View file

@ -11,6 +11,7 @@ import akka.event.LoggingAdapter
import akka.pattern.ask
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreterShell
import akka.util.OptionVal
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, ExecutionContextExecutor }
@ -86,11 +87,9 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
* The default phases are left in-tact since we still respect `.async` and other tags that were marked within a sub-fused graph.
*/
private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ActorRef) extends Materializer {
require(registerShell ne null, "When using SubFusing the subflowFuser MUST NOT be null.") // FIXME remove check?
val subFusingPhase = new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] = {
new GraphStageIsland(settings, materializer, islandName, Some(registerShell)).asInstanceOf[PhaseIsland[Any]]
new GraphStageIsland(settings, materializer, islandName, OptionVal(registerShell)).asInstanceOf[PhaseIsland[Any]]
}
}

View file

@ -47,8 +47,9 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[
* INTERNAL API
*/
private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates {
require(size > 0, "buffer size cannot be zero")
require((size & (size - 1)) == 0, "buffer size must be a power of two")
if (size < 1) throw new IllegalArgumentException(s"buffer size must be positive (was: $size)")
if ((size & (size - 1)) != 0) throw new IllegalArgumentException(s"buffer size must be a power of two (was: $size)")
// TODO: buffer and batch sizing heuristics
private var upstream: Subscription = _
private val inputBuffer = Array.ofDim[AnyRef](size)
@ -114,7 +115,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
}
protected def onSubscribe(subscription: Subscription): Unit = {
require(subscription != null)
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
if (upstreamCompleted) subscription.cancel()
else {
upstream = subscription

View file

@ -12,6 +12,7 @@ import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Promise
import akka.event.Logging
import akka.util.OptionVal
/**
* INTERNAL API

View file

@ -8,7 +8,7 @@ import java.util.ArrayList
import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill, Props }
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill }
import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter }
import akka.stream.Attributes.InputBuffer
@ -35,7 +35,7 @@ object PhasedFusingActorMaterializer {
val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[Any] =
new GraphStageIsland(settings, materializer, islandName, subflowFuser = None).asInstanceOf[PhaseIsland[Any]]
new GraphStageIsland(settings, materializer, islandName, subflowFuser = OptionVal.None).asInstanceOf[PhaseIsland[Any]]
}
val DefaultPhases: Map[IslandTag, Phase[Any]] = Map[IslandTag, Phase[Any]](
@ -536,7 +536,7 @@ final class GraphStageIsland(
effectiveSettings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer,
islandName: String,
subflowFuser: Option[GraphInterpreterShell ActorRef]) extends PhaseIsland[GraphStageLogic] {
subflowFuser: OptionVal[GraphInterpreterShell ActorRef]) extends PhaseIsland[GraphStageLogic] {
// TODO: remove these
private val logicArrayType = Array.empty[GraphStageLogic]
private[this] val logics = new ArrayList[GraphStageLogic](64)
@ -654,9 +654,8 @@ final class GraphStageIsland(
shell.connections = finalConnections
shell.logics = logics.toArray(logicArrayType)
// TODO make OptionVal
subflowFuser match {
case Some(fuseIntoExistingInterperter)
case OptionVal.Some(fuseIntoExistingInterperter)
fuseIntoExistingInterperter(shell)
case _

View file

@ -37,6 +37,7 @@ import scala.compat.java8.OptionConverters._
import java.util.Optional
import akka.event.Logging
import akka.util.OptionVal
/**
* INTERNAL API

View file

@ -7,6 +7,8 @@ package akka.stream.impl
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.scaladsl.Keep
import akka.util.OptionVal
import scala.language.existentials
/**
* Graphs to be materialized are defined by their traversal. There is no explicit graph information tracked, instead
@ -351,7 +353,7 @@ final case class CompletedTraversalBuilder(
inSlots: Int,
inToOffset: Map[InPort, Int],
attributes: Attributes,
islandTag: Option[IslandTag] = None) extends TraversalBuilder {
islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder {
override def add[A, B, C](submodule: TraversalBuilder, shape: Shape, combineMat: (A, B) C): TraversalBuilder = {
val key = new BuilderKey
@ -365,8 +367,8 @@ final case class CompletedTraversalBuilder(
override def traversal: Traversal = {
val withIsland = islandTag match {
case Some(tag) EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland)
case None traversalSoFar
case OptionVal.Some(tag) EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland)
case _ traversalSoFar
}
if (attributes eq Attributes.none) withIsland
@ -390,8 +392,8 @@ final case class CompletedTraversalBuilder(
override def makeIsland(islandTag: IslandTag): TraversalBuilder =
this.islandTag match {
case None copy(islandTag = Some(islandTag))
case Some(_) this
case OptionVal.None copy(islandTag = OptionVal(islandTag))
case OptionVal.Some(_) this
}
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder =
@ -464,26 +466,26 @@ final case class AtomicTraversalBuilder(
object LinearTraversalBuilder {
// TODO: Remove
private val cachedEmptyLinear = LinearTraversalBuilder(None, None, 0, 0, PushNotUsed, None, Attributes.none)
private val cachedEmptyLinear = LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, Attributes.none)
private[this] final val wireBackward: Array[Int] = Array(-1)
private[this] final val noWire: Array[Int] = Array()
def empty(attributes: Attributes = Attributes.none): LinearTraversalBuilder =
if (attributes eq Attributes.none) cachedEmptyLinear
else LinearTraversalBuilder(None, None, 0, 0, PushNotUsed, None, attributes, EmptyTraversal)
else LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, attributes, EmptyTraversal)
/**
* Create a traversal builder specialized for linear graphs. This is designed to be much faster and lightweight
* than its generic counterpart. It can be freely mixed with the generic builder in both ways.
*/
def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): LinearTraversalBuilder = {
require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.")
require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.")
if (module.shape.inlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.")
if (module.shape.outlets.size > 1) throw new IllegalStateException("Modules with more than one input port cannot be linear.")
TraversalBuilder.initShape(module.shape)
val inPortOpt = module.shape.inlets.headOption
val outPortOpt = module.shape.outlets.headOption
val inPortOpt = OptionVal(module.shape.inlets.headOption.orNull)
val outPortOpt = OptionVal(module.shape.outlets.headOption.orNull)
val wiring = if (outPortOpt.isDefined) wireBackward else noWire
@ -493,7 +495,7 @@ object LinearTraversalBuilder {
inOffset = 0,
if (inPortOpt.isDefined) 1 else 0,
traversalSoFar = MaterializeAtomic(module, wiring),
pendingBuilder = None,
pendingBuilder = OptionVal.None,
attributes)
}
@ -516,36 +518,36 @@ object LinearTraversalBuilder {
else empty().append(linear, combine)
case completed: CompletedTraversalBuilder
val inOpt = shape.inlets.headOption
val inOpt = OptionVal(shape.inlets.headOption.orNull)
val inOffs = inOpt match {
case Some(in) completed.offsetOf(in)
case None 0
case OptionVal.Some(in) completed.offsetOf(in)
case OptionVal.None 0
}
LinearTraversalBuilder(
inPort = inOpt,
outPort = None,
inPort = OptionVal(inOpt.orNull),
outPort = OptionVal.None,
inOffset = inOffs,
inSlots = completed.inSlots,
completed.traversal.concat(addMatCompose(PushNotUsed, combine)),
pendingBuilder = None,
pendingBuilder = OptionVal.None,
Attributes.none)
case composite
val inOpt = shape.inlets.headOption
val inOpt = OptionVal(shape.inlets.headOption.orNull)
val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder
val inOffs = inOpt match {
case Some(in) composite.offsetOf(in)
case None 0
case OptionVal.Some(in) composite.offsetOf(in)
case OptionVal.None 0
}
LinearTraversalBuilder(
inPort = inOpt,
outPort = Some(out),
inPort = OptionVal(inOpt.orNull),
outPort = OptionVal.Some(out),
inOffset = inOffs,
inSlots = composite.inSlots,
addMatCompose(PushNotUsed, combine),
pendingBuilder = Some(composite),
pendingBuilder = OptionVal.Some(composite),
Attributes.none,
beforeBuilder = EmptyTraversal)
@ -562,15 +564,15 @@ object LinearTraversalBuilder {
* -1 relative offset to something else (see rewireLastOutTo).
*/
final case class LinearTraversalBuilder(
inPort: Option[InPort],
outPort: Option[OutPort],
inPort: OptionVal[InPort],
outPort: OptionVal[OutPort],
inOffset: Int,
override val inSlots: Int,
traversalSoFar: Traversal,
pendingBuilder: Option[TraversalBuilder],
pendingBuilder: OptionVal[TraversalBuilder],
attributes: Attributes,
beforeBuilder: Traversal = EmptyTraversal,
islandTag: Option[IslandTag] = None) extends TraversalBuilder {
beforeBuilder: Traversal = EmptyTraversal,
islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder {
protected def isEmpty: Boolean = inSlots == 0 && outPort.isEmpty
@ -583,7 +585,7 @@ final case class LinearTraversalBuilder(
* This builder can always return a traversal.
*/
override def traversal: Traversal = {
if (outPort.nonEmpty)
if (outPort.isDefined)
throw new IllegalStateException("Traversal cannot be acquired until all output ports have been wired")
applyIslandAndAttributes(traversalSoFar)
}
@ -598,8 +600,8 @@ final case class LinearTraversalBuilder(
private def applyIslandAndAttributes(t: Traversal): Traversal = {
val withIslandTag = islandTag match {
case None t
case Some(tag) EnterIsland(tag).concat(t).concat(ExitIsland)
case OptionVal.None t
case OptionVal.Some(tag) EnterIsland(tag).concat(t).concat(ExitIsland)
}
if (attributes eq Attributes.none) withIslandTag
@ -625,19 +627,24 @@ final case class LinearTraversalBuilder(
override def wire(out: OutPort, in: InPort): TraversalBuilder = {
if (outPort.contains(out) && inPort.contains(in)) {
pendingBuilder match {
case Some(composite)
case OptionVal.Some(composite)
copy(
inPort = None,
outPort = None,
inPort = OptionVal.None,
outPort = OptionVal.None,
traversalSoFar =
applyIslandAndAttributes(
beforeBuilder.concat(
composite
.assign(out, inOffset - composite.offsetOfModule(out))
.traversal).concat(traversalSoFar)),
pendingBuilder = None, beforeBuilder = EmptyTraversal)
case None
copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
pendingBuilder = OptionVal.None, beforeBuilder = EmptyTraversal)
case OptionVal.None
copy(inPort = OptionVal.None, outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
case OptionVal.None
copy(
inPort = OptionVal.None,
outPort = OptionVal.None,
traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
}
} else
throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.")
@ -646,8 +653,8 @@ final case class LinearTraversalBuilder(
override def offsetOfModule(out: OutPort): Int = {
if (outPort.contains(out)) {
pendingBuilder match {
case Some(composite) composite.offsetOfModule(out)
case None 0 // Output belongs to the last module, which will be materialized *first*
case OptionVal.Some(composite) composite.offsetOfModule(out)
case OptionVal.None 0 // Output belongs to the last module, which will be materialized *first*
}
} else
throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder")
@ -665,9 +672,9 @@ final case class LinearTraversalBuilder(
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = {
if (outPort.contains(out)) {
pendingBuilder match {
case Some(composite)
case OptionVal.Some(composite)
copy(
outPort = None,
outPort = OptionVal.None,
traversalSoFar =
applyIslandAndAttributes(
beforeBuilder.concat(
@ -675,10 +682,13 @@ final case class LinearTraversalBuilder(
.assign(out, relativeSlot)
.traversal
.concat(traversalSoFar))),
pendingBuilder = None,
pendingBuilder = OptionVal.None,
beforeBuilder = EmptyTraversal)
case None
copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
case OptionVal.None
copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
copy(
outPort = OptionVal.None,
traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
}
} else
throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder")
@ -705,9 +715,9 @@ final case class LinearTraversalBuilder(
toAppend.copy(
traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose)))
} else {
if (outPort.nonEmpty) {
require(toAppend.inPort.isDefined, "Appended linear module must have an unwired input port " +
"because there is a dangling output.")
if (outPort.isDefined) {
if (toAppend.inPort.isEmpty)
throw new IllegalArgumentException("Appended linear module must have an unwired input port because there is a dangling output.")
/*
* To understand how append work, first the general structure of the LinearTraversalBuilder must be
@ -749,7 +759,7 @@ final case class LinearTraversalBuilder(
* different.
*/
val assembledTraversalForThis = this.pendingBuilder match {
case None
case OptionVal.None
/*
* This is the case where we are a pure linear builder (all composites have been already completed),
* which means that traversalSoFar contains everything already, except the final attributes and islands
@ -788,7 +798,7 @@ final case class LinearTraversalBuilder(
rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots)
}
case Some(composite)
case OptionVal.Some(composite)
/*
* This is the case where our last module is a composite, and since it does not have its output port
* wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder.
@ -842,7 +852,7 @@ final case class LinearTraversalBuilder(
* There are two variants, depending whether toAppend is purely linear or if it has a composite at the end.
*/
toAppend.pendingBuilder match {
case None
case OptionVal.None
/*
* This is the simple case, when the other is purely linear. We just concatenate the traversals
* and do some bookkeeping.
@ -855,13 +865,13 @@ final case class LinearTraversalBuilder(
inOffset = inOffset + toAppend.inSlots,
// Build in reverse so it yields a more efficient layout for left-to-right building
traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis),
pendingBuilder = None,
pendingBuilder = OptionVal.None,
attributes = Attributes.none, // attributes are none for the new enclosing builder
beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites
islandTag = None // islandTag is reset for the new enclosing builder
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
)
case Some(composite)
case OptionVal.Some(composite)
/*
* In this case we need to assemble as much as we can, and create a new "sandwich" of
* beforeBuilder ~ pendingBuilder ~ traversalSoFar
@ -875,9 +885,9 @@ final case class LinearTraversalBuilder(
// First prepare island enter and exit if tags are present
toAppend.islandTag match {
case None
case OptionVal.None
// Nothing changes
case Some(tag)
case OptionVal.Some(tag)
// Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps)
newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal)
// Exit the island just after the appended builder (they should not applied to _this_ builder)
@ -908,7 +918,7 @@ final case class LinearTraversalBuilder(
pendingBuilder = toAppend.pendingBuilder,
attributes = Attributes.none, // attributes are none for the new enclosing builder
beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites
islandTag = None // islandTag is reset for the new enclosing builder
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
)
}
} else throw new Exception("should this happen?")
@ -927,8 +937,8 @@ final case class LinearTraversalBuilder(
*/
override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder =
this.islandTag match {
case Some(tag) this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
case None copy(islandTag = Some(islandTag))
case OptionVal.Some(tag) this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
case OptionVal.None copy(islandTag = OptionVal.Some(islandTag))
}
}
@ -975,7 +985,7 @@ final case class CompositeTraversalBuilder(
outOwners: Map[OutPort, BuilderKey] = Map.empty,
unwiredOuts: Int = 0,
attributes: Attributes,
islandTag: IslandTag = null) extends TraversalBuilder {
islandTag: OptionVal[IslandTag] = OptionVal.None) extends TraversalBuilder {
override def toString: String =
s"""
@ -1018,7 +1028,10 @@ final case class CompositeTraversalBuilder(
remaining = remaining.tail
}
val finalTraversal = if (islandTag == null) traversal else EnterIsland(islandTag).concat(traversal).concat(ExitIsland)
val finalTraversal = islandTag match {
case OptionVal.None traversal
case OptionVal.Some(tag) EnterIsland(tag).concat(traversal).concat(ExitIsland)
}
// The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it
// to be embedded in a larger graph, making partial graph reuse much more efficient.
@ -1157,9 +1170,9 @@ final case class CompositeTraversalBuilder(
}
override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
if (this.islandTag eq null)
copy(islandTag = islandTag)
else
this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
this.islandTag match {
case OptionVal.None copy(islandTag = OptionVal(islandTag))
case _ this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
}
}
}

View file

@ -89,8 +89,8 @@ object ActorGraphInterpreter {
override def logic: GraphStageLogic = BatchingActorInputBoundary.this
}
require(size > 0, "buffer size cannot be zero")
require((size & (size - 1)) == 0, "buffer size must be a power of two")
if (size <= 0) throw new IllegalArgumentException("buffer size cannot be zero")
if ((size & (size - 1)) != 0) throw new IllegalArgumentException("buffer size must be a power of two")
private var actor: ActorRef = ActorRef.noSender
private var upstream: Subscription = _
@ -136,7 +136,7 @@ object ActorGraphInterpreter {
private def dequeue(): Any = {
val elem = inputBuffer(nextInputElementCursor)
require(elem ne null, "Internal queue must never contain a null")
if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null")
inputBuffer(nextInputElementCursor) = null
batchRemaining -= 1
@ -196,7 +196,7 @@ object ActorGraphInterpreter {
}
def onSubscribe(subscription: Subscription): Unit = {
require(subscription != null, "Subscription cannot be null")
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
if (upstreamCompleted) {
tryCancel(subscription)
} else if (downstreamCanceled) {
@ -298,22 +298,22 @@ object ActorGraphInterpreter {
}
def shutdown(reason: Option[Throwable]): Unit = {
shutdownReason = reason
shutdownReason = OptionVal(reason.orNull)
pendingSubscribers.getAndSet(null) match {
case null // already called earlier
case pending pending foreach reportSubscribeFailure
}
}
@volatile private var shutdownReason: Option[Throwable] = None
@volatile private var shutdownReason: OptionVal[Throwable] = OptionVal.None
private def reportSubscribeFailure(subscriber: Subscriber[Any]): Unit =
try shutdownReason match {
case Some(e: SpecViolation) // ok, not allowed to call onError
case Some(e)
case OptionVal.Some(e: SpecViolation) // ok, not allowed to call onError
case OptionVal.Some(e)
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnError(subscriber, e)
case None
case OptionVal.None
tryOnSubscribe(subscriber, CancelledSubscription)
tryOnComplete(subscriber)
} catch {
@ -341,7 +341,7 @@ object ActorGraphInterpreter {
// interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked)
private var downstreamCompleted = false
// when upstream failed before we got the exposed publisher
private var upstreamFailed: Option[Throwable] = None
private var upstreamFailed: OptionVal[Throwable] = OptionVal.None
private var upstreamCompleted: Boolean = false
private def onNext(elem: Any): Unit = {
@ -362,7 +362,7 @@ object ActorGraphInterpreter {
// No need to fail if had already been cancelled, or we closed earlier
if (!(downstreamCompleted || upstreamCompleted)) {
upstreamCompleted = true
upstreamFailed = Some(e)
upstreamFailed = OptionVal.Some(e)
publisher.shutdown(Some(e))
if ((subscriber ne null) && !e.isInstanceOf[SpecViolation]) tryOnError(subscriber, e)
}

View file

@ -8,7 +8,7 @@ import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.TLSProtocol._
import akka.stream.impl.{ TlsModuleIslandTag, TraversalBuilder }
import akka.util.ByteString
import akka.util.{ ByteString, OptionVal }
import scala.util.Try

View file

@ -14,7 +14,7 @@ import akka.japi.function.{ Effect, Procedure }
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource }
import akka.stream.impl.{ ReactiveStreamsCompliance, TraversalBuilder }
import akka.stream.impl.{ EmptyTraversal, LinearTraversalBuilder, ReactiveStreamsCompliance, TraversalBuilder }
import scala.collection.mutable.ArrayBuffer
import scala.collection.{ immutable, mutable }

View file

@ -540,7 +540,11 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageWithMaterializedValue.module"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.ModuleExtractor"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.ModuleExtractor$"),
ProblemFilters.excludePackage("akka.stream.impl")
ProblemFilters.excludePackage("akka.stream.impl"),
// small changes in attributes
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSource.withAttributes"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#ProbeSink.withAttributes")
// NOTE: filters that will be backported to 2.4 should go to the latest 2.4 version below
)