handle stream failures by restarting, #20317
* allow X restarts within Y seconds * and handle handshake timeout
This commit is contained in:
parent
8e316d06e9
commit
96b697d92f
9 changed files with 425 additions and 52 deletions
|
|
@ -4,18 +4,23 @@
|
|||
package akka.remote.artery
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
|
||||
import akka.Done
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.Inlet
|
||||
import akka.stream.SinkShape
|
||||
import akka.stream.stage.AsyncCallback
|
||||
import akka.stream.stage.GraphStage
|
||||
import akka.stream.stage.GraphStageLogic
|
||||
import akka.stream.stage.GraphStageWithMaterializedValue
|
||||
import akka.stream.stage.InHandler
|
||||
import io.aeron.Aeron
|
||||
import io.aeron.Publication
|
||||
|
|
@ -51,19 +56,23 @@ object AeronSink {
|
|||
/**
|
||||
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
|
||||
*/
|
||||
class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner) extends GraphStage[SinkShape[AeronSink.Bytes]] {
|
||||
class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner)
|
||||
extends GraphStageWithMaterializedValue[SinkShape[AeronSink.Bytes], Future[Done]] {
|
||||
import AeronSink._
|
||||
import TaskRunner._
|
||||
|
||||
val in: Inlet[Bytes] = Inlet("AeronSink")
|
||||
override val shape: SinkShape[Bytes] = SinkShape(in)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler {
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
|
||||
val completed = Promise[Done]()
|
||||
val logic = new GraphStageLogic(shape) with InHandler {
|
||||
|
||||
private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024))
|
||||
private val pub = aeron.addPublication(channel, streamId)
|
||||
|
||||
private var completedValue: Try[Done] = Success(Done)
|
||||
|
||||
private val spinning = 1000
|
||||
private var backoffCount = spinning
|
||||
private var lastMsgSize = 0
|
||||
|
|
@ -80,6 +89,7 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu
|
|||
override def postStop(): Unit = {
|
||||
taskRunner.command(Remove(addOfferTask.task))
|
||||
pub.close()
|
||||
completed.complete(completedValue)
|
||||
}
|
||||
|
||||
// InHandler
|
||||
|
|
@ -124,6 +134,15 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu
|
|||
super.onUpstreamFinish()
|
||||
}
|
||||
|
||||
override def onUpstreamFailure(cause: Throwable): Unit = {
|
||||
completedValue = Failure(cause)
|
||||
super.onUpstreamFailure(cause)
|
||||
}
|
||||
|
||||
setHandler(in, this)
|
||||
}
|
||||
|
||||
(logic, completed.future)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import scala.concurrent.Promise
|
|||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
import akka.Done
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorRef
|
||||
|
|
@ -39,6 +40,7 @@ import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
|
|||
import akka.remote.transport.AkkaPduCodec
|
||||
import akka.remote.transport.AkkaPduProtobufCodec
|
||||
import akka.serialization.Serialization
|
||||
import akka.stream.AbruptTerminationException
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.KillSwitches
|
||||
import akka.stream.Materializer
|
||||
|
|
@ -50,6 +52,8 @@ import akka.stream.scaladsl.Sink
|
|||
import akka.stream.scaladsl.Source
|
||||
import akka.util.ByteString
|
||||
import akka.util.ByteStringBuilder
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.util.Helpers.Requiring
|
||||
import io.aeron.Aeron
|
||||
import io.aeron.AvailableImageHandler
|
||||
import io.aeron.Image
|
||||
|
|
@ -58,7 +62,6 @@ import io.aeron.driver.MediaDriver
|
|||
import io.aeron.exceptions.ConductorServiceTimeoutException
|
||||
import org.agrona.ErrorHandler
|
||||
import org.agrona.IoUtil
|
||||
import scala.util.Try
|
||||
import java.io.File
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.DatagramChannel
|
||||
|
|
@ -156,6 +159,7 @@ private[akka] final class AssociationState(
|
|||
}
|
||||
s"AssociationState($incarnation, $a)"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -220,10 +224,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
|
||||
private val codec: AkkaPduCodec = AkkaPduProtobufCodec
|
||||
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
|
||||
@volatile private[this] var _shutdown = false
|
||||
|
||||
// FIXME config
|
||||
private val systemMessageResendInterval: FiniteDuration = 1.second
|
||||
private val handshakeTimeout: FiniteDuration = 10.seconds
|
||||
private val handshakeRetryInterval: FiniteDuration = 1.second
|
||||
private val handshakeTimeout: FiniteDuration =
|
||||
system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero,
|
||||
"handshake-timeout must be > 0")
|
||||
|
||||
private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}"
|
||||
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
|
||||
|
|
@ -234,6 +242,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
// FIXME: This does locking on putIfAbsent, we need something smarter
|
||||
private[this] val associations = new ConcurrentHashMap[Address, Association]()
|
||||
|
||||
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
|
||||
private val maxRestarts = 5 // FIXME config
|
||||
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
|
||||
|
||||
override def start(): Unit = {
|
||||
startMediaDriver()
|
||||
startAeron()
|
||||
|
|
@ -252,7 +264,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
|
||||
messageDispatcher = new MessageDispatcher(system, provider)
|
||||
|
||||
runInboundFlows()
|
||||
runInboundStreams()
|
||||
}
|
||||
|
||||
private def startMediaDriver(): Unit = {
|
||||
|
|
@ -298,14 +310,19 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
aeron = Aeron.connect(ctx)
|
||||
}
|
||||
|
||||
private def runInboundFlows(): Unit = {
|
||||
// control stream
|
||||
controlSubject = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner))
|
||||
private def runInboundStreams(): Unit = {
|
||||
runInboundControlStream()
|
||||
runInboundOrdinaryMessagesStream()
|
||||
}
|
||||
|
||||
private def runInboundControlStream(): Unit = {
|
||||
val (c, completed) = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner))
|
||||
.async // FIXME measure
|
||||
.map(ByteString.apply) // TODO we should use ByteString all the way
|
||||
.viaMat(inboundControlFlow)(Keep.right)
|
||||
.to(Sink.ignore)
|
||||
.toMat(Sink.ignore)(Keep.both)
|
||||
.run()(materializer)
|
||||
controlSubject = c
|
||||
|
||||
controlSubject.attach(new ControlMessageObserver {
|
||||
override def notify(inboundEnvelope: InboundEnvelope): Unit = {
|
||||
|
|
@ -321,14 +338,51 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
})
|
||||
|
||||
// ordinary messages stream
|
||||
Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner))
|
||||
controlSubject.attach(new ControlMessageObserver {
|
||||
override def notify(inboundEnvelope: InboundEnvelope): Unit = {
|
||||
inboundEnvelope.message match {
|
||||
case Quarantined(from, to) if to == localAddress ⇒
|
||||
val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address)
|
||||
publishLifecycleEvent(lifecycleEvent)
|
||||
// quarantine the other system from here
|
||||
association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid))
|
||||
case _ ⇒ // not interesting
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream())
|
||||
}
|
||||
|
||||
private def runInboundOrdinaryMessagesStream(): Unit = {
|
||||
val completed = Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner))
|
||||
.async // FIXME measure
|
||||
.map(ByteString.apply) // TODO we should use ByteString all the way
|
||||
.via(inboundFlow)
|
||||
.runWith(Sink.ignore)(materializer)
|
||||
|
||||
attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream())
|
||||
}
|
||||
|
||||
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = {
|
||||
implicit val ec = materializer.executionContext
|
||||
streamCompleted.onFailure {
|
||||
case _: AbruptTerminationException ⇒ // ActorSystem shutdown
|
||||
case cause ⇒
|
||||
if (!isShutdown)
|
||||
if (restartCounter.restart()) {
|
||||
log.error(cause, "{} failed. Restarting it.", streamName)
|
||||
restart()
|
||||
} else {
|
||||
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.",
|
||||
streamName, maxRestarts, restartTimeout.toSeconds)
|
||||
system.terminate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def shutdown(): Future[Done] = {
|
||||
_shutdown = true
|
||||
killSwitch.shutdown()
|
||||
if (taskRunner != null) taskRunner.stop()
|
||||
if (aeron != null) aeron.close()
|
||||
|
|
@ -340,6 +394,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
Future.successful(Done)
|
||||
}
|
||||
|
||||
private[remote] def isShutdown(): Boolean = _shutdown
|
||||
|
||||
// InboundContext
|
||||
override def sendControl(to: Address, message: ControlMessage) =
|
||||
association(to).outboundControlIngress.sendControlMessage(message)
|
||||
|
|
@ -375,22 +431,24 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit =
|
||||
association(remoteAddress).quarantine(reason = "", uid) // FIXME change the method signature (old remoting) to include reason?
|
||||
|
||||
def outbound(outboundContext: OutboundContext): Sink[Send, Any] = {
|
||||
def outbound(outboundContext: OutboundContext): Sink[Send, Future[Done]] = {
|
||||
Flow.fromGraph(killSwitch.flow[Send])
|
||||
.via(new OutboundHandshake(outboundContext, handshakeTimeout))
|
||||
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
|
||||
.via(encoder)
|
||||
.map(_.toArray) // TODO we should use ByteString all the way
|
||||
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner))
|
||||
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner))(Keep.right)
|
||||
}
|
||||
|
||||
def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = {
|
||||
def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = {
|
||||
Flow.fromGraph(killSwitch.flow[Send])
|
||||
.via(new OutboundHandshake(outboundContext, handshakeTimeout))
|
||||
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
|
||||
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize))
|
||||
.viaMat(new OutboundControlJunction(outboundContext))(Keep.right)
|
||||
.via(encoder)
|
||||
.map(_.toArray) // TODO we should use ByteString all the way
|
||||
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner))
|
||||
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner))(Keep.both)
|
||||
|
||||
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
|
||||
|
||||
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,10 +3,19 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.util.Success
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSelectionMessage
|
||||
import akka.actor.Address
|
||||
import akka.actor.RootActorPath
|
||||
import akka.dispatch.sysmsg.SystemMessage
|
||||
|
|
@ -16,17 +25,15 @@ import akka.remote.RemoteActorRef
|
|||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
||||
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
|
||||
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
|
||||
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
|
||||
import akka.stream.AbruptTerminationException
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.OverflowStrategy
|
||||
import akka.stream.scaladsl.Keep
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.scaladsl.SourceQueueWithComplete
|
||||
import akka.util.Unsafe
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.actor.ActorSelectionMessage
|
||||
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -44,10 +51,14 @@ private[akka] class Association(
|
|||
private val log = Logging(transport.system, getClass.getName)
|
||||
private val controlQueueSize = transport.provider.remoteSettings.SysMsgBufferSize
|
||||
|
||||
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
|
||||
private val maxRestarts = 5 // FIXME config
|
||||
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
|
||||
|
||||
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
|
||||
@volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _
|
||||
@volatile private[this] var _outboundControlIngress: OutboundControlIngress = _
|
||||
private val materializing = new CountDownLatch(1)
|
||||
@volatile private[this] var materializing = new CountDownLatch(1)
|
||||
|
||||
def outboundControlIngress: OutboundControlIngress = {
|
||||
if (_outboundControlIngress ne null)
|
||||
|
|
@ -115,7 +126,7 @@ private[akka] class Association(
|
|||
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
|
||||
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
|
||||
// FIXME where is that ActorSelectionMessage check in old remoting?
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message.isInstanceOf[ClearSystemMessageDelivery.type]) {
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
|
||||
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
|
||||
message match {
|
||||
case _: SystemMessage | ClearSystemMessageDelivery ⇒
|
||||
|
|
@ -179,22 +190,55 @@ private[akka] class Association(
|
|||
|
||||
// Idempotent
|
||||
def associate(): Unit = {
|
||||
// FIXME detect and handle stream failure, e.g. handshake timeout
|
||||
|
||||
// it's important to materialize the outboundControl stream first,
|
||||
// so that outboundControlIngress is ready when stages for all streams start
|
||||
if (controlQueue eq null) {
|
||||
val (q, control) = Source.queue(controlQueueSize, OverflowStrategy.backpressure)
|
||||
.toMat(transport.outboundControl(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
controlQueue = q
|
||||
_outboundControlIngress = control
|
||||
// stage in the control stream may access the outboundControlIngress before returned here
|
||||
// using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress
|
||||
materializing.countDown()
|
||||
// it's important to materialize the outboundControl stream first,
|
||||
// so that outboundControlIngress is ready when stages for all streams start
|
||||
runOutboundControlStream()
|
||||
runOutboundOrdinaryMessagesStream()
|
||||
}
|
||||
}
|
||||
|
||||
queue = Source.queue(256, OverflowStrategy.dropBuffer)
|
||||
.to(transport.outbound(this)).run()(materializer)
|
||||
private def runOutboundControlStream(): Unit = {
|
||||
// stage in the control stream may access the outboundControlIngress before returned here
|
||||
// using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress
|
||||
materializing = new CountDownLatch(1)
|
||||
val (q, (control, completed)) = Source.queue(controlQueueSize, OverflowStrategy.backpressure)
|
||||
.toMat(transport.outboundControl(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
controlQueue = q
|
||||
_outboundControlIngress = control
|
||||
materializing.countDown()
|
||||
attachStreamRestart("Outbound control stream", completed, cause ⇒ {
|
||||
runOutboundControlStream()
|
||||
cause match {
|
||||
case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID
|
||||
case _ ⇒ quarantine("Outbound control stream restarted")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private def runOutboundOrdinaryMessagesStream(): Unit = {
|
||||
val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer)
|
||||
.toMat(transport.outbound(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
queue = q
|
||||
attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream())
|
||||
}
|
||||
|
||||
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = {
|
||||
implicit val ec = materializer.executionContext
|
||||
streamCompleted.onFailure {
|
||||
case _: AbruptTerminationException ⇒ // ActorSystem shutdown
|
||||
case cause ⇒
|
||||
if (!transport.isShutdown)
|
||||
if (restartCounter.restart()) {
|
||||
log.error(cause, "{} failed. Restarting it.", streamName)
|
||||
restart(cause)
|
||||
} else {
|
||||
log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.",
|
||||
streamName, maxRestarts, restartTimeout.toSeconds)
|
||||
transport.system.terminate()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,10 +3,9 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Success
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.remote.UniqueAddress
|
||||
|
|
@ -24,6 +23,13 @@ import akka.stream.stage.TimerGraphStageLogic
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object OutboundHandshake {
|
||||
|
||||
/**
|
||||
* Stream is failed with this exception if the handshake is not completed
|
||||
* within the handshake timeout.
|
||||
*/
|
||||
class HandshakeTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace
|
||||
|
||||
// FIXME serialization for these messages
|
||||
final case class HandshakeReq(from: UniqueAddress) extends ControlMessage
|
||||
final case class HandshakeRsp(from: UniqueAddress) extends Reply
|
||||
|
|
@ -34,13 +40,16 @@ private[akka] object OutboundHandshake {
|
|||
private case object Completed extends HandshakeState
|
||||
|
||||
private case object HandshakeTimeout
|
||||
private case object HandshakeRetryTick
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration) extends GraphStage[FlowShape[Send, Send]] {
|
||||
private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration, retryInterval: FiniteDuration)
|
||||
extends GraphStage[FlowShape[Send, Send]] {
|
||||
|
||||
val in: Inlet[Send] = Inlet("OutboundHandshake.in")
|
||||
val out: Outlet[Send] = Outlet("OutboundHandshake.out")
|
||||
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
|
||||
|
|
@ -68,8 +77,6 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
|
|||
}
|
||||
}.invoke
|
||||
}
|
||||
|
||||
scheduleOnce(HandshakeTimeout, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -87,21 +94,29 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout:
|
|||
case Start ⇒
|
||||
// will pull when handshake reply is received (uniqueRemoteAddress completed)
|
||||
handshakeState = ReqInProgress
|
||||
outboundContext.sendControl(HandshakeReq(outboundContext.localAddress))
|
||||
scheduleOnce(HandshakeTimeout, timeout)
|
||||
schedulePeriodically(HandshakeRetryTick, retryInterval)
|
||||
sendHandshakeReq()
|
||||
case ReqInProgress ⇒ // will pull when handshake reply is received
|
||||
}
|
||||
}
|
||||
|
||||
private def sendHandshakeReq(): Unit =
|
||||
outboundContext.sendControl(HandshakeReq(outboundContext.localAddress))
|
||||
|
||||
private def handshakeCompleted(): Unit = {
|
||||
handshakeState = Completed
|
||||
cancelTimer(HandshakeRetryTick)
|
||||
cancelTimer(HandshakeTimeout)
|
||||
}
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit =
|
||||
timerKey match {
|
||||
case HandshakeRetryTick ⇒
|
||||
sendHandshakeReq()
|
||||
case HandshakeTimeout ⇒
|
||||
// FIXME would it make sense to retry a few times before failing?
|
||||
failStage(new TimeoutException(
|
||||
failStage(new HandshakeTimeoutException(
|
||||
s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms"))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration.Deadline
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.annotation.tailrec
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object RestartCounter {
|
||||
final case class State(count: Int, deadline: Deadline)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API: Thread safe "restarts with duration" counter
|
||||
*/
|
||||
private[akka] class RestartCounter(maxRestarts: Int, restartTimeout: FiniteDuration) {
|
||||
import RestartCounter._
|
||||
|
||||
private val state = new AtomicReference[State](State(0, Deadline.now + restartTimeout))
|
||||
|
||||
/**
|
||||
* Current number of restarts.
|
||||
*/
|
||||
def count(): Int = state.get.count
|
||||
|
||||
/**
|
||||
* Increment the restart counter, or reset the counter to 1 if the
|
||||
* `restartTimeout` has elapsed. The latter also resets the timeout.
|
||||
* @return `true` if number of restarts, including this one, is less
|
||||
* than or equal to `maxRestarts`
|
||||
*/
|
||||
@tailrec final def restart(): Boolean = {
|
||||
val s = state.get
|
||||
|
||||
val newState =
|
||||
if (s.deadline.hasTimeLeft())
|
||||
s.copy(count = s.count + 1)
|
||||
else
|
||||
State(1, Deadline.now + restartTimeout)
|
||||
|
||||
if (state.compareAndSet(s, newState))
|
||||
newState.count <= maxRestarts
|
||||
else
|
||||
restart() // recur
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.{ ActorIdentity, ActorSystem, Identify }
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||
import akka.testkit.SocketUtil
|
||||
import akka.testkit.TestActors
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object HandshakeFailureSpec {
|
||||
|
||||
val Seq(portA, portB) = SocketUtil.temporaryServerAddresses(2, "localhost", udp = true).map(_.getPort)
|
||||
|
||||
val commonConfig = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.artery.enabled = on
|
||||
remote.artery.hostname = localhost
|
||||
remote.artery.port = $portA
|
||||
remote.handshake-timeout = 2s
|
||||
}
|
||||
""")
|
||||
|
||||
val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB")
|
||||
.withFallback(commonConfig)
|
||||
|
||||
}
|
||||
|
||||
class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender {
|
||||
import HandshakeFailureSpec._
|
||||
|
||||
var systemB: ActorSystem = null
|
||||
|
||||
"Artery handshake" must {
|
||||
|
||||
"allow for timeout and later connect" in {
|
||||
def sel = system.actorSelection(s"akka.artery://systemB@localhost:$portB/user/echo")
|
||||
sel ! "hello"
|
||||
expectNoMsg(3.seconds) // longer than handshake-timeout
|
||||
|
||||
systemB = ActorSystem("systemB", HandshakeFailureSpec.configB)
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
|
||||
within(10.seconds) {
|
||||
awaitAssert {
|
||||
println(s"# identify $sel") // FIXME
|
||||
sel ! "hello2"
|
||||
expectMsg(1.second, "hello2")
|
||||
}
|
||||
}
|
||||
|
||||
sel ! Identify(None)
|
||||
val remoteRef = expectMsgType[ActorIdentity].ref.get
|
||||
|
||||
remoteRef ! "ping"
|
||||
expectMsg("ping")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit =
|
||||
if (systemB != null) shutdown(systemB)
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.{ ActorIdentity, ActorSystem, Identify }
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||
import akka.testkit.SocketUtil
|
||||
import akka.testkit.TestActors
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object HandshakeRetrySpec {
|
||||
|
||||
val Seq(portA, portB) = SocketUtil.temporaryServerAddresses(2, "localhost", udp = true).map(_.getPort)
|
||||
|
||||
val commonConfig = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.artery.enabled = on
|
||||
remote.artery.hostname = localhost
|
||||
remote.artery.port = $portA
|
||||
remote.handshake-timeout = 10s
|
||||
}
|
||||
""")
|
||||
|
||||
val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB")
|
||||
.withFallback(commonConfig)
|
||||
|
||||
}
|
||||
|
||||
class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender {
|
||||
import HandshakeRetrySpec._
|
||||
|
||||
var systemB: ActorSystem = null
|
||||
|
||||
"Artery handshake" must {
|
||||
|
||||
"be retried during handshake-timeout (no message loss)" in {
|
||||
def sel = system.actorSelection(s"akka.artery://systemB@localhost:$portB/user/echo")
|
||||
sel ! "hello"
|
||||
expectNoMsg(1.second)
|
||||
|
||||
systemB = ActorSystem("systemB", HandshakeRetrySpec.configB)
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
|
||||
expectMsg("hello")
|
||||
|
||||
sel ! Identify(None)
|
||||
val remoteRef = expectMsgType[ActorIdentity].ref.get
|
||||
|
||||
remoteRef ! "ping"
|
||||
expectMsg("ping")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit =
|
||||
if (systemB != null) shutdown(systemB)
|
||||
|
||||
}
|
||||
|
|
@ -3,8 +3,6 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.TimeoutException
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.Address
|
||||
|
|
@ -12,6 +10,7 @@ import akka.remote.EndpointManager.Send
|
|||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.OutboundHandshake.HandshakeReq
|
||||
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
|
||||
import akka.remote.artery.SystemMessageDelivery._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
|
|
@ -32,11 +31,12 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
|||
val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1)
|
||||
val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2)
|
||||
|
||||
private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
|
||||
private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds,
|
||||
retryInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
|
||||
val destination = null.asInstanceOf[RemoteActorRef] // not used
|
||||
TestSource.probe[String]
|
||||
.map(msg ⇒ Send(msg, None, destination, None))
|
||||
.via(new OutboundHandshake(outboundContext, timeout))
|
||||
.via(new OutboundHandshake(outboundContext, timeout, retryInterval))
|
||||
.map { case Send(msg, _, _, _) ⇒ msg }
|
||||
.toMat(TestSink.probe[Any])(Keep.both)
|
||||
.run()
|
||||
|
|
@ -60,7 +60,20 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
|
|||
val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis)
|
||||
|
||||
downstream.request(1)
|
||||
downstream.expectError().getClass should be(classOf[TimeoutException])
|
||||
downstream.expectError().getClass should be(classOf[HandshakeTimeoutException])
|
||||
}
|
||||
|
||||
"retry HandshakeReq" in {
|
||||
val controlProbe = TestProbe()
|
||||
val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref))
|
||||
val outboundContext = inboundContext.association(addressB.address)
|
||||
val (upstream, downstream) = setupStream(outboundContext, retryInterval = 100.millis)
|
||||
|
||||
downstream.request(10)
|
||||
controlProbe.expectMsg(HandshakeReq(addressA))
|
||||
controlProbe.expectMsg(HandshakeReq(addressA))
|
||||
controlProbe.expectMsg(HandshakeReq(addressA))
|
||||
downstream.cancel()
|
||||
}
|
||||
|
||||
"not deliver messages from upstream until handshake completed" in {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
|
||||
class RestartCounterSpec extends WordSpec with Matchers {
|
||||
|
||||
"RestartCounter" must {
|
||||
|
||||
"count max restarts within duration" in {
|
||||
val counter = new RestartCounter(3, 3.seconds)
|
||||
counter.restart() should ===(true)
|
||||
counter.restart() should ===(true)
|
||||
counter.restart() should ===(true)
|
||||
counter.restart() should ===(false)
|
||||
counter.count() should ===(4)
|
||||
}
|
||||
|
||||
"allow sporadic restarts" in {
|
||||
val counter = new RestartCounter(3, 10.millis)
|
||||
for (_ ← 1 to 10) {
|
||||
counter.restart() should ===(true)
|
||||
Thread.sleep(20)
|
||||
}
|
||||
}
|
||||
|
||||
"reset count after timeout" in {
|
||||
val counter = new RestartCounter(3, 500.millis)
|
||||
counter.restart()
|
||||
counter.restart()
|
||||
counter.count() should ===(2)
|
||||
Thread.sleep(600)
|
||||
counter.restart()
|
||||
counter.count() should ===(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue