Wip optimize allocations optionval #30264

* Optimizes retrieval of mandatoryAttributes by removing potential allocation of OptionVal
* Optimizes ActorContextImpl by avoiding allocations inserted by Scala 2.13
* Optimizes Artery Codecs to avoid allocations associated with Scala 2.13 pattern-matching
* Avoid pattern-matching on OptionVal since Scala 2.13 allocates when checking the pattern
This commit is contained in:
Viktor Klang (√) 2021-06-01 06:55:36 +00:00 committed by GitHub
parent e09de4fc84
commit 364cb06905
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 274 additions and 303 deletions

View file

@ -103,13 +103,13 @@ import scala.util.Success
private var _currentActorThread: OptionVal[Thread] = OptionVal.None private var _currentActorThread: OptionVal[Thread] = OptionVal.None
// context-shared timer needed to allow for nested timer usage // context-shared timer needed to allow for nested timer usage
def timer: TimerSchedulerCrossDslSupport[T] = _timer match { def timer: TimerSchedulerCrossDslSupport[T] = _timer.orNull match {
case OptionVal.Some(timer) => timer case null =>
case _ =>
checkCurrentActorThread() checkCurrentActorThread()
val timer = mkTimer() val timer = mkTimer()
_timer = OptionVal.Some(timer) _timer = OptionVal.Some(timer)
timer timer
case timer => timer
} }
protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this) protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this)
@ -150,14 +150,14 @@ import scala.util.Success
private def loggingContext(): LoggingContext = { private def loggingContext(): LoggingContext = {
// lazy init of logging setup // lazy init of logging setup
_logging match { _logging.orNull match {
case OptionVal.Some(l) => l case null =>
case _ =>
val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]]) val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]])
val logger = LoggerFactory.getLogger(logClass.getName) val logger = LoggerFactory.getLogger(logClass.getName)
val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this) val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this)
_logging = OptionVal.Some(l) _logging = OptionVal.Some(l)
l l
case l => l
} }
} }
@ -183,11 +183,10 @@ import scala.util.Success
// MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message // MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message
override private[akka] def clearMdc(): Unit = { override private[akka] def clearMdc(): Unit = {
// avoid access to MDC ThreadLocal if not needed, see details in LoggingContext // avoid access to MDC ThreadLocal if not needed, see details in LoggingContext
_logging match { val ctx = _logging.orNull
case OptionVal.Some(ctx) if ctx.mdcUsed => if ((ctx ne null) && ctx.mdcUsed) {
ActorMdc.clearMdc() ActorMdc.clearMdc()
ctx.mdcUsed = false ctx.mdcUsed = false
case _ =>
} }
} }
@ -295,14 +294,14 @@ import scala.util.Success
val boxedMessageClass = BoxedType(messageClass).asInstanceOf[Class[U]] val boxedMessageClass = BoxedType(messageClass).asInstanceOf[Class[U]]
_messageAdapters = (boxedMessageClass, f.asInstanceOf[Any => T]) :: _messageAdapters = (boxedMessageClass, f.asInstanceOf[Any => T]) ::
_messageAdapters.filterNot { case (cls, _) => cls == boxedMessageClass } _messageAdapters.filterNot { case (cls, _) => cls == boxedMessageClass }
val ref = messageAdapterRef match { val ref = messageAdapterRef.orNull match {
case OptionVal.Some(ref) => ref.asInstanceOf[ActorRef[U]] case null =>
case _ =>
// AdaptMessage is not really a T, but that is erased // AdaptMessage is not really a T, but that is erased
val ref = val ref =
internalSpawnMessageAdapter[Any](msg => AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter") internalSpawnMessageAdapter[Any](msg => AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter")
messageAdapterRef = OptionVal.Some(ref) messageAdapterRef = OptionVal.Some(ref)
ref ref
case ref => ref.asInstanceOf[ActorRef[U]]
} }
ref.asInstanceOf[ActorRef[U]] ref.asInstanceOf[ActorRef[U]]
} }
@ -316,13 +315,14 @@ import scala.util.Success
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] def setCurrentActorThread(): Unit = { @InternalApi private[akka] def setCurrentActorThread(): Unit = {
_currentActorThread match { val callerThread = Thread.currentThread()
case OptionVal.Some(t) => _currentActorThread.orNull match {
case null =>
_currentActorThread = OptionVal.Some(callerThread)
case t =>
throw new IllegalStateException( throw new IllegalStateException(
s"Invalid access by thread from the outside of $self. " + s"Invalid access by thread from the outside of $self. " +
s"Current message is processed by $t, but also accessed from ${Thread.currentThread()}.") s"Current message is processed by $t, but also accessed from $callerThread.")
case _ =>
_currentActorThread = OptionVal.Some(Thread.currentThread())
} }
} }
@ -338,17 +338,17 @@ import scala.util.Success
*/ */
@InternalApi private[akka] def checkCurrentActorThread(): Unit = { @InternalApi private[akka] def checkCurrentActorThread(): Unit = {
val callerThread = Thread.currentThread() val callerThread = Thread.currentThread()
_currentActorThread match { _currentActorThread.orNull match {
case OptionVal.Some(t) => case null =>
throw new UnsupportedOperationException(
s"Unsupported access to ActorContext from the outside of $self. " +
s"No message is currently processed by the actor, but ActorContext was called from $callerThread.")
case t =>
if (callerThread ne t) { if (callerThread ne t) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
s"Unsupported access to ActorContext operation from the outside of $self. " + s"Unsupported access to ActorContext operation from the outside of $self. " +
s"Current message is processed by $t, but ActorContext was called from $callerThread.") s"Current message is processed by $t, but ActorContext was called from $callerThread.")
} }
case _ =>
throw new UnsupportedOperationException(
s"Unsupported access to ActorContext from the outside of $self. " +
s"No message is currently processed by the actor, but ActorContext was called from $callerThread.")
} }
} }
} }

View file

@ -61,6 +61,7 @@ private[remote] class Encoder(
extends GraphStageWithMaterializedValue[ extends GraphStageWithMaterializedValue[
FlowShape[OutboundEnvelope, EnvelopeBuffer], FlowShape[OutboundEnvelope, EnvelopeBuffer],
Encoder.OutboundCompressionAccess] { Encoder.OutboundCompressionAccess] {
import Encoder._ import Encoder._
val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in") val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in")
@ -81,12 +82,12 @@ private[remote] class Encoder(
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None private var _serialization: OptionVal[Serialization] = OptionVal.None
private def serialization: Serialization = _serialization match { private def serialization: Serialization = _serialization.orNull match {
case OptionVal.Some(s) => s case null =>
case _ =>
val s = SerializationExtension(system) val s = SerializationExtension(system)
_serialization = OptionVal.Some(s) _serialization = OptionVal.Some(s)
s s
case s => s
} }
private val instruments: RemoteInstruments = RemoteInstruments(system) private val instruments: RemoteInstruments = RemoteInstruments(system)
@ -129,14 +130,14 @@ private[remote] class Encoder(
Serialization.currentTransportInformation.value = serialization.serializationInformation Serialization.currentTransportInformation.value = serialization.serializationInformation
// internally compression is applied by the builder: // internally compression is applied by the builder:
outboundEnvelope.recipient match { outboundEnvelope.recipient.orNull match {
case OptionVal.Some(r) => headerBuilder.setRecipientActorRef(r) case null => headerBuilder.setNoRecipient()
case _ => headerBuilder.setNoRecipient() case r => headerBuilder.setRecipientActorRef(r)
} }
outboundEnvelope.sender match { outboundEnvelope.sender.orNull match {
case OptionVal.Some(s) => headerBuilder.setSenderActorRef(s) case null => headerBuilder.setNoSender()
case _ => headerBuilder.setNoSender() case s => headerBuilder.setSenderActorRef(s)
} }
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
@ -179,18 +180,18 @@ private[remote] class Encoder(
new OversizedPayloadException(reasonText), new OversizedPayloadException(reasonText),
"Failed to serialize oversized message [{}].", "Failed to serialize oversized message [{}].",
Logging.messageClassName(outboundEnvelope.message)) Logging.messageClassName(outboundEnvelope.message))
system.eventStream.publish(outboundEnvelope.sender match { system.eventStream.publish(outboundEnvelope.sender.orNull match {
case OptionVal.Some(msgSender) => case null =>
Dropped(
outboundEnvelope.message,
reasonText,
outboundEnvelope.recipient.getOrElse(ActorRef.noSender))
case msgSender =>
Dropped( Dropped(
outboundEnvelope.message, outboundEnvelope.message,
reasonText, reasonText,
msgSender, msgSender,
outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) outboundEnvelope.recipient.getOrElse(ActorRef.noSender))
case _ =>
Dropped(
outboundEnvelope.message,
reasonText,
outboundEnvelope.recipient.getOrElse(ActorRef.noSender))
}) })
pull(in) pull(in)
case _ => case _ =>
@ -414,13 +415,14 @@ private[remote] class Decoder(
val originUid = headerBuilder.uid val originUid = headerBuilder.uid
val association = inboundContext.association(originUid) val association = inboundContext.association(originUid)
val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid) match { val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid).orNull match {
case OptionVal.Some(ref) => case null =>
OptionVal(ref.asInstanceOf[InternalActorRef]) headerBuilder.recipientActorRefPath.orNull match {
case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined => case null => OptionVal.None
resolveRecipient(headerBuilder.recipientActorRefPath.get) case path => resolveRecipient(path)
case _ => }
OptionVal.None case ref =>
OptionVal.Some(ref.asInstanceOf[InternalActorRef])
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>
// probably version mismatch due to restarted system // probably version mismatch due to restarted system
@ -428,13 +430,14 @@ private[remote] class Decoder(
OptionVal.None OptionVal.None
} }
val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid) match { val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid).orNull match {
case OptionVal.Some(ref) => case null =>
OptionVal(ref.asInstanceOf[InternalActorRef]) headerBuilder.senderActorRefPath.orNull match {
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined => case null => OptionVal.None
OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get)) case path => OptionVal(actorRefResolver.resolve(path))
case _ => }
OptionVal.None case ref =>
OptionVal.Some(ref.asInstanceOf[InternalActorRef])
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>
// probably version mismatch due to restarted system // probably version mismatch due to restarted system
@ -472,27 +475,20 @@ private[remote] class Decoder(
if ((messageCount & heavyHitterMask) == 0) { if ((messageCount & heavyHitterMask) == 0) {
// --- hit refs and manifests for heavy-hitter counting // --- hit refs and manifests for heavy-hitter counting
association match { association.orNull match {
case OptionVal.Some(assoc) => case null =>
val remoteAddress = assoc.remoteAddress
sender match {
case OptionVal.Some(snd) =>
compressions.hitActorRef(originUid, remoteAddress, snd, 1)
case _ =>
}
recipient match {
case OptionVal.Some(rcp) =>
compressions.hitActorRef(originUid, remoteAddress, rcp, 1)
case _ =>
}
compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1)
case _ =>
// we don't want to record hits for compression while handshake is still in progress. // we don't want to record hits for compression while handshake is still in progress.
log.debug( log.debug(
"Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") "Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
case assoc =>
val remoteAddress = assoc.remoteAddress
if (sender.isDefined)
compressions.hitActorRef(originUid, remoteAddress, sender.get, 1)
if (recipient.isDefined)
compressions.hitActorRef(originUid, remoteAddress, recipient.get, 1)
compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1)
} }
// --- end of hit refs and manifests for heavy-hitter counting // --- end of hit refs and manifests for heavy-hitter counting
} }
@ -511,7 +507,6 @@ private[remote] class Decoder(
lane = 0) lane = 0)
if (recipient.isEmpty && !headerBuilder.isNoRecipient) { if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
// The remote deployed actor might not be created yet when resolving the // The remote deployed actor might not be created yet when resolving the
// recipient for the first message that is sent to it, best effort retry. // recipient for the first message that is sent to it, best effort retry.
// However, if the retried resolve isn't successful the ref is banned and // However, if the retried resolve isn't successful the ref is banned and
@ -521,19 +516,19 @@ private[remote] class Decoder(
val recipientActorRefPath = headerBuilder.recipientActorRefPath.get val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) { if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {
headerBuilder.recipientActorRefPath match { headerBuilder.recipientActorRefPath.orNull match {
case OptionVal.Some(path) => case null =>
log.warning(
"Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
pull(in)
case path =>
val ref = actorRefResolver.getOrCompute(path) val ref = actorRefResolver.getOrCompute(path)
if (ref.isInstanceOf[EmptyLocalActorRef]) if (ref.isInstanceOf[EmptyLocalActorRef])
log.warning( log.warning(
"Message for banned (terminated, unresolved) remote deployed recipient [{}].", "Message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath) recipientActorRefPath)
push(out, decoded.withRecipient(ref)) push(out, decoded.withRecipient(ref))
case _ =>
log.warning(
"Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
pull(in)
} }
} else } else
@ -593,10 +588,8 @@ private[remote] class Decoder(
.runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath .runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) => case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) =>
resolveRecipient(recipientPath) match { resolveRecipient(recipientPath).orNull match {
case OptionVal.Some(recipient) => case null =>
push(out, inboundEnvelope.withRecipient(recipient))
case _ =>
if (attemptsLeft > 0) if (attemptsLeft > 0)
scheduleOnce( scheduleOnce(
RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope), RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope),
@ -615,6 +608,8 @@ private[remote] class Decoder(
val recipient = actorRefResolver.getOrCompute(recipientPath) val recipient = actorRefResolver.getOrCompute(recipientPath)
push(out, inboundEnvelope.withRecipient(recipient)) push(out, inboundEnvelope.withRecipient(recipient))
} }
case recipient =>
push(out, inboundEnvelope.withRecipient(recipient))
} }
case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown") case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown")
@ -648,12 +643,12 @@ private[remote] class Deserializer(
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None private var _serialization: OptionVal[Serialization] = OptionVal.None
private def serialization: Serialization = _serialization match { private def serialization: Serialization = _serialization.orNull match {
case OptionVal.Some(s) => s case null =>
case _ =>
val s = SerializationExtension(system) val s = SerializationExtension(system)
_serialization = OptionVal.Some(s) _serialization = OptionVal.Some(s)
s s
case s => s
} }
override protected def logSource = classOf[Deserializer] override protected def logSource = classOf[Deserializer]
@ -682,9 +677,9 @@ private[remote] class Deserializer(
push(out, envelopeWithMessage) push(out, envelopeWithMessage)
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>
val from = envelope.association match { val from = envelope.association.orNull match {
case OptionVal.Some(a) => a.remoteAddress case null => "unknown"
case _ => "unknown" case assoc => assoc.remoteAddress
} }
log.warning( log.warning(
"Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}", "Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}",

View file

@ -17,7 +17,7 @@ import akka.annotation.InternalApi
import akka.event.Logging import akka.event.Logging
import akka.japi.function import akka.japi.function
import akka.stream.impl.TraversalBuilder import akka.stream.impl.TraversalBuilder
import akka.util.{ ByteString, OptionVal } import akka.util.ByteString
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
import akka.util.LineNumbers import akka.util.LineNumbers
@ -122,17 +122,14 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
*/ */
def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = { def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = {
@tailrec @tailrec
def find(list: List[Attribute]): OptionVal[Attribute] = list match { def find(list: List[Attribute]): T = list match {
case Nil => OptionVal.None case Nil => throw new IllegalStateException(s"Mandatory attribute [$c] not found")
case head :: tail => case head :: tail =>
if (c.isInstance(head)) OptionVal.Some(head) if (c.isInstance(head)) c.cast(head)
else find(tail) else find(tail)
} }
find(attributeList) match { find(attributeList)
case OptionVal.Some(t) => t.asInstanceOf[T]
case _ => throw new IllegalStateException(s"Mandatory attribute [$c] not found")
}
} }
/** /**

View file

@ -350,8 +350,8 @@ import akka.util.unused
case _ => case _ =>
graph.traversalBuilder match { graph.traversalBuilder match {
case l: LinearTraversalBuilder => case l: LinearTraversalBuilder =>
l.pendingBuilder match { l.pendingBuilder.orNull match {
case OptionVal.Some(a: AtomicTraversalBuilder) => case a: AtomicTraversalBuilder =>
a.module match { a.module match {
case m: GraphStageModule[_, _] => case m: GraphStageModule[_, _] =>
m.stage match { m.stage match {
@ -359,7 +359,8 @@ import akka.util.unused
// It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize.
if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync) if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync)
OptionVal.Some(single) OptionVal.Some(single)
else OptionVal.None else
OptionVal.None
case _ => OptionVal.None case _ => OptionVal.None
} }
case _ => OptionVal.None case _ => OptionVal.None
@ -505,10 +506,9 @@ import akka.util.unused
} }
override def traversal: Traversal = { override def traversal: Traversal = {
val withIsland = islandTag match { val withIsland =
case OptionVal.Some(tag) => EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversalSoFar).concat(ExitIsland)
case _ => traversalSoFar else traversalSoFar
}
if (attributes eq Attributes.none) withIsland if (attributes eq Attributes.none) withIsland
else PushAttributes(attributes).concat(withIsland).concat(PopAttributes) else PushAttributes(attributes).concat(withIsland).concat(PopAttributes)
@ -531,10 +531,8 @@ import akka.util.unused
override def unwiredOuts: Int = 0 override def unwiredOuts: Int = 0
override def makeIsland(islandTag: IslandTag): TraversalBuilder = override def makeIsland(islandTag: IslandTag): TraversalBuilder =
this.islandTag match { if (this.islandTag.isDefined) this
case OptionVal.None => copy(islandTag = OptionVal(islandTag)) else copy(islandTag = OptionVal(islandTag))
case _ => this
}
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder =
throw new UnsupportedOperationException("Cannot assign ports to slots in a completed builder.") throw new UnsupportedOperationException("Cannot assign ports to slots in a completed builder.")
@ -685,13 +683,10 @@ import akka.util.unused
case completed: CompletedTraversalBuilder => case completed: CompletedTraversalBuilder =>
val inOpt = OptionVal(shape.inlets.headOption.orNull) val inOpt = OptionVal(shape.inlets.headOption.orNull)
val inOffs = inOpt match { val inOffs = if (inOpt.isDefined) completed.offsetOf(inOpt.get) else 0
case OptionVal.Some(in) => completed.offsetOf(in)
case _ => 0
}
LinearTraversalBuilder( LinearTraversalBuilder(
inPort = OptionVal(inOpt.orNull), inPort = inOpt,
outPort = OptionVal.None, outPort = OptionVal.None,
inOffset = inOffs, inOffset = inOffs,
inSlots = completed.inSlots, inSlots = completed.inSlots,
@ -702,13 +697,10 @@ import akka.util.unused
case composite => case composite =>
val inOpt = OptionVal(shape.inlets.headOption.orNull) val inOpt = OptionVal(shape.inlets.headOption.orNull)
val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder
val inOffs = inOpt match { val inOffs = if (inOpt.isDefined) composite.offsetOf(inOpt.get) else 0
case OptionVal.Some(in) => composite.offsetOf(in)
case _ => 0
}
LinearTraversalBuilder( LinearTraversalBuilder(
inPort = OptionVal(inOpt.orNull), inPort = inOpt,
outPort = OptionVal.Some(out), outPort = OptionVal.Some(out),
inOffset = inOffs, inOffset = inOffs,
inSlots = composite.inSlots, inSlots = composite.inSlots,
@ -770,10 +762,7 @@ import akka.util.unused
} }
private def applyIslandAndAttributes(t: Traversal): Traversal = { private def applyIslandAndAttributes(t: Traversal): Traversal = {
val withIslandTag = islandTag match { val withIslandTag = if (islandTag.isDefined) EnterIsland(islandTag.get).concat(t).concat(ExitIsland) else t
case OptionVal.Some(tag) => EnterIsland(tag).concat(t).concat(ExitIsland)
case _ => t
}
if (attributes eq Attributes.none) withIslandTag if (attributes eq Attributes.none) withIslandTag
else PushAttributes(attributes).concat(withIslandTag).concat(PopAttributes) else PushAttributes(attributes).concat(withIslandTag).concat(PopAttributes)
@ -797,22 +786,22 @@ import akka.util.unused
*/ */
override def wire(out: OutPort, in: InPort): TraversalBuilder = { override def wire(out: OutPort, in: InPort): TraversalBuilder = {
if (outPort.contains(out) && inPort.contains(in)) { if (outPort.contains(out) && inPort.contains(in)) {
pendingBuilder match { if (pendingBuilder.isDefined) {
case OptionVal.Some(composite) => val composite = pendingBuilder.get
copy( copy(
inPort = OptionVal.None, inPort = OptionVal.None,
outPort = OptionVal.None, outPort = OptionVal.None,
traversalSoFar = applyIslandAndAttributes( traversalSoFar = applyIslandAndAttributes(
beforeBuilder beforeBuilder
.concat(composite.assign(out, inOffset - composite.offsetOfModule(out)).traversal) .concat(composite.assign(out, inOffset - composite.offsetOfModule(out)).traversal)
.concat(traversalSoFar)), .concat(traversalSoFar)),
pendingBuilder = OptionVal.None, pendingBuilder = OptionVal.None,
beforeBuilder = EmptyTraversal) beforeBuilder = EmptyTraversal)
case _ => } else {
copy( copy(
inPort = OptionVal.None, inPort = OptionVal.None,
outPort = OptionVal.None, outPort = OptionVal.None,
traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
} }
} else } else
throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.")
@ -820,10 +809,10 @@ import akka.util.unused
override def offsetOfModule(out: OutPort): Int = { override def offsetOfModule(out: OutPort): Int = {
if (outPort.contains(out)) { if (outPort.contains(out)) {
pendingBuilder match { if (pendingBuilder.isDefined)
case OptionVal.Some(composite) => composite.offsetOfModule(out) pendingBuilder.get.offsetOfModule(out)
case _ => 0 // Output belongs to the last module, which will be materialized *first* else
} 0 // Output belongs to the last module, which will be materialized *first*
} else } else
throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder") throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder")
} }
@ -839,16 +828,16 @@ import akka.util.unused
override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = { override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = {
if (outPort.contains(out)) { if (outPort.contains(out)) {
pendingBuilder match { if (pendingBuilder.isDefined) {
case OptionVal.Some(composite) => val composite = pendingBuilder.get
copy( copy(
outPort = OptionVal.None, outPort = OptionVal.None,
traversalSoFar = applyIslandAndAttributes( traversalSoFar = applyIslandAndAttributes(
beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))), beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))),
pendingBuilder = OptionVal.None, pendingBuilder = OptionVal.None,
beforeBuilder = EmptyTraversal) beforeBuilder = EmptyTraversal)
case _ => } else {
copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
} }
} else } else
throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder")
@ -918,83 +907,82 @@ import akka.util.unused
* Depending whether we have a composite builder as our last step or not, composition will be somewhat * Depending whether we have a composite builder as our last step or not, composition will be somewhat
* different. * different.
*/ */
val assembledTraversalForThis = this.pendingBuilder match { val assembledTraversalForThis = if (this.pendingBuilder.isDefined) {
case OptionVal.Some(composite) => val composite = this.pendingBuilder.get
/*
* This is the case where our last module is a composite, and since it does not have its output port
* wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder.
*
* Since we will wire now the output port, we can assemble everything together:
*/
val out = outPort.get
/*
* Since we will be visited second (and the appended toAppend first), we need to
*
* 1. go back to the start of the composite module, i.e. composite.offsetOfModule(out) steps. This
* is necessary because the composite might not have the internal module as the first visited
* module in the Traversal and hence not have a base offset of 0 in the composite
* 2. go backward toAppend.inSlots slots to reach the beginning offset of toAppend
* 3. now go forward toAppend.inOffset to reach the correct location
*
* <------- (-composite.offsetOfModule(out))
* <-------------- (-toAppend.inSlots)
* -------> (+toAppend.inOffset)
*
* --------in----|-------[out module]----------
* toAppend this
*
*/
val compositeTraversal = composite
.assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset)
.traversal // All output ports are finished, so we can finally call this now
/*
* Now we can assemble the pieces for the final Traversal of _this_ builder.
*
* beforeBuilder ~ compositeTraversal ~ traversalSoFar
*
* (remember that this is the _reverse_ of the Flow DSL order)
*/
beforeBuilder.concat(compositeTraversal).concat(traversalSoFar)
} else {
/*
* This is the case where we are a pure linear builder (all composites have been already completed),
* which means that traversalSoFar contains everything already, except the final attributes and islands
* applied.
*
* Since the exposed output port has been wired optimistically to -1, we need to check if this is correct,
* and correct if necessary. This is the step below:
*/
if (toAppend.inOffset == (toAppend.inSlots - 1)) {
/* /*
* This is the case where our last module is a composite, and since it does not have its output port * if the builder we want to append (remember that is _prepend_ from the Traversal's perspective)
* wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. * has its exposed input port at the last location (which is toAppend.inSlots - 1 because input
* port offsets start with 0), then -1 is the correct wiring. I.e.
* *
* Since we will wire now the output port, we can assemble everything together: * 1. Visit the appended module first in the traversal, its input port is the last
* 2. Visit this module second in the traversal, wire the output port back to the previous input port (-1)
*/ */
val out = outPort.get traversalSoFar
} else {
/* /*
* The optimistic mapping to -1 is not correct, we need to unfold the Traversal to find our last module
* (which is the _first_ module in the Traversal) and rewire the output assignment to the correct offset.
*
* Since we will be visited second (and the appended toAppend first), we need to * Since we will be visited second (and the appended toAppend first), we need to
* *
* 1. go back to the start of the composite module, i.e. composite.offsetOfModule(out) steps. This * 1. go backward toAppend.inSlots slots to reach the beginning offset of toAppend
* is necessary because the composite might not have the internal module as the first visited * 2. now go forward toAppend.inOffset to reach the correct location
* module in the Traversal and hence not have a base offset of 0 in the composite
* 2. go backward toAppend.inSlots slots to reach the beginning offset of toAppend
* 3. now go forward toAppend.inOffset to reach the correct location
* *
* <------- (-composite.offsetOfModule(out)) * <-------------- (-toAppend.inSlots)
* <-------------- (-toAppend.inSlots) * -------> (+toAppend.inOffset)
* -------> (+toAppend.inOffset)
* *
* --------in----|-------[out module]---------- * --------in----|[out module]----------
* toAppend this * toAppend this
* *
*/ */
val compositeTraversal = composite rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots)
.assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset) }
.traversal // All output ports are finished, so we can finally call this now
/*
* Now we can assemble the pieces for the final Traversal of _this_ builder.
*
* beforeBuilder ~ compositeTraversal ~ traversalSoFar
*
* (remember that this is the _reverse_ of the Flow DSL order)
*/
beforeBuilder.concat(compositeTraversal).concat(traversalSoFar)
case _ =>
/*
* This is the case where we are a pure linear builder (all composites have been already completed),
* which means that traversalSoFar contains everything already, except the final attributes and islands
* applied.
*
* Since the exposed output port has been wired optimistically to -1, we need to check if this is correct,
* and correct if necessary. This is the step below:
*/
if (toAppend.inOffset == (toAppend.inSlots - 1)) {
/*
* if the builder we want to append (remember that is _prepend_ from the Traversal's perspective)
* has its exposed input port at the last location (which is toAppend.inSlots - 1 because input
* port offsets start with 0), then -1 is the correct wiring. I.e.
*
* 1. Visit the appended module first in the traversal, its input port is the last
* 2. Visit this module second in the traversal, wire the output port back to the previous input port (-1)
*/
traversalSoFar
} else {
/*
* The optimistic mapping to -1 is not correct, we need to unfold the Traversal to find our last module
* (which is the _first_ module in the Traversal) and rewire the output assignment to the correct offset.
*
* Since we will be visited second (and the appended toAppend first), we need to
*
* 1. go backward toAppend.inSlots slots to reach the beginning offset of toAppend
* 2. now go forward toAppend.inOffset to reach the correct location
*
* <-------------- (-toAppend.inSlots)
* -------> (+toAppend.inOffset)
*
* --------in----|[out module]----------
* toAppend this
*
*/
rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots)
}
} }
/* /*
@ -1009,79 +997,75 @@ import akka.util.unused
* We have finished "this" builder, now we need to construct the new builder as the result of appending. * We have finished "this" builder, now we need to construct the new builder as the result of appending.
* There are two variants, depending whether toAppend is purely linear or if it has a composite at the end. * There are two variants, depending whether toAppend is purely linear or if it has a composite at the end.
*/ */
toAppend.pendingBuilder match { if (toAppend.pendingBuilder.isEmpty) {
case OptionVal.None => /*
/* * This is the simple case, when the other is purely linear. We just concatenate the traversals
* This is the simple case, when the other is purely linear. We just concatenate the traversals * and do some bookkeeping.
* and do some bookkeeping. */
*/ LinearTraversalBuilder(
LinearTraversalBuilder( inPort = inPort,
inPort = inPort, outPort = toAppend.outPort,
outPort = toAppend.outPort, inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before
inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_
// the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ inOffset = inOffset + toAppend.inSlots,
inOffset = inOffset + toAppend.inSlots, // Build in reverse so it yields a more efficient layout for left-to-right building
// Build in reverse so it yields a more efficient layout for left-to-right building traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis),
traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), pendingBuilder = OptionVal.None,
pendingBuilder = OptionVal.None, attributes = Attributes.none, // attributes are none for the new enclosing builder
attributes = Attributes.none, // attributes are none for the new enclosing builder beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites
beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder )
) } else {
/*
* In this case we need to assemble as much as we can, and create a new "sandwich" of
* beforeBuilder ~ pendingBuilder ~ traversalSoFar
*
* We need to apply the attributes and islandTags of the appended builder, but we cannot do it in one
* step, instead we need to append half of the steps to traversalSoFar, and the other half to
* beforeBuilder.
*/
var newTraversalSoFar = finalTraversalForThis
var newBeforeTraversal = toAppend.beforeBuilder
case _ => // Some(pendingBuilder) // First prepare island enter and exit if tags are present
/* if (toAppend.islandTag.isDefined) {
* In this case we need to assemble as much as we can, and create a new "sandwich" of // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps)
* beforeBuilder ~ pendingBuilder ~ traversalSoFar newBeforeTraversal = EnterIsland(toAppend.islandTag.get).concat(newBeforeTraversal)
* // Exit the island just after the appended builder (they should not applied to _this_ builder)
* We need to apply the attributes and islandTags of the appended builder, but we cannot do it in one newTraversalSoFar = ExitIsland.concat(newTraversalSoFar)
* step, instead we need to append half of the steps to traversalSoFar, and the other half to }
* beforeBuilder.
*/
var newTraversalSoFar = finalTraversalForThis
var newBeforeTraversal = toAppend.beforeBuilder
// First prepare island enter and exit if tags are present // Secondly, prepare attribute push and pop if Attributes are present
toAppend.islandTag match { if (toAppend.attributes ne Attributes.none) {
case OptionVal.Some(tag) => // Push the attributes just before the appended builder.
// Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) newBeforeTraversal = PushAttributes(toAppend.attributes).concat(newBeforeTraversal)
newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal) // Pop the attributes immediately after the appended builder (they should not applied to _this_ builder)
// Exit the island just after the appended builder (they should not applied to _this_ builder) newTraversalSoFar = PopAttributes.concat(newTraversalSoFar)
newTraversalSoFar = ExitIsland.concat(newTraversalSoFar) }
case _ => // Nothing changes
}
// Secondly, prepare attribute push and pop if Attributes are present // This is roughly how things will look like in the end:
if (toAppend.attributes ne Attributes.none) { //
// Push the attributes just before the appended builder. // newBeforeTraversal newTraversalSoFar
newBeforeTraversal = PushAttributes(toAppend.attributes).concat(newBeforeTraversal) // [PushAttributes ~ EnterIsland] ~ composite ~ [toAppend.traversalSoFar ~ ExitIsland ~ PopAttributes ~ finalTraversalForThis]
// Pop the attributes immediately after the appended builder (they should not applied to _this_ builder)
newTraversalSoFar = PopAttributes.concat(newTraversalSoFar)
}
// This is roughly how things will look like in the end: // Finally add the already completed part of toAppend to newTraversalSoFar
// newTraversalSoFar = toAppend.traversalSoFar.concat(newTraversalSoFar)
// newBeforeTraversal newTraversalSoFar
// [PushAttributes ~ EnterIsland] ~ composite ~ [toAppend.traversalSoFar ~ ExitIsland ~ PopAttributes ~ finalTraversalForThis]
// Finally add the already completed part of toAppend to newTraversalSoFar LinearTraversalBuilder(
newTraversalSoFar = toAppend.traversalSoFar.concat(newTraversalSoFar) inPort = inPort,
outPort = toAppend.outPort,
LinearTraversalBuilder( inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before
inPort = inPort, // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_
outPort = toAppend.outPort, inOffset = inOffset + toAppend.inSlots,
inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before // Build in reverse so it yields a more efficient layout for left-to-right building. We cannot
// the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ // apply the full traversal, only the completed part of it
inOffset = inOffset + toAppend.inSlots, traversalSoFar = newTraversalSoFar,
// Build in reverse so it yields a more efficient layout for left-to-right building. We cannot // Last composite of toAppend is still pending
// apply the full traversal, only the completed part of it pendingBuilder = toAppend.pendingBuilder,
traversalSoFar = newTraversalSoFar, attributes = Attributes.none, // attributes are none for the new enclosing builder
// Last composite of toAppend is still pending beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites
pendingBuilder = toAppend.pendingBuilder, islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
attributes = Attributes.none, // attributes are none for the new enclosing builder )
beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites
islandTag = OptionVal.None // islandTag is reset for the new enclosing builder
)
} }
} else throw new Exception("should this happen?") } else throw new Exception("should this happen?")
@ -1098,11 +1082,9 @@ import akka.util.unused
* between islands. * between islands.
*/ */
override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder = override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder =
this.islandTag match { if (this.islandTag.isDefined)
case OptionVal.Some(_) => this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted else copy(islandTag = OptionVal.Some(islandTag))
case _ => copy(islandTag = OptionVal.Some(islandTag))
}
} }
/** /**
@ -1206,10 +1188,9 @@ import akka.util.unused
remaining = remaining.tail remaining = remaining.tail
} }
val finalTraversal = islandTag match { val finalTraversal =
case OptionVal.Some(tag) => EnterIsland(tag).concat(traversal).concat(ExitIsland) if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversal).concat(ExitIsland)
case _ => traversal else traversal
}
// The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it // The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it
// to be embedded in a larger graph, making partial graph reuse much more efficient. // to be embedded in a larger graph, making partial graph reuse much more efficient.
@ -1344,10 +1325,8 @@ import akka.util.unused
} }
override def makeIsland(islandTag: IslandTag): TraversalBuilder = { override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
this.islandTag match { if (this.islandTag.isDefined)
case OptionVal.None => copy(islandTag = OptionVal(islandTag)) this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
case _ => else copy(islandTag = OptionVal(islandTag))
this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted
}
} }
} }