Remove OptionVal workaround (#30789)

* Remove OptionVal workaround
* Revert "Avoid pattern-matching on OptionVal since Scala 2.13 allocates when checking the pattern"
  This reverts commit f0194bbc1ad43ac2c79bf156bfe91adf7fd5e538.
* Revert "Optimizes retrieval of mandatoryAttributes by removing potential allocation of OptionVal"
  This reverts commit 165b0e0d5c057965e37418299061bdf48c33fc44.
This commit is contained in:
Muskan Gupta 2021-10-22 19:57:44 +05:30 committed by GitHub
parent 0bc96eaa93
commit 62139a2220
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 290 additions and 268 deletions

View file

@ -103,13 +103,13 @@ import scala.util.Success
private var _currentActorThread: OptionVal[Thread] = OptionVal.None
// context-shared timer needed to allow for nested timer usage
def timer: TimerSchedulerCrossDslSupport[T] = _timer.orNull match {
case null =>
def timer: TimerSchedulerCrossDslSupport[T] = _timer match {
case OptionVal.Some(timer) => timer
case _ =>
checkCurrentActorThread()
val timer = mkTimer()
_timer = OptionVal.Some(timer)
timer
case timer => timer
}
protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this)
@ -150,14 +150,14 @@ import scala.util.Success
private def loggingContext(): LoggingContext = {
// lazy init of logging setup
_logging.orNull match {
case null =>
_logging match {
case OptionVal.Some(l) => l
case _ =>
val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]])
val logger = LoggerFactory.getLogger(logClass.getName)
val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this)
_logging = OptionVal.Some(l)
l
case l => l
}
}
@ -183,10 +183,11 @@ import scala.util.Success
// MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message
override private[akka] def clearMdc(): Unit = {
// avoid access to MDC ThreadLocal if not needed, see details in LoggingContext
val ctx = _logging.orNull
if ((ctx ne null) && ctx.mdcUsed) {
_logging match {
case OptionVal.Some(ctx) if ctx.mdcUsed =>
ActorMdc.clearMdc()
ctx.mdcUsed = false
case _ =>
}
}
@ -296,14 +297,14 @@ import scala.util.Success
val boxedMessageClass = BoxedType(messageClass).asInstanceOf[Class[U]]
_messageAdapters = (boxedMessageClass, f.asInstanceOf[Any => T]) ::
_messageAdapters.filterNot { case (cls, _) => cls == boxedMessageClass }
val ref = messageAdapterRef.orNull match {
case null =>
val ref = messageAdapterRef match {
case OptionVal.Some(ref) => ref.asInstanceOf[ActorRef[U]]
case _ =>
// AdaptMessage is not really a T, but that is erased
val ref =
internalSpawnMessageAdapter[Any](msg => AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter")
messageAdapterRef = OptionVal.Some(ref)
ref
case ref => ref.asInstanceOf[ActorRef[U]]
}
ref.asInstanceOf[ActorRef[U]]
}
@ -317,14 +318,13 @@ import scala.util.Success
* INTERNAL API
*/
@InternalApi private[akka] def setCurrentActorThread(): Unit = {
val callerThread = Thread.currentThread()
_currentActorThread.orNull match {
case null =>
_currentActorThread = OptionVal.Some(callerThread)
case t =>
_currentActorThread match {
case OptionVal.Some(t) =>
throw new IllegalStateException(
s"Invalid access by thread from the outside of $self. " +
s"Current message is processed by $t, but also accessed from $callerThread.")
s"Current message is processed by $t, but also accessed from ${Thread.currentThread()}.")
case _ =>
_currentActorThread = OptionVal.Some(Thread.currentThread())
}
}
@ -340,17 +340,17 @@ import scala.util.Success
*/
@InternalApi private[akka] def checkCurrentActorThread(): Unit = {
val callerThread = Thread.currentThread()
_currentActorThread.orNull match {
case null =>
throw new UnsupportedOperationException(
s"Unsupported access to ActorContext from the outside of $self. " +
s"No message is currently processed by the actor, but ActorContext was called from $callerThread.")
case t =>
_currentActorThread match {
case OptionVal.Some(t) =>
if (callerThread ne t) {
throw new UnsupportedOperationException(
s"Unsupported access to ActorContext operation from the outside of $self. " +
s"Current message is processed by $t, but ActorContext was called from $callerThread.")
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported access to ActorContext from the outside of $self. " +
s"No message is currently processed by the actor, but ActorContext was called from $callerThread.")
}
}
}

View file

@ -82,12 +82,12 @@ private[remote] class Encoder(
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None
private def serialization: Serialization = _serialization.orNull match {
case null =>
private def serialization: Serialization = _serialization match {
case OptionVal.Some(s) => s
case _ =>
val s = SerializationExtension(system)
_serialization = OptionVal.Some(s)
s
case s => s
}
private val instruments: RemoteInstruments = RemoteInstruments(system)
@ -130,14 +130,14 @@ private[remote] class Encoder(
Serialization.currentTransportInformation.value = serialization.serializationInformation
// internally compression is applied by the builder:
outboundEnvelope.recipient.orNull match {
case null => headerBuilder.setNoRecipient()
case r => headerBuilder.setRecipientActorRef(r)
outboundEnvelope.recipient match {
case OptionVal.Some(r) => headerBuilder.setRecipientActorRef(r)
case _ => headerBuilder.setNoRecipient()
}
outboundEnvelope.sender.orNull match {
case null => headerBuilder.setNoSender()
case s => headerBuilder.setSenderActorRef(s)
outboundEnvelope.sender match {
case OptionVal.Some(s) => headerBuilder.setSenderActorRef(s)
case _ => headerBuilder.setNoSender()
}
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
@ -180,18 +180,18 @@ private[remote] class Encoder(
new OversizedPayloadException(reasonText),
"Failed to serialize oversized message [{}].",
Logging.messageClassName(outboundEnvelope.message))
system.eventStream.publish(outboundEnvelope.sender.orNull match {
case null =>
Dropped(
outboundEnvelope.message,
reasonText,
outboundEnvelope.recipient.getOrElse(ActorRef.noSender))
case msgSender =>
system.eventStream.publish(outboundEnvelope.sender match {
case OptionVal.Some(msgSender) =>
Dropped(
outboundEnvelope.message,
reasonText,
msgSender,
outboundEnvelope.recipient.getOrElse(ActorRef.noSender))
case _ =>
Dropped(
outboundEnvelope.message,
reasonText,
outboundEnvelope.recipient.getOrElse(ActorRef.noSender))
})
pull(in)
case _ =>
@ -415,14 +415,13 @@ private[remote] class Decoder(
val originUid = headerBuilder.uid
val association = inboundContext.association(originUid)
val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid).orNull match {
case null =>
headerBuilder.recipientActorRefPath.orNull match {
case null => OptionVal.None
case path => resolveRecipient(path)
}
case ref =>
OptionVal.Some(ref.asInstanceOf[InternalActorRef])
val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid) match {
case OptionVal.Some(ref) =>
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined =>
resolveRecipient(headerBuilder.recipientActorRefPath.get)
case _ =>
OptionVal.None
} catch {
case NonFatal(e) =>
// probably version mismatch due to restarted system
@ -430,14 +429,13 @@ private[remote] class Decoder(
OptionVal.None
}
val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid).orNull match {
case null =>
headerBuilder.senderActorRefPath.orNull match {
case null => OptionVal.None
case path => OptionVal(actorRefResolver.resolve(path))
}
case ref =>
OptionVal.Some(ref.asInstanceOf[InternalActorRef])
val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid) match {
case OptionVal.Some(ref) =>
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined =>
OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get))
case _ =>
OptionVal.None
} catch {
case NonFatal(e) =>
// probably version mismatch due to restarted system
@ -475,12 +473,8 @@ private[remote] class Decoder(
if ((messageCount & heavyHitterMask) == 0) {
// --- hit refs and manifests for heavy-hitter counting
association.orNull match {
case null =>
// we don't want to record hits for compression while handshake is still in progress.
log.debug(
"Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
case assoc =>
association match {
case OptionVal.Some(assoc) =>
val remoteAddress = assoc.remoteAddress
if (sender.isDefined)
compressions.hitActorRef(originUid, remoteAddress, sender.get, 1)
@ -489,6 +483,10 @@ private[remote] class Decoder(
compressions.hitActorRef(originUid, remoteAddress, recipient.get, 1)
compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1)
case _ =>
// we don't want to record hits for compression while handshake is still in progress.
log.debug(
"Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
}
// --- end of hit refs and manifests for heavy-hitter counting
}
@ -516,19 +514,19 @@ private[remote] class Decoder(
val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {
headerBuilder.recipientActorRefPath.orNull match {
case null =>
log.warning(
"Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
pull(in)
case path =>
headerBuilder.recipientActorRefPath match {
case OptionVal.Some(path) =>
val ref = actorRefResolver.getOrCompute(path)
if (ref.isInstanceOf[EmptyLocalActorRef])
log.warning(
"Message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
push(out, decoded.withRecipient(ref))
case _ =>
log.warning(
"Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
pull(in)
}
} else
@ -588,8 +586,10 @@ private[remote] class Decoder(
.runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) =>
resolveRecipient(recipientPath).orNull match {
case null =>
resolveRecipient(recipientPath) match {
case OptionVal.Some(recipient) =>
push(out, inboundEnvelope.withRecipient(recipient))
case _ =>
if (attemptsLeft > 0)
scheduleOnce(
RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope),
@ -608,8 +608,6 @@ private[remote] class Decoder(
val recipient = actorRefResolver.getOrCompute(recipientPath)
push(out, inboundEnvelope.withRecipient(recipient))
}
case recipient =>
push(out, inboundEnvelope.withRecipient(recipient))
}
case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown")
@ -643,12 +641,12 @@ private[remote] class Deserializer(
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None
private def serialization: Serialization = _serialization.orNull match {
case null =>
private def serialization: Serialization = _serialization match {
case OptionVal.Some(s) => s
case _ =>
val s = SerializationExtension(system)
_serialization = OptionVal.Some(s)
s
case s => s
}
override protected def logSource = classOf[Deserializer]
@ -677,9 +675,9 @@ private[remote] class Deserializer(
push(out, envelopeWithMessage)
} catch {
case NonFatal(e) =>
val from = envelope.association.orNull match {
case null => "unknown"
case assoc => assoc.remoteAddress
val from = envelope.association match {
case OptionVal.Some(a) => a.remoteAddress
case _ => "unknown"
}
log.warning(
"Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}",

View file

@ -17,7 +17,7 @@ import akka.annotation.InternalApi
import akka.event.Logging
import akka.japi.function
import akka.stream.impl.TraversalBuilder
import akka.util.ByteString
import akka.util.{ ByteString, OptionVal }
import akka.util.JavaDurationConverters._
import akka.util.LineNumbers
@ -122,14 +122,17 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
*/
def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = {
@tailrec
def find(list: List[Attribute]): T = list match {
case Nil => throw new IllegalStateException(s"Mandatory attribute [$c] not found")
def find(list: List[Attribute]): OptionVal[Attribute] = list match {
case Nil => OptionVal.None
case head :: tail =>
if (c.isInstance(head)) c.cast(head)
if (c.isInstance(head)) OptionVal.Some(head)
else find(tail)
}
find(attributeList)
find(attributeList) match {
case OptionVal.Some(t) => t.asInstanceOf[T]
case _ => throw new IllegalStateException(s"Mandatory attribute [$c] not found")
}
}
/**

View file

@ -350,8 +350,8 @@ import akka.util.unused
case _ =>
graph.traversalBuilder match {
case l: LinearTraversalBuilder =>
l.pendingBuilder.orNull match {
case a: AtomicTraversalBuilder =>
l.pendingBuilder match {
case OptionVal.Some(a: AtomicTraversalBuilder) =>
a.module match {
case m: GraphStageModule[_, _] =>
m.stage match {
@ -359,8 +359,7 @@ import akka.util.unused
// It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize.
if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync)
OptionVal.Some(single)
else
OptionVal.None
else OptionVal.None
case _ => OptionVal.None
}
case _ => OptionVal.None
@ -506,9 +505,10 @@ import akka.util.unused
}
override def traversal: Traversal = {
val withIsland =
if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversalSoFar).concat(ExitIsland)
else traversalSoFar
val withIsland = islandTag match {
case OptionVal.Some(tag) => EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland)
case _ => traversalSoFar
}
if (attributes eq Attributes.none) withIsland
else PushAttributes(attributes).concat(withIsland).concat(PopAttributes)
@ -531,8 +531,10 @@ import akka.util.unused
override def unwiredOuts: Int = 0
override def makeIsland(islandTag: IslandTag): TraversalBuilder =
if (this.islandTag.isDefined) this
else copy(islandTag = OptionVal(islandTag))
this.islandTag match {
case OptionVal.None => copy(islandTag = OptionVal(islandTag))
case _ => this
}
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder =
throw new UnsupportedOperationException("Cannot assign ports to slots in a completed builder.")
@ -683,10 +685,13 @@ import akka.util.unused
case completed: CompletedTraversalBuilder =>
val inOpt = OptionVal(shape.inlets.headOption.orNull)
val inOffs = if (inOpt.isDefined) completed.offsetOf(inOpt.get) else 0
val inOffs = inOpt match {
case OptionVal.Some(in) => completed.offsetOf(in)
case _ => 0
}
LinearTraversalBuilder(
inPort = inOpt,
inPort = OptionVal(inOpt.orNull),
outPort = OptionVal.None,
inOffset = inOffs,
inSlots = completed.inSlots,
@ -697,10 +702,13 @@ import akka.util.unused
case composite =>
val inOpt = OptionVal(shape.inlets.headOption.orNull)
val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder
val inOffs = if (inOpt.isDefined) composite.offsetOf(inOpt.get) else 0
val inOffs = inOpt match {
case OptionVal.Some(in) => composite.offsetOf(in)
case _ => 0
}
LinearTraversalBuilder(
inPort = inOpt,
inPort = OptionVal(inOpt.orNull),
outPort = OptionVal.Some(out),
inOffset = inOffs,
inSlots = composite.inSlots,
@ -762,7 +770,10 @@ import akka.util.unused
}
private def applyIslandAndAttributes(t: Traversal): Traversal = {
val withIslandTag = if (islandTag.isDefined) EnterIsland(islandTag.get).concat(t).concat(ExitIsland) else t
val withIslandTag = islandTag match {
case OptionVal.Some(tag) => EnterIsland(tag).concat(t).concat(ExitIsland)
case _ => t
}
if (attributes eq Attributes.none) withIslandTag
else PushAttributes(attributes).concat(withIslandTag).concat(PopAttributes)
@ -786,8 +797,8 @@ import akka.util.unused
*/
override def wire(out: OutPort, in: InPort): TraversalBuilder = {
if (outPort.contains(out) && inPort.contains(in)) {
if (pendingBuilder.isDefined) {
val composite = pendingBuilder.get
pendingBuilder match {
case OptionVal.Some(composite) =>
copy(
inPort = OptionVal.None,
outPort = OptionVal.None,
@ -797,7 +808,7 @@ import akka.util.unused
.concat(traversalSoFar)),
pendingBuilder = OptionVal.None,
beforeBuilder = EmptyTraversal)
} else {
case _ =>
copy(
inPort = OptionVal.None,
outPort = OptionVal.None,
@ -809,10 +820,10 @@ import akka.util.unused
override def offsetOfModule(out: OutPort): Int = {
if (outPort.contains(out)) {
if (pendingBuilder.isDefined)
pendingBuilder.get.offsetOfModule(out)
else
0 // Output belongs to the last module, which will be materialized *first*
pendingBuilder match {
case OptionVal.Some(composite) => composite.offsetOfModule(out)
case _ => 0 // Output belongs to the last module, which will be materialized *first*
}
} else
throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder")
}
@ -828,15 +839,15 @@ import akka.util.unused
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = {
if (outPort.contains(out)) {
if (pendingBuilder.isDefined) {
val composite = pendingBuilder.get
pendingBuilder match {
case OptionVal.Some(composite) =>
copy(
outPort = OptionVal.None,
traversalSoFar = applyIslandAndAttributes(
beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))),
pendingBuilder = OptionVal.None,
beforeBuilder = EmptyTraversal)
} else {
case _ =>
copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
}
} else
@ -907,8 +918,8 @@ import akka.util.unused
* Depending whether we have a composite builder as our last step or not, composition will be somewhat
* different.
*/
val assembledTraversalForThis = if (this.pendingBuilder.isDefined) {
val composite = this.pendingBuilder.get
val assembledTraversalForThis = this.pendingBuilder match {
case OptionVal.Some(composite) =>
/*
* This is the case where our last module is a composite, and since it does not have its output port
* wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder.
@ -945,7 +956,8 @@ import akka.util.unused
* (remember that this is the _reverse_ of the Flow DSL order)
*/
beforeBuilder.concat(compositeTraversal).concat(traversalSoFar)
} else {
case _ =>
/*
* This is the case where we are a pure linear builder (all composites have been already completed),
* which means that traversalSoFar contains everything already, except the final attributes and islands
@ -997,7 +1009,8 @@ import akka.util.unused
* We have finished "this" builder, now we need to construct the new builder as the result of appending.
* There are two variants, depending whether toAppend is purely linear or if it has a composite at the end.
*/
if (toAppend.pendingBuilder.isEmpty) {
toAppend.pendingBuilder match {
case OptionVal.None =>
/*
* This is the simple case, when the other is purely linear. We just concatenate the traversals
* and do some bookkeeping.
@ -1015,7 +1028,8 @@ import akka.util.unused
beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
)
} else {
case _ => // Some(pendingBuilder)
/*
* In this case we need to assemble as much as we can, and create a new "sandwich" of
* beforeBuilder ~ pendingBuilder ~ traversalSoFar
@ -1028,11 +1042,13 @@ import akka.util.unused
var newBeforeTraversal = toAppend.beforeBuilder
// First prepare island enter and exit if tags are present
if (toAppend.islandTag.isDefined) {
toAppend.islandTag match {
case OptionVal.Some(tag) =>
// Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps)
newBeforeTraversal = EnterIsland(toAppend.islandTag.get).concat(newBeforeTraversal)
newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal)
// Exit the island just after the appended builder (they should not applied to _this_ builder)
newTraversalSoFar = ExitIsland.concat(newTraversalSoFar)
case _ => // Nothing changes
}
// Secondly, prepare attribute push and pop if Attributes are present
@ -1082,9 +1098,11 @@ import akka.util.unused
* between islands.
*/
override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder =
if (this.islandTag.isDefined)
this.islandTag match {
case OptionVal.Some(_) =>
this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
else copy(islandTag = OptionVal.Some(islandTag))
case _ => copy(islandTag = OptionVal.Some(islandTag))
}
}
/**
@ -1188,9 +1206,10 @@ import akka.util.unused
remaining = remaining.tail
}
val finalTraversal =
if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversal).concat(ExitIsland)
else traversal
val finalTraversal = islandTag match {
case OptionVal.Some(tag) => EnterIsland(tag).concat(traversal).concat(ExitIsland)
case _ => traversal
}
// The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it
// to be embedded in a larger graph, making partial graph reuse much more efficient.
@ -1325,8 +1344,10 @@ import akka.util.unused
}
override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
if (this.islandTag.isDefined)
this.islandTag match {
case OptionVal.None => copy(islandTag = OptionVal(islandTag))
case _ =>
this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
else copy(islandTag = OptionVal(islandTag))
}
}
}