init new handshake for unknown origin, receiver restarted, #20313
* handle UID incarnations, shared association state that can be swapped
for new handshakes
* detect that message comes from unknown origin and then initiate new
handshake (handled by InboundHandshake stage)
* simplify the OutboundHandshake stage
* doesn't have to listen for HandshakeRsp replies, it can just listen
to when the uniqueRemoteAddress future is completed, InboundHandshake
stage completes the handshake when it receives HandshakeRsp
* send the HandshakeReq via the control message ingress, instead of
pushing it downstreams, than also means that HandshakeReq is only sent
on the control stream, which is good
* materialization race condition
This commit is contained in:
parent
1296f9986f
commit
67325da722
13 changed files with 317 additions and 175 deletions
|
|
@ -3,21 +3,18 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit.NANOSECONDS
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor._
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.remote.AddressUidExtension
|
||||
import akka.remote.RARP
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import akka.testkit._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import java.net.InetAddress
|
||||
import scala.concurrent.Await
|
||||
import akka.remote.RARP
|
||||
import akka.remote.AddressUidExtension
|
||||
|
||||
object HandshakeRestartReceiverSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -77,7 +74,7 @@ abstract class HandshakeRestartReceiverSpec
|
|||
|
||||
val secondAddress = node(second).address
|
||||
val secondAssociation = RARP(system).provider.transport.asInstanceOf[ArteryTransport].association(secondAddress)
|
||||
val secondUniqueRemoteAddress = Await.result(secondAssociation.uniqueRemoteAddress, 3.seconds)
|
||||
val secondUniqueRemoteAddress = Await.result(secondAssociation.associationState.uniqueRemoteAddress, 3.seconds)
|
||||
secondUniqueRemoteAddress.address should ===(secondAddress)
|
||||
secondUniqueRemoteAddress.uid should ===(secondUid)
|
||||
|
||||
|
|
@ -93,7 +90,10 @@ abstract class HandshakeRestartReceiverSpec
|
|||
}
|
||||
val (secondUid2, subject2) = identifyWithUid(secondRootPath, "subject2")
|
||||
secondUid2 should !==(secondUid)
|
||||
// FIXME verify that UID in association was replaced (not implemented yet)
|
||||
val secondUniqueRemoteAddress2 = Await.result(secondAssociation.associationState.uniqueRemoteAddress, 3.seconds)
|
||||
secondUniqueRemoteAddress2.uid should ===(secondUid2)
|
||||
secondUniqueRemoteAddress2.address should ===(secondAddress)
|
||||
secondUniqueRemoteAddress2 should !==(secondUniqueRemoteAddress)
|
||||
|
||||
subject2 ! "shutdown"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery;
|
||||
|
||||
import akka.util.Unsafe;
|
||||
|
||||
class AbstractAssociation {
|
||||
protected final static long sharedStateOffset;
|
||||
|
||||
static {
|
||||
try {
|
||||
sharedStateOffset = Unsafe.instance.objectFieldOffset(Association.class.getDeclaredField("_sharedStateDoNotCallMeDirectly"));
|
||||
} catch(Throwable t){
|
||||
throw new ExceptionInInitializerError(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3,18 +3,24 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.io.File
|
||||
import java.nio.ByteOrder
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.function.{ Function ⇒ JFunction }
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.Done
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Address
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.InternalActorRef
|
||||
import akka.actor.Props
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.remote.AddressUidExtension
|
||||
|
|
@ -23,8 +29,10 @@ import akka.remote.MessageSerializer
|
|||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.remote.RemoteTransport
|
||||
import akka.remote.SeqNo
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
||||
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
|
||||
import akka.remote.transport.AkkaPduCodec
|
||||
import akka.remote.transport.AkkaPduProtobufCodec
|
||||
import akka.serialization.Serialization
|
||||
|
|
@ -60,7 +68,8 @@ private[akka] final case class InboundEnvelope(
|
|||
recipient: InternalActorRef,
|
||||
recipientAddress: Address,
|
||||
message: AnyRef,
|
||||
senderOption: Option[ActorRef])
|
||||
senderOption: Option[ActorRef],
|
||||
originAddress: UniqueAddress)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -75,7 +84,7 @@ private[akka] trait InboundContext {
|
|||
|
||||
/**
|
||||
* An inbound stage can send control message, e.g. a reply, to the origin
|
||||
* address with this method.
|
||||
* address with this method. It will be sent over the control sub-channel.
|
||||
*/
|
||||
def sendControl(to: Address, message: ControlMessage): Unit
|
||||
|
||||
|
|
@ -85,6 +94,26 @@ private[akka] trait InboundContext {
|
|||
def association(remoteAddress: Address): OutboundContext
|
||||
}
|
||||
|
||||
final class AssociationState(
|
||||
val incarnation: Int,
|
||||
val uniqueRemoteAddressPromise: Promise[UniqueAddress]) {
|
||||
|
||||
/**
|
||||
* Full outbound address with UID for this association.
|
||||
* Completed when by the handshake.
|
||||
*/
|
||||
def uniqueRemoteAddress: Future[UniqueAddress] = uniqueRemoteAddressPromise.future
|
||||
|
||||
override def toString(): String = {
|
||||
val a = uniqueRemoteAddressPromise.future.value match {
|
||||
case Some(Success(a)) ⇒ a
|
||||
case Some(Failure(e)) ⇒ s"Failure(${e.getMessage})"
|
||||
case None ⇒ "unknown"
|
||||
}
|
||||
s"AssociationState($incarnation, $a)"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Outbound association API that is used by the stream stages.
|
||||
|
|
@ -101,17 +130,15 @@ private[akka] trait OutboundContext {
|
|||
*/
|
||||
def remoteAddress: Address
|
||||
|
||||
/**
|
||||
* Full outbound address with UID for this association.
|
||||
* Completed when by the handshake.
|
||||
*/
|
||||
def uniqueRemoteAddress: Future[UniqueAddress]
|
||||
def associationState: AssociationState
|
||||
|
||||
def completeHandshake(peer: UniqueAddress): Unit
|
||||
|
||||
/**
|
||||
* Set the outbound address with UID when the
|
||||
* handshake is completed.
|
||||
* An inbound stage can send control message, e.g. a HandshakeReq, to the remote
|
||||
* address of this association. It will be sent over the control sub-channel.
|
||||
*/
|
||||
def completeRemoteAddress(a: UniqueAddress): Unit
|
||||
def sendControl(message: ControlMessage): Unit
|
||||
|
||||
/**
|
||||
* An outbound stage can listen to control messages
|
||||
|
|
@ -139,7 +166,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
@volatile private[this] var driver: MediaDriver = _
|
||||
@volatile private[this] var aeron: Aeron = _
|
||||
|
||||
override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
|
||||
override val log: LoggingAdapter = Logging(system, getClass.getName)
|
||||
override def defaultAddress: Address = localAddress.address
|
||||
override def addresses: Set[Address] = Set(defaultAddress)
|
||||
override def localAddressForRemote(remote: Address): Address = defaultAddress
|
||||
|
|
@ -280,9 +307,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
}
|
||||
}
|
||||
|
||||
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = {
|
||||
???
|
||||
}
|
||||
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit =
|
||||
association(remoteAddress).quarantine(uid)
|
||||
|
||||
def outbound(outboundContext: OutboundContext): Sink[Send, Any] = {
|
||||
Flow.fromGraph(killSwitch.flow[Send])
|
||||
|
|
@ -302,6 +328,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner))
|
||||
}
|
||||
|
||||
// FIXME hack until real envelopes, encoding originAddress in sender :)
|
||||
private val dummySender = system.systemActorOf(Props.empty, "dummy")
|
||||
|
||||
// TODO: Try out parallelized serialization (mapAsync) for performance
|
||||
val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒
|
||||
val pdu: ByteString = codec.constructMessage(
|
||||
|
|
@ -310,8 +339,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress.address, system)) {
|
||||
MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef])
|
||||
},
|
||||
sendEnvelope.senderOption,
|
||||
seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec
|
||||
if (sendEnvelope.senderOption.isDefined) sendEnvelope.senderOption else Some(dummySender), // FIXME: hack until real envelopes
|
||||
seqOption = Some(SeqNo(localAddress.uid)), // FIXME: hack until real envelopes
|
||||
ackOption = None)
|
||||
|
||||
// TODO: Drop unserializable messages
|
||||
|
|
@ -337,14 +366,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
m.recipient,
|
||||
m.recipientAddress,
|
||||
MessageSerializer.deserialize(system, m.serializedMessage),
|
||||
m.senderOption)
|
||||
if (m.senderOption.get.path.name == "dummy") None else m.senderOption, // FIXME hack until real envelopes
|
||||
UniqueAddress(m.senderOption.get.path.address, m.seq.rawValue.toInt)) // FIXME hack until real envelopes
|
||||
}
|
||||
|
||||
val inboundFlow: Flow[ByteString, ByteString, NotUsed] = {
|
||||
Flow.fromSinkAndSource(
|
||||
decoder
|
||||
.via(deserializer)
|
||||
.via(new InboundHandshake(this))
|
||||
.via(new InboundHandshake(this, inControlStream = false))
|
||||
.to(messageDispatcherSink),
|
||||
Source.maybe[ByteString].via(killSwitch.flow))
|
||||
}
|
||||
|
|
@ -353,9 +383,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
Flow.fromSinkAndSourceMat(
|
||||
decoder
|
||||
.via(deserializer)
|
||||
.via(new InboundHandshake(this))
|
||||
.via(new SystemMessageAcker(this))
|
||||
.via(new InboundHandshake(this, inControlStream = true))
|
||||
.viaMat(new InboundControlJunction)(Keep.right)
|
||||
.via(new SystemMessageAcker(this))
|
||||
.to(messageDispatcherSink),
|
||||
Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,22 +3,27 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.util.Success
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Address
|
||||
import akka.actor.RootActorPath
|
||||
import akka.dispatch.sysmsg.SystemMessage
|
||||
import akka.event.Logging
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
||||
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.scaladsl.Keep
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
|
||||
import akka.stream.scaladsl.Keep
|
||||
import akka.util.Unsafe
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -30,35 +35,87 @@ private[akka] class Association(
|
|||
val transport: ArteryTransport,
|
||||
val materializer: Materializer,
|
||||
override val remoteAddress: Address,
|
||||
override val controlSubject: ControlMessageSubject) extends OutboundContext {
|
||||
override val controlSubject: ControlMessageSubject)
|
||||
extends AbstractAssociation with OutboundContext {
|
||||
|
||||
private val log = Logging(transport.system, getClass.getName)
|
||||
|
||||
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
|
||||
@volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _
|
||||
@volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _
|
||||
@volatile private[this] var _outboundControlIngress: OutboundControlIngress = _
|
||||
private val materializing = new CountDownLatch(1)
|
||||
|
||||
def outboundControlIngress: OutboundControlIngress = {
|
||||
if (_outboundControlIngress eq null)
|
||||
throw new IllegalStateException("outboundControlIngress not initialized yet")
|
||||
_outboundControlIngress
|
||||
if (_outboundControlIngress ne null)
|
||||
_outboundControlIngress
|
||||
else {
|
||||
// materialization not completed yet
|
||||
materializing.await(10, TimeUnit.SECONDS)
|
||||
if (_outboundControlIngress eq null)
|
||||
throw new IllegalStateException("outboundControlIngress not initialized yet")
|
||||
_outboundControlIngress
|
||||
}
|
||||
}
|
||||
|
||||
override def localAddress: UniqueAddress = transport.localAddress
|
||||
|
||||
// FIXME we also need to be able to switch to new uid
|
||||
private val _uniqueRemoteAddress = Promise[UniqueAddress]()
|
||||
override def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future
|
||||
override def completeRemoteAddress(a: UniqueAddress): Unit = {
|
||||
require(a.address == remoteAddress, s"Wrong UniqueAddress got [$a.address], expected [$remoteAddress]")
|
||||
_uniqueRemoteAddress.trySuccess(a)
|
||||
/**
|
||||
* Holds reference to shared state of Association - *access only via helper methods*
|
||||
*/
|
||||
@volatile
|
||||
private[this] var _sharedStateDoNotCallMeDirectly: AssociationState =
|
||||
new AssociationState(incarnation = 1, uniqueRemoteAddressPromise = Promise())
|
||||
|
||||
/**
|
||||
* Helper method for access to underlying state via Unsafe
|
||||
*
|
||||
* @param oldState Previous state
|
||||
* @param newState Next state on transition
|
||||
* @return Whether the previous state matched correctly
|
||||
*/
|
||||
@inline
|
||||
private[this] def swapState(oldState: AssociationState, newState: AssociationState): Boolean =
|
||||
Unsafe.instance.compareAndSwapObject(this, AbstractAssociation.sharedStateOffset, oldState, newState)
|
||||
|
||||
/**
|
||||
* @return Reference to current shared state
|
||||
*/
|
||||
def associationState: AssociationState =
|
||||
Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]
|
||||
|
||||
override def completeHandshake(peer: UniqueAddress): Unit = {
|
||||
require(remoteAddress == peer.address,
|
||||
s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}")
|
||||
val current = associationState
|
||||
current.uniqueRemoteAddressPromise.trySuccess(peer)
|
||||
current.uniqueRemoteAddress.value match {
|
||||
case Some(Success(`peer`)) ⇒ // our value
|
||||
case _ ⇒
|
||||
val newState = new AssociationState(incarnation = current.incarnation + 1, Promise.successful(peer))
|
||||
if (swapState(current, newState)) {
|
||||
current.uniqueRemoteAddress.value match {
|
||||
case Some(Success(old)) ⇒
|
||||
log.debug("Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
|
||||
newState.incarnation, peer.address, peer.uid, old.uid)
|
||||
quarantine(Some(old.uid))
|
||||
case _ ⇒ // Failed, nothing to do
|
||||
}
|
||||
// if swap failed someone else completed before us, and that is fine
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OutboundContext
|
||||
override def sendControl(message: ControlMessage): Unit =
|
||||
outboundControlIngress.sendControlMessage(message)
|
||||
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
|
||||
// TODO: lookup subchannel
|
||||
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
|
||||
message match {
|
||||
case _: SystemMessage ⇒
|
||||
implicit val ec = materializer.executionContext
|
||||
systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
|
||||
controlQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
|
||||
case e ⇒
|
||||
// FIXME proper error handling, and quarantining
|
||||
println(s"# System message dropped, due to $e") // FIXME
|
||||
|
|
@ -72,20 +129,30 @@ private[akka] class Association(
|
|||
override val dummyRecipient: RemoteActorRef =
|
||||
transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef]
|
||||
|
||||
def quarantine(uid: Option[Int]): Unit = ()
|
||||
def quarantine(uid: Option[Int]): Unit = {
|
||||
// FIXME implement
|
||||
log.error("Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address.",
|
||||
remoteAddress, uid.getOrElse("unknown"))
|
||||
}
|
||||
|
||||
// Idempotent
|
||||
def associate(): Unit = {
|
||||
// FIXME detect and handle stream failure, e.g. handshake timeout
|
||||
if (queue eq null)
|
||||
queue = Source.queue(256, OverflowStrategy.dropBuffer)
|
||||
.to(transport.outbound(this)).run()(materializer)
|
||||
if (systemMessageQueue eq null) {
|
||||
|
||||
// it's important to materialize the outboundControl stream first,
|
||||
// so that outboundControlIngress is ready when stages for all streams start
|
||||
if (controlQueue eq null) {
|
||||
val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer)
|
||||
.toMat(transport.outboundControl(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
systemMessageQueue = q
|
||||
controlQueue = q
|
||||
_outboundControlIngress = control
|
||||
// stage in the control stream may access the outboundControlIngress before returned here
|
||||
// using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress
|
||||
materializing.countDown()
|
||||
|
||||
queue = Source.queue(256, OverflowStrategy.dropBuffer)
|
||||
.to(transport.outbound(this)).run()(materializer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,20 +3,22 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.util.ArrayDeque
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
|
||||
import akka.Done
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.FlowShape
|
||||
import akka.stream.Inlet
|
||||
import akka.stream.Outlet
|
||||
import akka.stream.stage.CallbackWrapper
|
||||
import akka.stream.stage.GraphStageLogic
|
||||
import akka.stream.stage.GraphStageWithMaterializedValue
|
||||
import akka.stream.stage.InHandler
|
||||
import akka.stream.stage.OutHandler
|
||||
import akka.remote.EndpointManager.Send
|
||||
import java.util.ArrayDeque
|
||||
import akka.stream.stage.CallbackWrapper
|
||||
|
||||
/**
|
||||
* Marker trait for reply messages
|
||||
|
|
@ -97,7 +99,7 @@ private[akka] class InboundControlJunction
|
|||
// InHandler
|
||||
override def onPush(): Unit = {
|
||||
grab(in) match {
|
||||
case env @ InboundEnvelope(_, _, _: ControlMessage, _) ⇒
|
||||
case env @ InboundEnvelope(_, _, _: ControlMessage, _, _) ⇒
|
||||
observers.foreach(_.notify(env))
|
||||
pull(in)
|
||||
case env ⇒
|
||||
|
|
|
|||
|
|
@ -4,11 +4,12 @@
|
|||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.Done
|
||||
import scala.util.Success
|
||||
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.FlowShape
|
||||
import akka.stream.Inlet
|
||||
|
|
@ -29,7 +30,6 @@ private[akka] object OutboundHandshake {
|
|||
|
||||
private sealed trait HandshakeState
|
||||
private case object Start extends HandshakeState
|
||||
private case object ControlMessageObserverAttached extends HandshakeState
|
||||
private case object ReqInProgress extends HandshakeState
|
||||
private case object Completed extends HandshakeState
|
||||
|
||||
|
|
@ -46,34 +46,24 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
|
|||
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver {
|
||||
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
|
||||
import OutboundHandshake._
|
||||
|
||||
private var handshakeState: HandshakeState = Start
|
||||
|
||||
private def remoteAddress = outboundContext.remoteAddress
|
||||
|
||||
override def preStart(): Unit = {
|
||||
if (outboundContext.uniqueRemoteAddress.isCompleted) {
|
||||
val uniqueRemoteAddress = outboundContext.associationState.uniqueRemoteAddress
|
||||
if (uniqueRemoteAddress.isCompleted) {
|
||||
handshakeState = Completed
|
||||
} else {
|
||||
// The InboundHandshake stage will complete the uniqueRemoteAddress future
|
||||
// when it receives the HandshakeRsp reply
|
||||
implicit val ec = materializer.executionContext
|
||||
outboundContext.controlSubject.attach(this).foreach {
|
||||
getAsyncCallback[Done] { _ ⇒
|
||||
if (handshakeState != Completed) {
|
||||
if (isAvailable(out))
|
||||
pushHandshakeReq()
|
||||
else
|
||||
handshakeState = ControlMessageObserverAttached
|
||||
}
|
||||
}.invoke
|
||||
}
|
||||
|
||||
outboundContext.uniqueRemoteAddress.foreach {
|
||||
uniqueRemoteAddress.foreach {
|
||||
getAsyncCallback[UniqueAddress] { a ⇒
|
||||
if (handshakeState != Completed) {
|
||||
handshakeCompleted()
|
||||
if (isAvailable(out) && !hasBeenPulled(in))
|
||||
if (isAvailable(out))
|
||||
pull(in)
|
||||
}
|
||||
}.invoke
|
||||
|
|
@ -83,10 +73,6 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
|
|||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
outboundContext.controlSubject.detach(this)
|
||||
}
|
||||
|
||||
// InHandler
|
||||
override def onPush(): Unit = {
|
||||
if (handshakeState != Completed)
|
||||
|
|
@ -98,50 +84,27 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
|
|||
override def onPull(): Unit = {
|
||||
handshakeState match {
|
||||
case Completed ⇒ pull(in)
|
||||
case ControlMessageObserverAttached ⇒
|
||||
pushHandshakeReq()
|
||||
case Start ⇒ // will push HandshakeReq when ControlMessageObserver is attached
|
||||
case Start ⇒
|
||||
// will pull when handshake reply is received (uniqueRemoteAddress completed)
|
||||
handshakeState = ReqInProgress
|
||||
outboundContext.sendControl(HandshakeReq(outboundContext.localAddress))
|
||||
case ReqInProgress ⇒ // will pull when handshake reply is received
|
||||
}
|
||||
}
|
||||
|
||||
private def pushHandshakeReq(): Unit = {
|
||||
handshakeState = ReqInProgress
|
||||
// FIXME we should be able to Send without recipient ActorRef
|
||||
push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None))
|
||||
}
|
||||
|
||||
private def handshakeCompleted(): Unit = {
|
||||
handshakeState = Completed
|
||||
cancelTimer(HandshakeTimeout)
|
||||
outboundContext.controlSubject.detach(this)
|
||||
}
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit =
|
||||
timerKey match {
|
||||
case HandshakeTimeout ⇒
|
||||
// FIXME would it make sense to retry a few times before failing?
|
||||
failStage(new TimeoutException(
|
||||
s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms"))
|
||||
s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms"))
|
||||
}
|
||||
|
||||
// ControlMessageObserver, external call
|
||||
override def notify(inboundEnvelope: InboundEnvelope): Unit = {
|
||||
inboundEnvelope.message match {
|
||||
case rsp: HandshakeRsp ⇒
|
||||
if (rsp.from.address == remoteAddress) {
|
||||
getAsyncCallback[HandshakeRsp] { reply ⇒
|
||||
if (handshakeState != Completed) {
|
||||
handshakeCompleted()
|
||||
outboundContext.completeRemoteAddress(reply.from)
|
||||
if (isAvailable(out) && !hasBeenPulled(in))
|
||||
pull(in)
|
||||
}
|
||||
}.invoke(rsp)
|
||||
}
|
||||
case _ ⇒ // not interested
|
||||
}
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
|
|
@ -150,31 +113,62 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class InboundHandshake(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
||||
private[akka] class InboundHandshake(inboundContext: InboundContext, inControlStream: Boolean) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
||||
val in: Inlet[InboundEnvelope] = Inlet("InboundHandshake.in")
|
||||
val out: Outlet[InboundEnvelope] = Outlet("InboundHandshake.out")
|
||||
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
|
||||
new TimerGraphStageLogic(shape) with OutHandler {
|
||||
import OutboundHandshake._
|
||||
|
||||
// InHandler
|
||||
override def onPush(): Unit = {
|
||||
grab(in) match {
|
||||
case InboundEnvelope(_, _, HandshakeReq(from), _) ⇒
|
||||
inboundContext.association(from.address).completeRemoteAddress(from)
|
||||
inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
|
||||
pull(in)
|
||||
case other ⇒
|
||||
push(out, other)
|
||||
if (inControlStream)
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
grab(in) match {
|
||||
case InboundEnvelope(_, _, HandshakeReq(from), _, _) ⇒
|
||||
inboundContext.association(from.address).completeHandshake(from)
|
||||
inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
|
||||
pull(in)
|
||||
case InboundEnvelope(_, _, HandshakeRsp(from), _, _) ⇒
|
||||
inboundContext.association(from.address).completeHandshake(from)
|
||||
pull(in)
|
||||
case other ⇒ onMessage(other)
|
||||
}
|
||||
}
|
||||
})
|
||||
else
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = onMessage(grab(in))
|
||||
})
|
||||
|
||||
private def onMessage(env: InboundEnvelope): Unit = {
|
||||
if (isKnownOrigin(env.originAddress))
|
||||
push(out, env)
|
||||
else {
|
||||
inboundContext.sendControl(env.originAddress.address, HandshakeReq(inboundContext.localAddress))
|
||||
// FIXME Note that we have the originAddress that would be needed to complete the handshake
|
||||
// but it is not done here because the handshake might exchange more information.
|
||||
// Is that a valid thought?
|
||||
// drop message from unknown, this system was probably restarted
|
||||
pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
private def isKnownOrigin(originAddress: UniqueAddress): Boolean = {
|
||||
// FIXME these association lookups are probably too costly for each message, need local cache or something
|
||||
val associationState = inboundContext.association(originAddress.address).associationState
|
||||
associationState.uniqueRemoteAddress.value match {
|
||||
case Some(Success(a)) if a.uid == originAddress.uid ⇒ true
|
||||
case x ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
// OutHandler
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
setHandler(out, this)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ private[akka] class MessageDispatcher(
|
|||
provider: RemoteActorRefProvider) {
|
||||
|
||||
private val remoteDaemon = provider.remoteDaemon
|
||||
private val log = Logging(system.eventStream, getClass.getName)
|
||||
private val log = Logging(system, getClass.getName)
|
||||
|
||||
def dispatch(recipient: InternalActorRef,
|
||||
recipientAddress: Address,
|
||||
|
|
|
|||
|
|
@ -199,7 +199,7 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G
|
|||
// InHandler
|
||||
override def onPush(): Unit = {
|
||||
grab(in) match {
|
||||
case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _) ⇒
|
||||
case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _, _) ⇒
|
||||
if (n == seqNo) {
|
||||
inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress))
|
||||
seqNo += 1
|
||||
|
|
|
|||
|
|
@ -42,9 +42,9 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
|
|||
val recipient = null.asInstanceOf[InternalActorRef] // not used
|
||||
|
||||
val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef]
|
||||
.map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None))
|
||||
.map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None, addressA))
|
||||
.viaMat(new InboundControlJunction)(Keep.both)
|
||||
.map { case InboundEnvelope(_, _, msg, _) ⇒ msg }
|
||||
.map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg }
|
||||
.toMat(TestSink.probe[Any])(Keep.both)
|
||||
.run()
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.Address
|
||||
|
|
@ -40,9 +41,9 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
|||
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
|
||||
val recipient = null.asInstanceOf[InternalActorRef] // not used
|
||||
TestSource.probe[AnyRef]
|
||||
.map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None))
|
||||
.via(new InboundHandshake(inboundContext))
|
||||
.map { case InboundEnvelope(_, _, msg, _) ⇒ msg }
|
||||
.map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None, addressA))
|
||||
.via(new InboundHandshake(inboundContext, inControlStream = true))
|
||||
.map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg }
|
||||
.toMat(TestSink.probe[Any])(Keep.both)
|
||||
.run()
|
||||
}
|
||||
|
|
@ -62,6 +63,33 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
|||
downstream.cancel()
|
||||
}
|
||||
|
||||
"complete remoteUniqueAddress when receiving HandshakeReq" in {
|
||||
val inboundContext = new TestInboundContext(addressB)
|
||||
val (upstream, downstream) = setupStream(inboundContext)
|
||||
|
||||
downstream.request(10)
|
||||
upstream.sendNext(HandshakeReq(addressA))
|
||||
upstream.sendNext("msg1")
|
||||
downstream.expectNext("msg1")
|
||||
val uniqueRemoteAddress = Await.result(
|
||||
inboundContext.association(addressA.address).associationState.uniqueRemoteAddress, remainingOrDefault)
|
||||
uniqueRemoteAddress should ===(addressA)
|
||||
downstream.cancel()
|
||||
}
|
||||
|
||||
"send HandshakeReq as when receiving message from unknown (receiving system restarted)" in {
|
||||
val replyProbe = TestProbe()
|
||||
val inboundContext = new ManualReplyInboundContext(replyProbe.ref, addressB, new TestControlMessageSubject)
|
||||
val (upstream, downstream) = setupStream(inboundContext)
|
||||
|
||||
downstream.request(10)
|
||||
// no HandshakeReq
|
||||
upstream.sendNext("msg17")
|
||||
replyProbe.expectMsg(HandshakeReq(addressB))
|
||||
downstream.expectNoMsg(200.millis) // messages from unknown are dropped
|
||||
downstream.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,12 +8,10 @@ import java.util.concurrent.TimeoutException
|
|||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.actor.InternalActorRef
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.OutboundHandshake.HandshakeReq
|
||||
import akka.remote.artery.OutboundHandshake.HandshakeRsp
|
||||
import akka.remote.artery.SystemMessageDelivery._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
|
|
@ -24,6 +22,7 @@ import akka.stream.testkit.scaladsl.TestSink
|
|||
import akka.stream.testkit.scaladsl.TestSource
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
||||
|
||||
|
|
@ -45,53 +44,37 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
|||
|
||||
"OutboundHandshake stage" must {
|
||||
"send HandshakeReq when first pulled" in {
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA)
|
||||
val controlProbe = TestProbe()
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref))
|
||||
val outboundContext = inboundContext.association(addressB.address)
|
||||
val (upstream, downstream) = setupStream(outboundContext)
|
||||
|
||||
downstream.request(10)
|
||||
downstream.expectNext(HandshakeReq(addressA))
|
||||
controlProbe.expectMsg(HandshakeReq(addressA))
|
||||
downstream.cancel()
|
||||
}
|
||||
|
||||
"timeout if not receiving HandshakeRsp" in {
|
||||
"timeout if handshake not completed" in {
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA)
|
||||
val outboundContext = inboundContext.association(addressB.address)
|
||||
val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis)
|
||||
|
||||
downstream.request(1)
|
||||
downstream.expectNext(HandshakeReq(addressA))
|
||||
downstream.expectError().getClass should be(classOf[TimeoutException])
|
||||
}
|
||||
|
||||
"not deliver messages from upstream until handshake completed" in {
|
||||
val controlSubject = new TestControlMessageSubject
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA, controlSubject)
|
||||
val controlProbe = TestProbe()
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref))
|
||||
val outboundContext = inboundContext.association(addressB.address)
|
||||
val recipient = null.asInstanceOf[InternalActorRef] // not used
|
||||
val (upstream, downstream) = setupStream(outboundContext)
|
||||
|
||||
downstream.request(10)
|
||||
downstream.expectNext(HandshakeReq(addressA))
|
||||
controlProbe.expectMsg(HandshakeReq(addressA))
|
||||
upstream.sendNext("msg1")
|
||||
downstream.expectNoMsg(200.millis)
|
||||
controlSubject.sendControl(InboundEnvelope(recipient, addressA.address, HandshakeRsp(addressB), None))
|
||||
downstream.expectNext("msg1")
|
||||
upstream.sendNext("msg2")
|
||||
downstream.expectNext("msg2")
|
||||
downstream.cancel()
|
||||
}
|
||||
|
||||
"complete handshake via another sub-channel" in {
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA)
|
||||
val outboundContext = inboundContext.association(addressB.address)
|
||||
val (upstream, downstream) = setupStream(outboundContext)
|
||||
|
||||
downstream.request(10)
|
||||
downstream.expectNext(HandshakeReq(addressA))
|
||||
upstream.sendNext("msg1")
|
||||
// handshake completed first by another sub-channel
|
||||
outboundContext.completeRemoteAddress(addressB)
|
||||
// InboundHandshake stage will complete the handshake when receiving HandshakeRsp
|
||||
inboundContext.association(addressB.address).completeHandshake(addressB)
|
||||
downstream.expectNext("msg1")
|
||||
upstream.sendNext("msg2")
|
||||
downstream.expectNext("msg2")
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ import scala.concurrent.duration._
|
|||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Identify
|
||||
import akka.actor.InternalActorRef
|
||||
|
|
@ -84,7 +82,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo
|
|||
Flow[Send]
|
||||
.map {
|
||||
case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒
|
||||
InboundEnvelope(recipient, addressB.address, sysEnv, None)
|
||||
InboundEnvelope(recipient, addressB.address, sysEnv, None, addressA)
|
||||
}
|
||||
.async
|
||||
.via(new SystemMessageAcker(inboundContext))
|
||||
|
|
|
|||
|
|
@ -3,35 +3,39 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.actor.Address
|
||||
import scala.concurrent.Future
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
||||
import akka.remote.RemoteActorRef
|
||||
import scala.concurrent.Promise
|
||||
import akka.Done
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.util.Success
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Address
|
||||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
||||
|
||||
private[akka] class TestInboundContext(
|
||||
override val localAddress: UniqueAddress,
|
||||
val controlSubject: TestControlMessageSubject = new TestControlMessageSubject,
|
||||
replyDropRate: Double = 0.0) extends InboundContext {
|
||||
val controlProbe: Option[ActorRef] = None,
|
||||
val replyDropRate: Double = 0.0) extends InboundContext {
|
||||
|
||||
private val associations = new ConcurrentHashMap[Address, OutboundContext]
|
||||
|
||||
def sendControl(to: Address, message: ControlMessage) = {
|
||||
override def sendControl(to: Address, message: ControlMessage) = {
|
||||
if (ThreadLocalRandom.current().nextDouble() >= replyDropRate)
|
||||
controlSubject.sendControl(InboundEnvelope(null, to, message, None))
|
||||
association(to).sendControl(message)
|
||||
}
|
||||
|
||||
def association(remoteAddress: Address): OutboundContext =
|
||||
override def association(remoteAddress: Address): OutboundContext =
|
||||
associations.get(remoteAddress) match {
|
||||
case null ⇒
|
||||
val a = new TestOutboundContext(localAddress, remoteAddress, controlSubject)
|
||||
val a = createAssociation(remoteAddress)
|
||||
associations.putIfAbsent(remoteAddress, a) match {
|
||||
case null ⇒ a
|
||||
case existing ⇒ existing
|
||||
|
|
@ -40,20 +44,38 @@ private[akka] class TestInboundContext(
|
|||
}
|
||||
|
||||
protected def createAssociation(remoteAddress: Address): OutboundContext =
|
||||
new TestOutboundContext(localAddress, remoteAddress, controlSubject)
|
||||
new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe)
|
||||
}
|
||||
|
||||
private[akka] class TestOutboundContext(
|
||||
override val localAddress: UniqueAddress,
|
||||
override val remoteAddress: Address,
|
||||
override val controlSubject: TestControlMessageSubject) extends OutboundContext {
|
||||
override val controlSubject: TestControlMessageSubject,
|
||||
val controlProbe: Option[ActorRef] = None) extends OutboundContext {
|
||||
|
||||
private val _uniqueRemoteAddress = Promise[UniqueAddress]()
|
||||
def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future
|
||||
def completeRemoteAddress(a: UniqueAddress): Unit = _uniqueRemoteAddress.trySuccess(a)
|
||||
// access to this is synchronized (it's a test utility)
|
||||
private var _associationState = new AssociationState(1, Promise())
|
||||
|
||||
override def associationState: AssociationState = synchronized {
|
||||
_associationState
|
||||
}
|
||||
|
||||
override def completeHandshake(peer: UniqueAddress): Unit = synchronized {
|
||||
_associationState.uniqueRemoteAddressPromise.trySuccess(peer)
|
||||
_associationState.uniqueRemoteAddress.value match {
|
||||
case Some(Success(`peer`)) ⇒ // our value
|
||||
case _ ⇒
|
||||
_associationState = new AssociationState(incarnation = _associationState.incarnation + 1, Promise.successful(peer))
|
||||
}
|
||||
}
|
||||
|
||||
override def sendControl(message: ControlMessage) = {
|
||||
controlProbe.foreach(_ ! message)
|
||||
controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress))
|
||||
}
|
||||
|
||||
// FIXME we should be able to Send without a recipient ActorRef
|
||||
def dummyRecipient: RemoteActorRef = null
|
||||
override def dummyRecipient: RemoteActorRef = null
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue