From 62139a22207038dde9dabd373b0bda4e43522254 Mon Sep 17 00:00:00 2001 From: Muskan Gupta Date: Fri, 22 Oct 2021 19:57:44 +0530 Subject: [PATCH] Remove OptionVal workaround (#30789) * Remove OptionVal workaround * Revert "Avoid pattern-matching on OptionVal since Scala 2.13 allocates when checking the pattern" This reverts commit f0194bbc1ad43ac2c79bf156bfe91adf7fd5e538. * Revert "Optimizes retrieval of mandatoryAttributes by removing potential allocation of OptionVal" This reverts commit 165b0e0d5c057965e37418299061bdf48c33fc44. --- .../typed/internal/ActorContextImpl.scala | 50 +-- .../scala/akka/remote/artery/Codecs.scala | 108 +++-- .../main/scala/akka/stream/Attributes.scala | 13 +- .../akka/stream/impl/TraversalBuilder.scala | 387 +++++++++--------- 4 files changed, 290 insertions(+), 268 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index c113fb07ce..546a18ed32 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -103,13 +103,13 @@ import scala.util.Success private var _currentActorThread: OptionVal[Thread] = OptionVal.None // context-shared timer needed to allow for nested timer usage - def timer: TimerSchedulerCrossDslSupport[T] = _timer.orNull match { - case null => + def timer: TimerSchedulerCrossDslSupport[T] = _timer match { + case OptionVal.Some(timer) => timer + case _ => checkCurrentActorThread() val timer = mkTimer() _timer = OptionVal.Some(timer) timer - case timer => timer } protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this) @@ -150,14 +150,14 @@ import scala.util.Success private def loggingContext(): LoggingContext = { // lazy init of logging setup - _logging.orNull match { - case null => + _logging match { + case OptionVal.Some(l) => l + case _ => val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]]) val logger = LoggerFactory.getLogger(logClass.getName) val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this) _logging = OptionVal.Some(l) l - case l => l } } @@ -183,10 +183,11 @@ import scala.util.Success // MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message override private[akka] def clearMdc(): Unit = { // avoid access to MDC ThreadLocal if not needed, see details in LoggingContext - val ctx = _logging.orNull - if ((ctx ne null) && ctx.mdcUsed) { - ActorMdc.clearMdc() - ctx.mdcUsed = false + _logging match { + case OptionVal.Some(ctx) if ctx.mdcUsed => + ActorMdc.clearMdc() + ctx.mdcUsed = false + case _ => } } @@ -296,14 +297,14 @@ import scala.util.Success val boxedMessageClass = BoxedType(messageClass).asInstanceOf[Class[U]] _messageAdapters = (boxedMessageClass, f.asInstanceOf[Any => T]) :: _messageAdapters.filterNot { case (cls, _) => cls == boxedMessageClass } - val ref = messageAdapterRef.orNull match { - case null => + val ref = messageAdapterRef match { + case OptionVal.Some(ref) => ref.asInstanceOf[ActorRef[U]] + case _ => // AdaptMessage is not really a T, but that is erased val ref = internalSpawnMessageAdapter[Any](msg => AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter") messageAdapterRef = OptionVal.Some(ref) ref - case ref => ref.asInstanceOf[ActorRef[U]] } ref.asInstanceOf[ActorRef[U]] } @@ -317,14 +318,13 @@ import scala.util.Success * INTERNAL API */ @InternalApi private[akka] def setCurrentActorThread(): Unit = { - val callerThread = Thread.currentThread() - _currentActorThread.orNull match { - case null => - _currentActorThread = OptionVal.Some(callerThread) - case t => + _currentActorThread match { + case OptionVal.Some(t) => throw new IllegalStateException( s"Invalid access by thread from the outside of $self. " + - s"Current message is processed by $t, but also accessed from $callerThread.") + s"Current message is processed by $t, but also accessed from ${Thread.currentThread()}.") + case _ => + _currentActorThread = OptionVal.Some(Thread.currentThread()) } } @@ -340,17 +340,17 @@ import scala.util.Success */ @InternalApi private[akka] def checkCurrentActorThread(): Unit = { val callerThread = Thread.currentThread() - _currentActorThread.orNull match { - case null => - throw new UnsupportedOperationException( - s"Unsupported access to ActorContext from the outside of $self. " + - s"No message is currently processed by the actor, but ActorContext was called from $callerThread.") - case t => + _currentActorThread match { + case OptionVal.Some(t) => if (callerThread ne t) { throw new UnsupportedOperationException( s"Unsupported access to ActorContext operation from the outside of $self. " + s"Current message is processed by $t, but ActorContext was called from $callerThread.") } + case _ => + throw new UnsupportedOperationException( + s"Unsupported access to ActorContext from the outside of $self. " + + s"No message is currently processed by the actor, but ActorContext was called from $callerThread.") } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 711ffafc55..698b9ccbf0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -82,12 +82,12 @@ private[remote] class Encoder( // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized private var _serialization: OptionVal[Serialization] = OptionVal.None - private def serialization: Serialization = _serialization.orNull match { - case null => + private def serialization: Serialization = _serialization match { + case OptionVal.Some(s) => s + case _ => val s = SerializationExtension(system) _serialization = OptionVal.Some(s) s - case s => s } private val instruments: RemoteInstruments = RemoteInstruments(system) @@ -130,14 +130,14 @@ private[remote] class Encoder( Serialization.currentTransportInformation.value = serialization.serializationInformation // internally compression is applied by the builder: - outboundEnvelope.recipient.orNull match { - case null => headerBuilder.setNoRecipient() - case r => headerBuilder.setRecipientActorRef(r) + outboundEnvelope.recipient match { + case OptionVal.Some(r) => headerBuilder.setRecipientActorRef(r) + case _ => headerBuilder.setNoRecipient() } - outboundEnvelope.sender.orNull match { - case null => headerBuilder.setNoSender() - case s => headerBuilder.setSenderActorRef(s) + outboundEnvelope.sender match { + case OptionVal.Some(s) => headerBuilder.setSenderActorRef(s) + case _ => headerBuilder.setNoSender() } val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 @@ -180,18 +180,18 @@ private[remote] class Encoder( new OversizedPayloadException(reasonText), "Failed to serialize oversized message [{}].", Logging.messageClassName(outboundEnvelope.message)) - system.eventStream.publish(outboundEnvelope.sender.orNull match { - case null => - Dropped( - outboundEnvelope.message, - reasonText, - outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) - case msgSender => + system.eventStream.publish(outboundEnvelope.sender match { + case OptionVal.Some(msgSender) => Dropped( outboundEnvelope.message, reasonText, msgSender, outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) + case _ => + Dropped( + outboundEnvelope.message, + reasonText, + outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) }) pull(in) case _ => @@ -415,14 +415,13 @@ private[remote] class Decoder( val originUid = headerBuilder.uid val association = inboundContext.association(originUid) - val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid).orNull match { - case null => - headerBuilder.recipientActorRefPath.orNull match { - case null => OptionVal.None - case path => resolveRecipient(path) - } - case ref => - OptionVal.Some(ref.asInstanceOf[InternalActorRef]) + val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid) match { + case OptionVal.Some(ref) => + OptionVal(ref.asInstanceOf[InternalActorRef]) + case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined => + resolveRecipient(headerBuilder.recipientActorRefPath.get) + case _ => + OptionVal.None } catch { case NonFatal(e) => // probably version mismatch due to restarted system @@ -430,14 +429,13 @@ private[remote] class Decoder( OptionVal.None } - val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid).orNull match { - case null => - headerBuilder.senderActorRefPath.orNull match { - case null => OptionVal.None - case path => OptionVal(actorRefResolver.resolve(path)) - } - case ref => - OptionVal.Some(ref.asInstanceOf[InternalActorRef]) + val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid) match { + case OptionVal.Some(ref) => + OptionVal(ref.asInstanceOf[InternalActorRef]) + case OptionVal.None if headerBuilder.senderActorRefPath.isDefined => + OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get)) + case _ => + OptionVal.None } catch { case NonFatal(e) => // probably version mismatch due to restarted system @@ -475,12 +473,8 @@ private[remote] class Decoder( if ((messageCount & heavyHitterMask) == 0) { // --- hit refs and manifests for heavy-hitter counting - association.orNull match { - case null => - // we don't want to record hits for compression while handshake is still in progress. - log.debug( - "Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") - case assoc => + association match { + case OptionVal.Some(assoc) => val remoteAddress = assoc.remoteAddress if (sender.isDefined) compressions.hitActorRef(originUid, remoteAddress, sender.get, 1) @@ -489,6 +483,10 @@ private[remote] class Decoder( compressions.hitActorRef(originUid, remoteAddress, recipient.get, 1) compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1) + case _ => + // we don't want to record hits for compression while handshake is still in progress. + log.debug( + "Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") } // --- end of hit refs and manifests for heavy-hitter counting } @@ -516,19 +514,19 @@ private[remote] class Decoder( val recipientActorRefPath = headerBuilder.recipientActorRefPath.get if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) { - headerBuilder.recipientActorRefPath.orNull match { - case null => - log.warning( - "Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].", - recipientActorRefPath) - pull(in) - case path => + headerBuilder.recipientActorRefPath match { + case OptionVal.Some(path) => val ref = actorRefResolver.getOrCompute(path) if (ref.isInstanceOf[EmptyLocalActorRef]) log.warning( "Message for banned (terminated, unresolved) remote deployed recipient [{}].", recipientActorRefPath) push(out, decoded.withRecipient(ref)) + case _ => + log.warning( + "Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].", + recipientActorRefPath) + pull(in) } } else @@ -588,8 +586,10 @@ private[remote] class Decoder( .runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) => - resolveRecipient(recipientPath).orNull match { - case null => + resolveRecipient(recipientPath) match { + case OptionVal.Some(recipient) => + push(out, inboundEnvelope.withRecipient(recipient)) + case _ => if (attemptsLeft > 0) scheduleOnce( RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope), @@ -608,8 +608,6 @@ private[remote] class Decoder( val recipient = actorRefResolver.getOrCompute(recipientPath) push(out, inboundEnvelope.withRecipient(recipient)) } - case recipient => - push(out, inboundEnvelope.withRecipient(recipient)) } case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown") @@ -643,12 +641,12 @@ private[remote] class Deserializer( // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized private var _serialization: OptionVal[Serialization] = OptionVal.None - private def serialization: Serialization = _serialization.orNull match { - case null => + private def serialization: Serialization = _serialization match { + case OptionVal.Some(s) => s + case _ => val s = SerializationExtension(system) _serialization = OptionVal.Some(s) s - case s => s } override protected def logSource = classOf[Deserializer] @@ -677,9 +675,9 @@ private[remote] class Deserializer( push(out, envelopeWithMessage) } catch { case NonFatal(e) => - val from = envelope.association.orNull match { - case null => "unknown" - case assoc => assoc.remoteAddress + val from = envelope.association match { + case OptionVal.Some(a) => a.remoteAddress + case _ => "unknown" } log.warning( "Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}", diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index efa549001f..8841ba4312 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -17,7 +17,7 @@ import akka.annotation.InternalApi import akka.event.Logging import akka.japi.function import akka.stream.impl.TraversalBuilder -import akka.util.ByteString +import akka.util.{ ByteString, OptionVal } import akka.util.JavaDurationConverters._ import akka.util.LineNumbers @@ -122,14 +122,17 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { */ def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = { @tailrec - def find(list: List[Attribute]): T = list match { - case Nil => throw new IllegalStateException(s"Mandatory attribute [$c] not found") + def find(list: List[Attribute]): OptionVal[Attribute] = list match { + case Nil => OptionVal.None case head :: tail => - if (c.isInstance(head)) c.cast(head) + if (c.isInstance(head)) OptionVal.Some(head) else find(tail) } - find(attributeList) + find(attributeList) match { + case OptionVal.Some(t) => t.asInstanceOf[T] + case _ => throw new IllegalStateException(s"Mandatory attribute [$c] not found") + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 0b41bce73d..44e6dcd892 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -350,8 +350,8 @@ import akka.util.unused case _ => graph.traversalBuilder match { case l: LinearTraversalBuilder => - l.pendingBuilder.orNull match { - case a: AtomicTraversalBuilder => + l.pendingBuilder match { + case OptionVal.Some(a: AtomicTraversalBuilder) => a.module match { case m: GraphStageModule[_, _] => m.stage match { @@ -359,8 +359,7 @@ import akka.util.unused // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync) OptionVal.Some(single) - else - OptionVal.None + else OptionVal.None case _ => OptionVal.None } case _ => OptionVal.None @@ -506,9 +505,10 @@ import akka.util.unused } override def traversal: Traversal = { - val withIsland = - if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversalSoFar).concat(ExitIsland) - else traversalSoFar + val withIsland = islandTag match { + case OptionVal.Some(tag) => EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) + case _ => traversalSoFar + } if (attributes eq Attributes.none) withIsland else PushAttributes(attributes).concat(withIsland).concat(PopAttributes) @@ -531,8 +531,10 @@ import akka.util.unused override def unwiredOuts: Int = 0 override def makeIsland(islandTag: IslandTag): TraversalBuilder = - if (this.islandTag.isDefined) this - else copy(islandTag = OptionVal(islandTag)) + this.islandTag match { + case OptionVal.None => copy(islandTag = OptionVal(islandTag)) + case _ => this + } override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = throw new UnsupportedOperationException("Cannot assign ports to slots in a completed builder.") @@ -683,10 +685,13 @@ import akka.util.unused case completed: CompletedTraversalBuilder => val inOpt = OptionVal(shape.inlets.headOption.orNull) - val inOffs = if (inOpt.isDefined) completed.offsetOf(inOpt.get) else 0 + val inOffs = inOpt match { + case OptionVal.Some(in) => completed.offsetOf(in) + case _ => 0 + } LinearTraversalBuilder( - inPort = inOpt, + inPort = OptionVal(inOpt.orNull), outPort = OptionVal.None, inOffset = inOffs, inSlots = completed.inSlots, @@ -697,10 +702,13 @@ import akka.util.unused case composite => val inOpt = OptionVal(shape.inlets.headOption.orNull) val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder - val inOffs = if (inOpt.isDefined) composite.offsetOf(inOpt.get) else 0 + val inOffs = inOpt match { + case OptionVal.Some(in) => composite.offsetOf(in) + case _ => 0 + } LinearTraversalBuilder( - inPort = inOpt, + inPort = OptionVal(inOpt.orNull), outPort = OptionVal.Some(out), inOffset = inOffs, inSlots = composite.inSlots, @@ -762,7 +770,10 @@ import akka.util.unused } private def applyIslandAndAttributes(t: Traversal): Traversal = { - val withIslandTag = if (islandTag.isDefined) EnterIsland(islandTag.get).concat(t).concat(ExitIsland) else t + val withIslandTag = islandTag match { + case OptionVal.Some(tag) => EnterIsland(tag).concat(t).concat(ExitIsland) + case _ => t + } if (attributes eq Attributes.none) withIslandTag else PushAttributes(attributes).concat(withIslandTag).concat(PopAttributes) @@ -786,22 +797,22 @@ import akka.util.unused */ override def wire(out: OutPort, in: InPort): TraversalBuilder = { if (outPort.contains(out) && inPort.contains(in)) { - if (pendingBuilder.isDefined) { - val composite = pendingBuilder.get - copy( - inPort = OptionVal.None, - outPort = OptionVal.None, - traversalSoFar = applyIslandAndAttributes( - beforeBuilder - .concat(composite.assign(out, inOffset - composite.offsetOfModule(out)).traversal) - .concat(traversalSoFar)), - pendingBuilder = OptionVal.None, - beforeBuilder = EmptyTraversal) - } else { - copy( - inPort = OptionVal.None, - outPort = OptionVal.None, - traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) + pendingBuilder match { + case OptionVal.Some(composite) => + copy( + inPort = OptionVal.None, + outPort = OptionVal.None, + traversalSoFar = applyIslandAndAttributes( + beforeBuilder + .concat(composite.assign(out, inOffset - composite.offsetOfModule(out)).traversal) + .concat(traversalSoFar)), + pendingBuilder = OptionVal.None, + beforeBuilder = EmptyTraversal) + case _ => + copy( + inPort = OptionVal.None, + outPort = OptionVal.None, + traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } } else throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") @@ -809,10 +820,10 @@ import akka.util.unused override def offsetOfModule(out: OutPort): Int = { if (outPort.contains(out)) { - if (pendingBuilder.isDefined) - pendingBuilder.get.offsetOfModule(out) - else - 0 // Output belongs to the last module, which will be materialized *first* + pendingBuilder match { + case OptionVal.Some(composite) => composite.offsetOfModule(out) + case _ => 0 // Output belongs to the last module, which will be materialized *first* + } } else throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder") } @@ -828,16 +839,16 @@ import akka.util.unused override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = { if (outPort.contains(out)) { - if (pendingBuilder.isDefined) { - val composite = pendingBuilder.get - copy( - outPort = OptionVal.None, - traversalSoFar = applyIslandAndAttributes( - beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))), - pendingBuilder = OptionVal.None, - beforeBuilder = EmptyTraversal) - } else { - copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) + pendingBuilder match { + case OptionVal.Some(composite) => + copy( + outPort = OptionVal.None, + traversalSoFar = applyIslandAndAttributes( + beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))), + pendingBuilder = OptionVal.None, + beforeBuilder = EmptyTraversal) + case _ => + copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } } else throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") @@ -907,82 +918,83 @@ import akka.util.unused * Depending whether we have a composite builder as our last step or not, composition will be somewhat * different. */ - val assembledTraversalForThis = if (this.pendingBuilder.isDefined) { - val composite = this.pendingBuilder.get - /* - * This is the case where our last module is a composite, and since it does not have its output port - * wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. - * - * Since we will wire now the output port, we can assemble everything together: - */ - val out = outPort.get - /* - * Since we will be visited second (and the appended toAppend first), we need to - * - * 1. go back to the start of the composite module, i.e. composite.offsetOfModule(out) steps. This - * is necessary because the composite might not have the internal module as the first visited - * module in the Traversal and hence not have a base offset of 0 in the composite - * 2. go backward toAppend.inSlots slots to reach the beginning offset of toAppend - * 3. now go forward toAppend.inOffset to reach the correct location - * - * <------- (-composite.offsetOfModule(out)) - * <-------------- (-toAppend.inSlots) - * -------> (+toAppend.inOffset) - * - * --------in----|-------[out module]---------- - * toAppend this - * - */ - val compositeTraversal = composite - .assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset) - .traversal // All output ports are finished, so we can finally call this now - - /* - * Now we can assemble the pieces for the final Traversal of _this_ builder. - * - * beforeBuilder ~ compositeTraversal ~ traversalSoFar - * - * (remember that this is the _reverse_ of the Flow DSL order) - */ - beforeBuilder.concat(compositeTraversal).concat(traversalSoFar) - } else { - /* - * This is the case where we are a pure linear builder (all composites have been already completed), - * which means that traversalSoFar contains everything already, except the final attributes and islands - * applied. - * - * Since the exposed output port has been wired optimistically to -1, we need to check if this is correct, - * and correct if necessary. This is the step below: - */ - if (toAppend.inOffset == (toAppend.inSlots - 1)) { + val assembledTraversalForThis = this.pendingBuilder match { + case OptionVal.Some(composite) => /* - * if the builder we want to append (remember that is _prepend_ from the Traversal's perspective) - * has its exposed input port at the last location (which is toAppend.inSlots - 1 because input - * port offsets start with 0), then -1 is the correct wiring. I.e. + * This is the case where our last module is a composite, and since it does not have its output port + * wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. * - * 1. Visit the appended module first in the traversal, its input port is the last - * 2. Visit this module second in the traversal, wire the output port back to the previous input port (-1) + * Since we will wire now the output port, we can assemble everything together: */ - traversalSoFar - } else { + val out = outPort.get /* - * The optimistic mapping to -1 is not correct, we need to unfold the Traversal to find our last module - * (which is the _first_ module in the Traversal) and rewire the output assignment to the correct offset. - * * Since we will be visited second (and the appended toAppend first), we need to * - * 1. go backward toAppend.inSlots slots to reach the beginning offset of toAppend - * 2. now go forward toAppend.inOffset to reach the correct location + * 1. go back to the start of the composite module, i.e. composite.offsetOfModule(out) steps. This + * is necessary because the composite might not have the internal module as the first visited + * module in the Traversal and hence not have a base offset of 0 in the composite + * 2. go backward toAppend.inSlots slots to reach the beginning offset of toAppend + * 3. now go forward toAppend.inOffset to reach the correct location * - * <-------------- (-toAppend.inSlots) - * -------> (+toAppend.inOffset) + * <------- (-composite.offsetOfModule(out)) + * <-------------- (-toAppend.inSlots) + * -------> (+toAppend.inOffset) * - * --------in----|[out module]---------- - * toAppend this + * --------in----|-------[out module]---------- + * toAppend this * */ - rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots) - } + val compositeTraversal = composite + .assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset) + .traversal // All output ports are finished, so we can finally call this now + + /* + * Now we can assemble the pieces for the final Traversal of _this_ builder. + * + * beforeBuilder ~ compositeTraversal ~ traversalSoFar + * + * (remember that this is the _reverse_ of the Flow DSL order) + */ + beforeBuilder.concat(compositeTraversal).concat(traversalSoFar) + + case _ => + /* + * This is the case where we are a pure linear builder (all composites have been already completed), + * which means that traversalSoFar contains everything already, except the final attributes and islands + * applied. + * + * Since the exposed output port has been wired optimistically to -1, we need to check if this is correct, + * and correct if necessary. This is the step below: + */ + if (toAppend.inOffset == (toAppend.inSlots - 1)) { + /* + * if the builder we want to append (remember that is _prepend_ from the Traversal's perspective) + * has its exposed input port at the last location (which is toAppend.inSlots - 1 because input + * port offsets start with 0), then -1 is the correct wiring. I.e. + * + * 1. Visit the appended module first in the traversal, its input port is the last + * 2. Visit this module second in the traversal, wire the output port back to the previous input port (-1) + */ + traversalSoFar + } else { + /* + * The optimistic mapping to -1 is not correct, we need to unfold the Traversal to find our last module + * (which is the _first_ module in the Traversal) and rewire the output assignment to the correct offset. + * + * Since we will be visited second (and the appended toAppend first), we need to + * + * 1. go backward toAppend.inSlots slots to reach the beginning offset of toAppend + * 2. now go forward toAppend.inOffset to reach the correct location + * + * <-------------- (-toAppend.inSlots) + * -------> (+toAppend.inOffset) + * + * --------in----|[out module]---------- + * toAppend this + * + */ + rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots) + } } /* @@ -997,75 +1009,79 @@ import akka.util.unused * We have finished "this" builder, now we need to construct the new builder as the result of appending. * There are two variants, depending whether toAppend is purely linear or if it has a composite at the end. */ - if (toAppend.pendingBuilder.isEmpty) { - /* - * This is the simple case, when the other is purely linear. We just concatenate the traversals - * and do some bookkeeping. - */ - LinearTraversalBuilder( - inPort = inPort, - outPort = toAppend.outPort, - inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before - // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ - inOffset = inOffset + toAppend.inSlots, - // Build in reverse so it yields a more efficient layout for left-to-right building - traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), - pendingBuilder = OptionVal.None, - attributes = Attributes.none, // attributes are none for the new enclosing builder - beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites - islandTag = OptionVal.None // islandTag is reset for the new enclosing builder - ) - } else { - /* - * In this case we need to assemble as much as we can, and create a new "sandwich" of - * beforeBuilder ~ pendingBuilder ~ traversalSoFar - * - * We need to apply the attributes and islandTags of the appended builder, but we cannot do it in one - * step, instead we need to append half of the steps to traversalSoFar, and the other half to - * beforeBuilder. - */ - var newTraversalSoFar = finalTraversalForThis - var newBeforeTraversal = toAppend.beforeBuilder + toAppend.pendingBuilder match { + case OptionVal.None => + /* + * This is the simple case, when the other is purely linear. We just concatenate the traversals + * and do some bookkeeping. + */ + LinearTraversalBuilder( + inPort = inPort, + outPort = toAppend.outPort, + inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before + // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ + inOffset = inOffset + toAppend.inSlots, + // Build in reverse so it yields a more efficient layout for left-to-right building + traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), + pendingBuilder = OptionVal.None, + attributes = Attributes.none, // attributes are none for the new enclosing builder + beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder + ) - // First prepare island enter and exit if tags are present - if (toAppend.islandTag.isDefined) { - // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) - newBeforeTraversal = EnterIsland(toAppend.islandTag.get).concat(newBeforeTraversal) - // Exit the island just after the appended builder (they should not applied to _this_ builder) - newTraversalSoFar = ExitIsland.concat(newTraversalSoFar) - } + case _ => // Some(pendingBuilder) + /* + * In this case we need to assemble as much as we can, and create a new "sandwich" of + * beforeBuilder ~ pendingBuilder ~ traversalSoFar + * + * We need to apply the attributes and islandTags of the appended builder, but we cannot do it in one + * step, instead we need to append half of the steps to traversalSoFar, and the other half to + * beforeBuilder. + */ + var newTraversalSoFar = finalTraversalForThis + var newBeforeTraversal = toAppend.beforeBuilder - // Secondly, prepare attribute push and pop if Attributes are present - if (toAppend.attributes ne Attributes.none) { - // Push the attributes just before the appended builder. - newBeforeTraversal = PushAttributes(toAppend.attributes).concat(newBeforeTraversal) - // Pop the attributes immediately after the appended builder (they should not applied to _this_ builder) - newTraversalSoFar = PopAttributes.concat(newTraversalSoFar) - } + // First prepare island enter and exit if tags are present + toAppend.islandTag match { + case OptionVal.Some(tag) => + // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) + newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal) + // Exit the island just after the appended builder (they should not applied to _this_ builder) + newTraversalSoFar = ExitIsland.concat(newTraversalSoFar) + case _ => // Nothing changes + } - // This is roughly how things will look like in the end: - // - // newBeforeTraversal newTraversalSoFar - // [PushAttributes ~ EnterIsland] ~ composite ~ [toAppend.traversalSoFar ~ ExitIsland ~ PopAttributes ~ finalTraversalForThis] + // Secondly, prepare attribute push and pop if Attributes are present + if (toAppend.attributes ne Attributes.none) { + // Push the attributes just before the appended builder. + newBeforeTraversal = PushAttributes(toAppend.attributes).concat(newBeforeTraversal) + // Pop the attributes immediately after the appended builder (they should not applied to _this_ builder) + newTraversalSoFar = PopAttributes.concat(newTraversalSoFar) + } - // Finally add the already completed part of toAppend to newTraversalSoFar - newTraversalSoFar = toAppend.traversalSoFar.concat(newTraversalSoFar) + // This is roughly how things will look like in the end: + // + // newBeforeTraversal newTraversalSoFar + // [PushAttributes ~ EnterIsland] ~ composite ~ [toAppend.traversalSoFar ~ ExitIsland ~ PopAttributes ~ finalTraversalForThis] - LinearTraversalBuilder( - inPort = inPort, - outPort = toAppend.outPort, - inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before - // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ - inOffset = inOffset + toAppend.inSlots, - // Build in reverse so it yields a more efficient layout for left-to-right building. We cannot - // apply the full traversal, only the completed part of it - traversalSoFar = newTraversalSoFar, - // Last composite of toAppend is still pending - pendingBuilder = toAppend.pendingBuilder, - attributes = Attributes.none, // attributes are none for the new enclosing builder - beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites - islandTag = OptionVal.None // islandTag is reset for the new enclosing builder - ) + // Finally add the already completed part of toAppend to newTraversalSoFar + newTraversalSoFar = toAppend.traversalSoFar.concat(newTraversalSoFar) + + LinearTraversalBuilder( + inPort = inPort, + outPort = toAppend.outPort, + inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before + // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ + inOffset = inOffset + toAppend.inSlots, + // Build in reverse so it yields a more efficient layout for left-to-right building. We cannot + // apply the full traversal, only the completed part of it + traversalSoFar = newTraversalSoFar, + // Last composite of toAppend is still pending + pendingBuilder = toAppend.pendingBuilder, + attributes = Attributes.none, // attributes are none for the new enclosing builder + beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder + ) } } else throw new Exception("should this happen?") @@ -1082,9 +1098,11 @@ import akka.util.unused * between islands. */ override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder = - if (this.islandTag.isDefined) - this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - else copy(islandTag = OptionVal.Some(islandTag)) + this.islandTag match { + case OptionVal.Some(_) => + this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + case _ => copy(islandTag = OptionVal.Some(islandTag)) + } } /** @@ -1188,9 +1206,10 @@ import akka.util.unused remaining = remaining.tail } - val finalTraversal = - if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversal).concat(ExitIsland) - else traversal + val finalTraversal = islandTag match { + case OptionVal.Some(tag) => EnterIsland(tag).concat(traversal).concat(ExitIsland) + case _ => traversal + } // The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it // to be embedded in a larger graph, making partial graph reuse much more efficient. @@ -1325,8 +1344,10 @@ import akka.util.unused } override def makeIsland(islandTag: IslandTag): TraversalBuilder = { - if (this.islandTag.isDefined) - this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - else copy(islandTag = OptionVal(islandTag)) + this.islandTag match { + case OptionVal.None => copy(islandTag = OptionVal(islandTag)) + case _ => + this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + } } }