initiate new handshake after restart of receiving system, #20568

* we don't want to include the full origin address in each message,
  only the UID
* that means that the restarted receiving system can't initate a
  new handshake immediately when it sees message from unknown origin
* instead we inject HandshakeReq from the sending system once in a while
  (1 per second) which will trigger the new handshake
* any messages that arrives before the HandshakeReq are dropped, but
  that is fine since the system was just restarted anyway
* note that the injected handshake is only done for active connections,
  when a message is sent
* also changed the UID to a Long, but there are more places in old remoting
  that must be changed before we actually can use a Long value

fix lost first message, #20566

* the first message was sometimes dropped by the InboundHandshake stage
  because it came from unknown origin, i.e. the handshake had not completed
* that happended because the ordinary messagage arrived before the
  first HandshakeReq, which may happen since we sent the HandshakeReq
  over the control stream
* this changes so that HandshakeReq is sent over the same stream, not
  only on the control stream and thereby the HandshakeReq will arrive
  before any other message
* always send HandshakeReq as first message
  * also when the handshake on sender side has been completed at startup
  * moved code from preStart to onPull
This commit is contained in:
Patrik Nordwall 2016-05-25 12:28:44 +02:00
parent 92404bc470
commit 7505393c89
17 changed files with 298 additions and 145 deletions

View file

@ -91,7 +91,6 @@ abstract class HandshakeRestartReceiverSpec
val (secondUid2, subject2) = identifyWithUid(secondRootPath, "subject2")
secondUid2 should !==(secondUid)
val secondUniqueRemoteAddress2 = Await.result(secondAssociation.associationState.uniqueRemoteAddress, 3.seconds)
println(s"# ${secondAssociation.associationState} secondUid $secondUid $secondUid2") // FIXME
secondUniqueRemoteAddress2.uid should ===(secondUid2)
secondUniqueRemoteAddress2.address should ===(secondAddress)
secondUniqueRemoteAddress2 should !==(secondUniqueRemoteAddress)

View file

@ -25,5 +25,12 @@ object AddressUidExtension extends ExtensionId[AddressUidExtension] with Extensi
}
class AddressUidExtension(val system: ExtendedActorSystem) extends Extension {
val addressUid: Int = ThreadLocalRandom.current.nextInt()
val longAddressUid: Long = {
// FIXME we should use a long here, but then we need to change in Cluster and RemoteWatcher also
//ThreadLocalRandom.current.nextLong()
ThreadLocalRandom.current.nextInt()
}
@deprecated("Use longAddressUid instead", "2.4.x")
val addressUid: Int = longAddressUid.toInt
}

View file

@ -6,12 +6,15 @@ package akka.remote
import akka.actor.Address
@SerialVersionUID(1L)
final case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] {
override def hashCode = uid
final case class UniqueAddress(address: Address, uid: Long) extends Ordered[UniqueAddress] {
override def hashCode = java.lang.Long.hashCode(uid)
def compare(that: UniqueAddress): Int = {
val result = Address.addressOrdering.compare(this.address, that.address)
if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1
else result
}
override def toString(): String =
address + "#" + uid
}

View file

@ -5,8 +5,6 @@ package akka.remote.artery
import java.io.File
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunction }
import scala.concurrent.Future
import scala.concurrent.Promise
@ -79,7 +77,7 @@ private[akka] final case class InboundEnvelope(
recipientAddress: Address,
message: AnyRef,
senderOption: Option[ActorRef],
originAddress: UniqueAddress)
originUid: Long)
/**
* INTERNAL API
@ -103,6 +101,16 @@ private[akka] trait InboundContext {
*/
def association(remoteAddress: Address): OutboundContext
/**
* Lookup the outbound association for a given UID.
* Will return `null` if the UID is unknown, i.e.
* handshake not completed. `null` is used instead of `Optional`
* to avoid allocations.
*/
def association(uid: Long): OutboundContext
def completeHandshake(peer: UniqueAddress): Unit
}
/**
@ -150,7 +158,7 @@ private[akka] final class AssociationState(
}
def isQuarantined(uid: Long): Boolean = {
// FIXME does this mean boxing (allocation) because of Set[Long]? Use specialized Set. LongMap?
// FIXME does this mean boxing (allocation) because of Set[Long]? Use specialized Set. org.agrona.collections.LongHashSet?
quarantined(uid)
}
@ -183,8 +191,6 @@ private[akka] trait OutboundContext {
def associationState: AssociationState
def completeHandshake(peer: UniqueAddress): Unit
def quarantine(reason: String): Unit
/**
@ -236,6 +242,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val handshakeTimeout: FiniteDuration =
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero,
"handshake-timeout must be > 0")
private val injectHandshakeInterval: FiniteDuration = 1.second
private val giveUpSendAfter: FiniteDuration = 60.seconds
private val largeMessageDestinations =
@ -252,9 +259,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val largeStreamId = 4
private val taskRunner = new TaskRunner(system)
// FIXME: This does locking on putIfAbsent, we need something smarter
private[this] val associations = new ConcurrentHashMap[Address, Association]()
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
@ -266,6 +270,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// of having a global one
val compression = new Compression(system)
private val associationRegistry = new AssociationRegistry(
remoteAddress new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations))
override def start(): Unit = {
startMediaDriver()
startAeron()
@ -280,7 +287,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// TODO: Have a supervisor actor
_localAddress = UniqueAddress(
Address("artery", system.name, remoteSettings.ArteryHostname, port),
AddressUidExtension(system).addressUid)
AddressUidExtension(system).longAddressUid)
materializer = ActorMaterializer()(system)
messageDispatcher = new MessageDispatcher(system, provider)
@ -457,29 +464,28 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
a.send(message, senderOption, recipient)
}
override def association(remoteAddress: Address): Association = {
val current = associations.get(remoteAddress)
if (current ne null) current
else {
associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] {
override def apply(remoteAddress: Address): Association = {
val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, controlSubject, largeMessageDestinations)
newAssociation.associate() // This is a bit costly for this blocking method :(
newAssociation
}
})
}
override def association(remoteAddress: Address): Association =
associationRegistry.association(remoteAddress)
override def association(uid: Long): Association =
associationRegistry.association(uid)
override def completeHandshake(peer: UniqueAddress): Unit = {
val a = associationRegistry.setUID(peer)
a.completeHandshake(peer)
}
private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit =
eventPublisher.notifyListeners(event)
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit =
association(remoteAddress).quarantine(reason = "", uid) // FIXME change the method signature (old remoting) to include reason?
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = {
// FIXME change the method signature (old remoting) to include reason and use Long uid?
association(remoteAddress).quarantine(reason = "", uid.map(_.toLong))
}
def outbound(outboundContext: OutboundContext): Sink[Send, Future[Done]] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval))
.via(encoder)
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter))(Keep.right)
@ -487,7 +493,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval))
.via(createEncoder(largeEnvelopePool))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter))(Keep.right)
@ -495,7 +501,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval))
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize))
.viaMat(new OutboundControlJunction(outboundContext))(Keep.right)
.via(encoder)
@ -505,9 +511,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
}
// FIXME hack until real envelopes, encoding originAddress in sender :)
private val dummySender = system.systemActorOf(Props.empty, "dummy")
def createEncoder(pool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(localAddress, system, compression, pool))

View file

@ -3,8 +3,10 @@
*/
package akka.remote.artery
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.function.{ Function JFunction }
import scala.annotation.tailrec
import scala.concurrent.Future
@ -12,6 +14,7 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.util.Success
import akka.{ Done, NotUsed }
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
@ -19,8 +22,8 @@ import akka.actor.Address
import akka.actor.RootActorPath
import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote.EndpointManager.Send
import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress }
import akka.remote.EndpointManager.Send
import akka.remote.artery.AeronSink.GaveUpSendingException
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
@ -99,7 +102,7 @@ private[akka] class Association(
def associationState: AssociationState =
Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]
override def completeHandshake(peer: UniqueAddress): Unit = {
def completeHandshake(peer: UniqueAddress): Unit = {
require(remoteAddress == peer.address,
s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}")
val current = associationState
@ -177,7 +180,7 @@ private[akka] class Association(
quarantine(reason, uid)
}
@tailrec final def quarantine(reason: String, uid: Option[Int]): Unit = {
@tailrec final def quarantine(reason: String, uid: Option[Long]): Unit = {
uid match {
case Some(u)
val current = associationState
@ -279,4 +282,42 @@ private[akka] class Association(
}
}
}
override def toString(): String =
s"Association($localAddress -> $remoteAddress with $associationState)"
}
/**
* INTERNAL API
*/
private[remote] class AssociationRegistry(createAssociation: Address Association) {
// FIXME: This does locking on putIfAbsent, we need something smarter
private[this] val associationsByAddress = new ConcurrentHashMap[Address, Association]()
private[this] val associationsByUid = new ConcurrentHashMap[Long, Association]()
def association(remoteAddress: Address): Association = {
val current = associationsByAddress.get(remoteAddress)
if (current ne null) current
else {
associationsByAddress.computeIfAbsent(remoteAddress, new JFunction[Address, Association] {
override def apply(remoteAddress: Address): Association = {
val newAssociation = createAssociation(remoteAddress)
newAssociation.associate() // This is a bit costly for this blocking method :(
newAssociation
}
})
}
}
def association(uid: Long): Association =
associationsByUid.get(uid)
def setUID(peer: UniqueAddress): Association = {
val a = association(peer.address)
val previous = associationsByUid.put(peer.uid, a)
if ((previous ne null) && (previous ne a))
throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]")
a
}
}

View file

@ -48,14 +48,14 @@ private[remote] object EnvelopeBuffer {
val TagTypeMask = 0xFF000000
val TagValueMask = 0x0000FFFF
val VersionOffset = 0
val UidOffset = 4
val SerializerOffset = 8
val SenderActorRefTagOffset = 12
val RecipientActorRefTagOffset = 16
val ClassManifestTagOffset = 20
val VersionOffset = 0 // Int
val UidOffset = 4 // Long
val SerializerOffset = 12 // Int
val SenderActorRefTagOffset = 16 // Int
val RecipientActorRefTagOffset = 20 // Int
val ClassManifestTagOffset = 24 // Int
val LiteralsSectionOffset = 32
val LiteralsSectionOffset = 28
val UsAscii = Charset.forName("US-ASCII")
@ -86,8 +86,8 @@ sealed trait HeaderBuilder {
def version_=(v: Int): Unit
def version: Int
def uid_=(u: Int): Unit
def uid: Int
def uid_=(u: Long): Unit
def uid: Long
def senderActorRef_=(ref: String): Unit
def senderActorRef: String
@ -109,7 +109,7 @@ sealed trait HeaderBuilder {
*/
private[remote] final class HeaderBuilderImpl(val compressionTable: LiteralCompressionTable) extends HeaderBuilder {
var version: Int = _
var uid: Int = _
var uid: Long = _
// Fields only available for EnvelopeBuffer
var _senderActorRef: String = null
@ -203,7 +203,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
// Write fixed length parts
byteBuffer.putInt(header.version)
byteBuffer.putInt(header.uid)
byteBuffer.putLong(header.uid)
byteBuffer.putInt(header.serializer)
// Write compressable, variable-length parts always to the actual position of the buffer
@ -234,7 +234,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
// Read fixed length parts
header.version = byteBuffer.getInt
header.uid = byteBuffer.getInt
header.uid = byteBuffer.getLong
header.serializer = byteBuffer.getInt
// Read compressable, variable-length parts always from the actual position of the buffer

View file

@ -167,7 +167,7 @@ class Decoder(
localAddress, // FIXME: Is this needed anymore? What should we do here?
deserializedMessage,
senderOption, // FIXME: No need for an option, decode simply to deadLetters instead
UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568
headerBuilder.uid)
push(out, decoded)
} catch {

View file

@ -4,7 +4,6 @@
package akka.remote.artery
import scala.concurrent.duration._
import scala.util.Success
import scala.util.control.NoStackTrace
import akka.remote.EndpointManager.Send
@ -41,13 +40,15 @@ private[akka] object OutboundHandshake {
private case object HandshakeTimeout
private case object HandshakeRetryTick
private case object InjectHandshakeTick
}
/**
* INTERNAL API
*/
private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration, retryInterval: FiniteDuration)
private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration,
retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration)
extends GraphStage[FlowShape[Send, Send]] {
val in: Inlet[Send] = Inlet("OutboundHandshake.in")
@ -59,12 +60,45 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
import OutboundHandshake._
private var handshakeState: HandshakeState = Start
private var pendingMessage: Send = null
private var injectHandshakeTickScheduled = false
override def preStart(): Unit = {
// InHandler
override def onPush(): Unit = {
if (handshakeState != Completed)
throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState]")
// inject a HandshakeReq once in a while to trigger a new handshake when destination
// system has been restarted
if (injectHandshakeTickScheduled) {
push(out, grab(in))
} else {
pushHandshakeReq()
pendingMessage = grab(in)
}
}
// OutHandler
override def onPull(): Unit = {
handshakeState match {
case Completed
if (pendingMessage eq null)
pull(in)
else {
push(out, pendingMessage)
pendingMessage = null
}
case Start
val uniqueRemoteAddress = outboundContext.associationState.uniqueRemoteAddress
if (uniqueRemoteAddress.isCompleted) {
handshakeState = Completed
} else {
// will pull when handshake reply is received (uniqueRemoteAddress completed)
handshakeState = ReqInProgress
scheduleOnce(HandshakeTimeout, timeout)
schedulePeriodically(HandshakeRetryTick, retryInterval)
// The InboundHandshake stage will complete the uniqueRemoteAddress future
// when it receives the HandshakeRsp reply
implicit val ec = materializer.executionContext
@ -78,31 +112,19 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
}.invoke
}
}
}
// InHandler
override def onPush(): Unit = {
if (handshakeState != Completed)
throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState]")
push(out, grab(in))
}
// always push a HandshakeReq as the first message
pushHandshakeReq()
// OutHandler
override def onPull(): Unit = {
handshakeState match {
case Completed pull(in)
case Start
// will pull when handshake reply is received (uniqueRemoteAddress completed)
handshakeState = ReqInProgress
scheduleOnce(HandshakeTimeout, timeout)
schedulePeriodically(HandshakeRetryTick, retryInterval)
sendHandshakeReq()
case ReqInProgress // will pull when handshake reply is received
}
}
private def sendHandshakeReq(): Unit =
outboundContext.sendControl(HandshakeReq(outboundContext.localAddress))
private def pushHandshakeReq(): Unit = {
injectHandshakeTickScheduled = true
scheduleOnce(InjectHandshakeTick, injectHandshakeInterval)
push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None))
}
private def handshakeCompleted(): Unit = {
handshakeState = Completed
@ -112,10 +134,13 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case InjectHandshakeTick
// next onPush message will trigger sending of HandshakeReq
injectHandshakeTickScheduled = false
case HandshakeRetryTick
sendHandshakeReq()
if (isAvailable(out))
pushHandshakeReq()
case HandshakeTimeout
// FIXME would it make sense to retry a few times before failing?
failStage(new HandshakeTimeoutException(
s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms"))
}
@ -134,7 +159,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler {
new TimerGraphStageLogic(shape) with OutHandler with StageLogging {
import OutboundHandshake._
// InHandler
@ -143,11 +168,9 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
override def onPush(): Unit = {
grab(in) match {
case InboundEnvelope(_, _, HandshakeReq(from), _, _)
inboundContext.association(from.address).completeHandshake(from)
inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
pull(in)
onHandshakeReq(from)
case InboundEnvelope(_, _, HandshakeRsp(from), _, _)
inboundContext.association(from.address).completeHandshake(from)
inboundContext.completeHandshake(from)
pull(in)
case other onMessage(other)
}
@ -155,29 +178,42 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
})
else
setHandler(in, new InHandler {
override def onPush(): Unit = onMessage(grab(in))
override def onPush(): Unit = {
grab(in) match {
case InboundEnvelope(_, _, HandshakeReq(from), _, _)
onHandshakeReq(from)
case other onMessage(other)
}
}
})
private def onHandshakeReq(from: UniqueAddress): Unit = {
inboundContext.completeHandshake(from)
inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
pull(in)
}
private def onMessage(env: InboundEnvelope): Unit = {
if (isKnownOrigin(env.originAddress))
if (isKnownOrigin(env.originUid))
push(out, env)
else {
inboundContext.sendControl(env.originAddress.address, HandshakeReq(inboundContext.localAddress))
// FIXME Note that we have the originAddress that would be needed to complete the handshake
// but it is not done here because the handshake might exchange more information.
// Is that a valid thought?
// drop message from unknown, this system was probably restarted
// FIXME remove, only debug
log.warning(s"Dropping message [{}] from unknown system with UID [{}]. " +
"This system with UID [{}] was probably restarted. " +
"Messages will be accepted when new handshake has been completed.",
env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
if (log.isDebugEnabled)
log.debug(s"Dropping message [{}] from unknown system with UID [{}]. " +
"This system with UID [{}] was probably restarted. " +
"Messages will be accepted when new handshake has been completed.",
env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
pull(in)
}
}
private def isKnownOrigin(originAddress: UniqueAddress): Boolean = {
private def isKnownOrigin(originUid: Long): Boolean = {
// FIXME these association lookups are probably too costly for each message, need local cache or something
val associationState = inboundContext.association(originAddress.address).associationState
associationState.uniqueRemoteAddressValue() match {
case Some(Success(a)) if a.uid == originAddress.uid true
case x false
}
(inboundContext.association(originUid) ne null)
}
// OutHandler

View file

@ -7,11 +7,11 @@ import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.remote.UniqueAddress
/**
* INTERNAL API
@ -27,14 +27,19 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten
// InHandler
override def onPush(): Unit = {
val env = grab(in)
val association = inboundContext.association(env.originAddress.address)
if (association.associationState.isQuarantined(env.originAddress.uid)) {
inboundContext.sendControl(env.originAddress.address,
Quarantined(inboundContext.localAddress, env.originAddress))
inboundContext.association(env.originUid) match {
case null
// unknown, handshake not completed
push(out, env)
case association
if (association.associationState.isQuarantined(env.originUid)) {
inboundContext.sendControl(association.remoteAddress,
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
pull(in)
} else
push(out, env)
}
}
// OutHandler
override def onPull(): Unit = pull(in)

View file

@ -4,13 +4,11 @@
package akka.remote.artery
import java.util.ArrayDeque
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.remote.EndpointManager.Send
import akka.remote.UniqueAddress
@ -24,6 +22,7 @@ import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.remote.artery.OutboundHandshake.HandshakeReq
/**
* INTERNAL API
@ -159,6 +158,10 @@ private[akka] class SystemMessageDelivery(
// InHandler
override def onPush(): Unit = {
grab(in) match {
case s @ Send(_: HandshakeReq, _, _, _)
// pass on HandshakeReq
if (isAvailable(out))
push(out, s)
case s @ Send(ClearSystemMessageDelivery, _, _, _)
clear()
pull(in)

View file

@ -111,7 +111,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
headerOut.manifest should ===("manifest1")
headerIn.version = 3
headerIn.uid = Int.MinValue
headerIn.uid = Long.MinValue
headerIn.serializer = -1
headerIn.senderActorRef = "uncompressable0"
headerIn.recipientActorRef = "reallylongcompressablestring"
@ -127,7 +127,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
envelope.parseHeader(headerOut)
headerOut.version should ===(3)
headerOut.uid should ===(Int.MinValue)
headerOut.uid should ===(Long.MinValue)
headerOut.serializer should ===(-1)
headerOut.senderActorRef should ===("uncompressable0")
headerOut.recipientActorRef should ===("reallylongcompressablestring")

View file

@ -42,7 +42,7 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
val recipient = null.asInstanceOf[InternalActorRef] // not used
val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef]
.map(msg InboundEnvelope(recipient, addressB.address, msg, None, addressA))
.map(msg InboundEnvelope(recipient, addressB.address, msg, None, addressA.uid))
.viaMat(new InboundControlJunction)(Keep.both)
.map { case InboundEnvelope(_, _, msg, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)

View file

@ -41,7 +41,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
val recipient = null.asInstanceOf[InternalActorRef] // not used
TestSource.probe[AnyRef]
.map(msg InboundEnvelope(recipient, addressB.address, msg, None, addressA))
.map(msg InboundEnvelope(recipient, addressB.address, msg, None, addressA.uid))
.via(new InboundHandshake(inboundContext, inControlStream = true))
.map { case InboundEnvelope(_, _, msg, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)
@ -77,7 +77,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
downstream.cancel()
}
"send HandshakeReq when receiving message from unknown (receiving system restarted)" in {
"drop message from unknown (receiving system restarted)" in {
val replyProbe = TestProbe()
val inboundContext = new TestInboundContext(addressB, controlProbe = Some(replyProbe.ref))
val (upstream, downstream) = setupStream(inboundContext)
@ -85,8 +85,16 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
downstream.request(10)
// no HandshakeReq
upstream.sendNext("msg17")
replyProbe.expectMsg(HandshakeReq(addressB))
downstream.expectNoMsg(200.millis) // messages from unknown are dropped
// and accept messages after handshake
upstream.sendNext(HandshakeReq(addressA))
upstream.sendNext("msg18")
replyProbe.expectMsg(HandshakeRsp(addressB))
downstream.expectNext("msg18")
upstream.sendNext("msg19")
downstream.expectNext("msg19")
downstream.cancel()
}

View file

@ -57,19 +57,19 @@ class LargeMessagesStreamSpec extends WordSpec with ShouldMatchers with ScalaFut
val senderProbeB = TestProbe()(systemB)
// start actor and make sure it is up and running
val large = systemB.actorOf(Props(new EchoSize), "regular")
large.tell(Ping(), senderProbeB.ref)
val regular = systemB.actorOf(Props(new EchoSize), "regular")
regular.tell(Ping(), senderProbeB.ref)
senderProbeB.expectMsg(Pong(0))
// communicate with it from the other system
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
largeRemote.tell(Ping(), senderProbeA.ref)
val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
regularRemote.tell(Ping(), senderProbeA.ref)
senderProbeA.expectMsg(Pong(0))
// flag should be cached now
largeRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
regularRemote.asInstanceOf[RemoteActorRef].cachedLargeMessageDestinationFlag should ===(RegularDestination)
} finally {
TestKit.shutdownActorSystem(systemA)

View file

@ -31,12 +31,15 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val addressA = UniqueAddress(Address("artery", "sysA", "hostA", 1001), 1)
val addressB = UniqueAddress(Address("artery", "sysB", "hostB", 1002), 2)
private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds,
retryInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
private def setupStream(
outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds,
retryInterval: FiniteDuration = 10.seconds,
injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
val destination = null.asInstanceOf[RemoteActorRef] // not used
TestSource.probe[String]
.map(msg Send(msg, None, destination, None))
.via(new OutboundHandshake(outboundContext, timeout, retryInterval))
.via(new OutboundHandshake(outboundContext, timeout, retryInterval, injectHandshakeInterval))
.map { case Send(msg, _, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)
.run()
@ -44,13 +47,25 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
"OutboundHandshake stage" must {
"send HandshakeReq when first pulled" in {
val controlProbe = TestProbe()
val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref))
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
controlProbe.expectMsg(HandshakeReq(addressA))
downstream.expectNext(HandshakeReq(addressA))
downstream.cancel()
}
"send HandshakeReq also when uniqueRemoteAddress future completed at startup" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
inboundContext.completeHandshake(addressB)
val (upstream, downstream) = setupStream(outboundContext)
upstream.sendNext("msg1")
downstream.request(10)
downstream.expectNext(HandshakeReq(addressA))
downstream.expectNext("msg1")
downstream.cancel()
}
@ -60,40 +75,63 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis)
downstream.request(1)
downstream.expectNext(HandshakeReq(addressA))
downstream.expectError().getClass should be(classOf[HandshakeTimeoutException])
}
"retry HandshakeReq" in {
val controlProbe = TestProbe()
val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref))
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext, retryInterval = 100.millis)
downstream.request(10)
controlProbe.expectMsg(HandshakeReq(addressA))
controlProbe.expectMsg(HandshakeReq(addressA))
controlProbe.expectMsg(HandshakeReq(addressA))
downstream.expectNext(HandshakeReq(addressA))
downstream.expectNext(HandshakeReq(addressA))
downstream.expectNext(HandshakeReq(addressA))
downstream.cancel()
}
"not deliver messages from upstream until handshake completed" in {
val controlProbe = TestProbe()
val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref))
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
controlProbe.expectMsg(HandshakeReq(addressA))
downstream.expectNext(HandshakeReq(addressA))
upstream.sendNext("msg1")
downstream.expectNoMsg(200.millis)
// InboundHandshake stage will complete the handshake when receiving HandshakeRsp
inboundContext.association(addressB.address).completeHandshake(addressB)
inboundContext.completeHandshake(addressB)
downstream.expectNext("msg1")
upstream.sendNext("msg2")
downstream.expectNext("msg2")
downstream.cancel()
}
"inject HandshakeReq" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext, injectHandshakeInterval = 500.millis)
downstream.request(10)
upstream.sendNext("msg1")
downstream.expectNext(HandshakeReq(addressA))
inboundContext.completeHandshake(addressB)
downstream.expectNext("msg1")
downstream.expectNoMsg(600.millis)
upstream.sendNext("msg2")
upstream.sendNext("msg3")
upstream.sendNext("msg4")
downstream.expectNext(HandshakeReq(addressA))
downstream.expectNext("msg2")
downstream.expectNext("msg3")
downstream.expectNext("msg4")
downstream.expectNoMsg(600.millis)
downstream.cancel()
}
}
}

View file

@ -77,7 +77,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
Flow[Send]
.map {
case Send(sysEnv: SystemMessageEnvelope, _, _, _)
InboundEnvelope(recipient, addressB.address, sysEnv, None, addressA)
InboundEnvelope(recipient, addressB.address, sysEnv, None, addressA.uid)
}
.async
.via(new SystemMessageAcker(inboundContext))

View file

@ -25,7 +25,8 @@ private[akka] class TestInboundContext(
val controlProbe: Option[ActorRef] = None,
val replyDropRate: Double = 0.0) extends InboundContext {
private val associations = new ConcurrentHashMap[Address, OutboundContext]
private val associationsByAddress = new ConcurrentHashMap[Address, OutboundContext]()
private val associationsByUid = new ConcurrentHashMap[Long, OutboundContext]()
override def sendControl(to: Address, message: ControlMessage) = {
if (ThreadLocalRandom.current().nextDouble() >= replyDropRate)
@ -33,17 +34,26 @@ private[akka] class TestInboundContext(
}
override def association(remoteAddress: Address): OutboundContext =
associations.get(remoteAddress) match {
associationsByAddress.get(remoteAddress) match {
case null
val a = createAssociation(remoteAddress)
associations.putIfAbsent(remoteAddress, a) match {
associationsByAddress.putIfAbsent(remoteAddress, a) match {
case null a
case existing existing
}
case existing existing
}
protected def createAssociation(remoteAddress: Address): OutboundContext =
override def association(uid: Long): OutboundContext =
associationsByUid.get(uid)
override def completeHandshake(peer: UniqueAddress): Unit = {
val a = association(peer.address).asInstanceOf[TestOutboundContext]
a.completeHandshake(peer)
associationsByUid.put(peer.uid, a)
}
protected def createAssociation(remoteAddress: Address): TestOutboundContext =
new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe)
}
@ -60,7 +70,7 @@ private[akka] class TestOutboundContext(
_associationState
}
override def completeHandshake(peer: UniqueAddress): Unit = synchronized {
def completeHandshake(peer: UniqueAddress): Unit = synchronized {
_associationState.uniqueRemoteAddressPromise.trySuccess(peer)
_associationState.uniqueRemoteAddress.value match {
case Some(Success(`peer`)) // our value
@ -75,7 +85,7 @@ private[akka] class TestOutboundContext(
override def sendControl(message: ControlMessage) = {
controlProbe.foreach(_ ! message)
controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress))
controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress.uid))
}
// FIXME we should be able to Send without a recipient ActorRef