diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index b8ed3560f7..d31c4ea740 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -103,13 +103,13 @@ import scala.util.Success private var _currentActorThread: OptionVal[Thread] = OptionVal.None // context-shared timer needed to allow for nested timer usage - def timer: TimerSchedulerCrossDslSupport[T] = _timer match { - case OptionVal.Some(timer) => timer - case _ => + def timer: TimerSchedulerCrossDslSupport[T] = _timer.orNull match { + case null => checkCurrentActorThread() val timer = mkTimer() _timer = OptionVal.Some(timer) timer + case timer => timer } protected[this] def mkTimer(): TimerSchedulerCrossDslSupport[T] = new TimerSchedulerImpl[T](this) @@ -150,14 +150,14 @@ import scala.util.Success private def loggingContext(): LoggingContext = { // lazy init of logging setup - _logging match { - case OptionVal.Some(l) => l - case _ => + _logging.orNull match { + case null => val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]]) val logger = LoggerFactory.getLogger(logClass.getName) val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this) _logging = OptionVal.Some(l) l + case l => l } } @@ -183,11 +183,10 @@ import scala.util.Success // MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message override private[akka] def clearMdc(): Unit = { // avoid access to MDC ThreadLocal if not needed, see details in LoggingContext - _logging match { - case OptionVal.Some(ctx) if ctx.mdcUsed => - ActorMdc.clearMdc() - ctx.mdcUsed = false - case _ => + val ctx = _logging.orNull + if ((ctx ne null) && ctx.mdcUsed) { + ActorMdc.clearMdc() + ctx.mdcUsed = false } } @@ -295,14 +294,14 @@ import scala.util.Success val boxedMessageClass = BoxedType(messageClass).asInstanceOf[Class[U]] _messageAdapters = (boxedMessageClass, f.asInstanceOf[Any => T]) :: _messageAdapters.filterNot { case (cls, _) => cls == boxedMessageClass } - val ref = messageAdapterRef match { - case OptionVal.Some(ref) => ref.asInstanceOf[ActorRef[U]] - case _ => + val ref = messageAdapterRef.orNull match { + case null => // AdaptMessage is not really a T, but that is erased val ref = internalSpawnMessageAdapter[Any](msg => AdaptWithRegisteredMessageAdapter(msg).asInstanceOf[T], "adapter") messageAdapterRef = OptionVal.Some(ref) ref + case ref => ref.asInstanceOf[ActorRef[U]] } ref.asInstanceOf[ActorRef[U]] } @@ -316,13 +315,14 @@ import scala.util.Success * INTERNAL API */ @InternalApi private[akka] def setCurrentActorThread(): Unit = { - _currentActorThread match { - case OptionVal.Some(t) => + val callerThread = Thread.currentThread() + _currentActorThread.orNull match { + case null => + _currentActorThread = OptionVal.Some(callerThread) + case t => throw new IllegalStateException( s"Invalid access by thread from the outside of $self. " + - s"Current message is processed by $t, but also accessed from ${Thread.currentThread()}.") - case _ => - _currentActorThread = OptionVal.Some(Thread.currentThread()) + s"Current message is processed by $t, but also accessed from $callerThread.") } } @@ -338,17 +338,17 @@ import scala.util.Success */ @InternalApi private[akka] def checkCurrentActorThread(): Unit = { val callerThread = Thread.currentThread() - _currentActorThread match { - case OptionVal.Some(t) => + _currentActorThread.orNull match { + case null => + throw new UnsupportedOperationException( + s"Unsupported access to ActorContext from the outside of $self. " + + s"No message is currently processed by the actor, but ActorContext was called from $callerThread.") + case t => if (callerThread ne t) { throw new UnsupportedOperationException( s"Unsupported access to ActorContext operation from the outside of $self. " + s"Current message is processed by $t, but ActorContext was called from $callerThread.") } - case _ => - throw new UnsupportedOperationException( - s"Unsupported access to ActorContext from the outside of $self. " + - s"No message is currently processed by the actor, but ActorContext was called from $callerThread.") } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index ba32e2930d..5b263ec13e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -61,6 +61,7 @@ private[remote] class Encoder( extends GraphStageWithMaterializedValue[ FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.OutboundCompressionAccess] { + import Encoder._ val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in") @@ -81,12 +82,12 @@ private[remote] class Encoder( // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized private var _serialization: OptionVal[Serialization] = OptionVal.None - private def serialization: Serialization = _serialization match { - case OptionVal.Some(s) => s - case _ => + private def serialization: Serialization = _serialization.orNull match { + case null => val s = SerializationExtension(system) _serialization = OptionVal.Some(s) s + case s => s } private val instruments: RemoteInstruments = RemoteInstruments(system) @@ -129,14 +130,14 @@ private[remote] class Encoder( Serialization.currentTransportInformation.value = serialization.serializationInformation // internally compression is applied by the builder: - outboundEnvelope.recipient match { - case OptionVal.Some(r) => headerBuilder.setRecipientActorRef(r) - case _ => headerBuilder.setNoRecipient() + outboundEnvelope.recipient.orNull match { + case null => headerBuilder.setNoRecipient() + case r => headerBuilder.setRecipientActorRef(r) } - outboundEnvelope.sender match { - case OptionVal.Some(s) => headerBuilder.setSenderActorRef(s) - case _ => headerBuilder.setNoSender() + outboundEnvelope.sender.orNull match { + case null => headerBuilder.setNoSender() + case s => headerBuilder.setSenderActorRef(s) } val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 @@ -179,18 +180,18 @@ private[remote] class Encoder( new OversizedPayloadException(reasonText), "Failed to serialize oversized message [{}].", Logging.messageClassName(outboundEnvelope.message)) - system.eventStream.publish(outboundEnvelope.sender match { - case OptionVal.Some(msgSender) => + system.eventStream.publish(outboundEnvelope.sender.orNull match { + case null => + Dropped( + outboundEnvelope.message, + reasonText, + outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) + case msgSender => Dropped( outboundEnvelope.message, reasonText, msgSender, outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) - case _ => - Dropped( - outboundEnvelope.message, - reasonText, - outboundEnvelope.recipient.getOrElse(ActorRef.noSender)) }) pull(in) case _ => @@ -414,13 +415,14 @@ private[remote] class Decoder( val originUid = headerBuilder.uid val association = inboundContext.association(originUid) - val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid) match { - case OptionVal.Some(ref) => - OptionVal(ref.asInstanceOf[InternalActorRef]) - case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined => - resolveRecipient(headerBuilder.recipientActorRefPath.get) - case _ => - OptionVal.None + val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid).orNull match { + case null => + headerBuilder.recipientActorRefPath.orNull match { + case null => OptionVal.None + case path => resolveRecipient(path) + } + case ref => + OptionVal.Some(ref.asInstanceOf[InternalActorRef]) } catch { case NonFatal(e) => // probably version mismatch due to restarted system @@ -428,13 +430,14 @@ private[remote] class Decoder( OptionVal.None } - val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid) match { - case OptionVal.Some(ref) => - OptionVal(ref.asInstanceOf[InternalActorRef]) - case OptionVal.None if headerBuilder.senderActorRefPath.isDefined => - OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get)) - case _ => - OptionVal.None + val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid).orNull match { + case null => + headerBuilder.senderActorRefPath.orNull match { + case null => OptionVal.None + case path => OptionVal(actorRefResolver.resolve(path)) + } + case ref => + OptionVal.Some(ref.asInstanceOf[InternalActorRef]) } catch { case NonFatal(e) => // probably version mismatch due to restarted system @@ -472,27 +475,20 @@ private[remote] class Decoder( if ((messageCount & heavyHitterMask) == 0) { // --- hit refs and manifests for heavy-hitter counting - association match { - case OptionVal.Some(assoc) => - val remoteAddress = assoc.remoteAddress - sender match { - case OptionVal.Some(snd) => - compressions.hitActorRef(originUid, remoteAddress, snd, 1) - case _ => - } - - recipient match { - case OptionVal.Some(rcp) => - compressions.hitActorRef(originUid, remoteAddress, rcp, 1) - case _ => - } - - compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1) - - case _ => + association.orNull match { + case null => // we don't want to record hits for compression while handshake is still in progress. log.debug( "Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?") + case assoc => + val remoteAddress = assoc.remoteAddress + if (sender.isDefined) + compressions.hitActorRef(originUid, remoteAddress, sender.get, 1) + + if (recipient.isDefined) + compressions.hitActorRef(originUid, remoteAddress, recipient.get, 1) + + compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1) } // --- end of hit refs and manifests for heavy-hitter counting } @@ -511,7 +507,6 @@ private[remote] class Decoder( lane = 0) if (recipient.isEmpty && !headerBuilder.isNoRecipient) { - // The remote deployed actor might not be created yet when resolving the // recipient for the first message that is sent to it, best effort retry. // However, if the retried resolve isn't successful the ref is banned and @@ -521,19 +516,19 @@ private[remote] class Decoder( val recipientActorRefPath = headerBuilder.recipientActorRefPath.get if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) { - headerBuilder.recipientActorRefPath match { - case OptionVal.Some(path) => + headerBuilder.recipientActorRefPath.orNull match { + case null => + log.warning( + "Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].", + recipientActorRefPath) + pull(in) + case path => val ref = actorRefResolver.getOrCompute(path) if (ref.isInstanceOf[EmptyLocalActorRef]) log.warning( "Message for banned (terminated, unresolved) remote deployed recipient [{}].", recipientActorRefPath) push(out, decoded.withRecipient(ref)) - case _ => - log.warning( - "Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].", - recipientActorRefPath) - pull(in) } } else @@ -593,10 +588,8 @@ private[remote] class Decoder( .runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) => - resolveRecipient(recipientPath) match { - case OptionVal.Some(recipient) => - push(out, inboundEnvelope.withRecipient(recipient)) - case _ => + resolveRecipient(recipientPath).orNull match { + case null => if (attemptsLeft > 0) scheduleOnce( RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope), @@ -615,6 +608,8 @@ private[remote] class Decoder( val recipient = actorRefResolver.getOrCompute(recipientPath) push(out, inboundEnvelope.withRecipient(recipient)) } + case recipient => + push(out, inboundEnvelope.withRecipient(recipient)) } case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown") @@ -648,12 +643,12 @@ private[remote] class Deserializer( // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized private var _serialization: OptionVal[Serialization] = OptionVal.None - private def serialization: Serialization = _serialization match { - case OptionVal.Some(s) => s - case _ => + private def serialization: Serialization = _serialization.orNull match { + case null => val s = SerializationExtension(system) _serialization = OptionVal.Some(s) s + case s => s } override protected def logSource = classOf[Deserializer] @@ -682,9 +677,9 @@ private[remote] class Deserializer( push(out, envelopeWithMessage) } catch { case NonFatal(e) => - val from = envelope.association match { - case OptionVal.Some(a) => a.remoteAddress - case _ => "unknown" + val from = envelope.association.orNull match { + case null => "unknown" + case assoc => assoc.remoteAddress } log.warning( "Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}", diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 9f09cf3f10..ee3698f1f7 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -17,7 +17,7 @@ import akka.annotation.InternalApi import akka.event.Logging import akka.japi.function import akka.stream.impl.TraversalBuilder -import akka.util.{ ByteString, OptionVal } +import akka.util.ByteString import akka.util.JavaDurationConverters._ import akka.util.LineNumbers @@ -122,17 +122,14 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { */ def getMandatoryAttribute[T <: MandatoryAttribute](c: Class[T]): T = { @tailrec - def find(list: List[Attribute]): OptionVal[Attribute] = list match { - case Nil => OptionVal.None + def find(list: List[Attribute]): T = list match { + case Nil => throw new IllegalStateException(s"Mandatory attribute [$c] not found") case head :: tail => - if (c.isInstance(head)) OptionVal.Some(head) + if (c.isInstance(head)) c.cast(head) else find(tail) } - find(attributeList) match { - case OptionVal.Some(t) => t.asInstanceOf[T] - case _ => throw new IllegalStateException(s"Mandatory attribute [$c] not found") - } + find(attributeList) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 44e6dcd892..0b41bce73d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -350,8 +350,8 @@ import akka.util.unused case _ => graph.traversalBuilder match { case l: LinearTraversalBuilder => - l.pendingBuilder match { - case OptionVal.Some(a: AtomicTraversalBuilder) => + l.pendingBuilder.orNull match { + case a: AtomicTraversalBuilder => a.module match { case m: GraphStageModule[_, _] => m.stage match { @@ -359,7 +359,8 @@ import akka.util.unused // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. if ((l.traversalSoFar eq EmptyTraversal) && !l.attributes.isAsync) OptionVal.Some(single) - else OptionVal.None + else + OptionVal.None case _ => OptionVal.None } case _ => OptionVal.None @@ -505,10 +506,9 @@ import akka.util.unused } override def traversal: Traversal = { - val withIsland = islandTag match { - case OptionVal.Some(tag) => EnterIsland(tag).concat(traversalSoFar).concat(ExitIsland) - case _ => traversalSoFar - } + val withIsland = + if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversalSoFar).concat(ExitIsland) + else traversalSoFar if (attributes eq Attributes.none) withIsland else PushAttributes(attributes).concat(withIsland).concat(PopAttributes) @@ -531,10 +531,8 @@ import akka.util.unused override def unwiredOuts: Int = 0 override def makeIsland(islandTag: IslandTag): TraversalBuilder = - this.islandTag match { - case OptionVal.None => copy(islandTag = OptionVal(islandTag)) - case _ => this - } + if (this.islandTag.isDefined) this + else copy(islandTag = OptionVal(islandTag)) override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = throw new UnsupportedOperationException("Cannot assign ports to slots in a completed builder.") @@ -685,13 +683,10 @@ import akka.util.unused case completed: CompletedTraversalBuilder => val inOpt = OptionVal(shape.inlets.headOption.orNull) - val inOffs = inOpt match { - case OptionVal.Some(in) => completed.offsetOf(in) - case _ => 0 - } + val inOffs = if (inOpt.isDefined) completed.offsetOf(inOpt.get) else 0 LinearTraversalBuilder( - inPort = OptionVal(inOpt.orNull), + inPort = inOpt, outPort = OptionVal.None, inOffset = inOffs, inSlots = completed.inSlots, @@ -702,13 +697,10 @@ import akka.util.unused case composite => val inOpt = OptionVal(shape.inlets.headOption.orNull) val out = shape.outlets.head // Cannot be empty, otherwise it would be a CompletedTraversalBuilder - val inOffs = inOpt match { - case OptionVal.Some(in) => composite.offsetOf(in) - case _ => 0 - } + val inOffs = if (inOpt.isDefined) composite.offsetOf(inOpt.get) else 0 LinearTraversalBuilder( - inPort = OptionVal(inOpt.orNull), + inPort = inOpt, outPort = OptionVal.Some(out), inOffset = inOffs, inSlots = composite.inSlots, @@ -770,10 +762,7 @@ import akka.util.unused } private def applyIslandAndAttributes(t: Traversal): Traversal = { - val withIslandTag = islandTag match { - case OptionVal.Some(tag) => EnterIsland(tag).concat(t).concat(ExitIsland) - case _ => t - } + val withIslandTag = if (islandTag.isDefined) EnterIsland(islandTag.get).concat(t).concat(ExitIsland) else t if (attributes eq Attributes.none) withIslandTag else PushAttributes(attributes).concat(withIslandTag).concat(PopAttributes) @@ -797,22 +786,22 @@ import akka.util.unused */ override def wire(out: OutPort, in: InPort): TraversalBuilder = { if (outPort.contains(out) && inPort.contains(in)) { - pendingBuilder match { - case OptionVal.Some(composite) => - copy( - inPort = OptionVal.None, - outPort = OptionVal.None, - traversalSoFar = applyIslandAndAttributes( - beforeBuilder - .concat(composite.assign(out, inOffset - composite.offsetOfModule(out)).traversal) - .concat(traversalSoFar)), - pendingBuilder = OptionVal.None, - beforeBuilder = EmptyTraversal) - case _ => - copy( - inPort = OptionVal.None, - outPort = OptionVal.None, - traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) + if (pendingBuilder.isDefined) { + val composite = pendingBuilder.get + copy( + inPort = OptionVal.None, + outPort = OptionVal.None, + traversalSoFar = applyIslandAndAttributes( + beforeBuilder + .concat(composite.assign(out, inOffset - composite.offsetOfModule(out)).traversal) + .concat(traversalSoFar)), + pendingBuilder = OptionVal.None, + beforeBuilder = EmptyTraversal) + } else { + copy( + inPort = OptionVal.None, + outPort = OptionVal.None, + traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) } } else throw new IllegalArgumentException(s"The ports $in and $out cannot be accessed in this builder.") @@ -820,10 +809,10 @@ import akka.util.unused override def offsetOfModule(out: OutPort): Int = { if (outPort.contains(out)) { - pendingBuilder match { - case OptionVal.Some(composite) => composite.offsetOfModule(out) - case _ => 0 // Output belongs to the last module, which will be materialized *first* - } + if (pendingBuilder.isDefined) + pendingBuilder.get.offsetOfModule(out) + else + 0 // Output belongs to the last module, which will be materialized *first* } else throw new IllegalArgumentException(s"Port $out cannot be accessed in this builder") } @@ -839,16 +828,16 @@ import akka.util.unused override def assign(out: OutPort, relativeSlot: Int): TraversalBuilder = { if (outPort.contains(out)) { - pendingBuilder match { - case OptionVal.Some(composite) => - copy( - outPort = OptionVal.None, - traversalSoFar = applyIslandAndAttributes( - beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))), - pendingBuilder = OptionVal.None, - beforeBuilder = EmptyTraversal) - case _ => - copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) + if (pendingBuilder.isDefined) { + val composite = pendingBuilder.get + copy( + outPort = OptionVal.None, + traversalSoFar = applyIslandAndAttributes( + beforeBuilder.concat(composite.assign(out, relativeSlot).traversal.concat(traversalSoFar))), + pendingBuilder = OptionVal.None, + beforeBuilder = EmptyTraversal) + } else { + copy(outPort = OptionVal.None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } } else throw new IllegalArgumentException(s"Port $out cannot be assigned in this builder") @@ -918,83 +907,82 @@ import akka.util.unused * Depending whether we have a composite builder as our last step or not, composition will be somewhat * different. */ - val assembledTraversalForThis = this.pendingBuilder match { - case OptionVal.Some(composite) => + val assembledTraversalForThis = if (this.pendingBuilder.isDefined) { + val composite = this.pendingBuilder.get + /* + * This is the case where our last module is a composite, and since it does not have its output port + * wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. + * + * Since we will wire now the output port, we can assemble everything together: + */ + val out = outPort.get + /* + * Since we will be visited second (and the appended toAppend first), we need to + * + * 1. go back to the start of the composite module, i.e. composite.offsetOfModule(out) steps. This + * is necessary because the composite might not have the internal module as the first visited + * module in the Traversal and hence not have a base offset of 0 in the composite + * 2. go backward toAppend.inSlots slots to reach the beginning offset of toAppend + * 3. now go forward toAppend.inOffset to reach the correct location + * + * <------- (-composite.offsetOfModule(out)) + * <-------------- (-toAppend.inSlots) + * -------> (+toAppend.inOffset) + * + * --------in----|-------[out module]---------- + * toAppend this + * + */ + val compositeTraversal = composite + .assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset) + .traversal // All output ports are finished, so we can finally call this now + + /* + * Now we can assemble the pieces for the final Traversal of _this_ builder. + * + * beforeBuilder ~ compositeTraversal ~ traversalSoFar + * + * (remember that this is the _reverse_ of the Flow DSL order) + */ + beforeBuilder.concat(compositeTraversal).concat(traversalSoFar) + } else { + /* + * This is the case where we are a pure linear builder (all composites have been already completed), + * which means that traversalSoFar contains everything already, except the final attributes and islands + * applied. + * + * Since the exposed output port has been wired optimistically to -1, we need to check if this is correct, + * and correct if necessary. This is the step below: + */ + if (toAppend.inOffset == (toAppend.inSlots - 1)) { /* - * This is the case where our last module is a composite, and since it does not have its output port - * wired yet, the traversal is split into the parts, traversalSoFar, pendingBuilder and beforeBuilder. + * if the builder we want to append (remember that is _prepend_ from the Traversal's perspective) + * has its exposed input port at the last location (which is toAppend.inSlots - 1 because input + * port offsets start with 0), then -1 is the correct wiring. I.e. * - * Since we will wire now the output port, we can assemble everything together: + * 1. Visit the appended module first in the traversal, its input port is the last + * 2. Visit this module second in the traversal, wire the output port back to the previous input port (-1) */ - val out = outPort.get + traversalSoFar + } else { /* + * The optimistic mapping to -1 is not correct, we need to unfold the Traversal to find our last module + * (which is the _first_ module in the Traversal) and rewire the output assignment to the correct offset. + * * Since we will be visited second (and the appended toAppend first), we need to * - * 1. go back to the start of the composite module, i.e. composite.offsetOfModule(out) steps. This - * is necessary because the composite might not have the internal module as the first visited - * module in the Traversal and hence not have a base offset of 0 in the composite - * 2. go backward toAppend.inSlots slots to reach the beginning offset of toAppend - * 3. now go forward toAppend.inOffset to reach the correct location + * 1. go backward toAppend.inSlots slots to reach the beginning offset of toAppend + * 2. now go forward toAppend.inOffset to reach the correct location * - * <------- (-composite.offsetOfModule(out)) - * <-------------- (-toAppend.inSlots) - * -------> (+toAppend.inOffset) + * <-------------- (-toAppend.inSlots) + * -------> (+toAppend.inOffset) * - * --------in----|-------[out module]---------- - * toAppend this + * --------in----|[out module]---------- + * toAppend this * */ - val compositeTraversal = composite - .assign(out, -composite.offsetOfModule(out) - toAppend.inSlots + toAppend.inOffset) - .traversal // All output ports are finished, so we can finally call this now - - /* - * Now we can assemble the pieces for the final Traversal of _this_ builder. - * - * beforeBuilder ~ compositeTraversal ~ traversalSoFar - * - * (remember that this is the _reverse_ of the Flow DSL order) - */ - beforeBuilder.concat(compositeTraversal).concat(traversalSoFar) - - case _ => - /* - * This is the case where we are a pure linear builder (all composites have been already completed), - * which means that traversalSoFar contains everything already, except the final attributes and islands - * applied. - * - * Since the exposed output port has been wired optimistically to -1, we need to check if this is correct, - * and correct if necessary. This is the step below: - */ - if (toAppend.inOffset == (toAppend.inSlots - 1)) { - /* - * if the builder we want to append (remember that is _prepend_ from the Traversal's perspective) - * has its exposed input port at the last location (which is toAppend.inSlots - 1 because input - * port offsets start with 0), then -1 is the correct wiring. I.e. - * - * 1. Visit the appended module first in the traversal, its input port is the last - * 2. Visit this module second in the traversal, wire the output port back to the previous input port (-1) - */ - traversalSoFar - } else { - /* - * The optimistic mapping to -1 is not correct, we need to unfold the Traversal to find our last module - * (which is the _first_ module in the Traversal) and rewire the output assignment to the correct offset. - * - * Since we will be visited second (and the appended toAppend first), we need to - * - * 1. go backward toAppend.inSlots slots to reach the beginning offset of toAppend - * 2. now go forward toAppend.inOffset to reach the correct location - * - * <-------------- (-toAppend.inSlots) - * -------> (+toAppend.inOffset) - * - * --------in----|[out module]---------- - * toAppend this - * - */ - rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots) - } + rewireLastOutTo(traversalSoFar, toAppend.inOffset - toAppend.inSlots) + } } /* @@ -1009,79 +997,75 @@ import akka.util.unused * We have finished "this" builder, now we need to construct the new builder as the result of appending. * There are two variants, depending whether toAppend is purely linear or if it has a composite at the end. */ - toAppend.pendingBuilder match { - case OptionVal.None => - /* - * This is the simple case, when the other is purely linear. We just concatenate the traversals - * and do some bookkeeping. - */ - LinearTraversalBuilder( - inPort = inPort, - outPort = toAppend.outPort, - inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before - // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ - inOffset = inOffset + toAppend.inSlots, - // Build in reverse so it yields a more efficient layout for left-to-right building - traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), - pendingBuilder = OptionVal.None, - attributes = Attributes.none, // attributes are none for the new enclosing builder - beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites - islandTag = OptionVal.None // islandTag is reset for the new enclosing builder - ) + if (toAppend.pendingBuilder.isEmpty) { + /* + * This is the simple case, when the other is purely linear. We just concatenate the traversals + * and do some bookkeeping. + */ + LinearTraversalBuilder( + inPort = inPort, + outPort = toAppend.outPort, + inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before + // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ + inOffset = inOffset + toAppend.inSlots, + // Build in reverse so it yields a more efficient layout for left-to-right building + traversalSoFar = toAppend.applyIslandAndAttributes(toAppend.traversalSoFar).concat(finalTraversalForThis), + pendingBuilder = OptionVal.None, + attributes = Attributes.none, // attributes are none for the new enclosing builder + beforeBuilder = EmptyTraversal, // no need for beforeBuilder as there are no composites + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder + ) + } else { + /* + * In this case we need to assemble as much as we can, and create a new "sandwich" of + * beforeBuilder ~ pendingBuilder ~ traversalSoFar + * + * We need to apply the attributes and islandTags of the appended builder, but we cannot do it in one + * step, instead we need to append half of the steps to traversalSoFar, and the other half to + * beforeBuilder. + */ + var newTraversalSoFar = finalTraversalForThis + var newBeforeTraversal = toAppend.beforeBuilder - case _ => // Some(pendingBuilder) - /* - * In this case we need to assemble as much as we can, and create a new "sandwich" of - * beforeBuilder ~ pendingBuilder ~ traversalSoFar - * - * We need to apply the attributes and islandTags of the appended builder, but we cannot do it in one - * step, instead we need to append half of the steps to traversalSoFar, and the other half to - * beforeBuilder. - */ - var newTraversalSoFar = finalTraversalForThis - var newBeforeTraversal = toAppend.beforeBuilder + // First prepare island enter and exit if tags are present + if (toAppend.islandTag.isDefined) { + // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) + newBeforeTraversal = EnterIsland(toAppend.islandTag.get).concat(newBeforeTraversal) + // Exit the island just after the appended builder (they should not applied to _this_ builder) + newTraversalSoFar = ExitIsland.concat(newTraversalSoFar) + } - // First prepare island enter and exit if tags are present - toAppend.islandTag match { - case OptionVal.Some(tag) => - // Enter the island just before the appended builder (keeping the toAppend.beforeBuilder steps) - newBeforeTraversal = EnterIsland(tag).concat(newBeforeTraversal) - // Exit the island just after the appended builder (they should not applied to _this_ builder) - newTraversalSoFar = ExitIsland.concat(newTraversalSoFar) - case _ => // Nothing changes - } + // Secondly, prepare attribute push and pop if Attributes are present + if (toAppend.attributes ne Attributes.none) { + // Push the attributes just before the appended builder. + newBeforeTraversal = PushAttributes(toAppend.attributes).concat(newBeforeTraversal) + // Pop the attributes immediately after the appended builder (they should not applied to _this_ builder) + newTraversalSoFar = PopAttributes.concat(newTraversalSoFar) + } - // Secondly, prepare attribute push and pop if Attributes are present - if (toAppend.attributes ne Attributes.none) { - // Push the attributes just before the appended builder. - newBeforeTraversal = PushAttributes(toAppend.attributes).concat(newBeforeTraversal) - // Pop the attributes immediately after the appended builder (they should not applied to _this_ builder) - newTraversalSoFar = PopAttributes.concat(newTraversalSoFar) - } + // This is roughly how things will look like in the end: + // + // newBeforeTraversal newTraversalSoFar + // [PushAttributes ~ EnterIsland] ~ composite ~ [toAppend.traversalSoFar ~ ExitIsland ~ PopAttributes ~ finalTraversalForThis] - // This is roughly how things will look like in the end: - // - // newBeforeTraversal newTraversalSoFar - // [PushAttributes ~ EnterIsland] ~ composite ~ [toAppend.traversalSoFar ~ ExitIsland ~ PopAttributes ~ finalTraversalForThis] + // Finally add the already completed part of toAppend to newTraversalSoFar + newTraversalSoFar = toAppend.traversalSoFar.concat(newTraversalSoFar) - // Finally add the already completed part of toAppend to newTraversalSoFar - newTraversalSoFar = toAppend.traversalSoFar.concat(newTraversalSoFar) - - LinearTraversalBuilder( - inPort = inPort, - outPort = toAppend.outPort, - inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before - // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ - inOffset = inOffset + toAppend.inSlots, - // Build in reverse so it yields a more efficient layout for left-to-right building. We cannot - // apply the full traversal, only the completed part of it - traversalSoFar = newTraversalSoFar, - // Last composite of toAppend is still pending - pendingBuilder = toAppend.pendingBuilder, - attributes = Attributes.none, // attributes are none for the new enclosing builder - beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites - islandTag = OptionVal.None // islandTag is reset for the new enclosing builder - ) + LinearTraversalBuilder( + inPort = inPort, + outPort = toAppend.outPort, + inSlots = inSlots + toAppend.inSlots, // we have now more input ports than before + // the inOffset of _this_ gets shifted by toAppend.inSlots, because the traversal of toAppend is _prepended_ + inOffset = inOffset + toAppend.inSlots, + // Build in reverse so it yields a more efficient layout for left-to-right building. We cannot + // apply the full traversal, only the completed part of it + traversalSoFar = newTraversalSoFar, + // Last composite of toAppend is still pending + pendingBuilder = toAppend.pendingBuilder, + attributes = Attributes.none, // attributes are none for the new enclosing builder + beforeBuilder = newBeforeTraversal, // no need for beforeBuilder as there are no composites + islandTag = OptionVal.None // islandTag is reset for the new enclosing builder + ) } } else throw new Exception("should this happen?") @@ -1098,11 +1082,9 @@ import akka.util.unused * between islands. */ override def makeIsland(islandTag: IslandTag): LinearTraversalBuilder = - this.islandTag match { - case OptionVal.Some(_) => - this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - case _ => copy(islandTag = OptionVal.Some(islandTag)) - } + if (this.islandTag.isDefined) + this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + else copy(islandTag = OptionVal.Some(islandTag)) } /** @@ -1206,10 +1188,9 @@ import akka.util.unused remaining = remaining.tail } - val finalTraversal = islandTag match { - case OptionVal.Some(tag) => EnterIsland(tag).concat(traversal).concat(ExitIsland) - case _ => traversal - } + val finalTraversal = + if (islandTag.isDefined) EnterIsland(islandTag.get).concat(traversal).concat(ExitIsland) + else traversal // The CompleteTraversalBuilder only keeps the minimum amount of necessary information that is needed for it // to be embedded in a larger graph, making partial graph reuse much more efficient. @@ -1344,10 +1325,8 @@ import akka.util.unused } override def makeIsland(islandTag: IslandTag): TraversalBuilder = { - this.islandTag match { - case OptionVal.None => copy(islandTag = OptionVal(islandTag)) - case _ => - this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted - } + if (this.islandTag.isDefined) + this // Wrapping with an island, then immediately re-wrapping makes the second island empty, so can be omitted + else copy(islandTag = OptionVal(islandTag)) } }