diff --git a/akka-actor/src/main/scala/akka/util/OptionVal.scala b/akka-actor/src/main/scala/akka/util/OptionVal.scala new file mode 100644 index 0000000000..7238c54233 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/OptionVal.scala @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.util + +/** + * INTERNAL API + */ +private[akka] object OptionVal { + + def apply[A >: Null](x: A): OptionVal[A] = new OptionVal(x) + + object Some { + def apply[A >: Null](x: A): OptionVal[A] = new OptionVal(x) + def unapply[A >: Null](x: OptionVal[A]): OptionVal[A] = x + } + + /** + * Represents non-existent values, `null` values. + */ + val None = new OptionVal[Null](null) +} + +/** + * INTERNAL API + * Represents optional values similar to `scala.Option`, but + * as a value class to avoid allocations. + * + * Note that it can be used in pattern matching without allocations + * because it has name based extractor using methods `isEmpty` and `get`. + * See http://hseeberger.github.io/blog/2013/10/04/name-based-extractors-in-scala-2-dot-11/ + */ +private[akka] final class OptionVal[+A >: Null](val x: A) extends AnyVal { + + /** + * Returns true if the option is `OptionVal.None`, false otherwise. + */ + def isEmpty: Boolean = + x == null + + /** + * Returns true if the option is `OptionVal.None`, false otherwise. + */ + def isDefined: Boolean = !isEmpty + + /** + * Returns the option's value if the option is nonempty, otherwise + * return `default`. + */ + def getOrElse[B >: A](default: B): B = + if (x == null) default else x + + /** + * Returns the option's value if it is nonempty, or `null` if it is empty. + */ + def orNull[A1 >: A](implicit ev: Null <:< A1): A1 = this getOrElse ev(null) + + /** + * Returns the option's value. + * @note The option must be nonEmpty. + * @throws java.util.NoSuchElementException if the option is empty. + */ + def get: A = + if (x == null) throw new NoSuchElementException("OptionVal.None.get") + else x + + override def toString: String = + if (x == null) "None" else s"Some($x)" +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 82f22e1c89..6e96859ecf 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -5,13 +5,10 @@ package akka.remote.artery import java.nio.ByteBuffer import java.nio.ByteOrder - import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit - import scala.concurrent.Await import scala.concurrent.duration._ - import akka.NotUsed import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem @@ -28,6 +25,7 @@ import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl._ import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ +import akka.util.OptionVal @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -127,7 +125,7 @@ class CodecBenchmark { Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) - .map(_ ⇒ Send(payload, None, remoteRefB, None)) + .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) .via(encoder) .map(envelope => envelopePool.release(envelope)) .runWith(new LatchSink(N, latch))(materializer) @@ -193,7 +191,7 @@ class CodecBenchmark { resolveActorRefWithLocalAddress, compression, envelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) - .map(_ ⇒ Send(payload, None, remoteRefB, None)) + .map(_ ⇒ Send(payload, OptionVal.None, remoteRefB, None)) .via(encoder) .via(decoder) .runWith(new LatchSink(N, latch))(materializer) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala index d66a6814d0..6f1d2d539b 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala @@ -37,7 +37,7 @@ class LatchSink(countDownAfter: Int, latch: CountDownLatch) extends GraphStage[S } class BarrierSink(countDownAfter: Int, latch: CountDownLatch, barrierAfter: Int, barrier: CyclicBarrier) - extends GraphStage[SinkShape[Any]] { + extends GraphStage[SinkShape[Any]] { val in: Inlet[Any] = Inlet("BarrierSink") override val shape: SinkShape[Any] = SinkShape(in) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala index f54ca42f0e..2b131923f3 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala @@ -34,7 +34,8 @@ class SendQueueBenchmark { val config = ConfigFactory.parseString( """ - """) + """ + ) implicit val system = ActorSystem("SendQueueBenchmark", config) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index c92f1b1032..1f65987741 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -27,6 +27,8 @@ import scala.concurrent.duration.{ Deadline } import scala.util.control.NonFatal import java.util.concurrent.locks.LockSupport import scala.concurrent.Future +import akka.util.OptionVal +import akka.util.OptionVal /** * INTERNAL API @@ -36,7 +38,7 @@ private[remote] trait InboundMessageDispatcher { recipient: InternalActorRef, recipientAddress: Address, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef]): Unit + senderOption: OptionVal[ActorRef]): Unit } /** @@ -53,7 +55,7 @@ private[remote] class DefaultMessageDispatcher( recipient: InternalActorRef, recipientAddress: Address, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef]): Unit = { + senderOption: OptionVal[ActorRef]): Unit = { import provider.remoteSettings._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f8ca7a5e34..119b406d8f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -11,15 +11,14 @@ import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe - import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } - import scala.util.control.Exception.Catcher import scala.concurrent.Future import akka.ConfigurationException import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.artery.ArteryTransport +import akka.util.OptionVal /** * INTERNAL API @@ -528,13 +527,13 @@ private[akka] class RemoteActorRef private[akka] ( //Unwatch has a different signature, need to pattern match arguments against InternalActorRef case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) ⇒ provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) - case _ ⇒ remote.send(message, None, this) + case _ ⇒ remote.send(message, OptionVal.None, this) } } catch handleException override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { if (message == null) throw new InvalidMessageException("Message is null") - try remote.send(message, Option(sender), this) catch handleException + try remote.send(message, OptionVal(sender), this) catch handleException } override def provider: RemoteActorRefProvider = remote.provider diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 0797e2d2cf..c865aa508d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -11,6 +11,7 @@ import akka.event.{ LoggingAdapter } import scala.collection.immutable import scala.concurrent.Future import scala.util.control.NoStackTrace +import akka.util.OptionVal /** * RemoteTransportException represents a general failure within a RemoteTransport, @@ -68,7 +69,7 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va /** * Sends the given message to the recipient supplying the sender() if any */ - def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit + def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit /** * Sends a management command to the underlying transport stack. The call returns with a Future that indicates diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index c742b58aa6..8a35a63126 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -24,6 +24,7 @@ import akka.remote.transport.AkkaPduCodec.Message import java.util.concurrent.ConcurrentHashMap import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString.UTF_8 +import akka.util.OptionVal /** * INTERNAL API @@ -209,7 +210,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc } } - override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match { + override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match { case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender) case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null) } @@ -249,7 +250,7 @@ private[remote] object EndpointManager { final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand case object StartupFinished extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand - final case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None) + final case class Send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None) extends RemotingCommand with HasSequenceNumber { override def toString = s"Remote message $senderOption -> $recipient" diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index a48d91e28a..1e61bc2468 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -77,6 +77,7 @@ import akka.actor.Cancellable import scala.collection.JavaConverters._ import akka.stream.ActorMaterializerSettings import scala.annotation.tailrec +import akka.util.OptionVal /** * INTERNAL API */ @@ -84,7 +85,7 @@ private[akka] final case class InboundEnvelope( recipient: InternalActorRef, recipientAddress: Address, message: AnyRef, - senderOption: Option[ActorRef], + senderOption: OptionVal[ActorRef], originUid: Long) /** @@ -111,11 +112,10 @@ private[akka] trait InboundContext { /** * Lookup the outbound association for a given UID. - * Will return `null` if the UID is unknown, i.e. - * handshake not completed. `null` is used instead of `Optional` - * to avoid allocations. + * Will return `OptionVal.None` if the UID is unknown, i.e. + * handshake not completed. */ - def association(uid: Long): OutboundContext + def association(uid: Long): OptionVal[OutboundContext] def completeHandshake(peer: UniqueAddress): Unit @@ -603,7 +603,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def sendControl(to: Address, message: ControlMessage) = association(to).sendControl(message) - override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation val a = @@ -620,7 +620,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def association(remoteAddress: Address): Association = associationRegistry.association(remoteAddress) - override def association(uid: Long): Association = + override def association(uid: Long): OptionVal[Association] = associationRegistry.association(uid) override def completeHandshake(peer: UniqueAddress): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 2edd74cdbd..b46a479b76 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -4,7 +4,6 @@ package akka.remote.artery import java.util.Queue - import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch @@ -36,6 +35,7 @@ import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.util.{ Unsafe, WildcardTree } import org.agrona.concurrent.ManyToOneConcurrentArrayQueue +import akka.util.OptionVal /** * INTERNAL API @@ -158,7 +158,7 @@ private[remote] class Association( override def sendControl(message: ControlMessage): Unit = outboundControlIngress.sendControlMessage(message) - def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = { // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { @@ -224,7 +224,7 @@ private[remote] class Association( "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", remoteAddress, u, reason) // end delivery of system messages to that incarnation after this point - send(ClearSystemMessageDelivery, None, dummyRecipient) + send(ClearSystemMessageDelivery, OptionVal.None, dummyRecipient) // try to tell the other system that we have quarantined it sendControl(Quarantined(localAddress, peer)) } else @@ -408,8 +408,8 @@ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Associa } } - def association(uid: Long): Association = - associationsByUid.get(uid) + def association(uid: Long): OptionVal[Association] = + OptionVal(associationsByUid.get(uid)) def setUID(peer: UniqueAddress): Association = { val a = association(peer.address) diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 450099c99f..8842d548b6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -97,6 +97,7 @@ sealed trait HeaderBuilder { def senderActorRef: String def setNoSender(): Unit + def isNoSender: Boolean def recipientActorRef_=(ref: String): Unit def recipientActorRef: String @@ -135,6 +136,9 @@ private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompr _senderActorRefIdx = EnvelopeBuffer.DeadLettersCode } + def isNoSender: Boolean = + (_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode + def senderActorRef: String = { if (_senderActorRef ne null) _senderActorRef else { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index eb6470e73b..299cd7e5fd 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -1,7 +1,6 @@ package akka.remote.artery import scala.util.control.NonFatal - import akka.actor.{ ActorRef, InternalActorRef } import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem @@ -11,13 +10,14 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.util.OptionVal // TODO: Long UID class Encoder( uniqueLocalAddress: UniqueAddress, - system: ActorSystem, - compressionTable: LiteralCompressionTable, - pool: EnvelopeBufferPool) + system: ActorSystem, + compressionTable: LiteralCompressionTable, + pool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { val in: Inlet[Send] = Inlet("Artery.Encoder.in") @@ -34,7 +34,6 @@ class Encoder( private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) - private val noSender = system.deadLetters.path.toSerializationFormatWithAddress(localAddress) private val senderCache = new java.util.HashMap[ActorRef, String] private var recipientCache = new java.util.HashMap[ActorRef, String] @@ -57,7 +56,8 @@ class Encoder( headerBuilder.recipientActorRef = recipientStr send.senderOption match { - case Some(sender) ⇒ + case OptionVal.None => headerBuilder.setNoSender() + case OptionVal.Some(sender) => val senderStr = senderCache.get(sender) match { case null ⇒ val s = sender.path.toSerializationFormatWithAddress(localAddress) @@ -69,9 +69,6 @@ class Encoder( case s ⇒ s } headerBuilder.senderActorRef = senderStr - case None ⇒ - //headerBuilder.setNoSender() - headerBuilder.senderActorRef = noSender } try { @@ -108,11 +105,11 @@ class Encoder( } class Decoder( - uniqueLocalAddress: UniqueAddress, - system: ExtendedActorSystem, + uniqueLocalAddress: UniqueAddress, + system: ExtendedActorSystem, resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, - compressionTable: LiteralCompressionTable, - pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + compressionTable: LiteralCompressionTable, + pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) @@ -124,7 +121,7 @@ class Decoder( private val serialization = SerializationExtension(system) private val recipientCache = new java.util.HashMap[String, InternalActorRef] - private val senderCache = new java.util.HashMap[String, Option[ActorRef]] + private val senderCache = new java.util.HashMap[String, ActorRef] override protected def logSource = classOf[Decoder] @@ -146,17 +143,21 @@ class Decoder( case ref ⇒ ref } - val senderOption: Option[ActorRef] = senderCache.get(headerBuilder.senderActorRef) match { - case null ⇒ - val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef) - // FIXME this cache will be replaced by compression table - if (senderCache.size() >= 1000) - senderCache.clear() - val refOpt = Some(ref) - senderCache.put(headerBuilder.senderActorRef, refOpt) - refOpt - case refOpt ⇒ refOpt - } + val senderOption = + if (headerBuilder.isNoSender) + OptionVal.None + else { + senderCache.get(headerBuilder.senderActorRef) match { + case null ⇒ + val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef) + // FIXME this cache will be replaced by compression table + if (senderCache.size() >= 1000) + senderCache.clear() + senderCache.put(headerBuilder.senderActorRef, ref) + OptionVal(ref) + case ref ⇒ OptionVal(ref) + } + } try { val deserializedMessage = MessageSerializer.deserializeForArtery( @@ -166,7 +167,7 @@ class Decoder( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? deserializedMessage, - senderOption, // FIXME: No need for an option, decode simply to deadLetters instead + senderOption, headerBuilder.uid) push(out, decoded) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 7d32d0889b..6d68ee0825 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -18,6 +18,7 @@ import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.remote.UniqueAddress +import akka.util.OptionVal /** * INTERNAL API: Marker trait for reply messages @@ -197,7 +198,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) } private def wrap(message: ControlMessage): Send = - Send(message, None, outboundContext.dummyRecipient, None) + Send(message, OptionVal.None, outboundContext.dummyRecipient, None) setHandlers(in, out, this) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index d1f96ce91b..84dd9871f1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -5,7 +5,6 @@ package akka.remote.artery import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress import akka.stream.Attributes @@ -17,6 +16,7 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic +import akka.util.OptionVal /** * INTERNAL API @@ -123,7 +123,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: private def pushHandshakeReq(): Unit = { injectHandshakeTickScheduled = true scheduleOnce(InjectHandshakeTick, injectHandshakeInterval) - push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None)) + push(out, Send(HandshakeReq(outboundContext.localAddress), OptionVal.None, outboundContext.dummyRecipient, None)) } private def handshakeCompleted(): Unit = { @@ -215,7 +215,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt private def isKnownOrigin(originUid: Long): Boolean = { // FIXME these association lookups are probably too costly for each message, need local cache or something - (inboundContext.association(originUid) ne null) + inboundContext.association(originUid).isDefined } // OutHandler diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala index 531e9c4aff..1a4454c58b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala @@ -12,6 +12,7 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.remote.UniqueAddress +import akka.util.OptionVal /** * INTERNAL API @@ -28,10 +29,10 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten override def onPush(): Unit = { val env = grab(in) inboundContext.association(env.originUid) match { - case null ⇒ + case OptionVal.None => // unknown, handshake not completed push(out, env) - case association ⇒ + case OptionVal.Some(association) => if (association.associationState.isQuarantined(env.originUid)) { inboundContext.sendControl( association.remoteAddress, diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index 8e871c3f33..7525473479 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -16,6 +16,7 @@ import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging import akka.remote.RemoteActorRefProvider import akka.remote.RemoteRef +import akka.util.OptionVal /** * INTERNAL API @@ -31,7 +32,7 @@ private[akka] class MessageDispatcher( recipient: InternalActorRef, recipientAddress: Address, message: AnyRef, - senderOption: Option[ActorRef]): Unit = { + senderOption: OptionVal[ActorRef]): Unit = { import provider.remoteSettings._ diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index ed8e3d5ad7..b0c35bc1f4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -10,6 +10,7 @@ import akka.remote._ import akka.util.ByteString import akka.protobuf.InvalidProtocolBufferException import akka.protobuf.{ ByteString ⇒ PByteString } +import akka.util.OptionVal /** * INTERNAL API @@ -35,11 +36,11 @@ private[remote] object AkkaPduCodec { final case class Payload(bytes: ByteString) extends AkkaPdu final case class Message( - recipient: InternalActorRef, - recipientAddress: Address, + recipient: InternalActorRef, + recipientAddress: Address, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef], - seqOption: Option[SeqNo]) extends HasSequenceNumber { + senderOption: OptionVal[ActorRef], + seqOption: Option[SeqNo]) extends HasSequenceNumber { def reliableDeliveryEnabled = seqOption.isDefined @@ -94,12 +95,12 @@ private[remote] trait AkkaPduCodec { def decodeMessage(raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message]) def constructMessage( - localAddress: Address, - recipient: ActorRef, + localAddress: Address, + recipient: ActorRef, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef], - seqOption: Option[SeqNo] = None, - ackOption: Option[Ack] = None): ByteString + senderOption: OptionVal[ActorRef], + seqOption: Option[SeqNo] = None, + ackOption: Option[Ack] = None): ByteString def constructPureAck(ack: Ack): ByteString } @@ -118,19 +119,23 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { } override def constructMessage( - localAddress: Address, - recipient: ActorRef, + localAddress: Address, + recipient: ActorRef, serializedMessage: SerializedMessage, - senderOption: Option[ActorRef], - seqOption: Option[SeqNo] = None, - ackOption: Option[Ack] = None): ByteString = { + senderOption: OptionVal[ActorRef], + seqOption: Option[SeqNo] = None, + ackOption: Option[Ack] = None): ByteString = { val ackAndEnvelopeBuilder = AckAndEnvelopeContainer.newBuilder val envelopeBuilder = RemoteEnvelope.newBuilder envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) - senderOption foreach { ref ⇒ envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } + senderOption match { + case OptionVal.Some(sender) => envelopeBuilder.setSender(serializeActorRef(localAddress, sender)) + case OptionVal.None => + } + seqOption foreach { seq ⇒ envelopeBuilder.setSeq(seq.rawValue) } ackOption foreach { ack ⇒ ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } envelopeBuilder.setMessage(serializedMessage) @@ -176,8 +181,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { } override def decodeMessage( - raw: ByteString, - provider: RemoteActorRefProvider, + raw: ByteString, + provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message]) = { val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray) @@ -193,8 +198,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath), serializedMessage = msgPdu.getMessage, senderOption = - if (msgPdu.hasSender) Some(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress)) - else None, + if (msgPdu.hasSender) OptionVal(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress)) + else OptionVal.None, seqOption = if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None)) } else None @@ -226,7 +231,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { Address(encodedAddress.getProtocol, encodedAddress.getSystem, encodedAddress.getHostname, encodedAddress.getPort) private def constructControlMessagePdu( - code: WireFormats.CommandType, + code: WireFormats.CommandType, handshakeInfo: Option[AkkaHandshakeInfo.Builder]): ByteString = { val controlMessageBuilder = AkkaControlMessage.newBuilder() diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index 0981ade341..efa0a10b54 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -4,7 +4,6 @@ package akka.remote.artery import scala.concurrent.duration._ - import akka.actor.Address import akka.actor.InternalActorRef import akka.remote.UniqueAddress @@ -18,6 +17,7 @@ import akka.stream.testkit.scaladsl.TestSource import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.TestProbe +import akka.util.OptionVal object InboundControlJunctionSpec { case object Control1 extends ControlMessage @@ -42,7 +42,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender { val recipient = null.asInstanceOf[InternalActorRef] // not used val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] - .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None, addressA.uid)) + .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid)) .viaMat(new InboundControlJunction)(Keep.both) .map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index 4a1774a7bc..5aa42f93ce 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -5,7 +5,6 @@ package akka.remote.artery import scala.concurrent.Await import scala.concurrent.duration._ - import akka.actor.Address import akka.actor.InternalActorRef import akka.remote.UniqueAddress @@ -22,6 +21,7 @@ import akka.stream.testkit.scaladsl.TestSource import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.TestProbe +import akka.util.OptionVal object InboundHandshakeSpec { case object Control1 extends ControlMessage @@ -41,7 +41,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { val recipient = null.asInstanceOf[InternalActorRef] // not used TestSource.probe[AnyRef] - .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None, addressA.uid)) + .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid)) .via(new InboundHandshake(inboundContext, inControlStream = true)) .map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala index 206a355ece..7b79fdaced 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundControlJunctionSpec.scala @@ -4,7 +4,6 @@ package akka.remote.artery import scala.concurrent.duration._ - import akka.actor.Address import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef @@ -17,6 +16,7 @@ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSource import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.util.OptionVal object OutboundControlJunctionSpec { case object Control1 extends ControlMessage @@ -41,7 +41,7 @@ class OutboundControlJunctionSpec extends AkkaSpec with ImplicitSender { val destination = null.asInstanceOf[RemoteActorRef] // not used val ((upstream, controlIngress), downstream) = TestSource.probe[String] - .map(msg ⇒ Send(msg, None, destination, None)) + .map(msg ⇒ Send(msg, OptionVal.None, destination, None)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.both) .map { case Send(msg, _, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 27bbd3e13b..1b3adef27d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -4,7 +4,6 @@ package akka.remote.artery import scala.concurrent.duration._ - import akka.actor.Address import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef @@ -22,6 +21,7 @@ import akka.stream.testkit.scaladsl.TestSource import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender import akka.testkit.TestProbe +import akka.util.OptionVal class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { @@ -38,7 +38,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val destination = null.asInstanceOf[RemoteActorRef] // not used TestSource.probe[String] - .map(msg ⇒ Send(msg, None, destination, None)) + .map(msg ⇒ Send(msg, OptionVal.None, destination, None)) .via(new OutboundHandshake(outboundContext, timeout, retryInterval, injectHandshakeInterval)) .map { case Send(msg, _, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index f9173f04e3..a6d309ebf2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -4,10 +4,8 @@ package akka.remote.artery import java.util.concurrent.ThreadLocalRandom - import scala.concurrent.Await import scala.concurrent.duration._ - import akka.NotUsed import akka.actor.ActorIdentity import akka.actor.ActorSystem @@ -33,6 +31,7 @@ import akka.testkit.ImplicitSender import akka.testkit.TestActors import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory +import akka.util.OptionVal object SystemMessageDeliverySpec { @@ -68,7 +67,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = { val remoteRef = null.asInstanceOf[RemoteActorRef] // not used Source(1 to sendCount) - .map(n ⇒ Send("msg-" + n, None, remoteRef, None)) + .map(n ⇒ Send("msg-" + n, OptionVal.None, remoteRef, None)) .via(new SystemMessageDelivery(outboundContext, resendInterval, maxBufferSize = 1000)) } @@ -77,7 +76,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi Flow[Send] .map { case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒ - InboundEnvelope(recipient, addressB.address, sysEnv, None, addressA.uid) + InboundEnvelope(recipient, addressB.address, sysEnv, OptionVal.None, addressA.uid) } .async .via(new SystemMessageAcker(inboundContext)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index bbc9cefaa2..0ec5454ab8 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -6,11 +6,9 @@ package akka.remote.artery import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ThreadLocalRandom - import scala.concurrent.Future import scala.concurrent.Promise import scala.util.Success - import akka.Done import akka.actor.ActorRef import akka.actor.Address @@ -18,6 +16,7 @@ import akka.remote.RemoteActorRef import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject +import akka.util.OptionVal private[akka] class TestInboundContext( override val localAddress: UniqueAddress, @@ -44,8 +43,8 @@ private[akka] class TestInboundContext( case existing ⇒ existing } - override def association(uid: Long): OutboundContext = - associationsByUid.get(uid) + override def association(uid: Long): OptionVal[OutboundContext] = + OptionVal(associationsByUid.get(uid)) override def completeHandshake(peer: UniqueAddress): Unit = { val a = association(peer.address).asInstanceOf[TestOutboundContext] @@ -85,7 +84,7 @@ private[akka] class TestOutboundContext( override def sendControl(message: ControlMessage) = { controlProbe.foreach(_ ! message) - controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress.uid)) + controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, OptionVal.None, localAddress.uid)) } // FIXME we should be able to Send without a recipient ActorRef diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index e67e2ab486..3825bba924 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -14,6 +14,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.{ Await, Promise } import java.util.concurrent.TimeoutException +import akka.util.OptionVal object AkkaProtocolSpec { @@ -66,7 +67,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re val codec = AkkaPduProtobufCodec val testMsg = WireFormats.SerializedMessage.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build - val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, None) + val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, OptionVal.None) val testMsgPdu: ByteString = codec.constructPayload(testEnvelope) def testHeartbeat = InboundPayload(codec.constructHeartbeat) diff --git a/project/MiMa.scala b/project/MiMa.scala index 0a96b84a24..a375eec6ba 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -851,7 +851,15 @@ object MiMa extends AutoPlugin { // Remove useUntrustedMode which is an internal API and not used anywhere anymore ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), + + // Use OptionVal in remote Send envelope + FilterAnyProblemStartingWith("akka.remote.EndpointManager"), + FilterAnyProblemStartingWith("akka.remote.Remoting"), + FilterAnyProblemStartingWith("akka.remote.RemoteTransport"), + FilterAnyProblemStartingWith("akka.remote.InboundMessageDispatcher"), + FilterAnyProblemStartingWith("akka.remote.DefaultMessageDispatcher"), + FilterAnyProblemStartingWith("akka.remote.transport"), // internal api FilterAnyProblemStartingWith("akka.stream.impl"),