Option value class, to avoid allocations for optional sender

This commit is contained in:
Patrik Nordwall 2016-06-05 15:40:06 +02:00
parent c808522f6d
commit a814034342
25 changed files with 191 additions and 100 deletions

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
/**
* INTERNAL API
*/
private[akka] object OptionVal {
def apply[A >: Null](x: A): OptionVal[A] = new OptionVal(x)
object Some {
def apply[A >: Null](x: A): OptionVal[A] = new OptionVal(x)
def unapply[A >: Null](x: OptionVal[A]): OptionVal[A] = x
}
/**
* Represents non-existent values, `null` values.
*/
val None = new OptionVal[Null](null)
}
/**
* INTERNAL API
* Represents optional values similar to `scala.Option`, but
* as a value class to avoid allocations.
*
* Note that it can be used in pattern matching without allocations
* because it has name based extractor using methods `isEmpty` and `get`.
* See http://hseeberger.github.io/blog/2013/10/04/name-based-extractors-in-scala-2-dot-11/
*/
private[akka] final class OptionVal[+A >: Null](val x: A) extends AnyVal {
/**
* Returns true if the option is `OptionVal.None`, false otherwise.
*/
def isEmpty: Boolean =
x == null
/**
* Returns true if the option is `OptionVal.None`, false otherwise.
*/
def isDefined: Boolean = !isEmpty
/**
* Returns the option's value if the option is nonempty, otherwise
* return `default`.
*/
def getOrElse[B >: A](default: B): B =
if (x == null) default else x
/**
* Returns the option's value if it is nonempty, or `null` if it is empty.
*/
def orNull[A1 >: A](implicit ev: Null <:< A1): A1 = this getOrElse ev(null)
/**
* Returns the option's value.
* @note The option must be nonEmpty.
* @throws java.util.NoSuchElementException if the option is empty.
*/
def get: A =
if (x == null) throw new NoSuchElementException("OptionVal.None.get")
else x
override def toString: String =
if (x == null) "None" else s"Some($x)"
}

View file

@ -5,13 +5,10 @@ package akka.remote.artery
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.ByteOrder import java.nio.ByteOrder
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
@ -28,6 +25,7 @@ import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations._
import akka.util.OptionVal
@State(Scope.Benchmark) @State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS)
@ -127,7 +125,7 @@ class CodecBenchmark {
Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool)) Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(_ Send(payload, None, remoteRefB, None)) .map(_ Send(payload, OptionVal.None, remoteRefB, None))
.via(encoder) .via(encoder)
.map(envelope => envelopePool.release(envelope)) .map(envelope => envelopePool.release(envelope))
.runWith(new LatchSink(N, latch))(materializer) .runWith(new LatchSink(N, latch))(materializer)
@ -193,7 +191,7 @@ class CodecBenchmark {
resolveActorRefWithLocalAddress, compression, envelopePool)) resolveActorRefWithLocalAddress, compression, envelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(_ Send(payload, None, remoteRefB, None)) .map(_ Send(payload, OptionVal.None, remoteRefB, None))
.via(encoder) .via(encoder)
.via(decoder) .via(decoder)
.runWith(new LatchSink(N, latch))(materializer) .runWith(new LatchSink(N, latch))(materializer)

View file

@ -34,7 +34,8 @@ class SendQueueBenchmark {
val config = ConfigFactory.parseString( val config = ConfigFactory.parseString(
""" """
""") """
)
implicit val system = ActorSystem("SendQueueBenchmark", config) implicit val system = ActorSystem("SendQueueBenchmark", config)

View file

@ -27,6 +27,8 @@ import scala.concurrent.duration.{ Deadline }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import java.util.concurrent.locks.LockSupport import java.util.concurrent.locks.LockSupport
import scala.concurrent.Future import scala.concurrent.Future
import akka.util.OptionVal
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -36,7 +38,7 @@ private[remote] trait InboundMessageDispatcher {
recipient: InternalActorRef, recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
serializedMessage: SerializedMessage, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]): Unit senderOption: OptionVal[ActorRef]): Unit
} }
/** /**
@ -53,7 +55,7 @@ private[remote] class DefaultMessageDispatcher(
recipient: InternalActorRef, recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
serializedMessage: SerializedMessage, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef]): Unit = { senderOption: OptionVal[ActorRef]): Unit = {
import provider.remoteSettings._ import provider.remoteSettings._

View file

@ -11,15 +11,14 @@ import akka.event.{ EventStream, Logging, LoggingAdapter }
import akka.event.Logging.Error import akka.event.Logging.Error
import akka.serialization.{ Serialization, SerializationExtension } import akka.serialization.{ Serialization, SerializationExtension }
import akka.pattern.pipe import akka.pattern.pipe
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone }
import scala.util.control.Exception.Catcher import scala.util.control.Exception.Catcher
import scala.concurrent.Future import scala.concurrent.Future
import akka.ConfigurationException import akka.ConfigurationException
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.artery.ArteryTransport import akka.remote.artery.ArteryTransport
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -528,13 +527,13 @@ private[akka] class RemoteActorRef private[akka] (
//Unwatch has a different signature, need to pattern match arguments against InternalActorRef //Unwatch has a different signature, need to pattern match arguments against InternalActorRef
case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher) case Unwatch(watchee: InternalActorRef, watcher: InternalActorRef) if isWatchIntercepted(watchee, watcher)
provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher) provider.remoteWatcher ! RemoteWatcher.UnwatchRemote(watchee, watcher)
case _ remote.send(message, None, this) case _ remote.send(message, OptionVal.None, this)
} }
} catch handleException } catch handleException
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
if (message == null) throw new InvalidMessageException("Message is null") if (message == null) throw new InvalidMessageException("Message is null")
try remote.send(message, Option(sender), this) catch handleException try remote.send(message, OptionVal(sender), this) catch handleException
} }
override def provider: RemoteActorRefProvider = remote.provider override def provider: RemoteActorRefProvider = remote.provider

View file

@ -11,6 +11,7 @@ import akka.event.{ LoggingAdapter }
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.util.OptionVal
/** /**
* RemoteTransportException represents a general failure within a RemoteTransport, * RemoteTransportException represents a general failure within a RemoteTransport,
@ -68,7 +69,7 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va
/** /**
* Sends the given message to the recipient supplying the sender() if any * Sends the given message to the recipient supplying the sender() if any
*/ */
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit
/** /**
* Sends a management command to the underlying transport stack. The call returns with a Future that indicates * Sends a management command to the underlying transport stack. The call returns with a Future that indicates

View file

@ -24,6 +24,7 @@ import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.ByteString.UTF_8 import akka.util.ByteString.UTF_8
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -209,7 +210,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
} }
} }
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match { override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
case Some(manager) manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender) case Some(manager) manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
case None throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null) case None throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null)
} }
@ -249,7 +250,7 @@ private[remote] object EndpointManager {
final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand
case object StartupFinished extends RemotingCommand case object StartupFinished extends RemotingCommand
case object ShutdownAndFlush extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand
final case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None) final case class Send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef, seqOpt: Option[SeqNo] = None)
extends RemotingCommand with HasSequenceNumber { extends RemotingCommand with HasSequenceNumber {
override def toString = s"Remote message $senderOption -> $recipient" override def toString = s"Remote message $senderOption -> $recipient"

View file

@ -77,6 +77,7 @@ import akka.actor.Cancellable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -84,7 +85,7 @@ private[akka] final case class InboundEnvelope(
recipient: InternalActorRef, recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
message: AnyRef, message: AnyRef,
senderOption: Option[ActorRef], senderOption: OptionVal[ActorRef],
originUid: Long) originUid: Long)
/** /**
@ -111,11 +112,10 @@ private[akka] trait InboundContext {
/** /**
* Lookup the outbound association for a given UID. * Lookup the outbound association for a given UID.
* Will return `null` if the UID is unknown, i.e. * Will return `OptionVal.None` if the UID is unknown, i.e.
* handshake not completed. `null` is used instead of `Optional` * handshake not completed.
* to avoid allocations.
*/ */
def association(uid: Long): OutboundContext def association(uid: Long): OptionVal[OutboundContext]
def completeHandshake(peer: UniqueAddress): Unit def completeHandshake(peer: UniqueAddress): Unit
@ -603,7 +603,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def sendControl(to: Address, message: ControlMessage) = override def sendControl(to: Address, message: ControlMessage) =
association(to).sendControl(message) association(to).sendControl(message)
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = {
val cached = recipient.cachedAssociation val cached = recipient.cachedAssociation
val a = val a =
@ -620,7 +620,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def association(remoteAddress: Address): Association = override def association(remoteAddress: Address): Association =
associationRegistry.association(remoteAddress) associationRegistry.association(remoteAddress)
override def association(uid: Long): Association = override def association(uid: Long): OptionVal[Association] =
associationRegistry.association(uid) associationRegistry.association(uid)
override def completeHandshake(peer: UniqueAddress): Unit = { override def completeHandshake(peer: UniqueAddress): Unit = {

View file

@ -4,7 +4,6 @@
package akka.remote.artery package akka.remote.artery
import java.util.Queue import java.util.Queue
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
@ -36,6 +35,7 @@ import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.util.{ Unsafe, WildcardTree } import akka.util.{ Unsafe, WildcardTree }
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -158,7 +158,7 @@ private[remote] class Association(
override def sendControl(message: ControlMessage): Unit = override def sendControl(message: ControlMessage): Unit =
outboundControlIngress.sendControlMessage(message) outboundControlIngress.sendControlMessage(message)
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = {
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
// FIXME where is that ActorSelectionMessage check in old remoting? // FIXME where is that ActorSelectionMessage check in old remoting?
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
@ -224,7 +224,7 @@ private[remote] class Association(
"Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}",
remoteAddress, u, reason) remoteAddress, u, reason)
// end delivery of system messages to that incarnation after this point // end delivery of system messages to that incarnation after this point
send(ClearSystemMessageDelivery, None, dummyRecipient) send(ClearSystemMessageDelivery, OptionVal.None, dummyRecipient)
// try to tell the other system that we have quarantined it // try to tell the other system that we have quarantined it
sendControl(Quarantined(localAddress, peer)) sendControl(Quarantined(localAddress, peer))
} else } else
@ -408,8 +408,8 @@ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Associa
} }
} }
def association(uid: Long): Association = def association(uid: Long): OptionVal[Association] =
associationsByUid.get(uid) OptionVal(associationsByUid.get(uid))
def setUID(peer: UniqueAddress): Association = { def setUID(peer: UniqueAddress): Association = {
val a = association(peer.address) val a = association(peer.address)

View file

@ -97,6 +97,7 @@ sealed trait HeaderBuilder {
def senderActorRef: String def senderActorRef: String
def setNoSender(): Unit def setNoSender(): Unit
def isNoSender: Boolean
def recipientActorRef_=(ref: String): Unit def recipientActorRef_=(ref: String): Unit
def recipientActorRef: String def recipientActorRef: String
@ -135,6 +136,9 @@ private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompr
_senderActorRefIdx = EnvelopeBuffer.DeadLettersCode _senderActorRefIdx = EnvelopeBuffer.DeadLettersCode
} }
def isNoSender: Boolean =
(_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode
def senderActorRef: String = { def senderActorRef: String = {
if (_senderActorRef ne null) _senderActorRef if (_senderActorRef ne null) _senderActorRef
else { else {

View file

@ -1,7 +1,6 @@
package akka.remote.artery package akka.remote.artery
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ ActorRef, InternalActorRef } import akka.actor.{ ActorRef, InternalActorRef }
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
@ -11,6 +10,7 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension } import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._ import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal
// TODO: Long UID // TODO: Long UID
class Encoder( class Encoder(
@ -34,7 +34,6 @@ class Encoder(
private val serialization = SerializationExtension(system) private val serialization = SerializationExtension(system)
private val serializationInfo = Serialization.Information(localAddress, system) private val serializationInfo = Serialization.Information(localAddress, system)
private val noSender = system.deadLetters.path.toSerializationFormatWithAddress(localAddress)
private val senderCache = new java.util.HashMap[ActorRef, String] private val senderCache = new java.util.HashMap[ActorRef, String]
private var recipientCache = new java.util.HashMap[ActorRef, String] private var recipientCache = new java.util.HashMap[ActorRef, String]
@ -57,7 +56,8 @@ class Encoder(
headerBuilder.recipientActorRef = recipientStr headerBuilder.recipientActorRef = recipientStr
send.senderOption match { send.senderOption match {
case Some(sender) case OptionVal.None => headerBuilder.setNoSender()
case OptionVal.Some(sender) =>
val senderStr = senderCache.get(sender) match { val senderStr = senderCache.get(sender) match {
case null case null
val s = sender.path.toSerializationFormatWithAddress(localAddress) val s = sender.path.toSerializationFormatWithAddress(localAddress)
@ -69,9 +69,6 @@ class Encoder(
case s s case s s
} }
headerBuilder.senderActorRef = senderStr headerBuilder.senderActorRef = senderStr
case None
//headerBuilder.setNoSender()
headerBuilder.senderActorRef = noSender
} }
try { try {
@ -124,7 +121,7 @@ class Decoder(
private val serialization = SerializationExtension(system) private val serialization = SerializationExtension(system)
private val recipientCache = new java.util.HashMap[String, InternalActorRef] private val recipientCache = new java.util.HashMap[String, InternalActorRef]
private val senderCache = new java.util.HashMap[String, Option[ActorRef]] private val senderCache = new java.util.HashMap[String, ActorRef]
override protected def logSource = classOf[Decoder] override protected def logSource = classOf[Decoder]
@ -146,16 +143,20 @@ class Decoder(
case ref ref case ref ref
} }
val senderOption: Option[ActorRef] = senderCache.get(headerBuilder.senderActorRef) match { val senderOption =
if (headerBuilder.isNoSender)
OptionVal.None
else {
senderCache.get(headerBuilder.senderActorRef) match {
case null case null
val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef) val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef)
// FIXME this cache will be replaced by compression table // FIXME this cache will be replaced by compression table
if (senderCache.size() >= 1000) if (senderCache.size() >= 1000)
senderCache.clear() senderCache.clear()
val refOpt = Some(ref) senderCache.put(headerBuilder.senderActorRef, ref)
senderCache.put(headerBuilder.senderActorRef, refOpt) OptionVal(ref)
refOpt case ref OptionVal(ref)
case refOpt refOpt }
} }
try { try {
@ -166,7 +167,7 @@ class Decoder(
recipient, recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here? localAddress, // FIXME: Is this needed anymore? What should we do here?
deserializedMessage, deserializedMessage,
senderOption, // FIXME: No need for an option, decode simply to deadLetters instead senderOption,
headerBuilder.uid) headerBuilder.uid)
push(out, decoded) push(out, decoded)

View file

@ -18,6 +18,7 @@ import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler import akka.stream.stage.OutHandler
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.util.OptionVal
/** /**
* INTERNAL API: Marker trait for reply messages * INTERNAL API: Marker trait for reply messages
@ -197,7 +198,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext)
} }
private def wrap(message: ControlMessage): Send = private def wrap(message: ControlMessage): Send =
Send(message, None, outboundContext.dummyRecipient, None) Send(message, OptionVal.None, outboundContext.dummyRecipient, None)
setHandlers(in, out, this) setHandlers(in, out, this)
} }

View file

@ -5,7 +5,6 @@ package akka.remote.artery
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.remote.EndpointManager.Send import akka.remote.EndpointManager.Send
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.stream.Attributes import akka.stream.Attributes
@ -17,6 +16,7 @@ import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic import akka.stream.stage.TimerGraphStageLogic
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -123,7 +123,7 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
private def pushHandshakeReq(): Unit = { private def pushHandshakeReq(): Unit = {
injectHandshakeTickScheduled = true injectHandshakeTickScheduled = true
scheduleOnce(InjectHandshakeTick, injectHandshakeInterval) scheduleOnce(InjectHandshakeTick, injectHandshakeInterval)
push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None)) push(out, Send(HandshakeReq(outboundContext.localAddress), OptionVal.None, outboundContext.dummyRecipient, None))
} }
private def handshakeCompleted(): Unit = { private def handshakeCompleted(): Unit = {
@ -215,7 +215,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
private def isKnownOrigin(originUid: Long): Boolean = { private def isKnownOrigin(originUid: Long): Boolean = {
// FIXME these association lookups are probably too costly for each message, need local cache or something // FIXME these association lookups are probably too costly for each message, need local cache or something
(inboundContext.association(originUid) ne null) inboundContext.association(originUid).isDefined
} }
// OutHandler // OutHandler

View file

@ -12,6 +12,7 @@ import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler import akka.stream.stage.OutHandler
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -28,10 +29,10 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten
override def onPush(): Unit = { override def onPush(): Unit = {
val env = grab(in) val env = grab(in)
inboundContext.association(env.originUid) match { inboundContext.association(env.originUid) match {
case null case OptionVal.None =>
// unknown, handshake not completed // unknown, handshake not completed
push(out, env) push(out, env)
case association case OptionVal.Some(association) =>
if (association.associationState.isQuarantined(env.originUid)) { if (association.associationState.isQuarantined(env.originUid)) {
inboundContext.sendControl( inboundContext.sendControl(
association.remoteAddress, association.remoteAddress,

View file

@ -16,6 +16,7 @@ import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging import akka.event.Logging
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteRef import akka.remote.RemoteRef
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -31,7 +32,7 @@ private[akka] class MessageDispatcher(
recipient: InternalActorRef, recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
message: AnyRef, message: AnyRef,
senderOption: Option[ActorRef]): Unit = { senderOption: OptionVal[ActorRef]): Unit = {
import provider.remoteSettings._ import provider.remoteSettings._

View file

@ -10,6 +10,7 @@ import akka.remote._
import akka.util.ByteString import akka.util.ByteString
import akka.protobuf.InvalidProtocolBufferException import akka.protobuf.InvalidProtocolBufferException
import akka.protobuf.{ ByteString PByteString } import akka.protobuf.{ ByteString PByteString }
import akka.util.OptionVal
/** /**
* INTERNAL API * INTERNAL API
@ -38,7 +39,7 @@ private[remote] object AkkaPduCodec {
recipient: InternalActorRef, recipient: InternalActorRef,
recipientAddress: Address, recipientAddress: Address,
serializedMessage: SerializedMessage, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef], senderOption: OptionVal[ActorRef],
seqOption: Option[SeqNo]) extends HasSequenceNumber { seqOption: Option[SeqNo]) extends HasSequenceNumber {
def reliableDeliveryEnabled = seqOption.isDefined def reliableDeliveryEnabled = seqOption.isDefined
@ -97,7 +98,7 @@ private[remote] trait AkkaPduCodec {
localAddress: Address, localAddress: Address,
recipient: ActorRef, recipient: ActorRef,
serializedMessage: SerializedMessage, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef], senderOption: OptionVal[ActorRef],
seqOption: Option[SeqNo] = None, seqOption: Option[SeqNo] = None,
ackOption: Option[Ack] = None): ByteString ackOption: Option[Ack] = None): ByteString
@ -121,7 +122,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
localAddress: Address, localAddress: Address,
recipient: ActorRef, recipient: ActorRef,
serializedMessage: SerializedMessage, serializedMessage: SerializedMessage,
senderOption: Option[ActorRef], senderOption: OptionVal[ActorRef],
seqOption: Option[SeqNo] = None, seqOption: Option[SeqNo] = None,
ackOption: Option[Ack] = None): ByteString = { ackOption: Option[Ack] = None): ByteString = {
@ -130,7 +131,11 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
val envelopeBuilder = RemoteEnvelope.newBuilder val envelopeBuilder = RemoteEnvelope.newBuilder
envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient)) envelopeBuilder.setRecipient(serializeActorRef(recipient.path.address, recipient))
senderOption foreach { ref envelopeBuilder.setSender(serializeActorRef(localAddress, ref)) } senderOption match {
case OptionVal.Some(sender) => envelopeBuilder.setSender(serializeActorRef(localAddress, sender))
case OptionVal.None =>
}
seqOption foreach { seq envelopeBuilder.setSeq(seq.rawValue) } seqOption foreach { seq envelopeBuilder.setSeq(seq.rawValue) }
ackOption foreach { ack ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) } ackOption foreach { ack ackAndEnvelopeBuilder.setAck(ackBuilder(ack)) }
envelopeBuilder.setMessage(serializedMessage) envelopeBuilder.setMessage(serializedMessage)
@ -193,8 +198,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath), recipientAddress = AddressFromURIString(msgPdu.getRecipient.getPath),
serializedMessage = msgPdu.getMessage, serializedMessage = msgPdu.getMessage,
senderOption = senderOption =
if (msgPdu.hasSender) Some(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress)) if (msgPdu.hasSender) OptionVal(provider.resolveActorRefWithLocalAddress(msgPdu.getSender.getPath, localAddress))
else None, else OptionVal.None,
seqOption = seqOption =
if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None)) if (msgPdu.hasSeq) Some(SeqNo(msgPdu.getSeq)) else None))
} else None } else None

View file

@ -4,7 +4,6 @@
package akka.remote.artery package akka.remote.artery
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.actor.InternalActorRef import akka.actor.InternalActorRef
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
@ -18,6 +17,7 @@ import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.util.OptionVal
object InboundControlJunctionSpec { object InboundControlJunctionSpec {
case object Control1 extends ControlMessage case object Control1 extends ControlMessage
@ -42,7 +42,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
val recipient = null.asInstanceOf[InternalActorRef] // not used val recipient = null.asInstanceOf[InternalActorRef] // not used
val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef]
.map(msg InboundEnvelope(recipient, addressB.address, msg, None, addressA.uid)) .map(msg InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid))
.viaMat(new InboundControlJunction)(Keep.both) .viaMat(new InboundControlJunction)(Keep.both)
.map { case InboundEnvelope(_, _, msg, _, _) msg } .map { case InboundEnvelope(_, _, msg, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both) .toMat(TestSink.probe[Any])(Keep.both)

View file

@ -5,7 +5,6 @@ package akka.remote.artery
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.actor.InternalActorRef import akka.actor.InternalActorRef
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
@ -22,6 +21,7 @@ import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.util.OptionVal
object InboundHandshakeSpec { object InboundHandshakeSpec {
case object Control1 extends ControlMessage case object Control1 extends ControlMessage
@ -41,7 +41,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
val recipient = null.asInstanceOf[InternalActorRef] // not used val recipient = null.asInstanceOf[InternalActorRef] // not used
TestSource.probe[AnyRef] TestSource.probe[AnyRef]
.map(msg InboundEnvelope(recipient, addressB.address, msg, None, addressA.uid)) .map(msg InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid))
.via(new InboundHandshake(inboundContext, inControlStream = true)) .via(new InboundHandshake(inboundContext, inControlStream = true))
.map { case InboundEnvelope(_, _, msg, _, _) msg } .map { case InboundEnvelope(_, _, msg, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both) .toMat(TestSink.probe[Any])(Keep.both)

View file

@ -4,7 +4,6 @@
package akka.remote.artery package akka.remote.artery
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.remote.EndpointManager.Send import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef import akka.remote.RemoteActorRef
@ -17,6 +16,7 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.util.OptionVal
object OutboundControlJunctionSpec { object OutboundControlJunctionSpec {
case object Control1 extends ControlMessage case object Control1 extends ControlMessage
@ -41,7 +41,7 @@ class OutboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
val destination = null.asInstanceOf[RemoteActorRef] // not used val destination = null.asInstanceOf[RemoteActorRef] // not used
val ((upstream, controlIngress), downstream) = TestSource.probe[String] val ((upstream, controlIngress), downstream) = TestSource.probe[String]
.map(msg Send(msg, None, destination, None)) .map(msg Send(msg, OptionVal.None, destination, None))
.viaMat(new OutboundControlJunction(outboundContext))(Keep.both) .viaMat(new OutboundControlJunction(outboundContext))(Keep.both)
.map { case Send(msg, _, _, _) msg } .map { case Send(msg, _, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both) .toMat(TestSink.probe[Any])(Keep.both)

View file

@ -4,7 +4,6 @@
package akka.remote.artery package akka.remote.artery
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.remote.EndpointManager.Send import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef import akka.remote.RemoteActorRef
@ -22,6 +21,7 @@ import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
import akka.testkit.TestProbe import akka.testkit.TestProbe
import akka.util.OptionVal
class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
@ -38,7 +38,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val destination = null.asInstanceOf[RemoteActorRef] // not used val destination = null.asInstanceOf[RemoteActorRef] // not used
TestSource.probe[String] TestSource.probe[String]
.map(msg Send(msg, None, destination, None)) .map(msg Send(msg, OptionVal.None, destination, None))
.via(new OutboundHandshake(outboundContext, timeout, retryInterval, injectHandshakeInterval)) .via(new OutboundHandshake(outboundContext, timeout, retryInterval, injectHandshakeInterval))
.map { case Send(msg, _, _, _) msg } .map { case Send(msg, _, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both) .toMat(TestSink.probe[Any])(Keep.both)

View file

@ -4,10 +4,8 @@
package akka.remote.artery package akka.remote.artery
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorIdentity import akka.actor.ActorIdentity
import akka.actor.ActorSystem import akka.actor.ActorSystem
@ -33,6 +31,7 @@ import akka.testkit.ImplicitSender
import akka.testkit.TestActors import akka.testkit.TestActors
import akka.testkit.TestProbe import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.util.OptionVal
object SystemMessageDeliverySpec { object SystemMessageDeliverySpec {
@ -68,7 +67,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = { private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = {
val remoteRef = null.asInstanceOf[RemoteActorRef] // not used val remoteRef = null.asInstanceOf[RemoteActorRef] // not used
Source(1 to sendCount) Source(1 to sendCount)
.map(n Send("msg-" + n, None, remoteRef, None)) .map(n Send("msg-" + n, OptionVal.None, remoteRef, None))
.via(new SystemMessageDelivery(outboundContext, resendInterval, maxBufferSize = 1000)) .via(new SystemMessageDelivery(outboundContext, resendInterval, maxBufferSize = 1000))
} }
@ -77,7 +76,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
Flow[Send] Flow[Send]
.map { .map {
case Send(sysEnv: SystemMessageEnvelope, _, _, _) case Send(sysEnv: SystemMessageEnvelope, _, _, _)
InboundEnvelope(recipient, addressB.address, sysEnv, None, addressA.uid) InboundEnvelope(recipient, addressB.address, sysEnv, OptionVal.None, addressA.uid)
} }
.async .async
.via(new SystemMessageAcker(inboundContext)) .via(new SystemMessageAcker(inboundContext))

View file

@ -6,11 +6,9 @@ package akka.remote.artery
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.util.Success import scala.util.Success
import akka.Done import akka.Done
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Address import akka.actor.Address
@ -18,6 +16,7 @@ import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageObserver
import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.util.OptionVal
private[akka] class TestInboundContext( private[akka] class TestInboundContext(
override val localAddress: UniqueAddress, override val localAddress: UniqueAddress,
@ -44,8 +43,8 @@ private[akka] class TestInboundContext(
case existing existing case existing existing
} }
override def association(uid: Long): OutboundContext = override def association(uid: Long): OptionVal[OutboundContext] =
associationsByUid.get(uid) OptionVal(associationsByUid.get(uid))
override def completeHandshake(peer: UniqueAddress): Unit = { override def completeHandshake(peer: UniqueAddress): Unit = {
val a = association(peer.address).asInstanceOf[TestOutboundContext] val a = association(peer.address).asInstanceOf[TestOutboundContext]
@ -85,7 +84,7 @@ private[akka] class TestOutboundContext(
override def sendControl(message: ControlMessage) = { override def sendControl(message: ControlMessage) = {
controlProbe.foreach(_ ! message) controlProbe.foreach(_ ! message)
controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress.uid)) controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, OptionVal.None, localAddress.uid))
} }
// FIXME we should be able to Send without a recipient ActorRef // FIXME we should be able to Send without a recipient ActorRef

View file

@ -14,6 +14,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, Promise } import scala.concurrent.{ Await, Promise }
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.util.OptionVal
object AkkaProtocolSpec { object AkkaProtocolSpec {
@ -66,7 +67,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
val codec = AkkaPduProtobufCodec val codec = AkkaPduProtobufCodec
val testMsg = WireFormats.SerializedMessage.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build val testMsg = WireFormats.SerializedMessage.newBuilder().setSerializerId(0).setMessage(PByteString.copyFromUtf8("foo")).build
val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, None) val testEnvelope = codec.constructMessage(localAkkaAddress, testActor, testMsg, OptionVal.None)
val testMsgPdu: ByteString = codec.constructPayload(testEnvelope) val testMsgPdu: ByteString = codec.constructPayload(testEnvelope)
def testHeartbeat = InboundPayload(codec.constructHeartbeat) def testHeartbeat = InboundPayload(codec.constructHeartbeat)

View file

@ -853,6 +853,14 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"),
// Use OptionVal in remote Send envelope
FilterAnyProblemStartingWith("akka.remote.EndpointManager"),
FilterAnyProblemStartingWith("akka.remote.Remoting"),
FilterAnyProblemStartingWith("akka.remote.RemoteTransport"),
FilterAnyProblemStartingWith("akka.remote.InboundMessageDispatcher"),
FilterAnyProblemStartingWith("akka.remote.DefaultMessageDispatcher"),
FilterAnyProblemStartingWith("akka.remote.transport"),
// internal api // internal api
FilterAnyProblemStartingWith("akka.stream.impl"), FilterAnyProblemStartingWith("akka.stream.impl"),