From 96b697d92f9eedc133111af345b031cf15021cdd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 17 May 2016 17:34:57 +0200 Subject: [PATCH] handle stream failures by restarting, #20317 * allow X restarts within Y seconds * and handle handshake timeout --- .../scala/akka/remote/artery/AeronSink.scala | 29 +++++-- .../akka/remote/artery/ArteryTransport.scala | 86 ++++++++++++++++--- .../akka/remote/artery/Association.scala | 86 ++++++++++++++----- .../scala/akka/remote/artery/Handshake.scala | 29 +++++-- .../akka/remote/artery/RestartCounter.scala | 52 +++++++++++ .../remote/artery/HandshakeFailureSpec.scala | 68 +++++++++++++++ .../remote/artery/HandshakeRetrySpec.scala | 62 +++++++++++++ .../remote/artery/OutboundHandshakeSpec.scala | 23 +++-- .../remote/artery/RestartCounterSpec.scala | 42 +++++++++ 9 files changed, 425 insertions(+), 52 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/artery/RestartCounter.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/RestartCounterSpec.scala diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index 590035ea01..9807591bc1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -4,18 +4,23 @@ package akka.remote.artery import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec +import scala.concurrent.Future +import scala.concurrent.Promise import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import akka.Done import akka.stream.Attributes import akka.stream.Inlet import akka.stream.SinkShape import akka.stream.stage.AsyncCallback -import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic +import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import io.aeron.Aeron import io.aeron.Publication @@ -51,19 +56,23 @@ object AeronSink { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner) extends GraphStage[SinkShape[AeronSink.Bytes]] { +class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner) + extends GraphStageWithMaterializedValue[SinkShape[AeronSink.Bytes], Future[Done]] { import AeronSink._ import TaskRunner._ val in: Inlet[Bytes] = Inlet("AeronSink") override val shape: SinkShape[Bytes] = SinkShape(in) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val completed = Promise[Done]() + val logic = new GraphStageLogic(shape) with InHandler { private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024)) private val pub = aeron.addPublication(channel, streamId) + private var completedValue: Try[Done] = Success(Done) + private val spinning = 1000 private var backoffCount = spinning private var lastMsgSize = 0 @@ -80,6 +89,7 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu override def postStop(): Unit = { taskRunner.command(Remove(addOfferTask.task)) pub.close() + completed.complete(completedValue) } // InHandler @@ -124,6 +134,15 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu super.onUpstreamFinish() } + override def onUpstreamFailure(cause: Throwable): Unit = { + completedValue = Failure(cause) + super.onUpstreamFailure(cause) + } + setHandler(in, this) } + + (logic, completed.future) + } + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 9952a82c3a..a9473a9d9f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -13,6 +13,7 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success +import scala.util.Try import akka.Done import akka.NotUsed import akka.actor.ActorRef @@ -39,6 +40,7 @@ import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization +import akka.stream.AbruptTerminationException import akka.stream.ActorMaterializer import akka.stream.KillSwitches import akka.stream.Materializer @@ -50,6 +52,8 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.ByteString import akka.util.ByteStringBuilder +import akka.util.Helpers.ConfigOps +import akka.util.Helpers.Requiring import io.aeron.Aeron import io.aeron.AvailableImageHandler import io.aeron.Image @@ -58,7 +62,6 @@ import io.aeron.driver.MediaDriver import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil -import scala.util.Try import java.io.File import java.net.InetSocketAddress import java.nio.channels.DatagramChannel @@ -156,6 +159,7 @@ private[akka] final class AssociationState( } s"AssociationState($incarnation, $a)" } + } /** @@ -220,10 +224,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val codec: AkkaPduCodec = AkkaPduProtobufCodec private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") + @volatile private[this] var _shutdown = false // FIXME config private val systemMessageResendInterval: FiniteDuration = 1.second - private val handshakeTimeout: FiniteDuration = 10.seconds + private val handshakeRetryInterval: FiniteDuration = 1.second + private val handshakeTimeout: FiniteDuration = + system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero, + "handshake-timeout must be > 0") private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" @@ -234,6 +242,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME: This does locking on putIfAbsent, we need something smarter private[this] val associations = new ConcurrentHashMap[Address, Association]() + private val restartTimeout: FiniteDuration = 5.seconds // FIXME config + private val maxRestarts = 5 // FIXME config + private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) + override def start(): Unit = { startMediaDriver() startAeron() @@ -252,7 +264,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R messageDispatcher = new MessageDispatcher(system, provider) - runInboundFlows() + runInboundStreams() } private def startMediaDriver(): Unit = { @@ -298,14 +310,19 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R aeron = Aeron.connect(ctx) } - private def runInboundFlows(): Unit = { - // control stream - controlSubject = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner)) + private def runInboundStreams(): Unit = { + runInboundControlStream() + runInboundOrdinaryMessagesStream() + } + + private def runInboundControlStream(): Unit = { + val (c, completed) = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way .viaMat(inboundControlFlow)(Keep.right) - .to(Sink.ignore) + .toMat(Sink.ignore)(Keep.both) .run()(materializer) + controlSubject = c controlSubject.attach(new ControlMessageObserver { override def notify(inboundEnvelope: InboundEnvelope): Unit = { @@ -321,14 +338,51 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R }) // ordinary messages stream - Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner)) + controlSubject.attach(new ControlMessageObserver { + override def notify(inboundEnvelope: InboundEnvelope): Unit = { + inboundEnvelope.message match { + case Quarantined(from, to) if to == localAddress ⇒ + val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address) + publishLifecycleEvent(lifecycleEvent) + // quarantine the other system from here + association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid)) + case _ ⇒ // not interesting + } + } + }) + + attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream()) + } + + private def runInboundOrdinaryMessagesStream(): Unit = { + val completed = Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way .via(inboundFlow) .runWith(Sink.ignore)(materializer) + + attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream()) + } + + private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { + implicit val ec = materializer.executionContext + streamCompleted.onFailure { + case _: AbruptTerminationException ⇒ // ActorSystem shutdown + case cause ⇒ + if (!isShutdown) + if (restartCounter.restart()) { + log.error(cause, "{} failed. Restarting it.", streamName) + restart() + } else { + log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.", + streamName, maxRestarts, restartTimeout.toSeconds) + system.terminate() + } + } } override def shutdown(): Future[Done] = { + _shutdown = true killSwitch.shutdown() if (taskRunner != null) taskRunner.stop() if (aeron != null) aeron.close() @@ -340,6 +394,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Future.successful(Done) } + private[remote] def isShutdown(): Boolean = _shutdown + // InboundContext override def sendControl(to: Address, message: ControlMessage) = association(to).outboundControlIngress.sendControlMessage(message) @@ -375,22 +431,24 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = association(remoteAddress).quarantine(reason = "", uid) // FIXME change the method signature (old remoting) to include reason? - def outbound(outboundContext: OutboundContext): Sink[Send, Any] = { + def outbound(outboundContext: OutboundContext): Sink[Send, Future[Done]] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext, handshakeTimeout)) + .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner))(Keep.right) } - def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = { + def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = { Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext, handshakeTimeout)) + .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner)) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner))(Keep.both) + + // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 4c0b427441..219dc15906 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -3,10 +3,19 @@ */ package akka.remote.artery +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + import scala.annotation.tailrec +import scala.concurrent.Future import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.concurrent.duration.FiniteDuration import scala.util.Success + +import akka.Done import akka.actor.ActorRef +import akka.actor.ActorSelectionMessage import akka.actor.Address import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage @@ -16,17 +25,15 @@ import akka.remote.RemoteActorRef import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress +import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException +import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery +import akka.stream.AbruptTerminationException import akka.stream.Materializer import akka.stream.OverflowStrategy import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete import akka.util.Unsafe -import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import akka.actor.ActorSelectionMessage -import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery /** * INTERNAL API @@ -44,10 +51,14 @@ private[akka] class Association( private val log = Logging(transport.system, getClass.getName) private val controlQueueSize = transport.provider.remoteSettings.SysMsgBufferSize + private val restartTimeout: FiniteDuration = 5.seconds // FIXME config + private val maxRestarts = 5 // FIXME config + private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) + @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _ @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ - private val materializing = new CountDownLatch(1) + @volatile private[this] var materializing = new CountDownLatch(1) def outboundControlIngress: OutboundControlIngress = { if (_outboundControlIngress ne null) @@ -115,7 +126,7 @@ private[akka] class Association( def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system // FIXME where is that ActorSelectionMessage check in old remoting? - if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message.isInstanceOf[ClearSystemMessageDelivery.type]) { + if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { case _: SystemMessage | ClearSystemMessageDelivery ⇒ @@ -179,22 +190,55 @@ private[akka] class Association( // Idempotent def associate(): Unit = { - // FIXME detect and handle stream failure, e.g. handshake timeout - - // it's important to materialize the outboundControl stream first, - // so that outboundControlIngress is ready when stages for all streams start if (controlQueue eq null) { - val (q, control) = Source.queue(controlQueueSize, OverflowStrategy.backpressure) - .toMat(transport.outboundControl(this))(Keep.both) - .run()(materializer) - controlQueue = q - _outboundControlIngress = control - // stage in the control stream may access the outboundControlIngress before returned here - // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress - materializing.countDown() + // it's important to materialize the outboundControl stream first, + // so that outboundControlIngress is ready when stages for all streams start + runOutboundControlStream() + runOutboundOrdinaryMessagesStream() + } + } - queue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outbound(this)).run()(materializer) + private def runOutboundControlStream(): Unit = { + // stage in the control stream may access the outboundControlIngress before returned here + // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress + materializing = new CountDownLatch(1) + val (q, (control, completed)) = Source.queue(controlQueueSize, OverflowStrategy.backpressure) + .toMat(transport.outboundControl(this))(Keep.both) + .run()(materializer) + controlQueue = q + _outboundControlIngress = control + materializing.countDown() + attachStreamRestart("Outbound control stream", completed, cause ⇒ { + runOutboundControlStream() + cause match { + case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID + case _ ⇒ quarantine("Outbound control stream restarted") + } + }) + } + + private def runOutboundOrdinaryMessagesStream(): Unit = { + val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer) + .toMat(transport.outbound(this))(Keep.both) + .run()(materializer) + queue = q + attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) + } + + private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = { + implicit val ec = materializer.executionContext + streamCompleted.onFailure { + case _: AbruptTerminationException ⇒ // ActorSystem shutdown + case cause ⇒ + if (!transport.isShutdown) + if (restartCounter.restart()) { + log.error(cause, "{} failed. Restarting it.", streamName) + restart(cause) + } else { + log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.", + streamName, maxRestarts, restartTimeout.toSeconds) + transport.system.terminate() + } } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index a889acac38..cb2fe64f24 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -3,10 +3,9 @@ */ package akka.remote.artery -import java.util.concurrent.TimeoutException - import scala.concurrent.duration._ import scala.util.Success +import scala.util.control.NoStackTrace import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress @@ -24,6 +23,13 @@ import akka.stream.stage.TimerGraphStageLogic * INTERNAL API */ private[akka] object OutboundHandshake { + + /** + * Stream is failed with this exception if the handshake is not completed + * within the handshake timeout. + */ + class HandshakeTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace + // FIXME serialization for these messages final case class HandshakeReq(from: UniqueAddress) extends ControlMessage final case class HandshakeRsp(from: UniqueAddress) extends Reply @@ -34,13 +40,16 @@ private[akka] object OutboundHandshake { private case object Completed extends HandshakeState private case object HandshakeTimeout + private case object HandshakeRetryTick } /** * INTERNAL API */ -private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration) extends GraphStage[FlowShape[Send, Send]] { +private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration, retryInterval: FiniteDuration) + extends GraphStage[FlowShape[Send, Send]] { + val in: Inlet[Send] = Inlet("OutboundHandshake.in") val out: Outlet[Send] = Outlet("OutboundHandshake.out") override val shape: FlowShape[Send, Send] = FlowShape(in, out) @@ -68,8 +77,6 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: } }.invoke } - - scheduleOnce(HandshakeTimeout, timeout) } } @@ -87,21 +94,29 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: case Start ⇒ // will pull when handshake reply is received (uniqueRemoteAddress completed) handshakeState = ReqInProgress - outboundContext.sendControl(HandshakeReq(outboundContext.localAddress)) + scheduleOnce(HandshakeTimeout, timeout) + schedulePeriodically(HandshakeRetryTick, retryInterval) + sendHandshakeReq() case ReqInProgress ⇒ // will pull when handshake reply is received } } + private def sendHandshakeReq(): Unit = + outboundContext.sendControl(HandshakeReq(outboundContext.localAddress)) + private def handshakeCompleted(): Unit = { handshakeState = Completed + cancelTimer(HandshakeRetryTick) cancelTimer(HandshakeTimeout) } override protected def onTimer(timerKey: Any): Unit = timerKey match { + case HandshakeRetryTick ⇒ + sendHandshakeReq() case HandshakeTimeout ⇒ // FIXME would it make sense to retry a few times before failing? - failStage(new TimeoutException( + failStage(new HandshakeTimeoutException( s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms")) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/RestartCounter.scala b/akka-remote/src/main/scala/akka/remote/artery/RestartCounter.scala new file mode 100644 index 0000000000..6f23b239d7 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/RestartCounter.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration.Deadline +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.duration.FiniteDuration +import scala.annotation.tailrec + +/** + * INTERNAL API + */ +private[akka] object RestartCounter { + final case class State(count: Int, deadline: Deadline) +} + +/** + * INTERNAL API: Thread safe "restarts with duration" counter + */ +private[akka] class RestartCounter(maxRestarts: Int, restartTimeout: FiniteDuration) { + import RestartCounter._ + + private val state = new AtomicReference[State](State(0, Deadline.now + restartTimeout)) + + /** + * Current number of restarts. + */ + def count(): Int = state.get.count + + /** + * Increment the restart counter, or reset the counter to 1 if the + * `restartTimeout` has elapsed. The latter also resets the timeout. + * @return `true` if number of restarts, including this one, is less + * than or equal to `maxRestarts` + */ + @tailrec final def restart(): Boolean = { + val s = state.get + + val newState = + if (s.deadline.hasTimeLeft()) + s.copy(count = s.count + 1) + else + State(1, Deadline.now + restartTimeout) + + if (state.compareAndSet(s, newState)) + newState.count <= maxRestarts + else + restart() // recur + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala new file mode 100644 index 0000000000..08475939fd --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeFailureSpec.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.{ ActorIdentity, ActorSystem, Identify } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.testkit.SocketUtil +import akka.testkit.TestActors +import com.typesafe.config.ConfigFactory + +object HandshakeFailureSpec { + + val Seq(portA, portB) = SocketUtil.temporaryServerAddresses(2, "localhost", udp = true).map(_.getPort) + + val commonConfig = ConfigFactory.parseString(s""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.hostname = localhost + remote.artery.port = $portA + remote.handshake-timeout = 2s + } + """) + + val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB") + .withFallback(commonConfig) + +} + +class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender { + import HandshakeFailureSpec._ + + var systemB: ActorSystem = null + + "Artery handshake" must { + + "allow for timeout and later connect" in { + def sel = system.actorSelection(s"akka.artery://systemB@localhost:$portB/user/echo") + sel ! "hello" + expectNoMsg(3.seconds) // longer than handshake-timeout + + systemB = ActorSystem("systemB", HandshakeFailureSpec.configB) + systemB.actorOf(TestActors.echoActorProps, "echo") + + within(10.seconds) { + awaitAssert { + println(s"# identify $sel") // FIXME + sel ! "hello2" + expectMsg(1.second, "hello2") + } + } + + sel ! Identify(None) + val remoteRef = expectMsgType[ActorIdentity].ref.get + + remoteRef ! "ping" + expectMsg("ping") + } + + } + + override def afterTermination(): Unit = + if (systemB != null) shutdown(systemB) + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala new file mode 100644 index 0000000000..df22b2cb84 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.{ ActorIdentity, ActorSystem, Identify } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.testkit.SocketUtil +import akka.testkit.TestActors +import com.typesafe.config.ConfigFactory + +object HandshakeRetrySpec { + + val Seq(portA, portB) = SocketUtil.temporaryServerAddresses(2, "localhost", udp = true).map(_.getPort) + + val commonConfig = ConfigFactory.parseString(s""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.hostname = localhost + remote.artery.port = $portA + remote.handshake-timeout = 10s + } + """) + + val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB") + .withFallback(commonConfig) + +} + +class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender { + import HandshakeRetrySpec._ + + var systemB: ActorSystem = null + + "Artery handshake" must { + + "be retried during handshake-timeout (no message loss)" in { + def sel = system.actorSelection(s"akka.artery://systemB@localhost:$portB/user/echo") + sel ! "hello" + expectNoMsg(1.second) + + systemB = ActorSystem("systemB", HandshakeRetrySpec.configB) + systemB.actorOf(TestActors.echoActorProps, "echo") + + expectMsg("hello") + + sel ! Identify(None) + val remoteRef = expectMsgType[ActorIdentity].ref.get + + remoteRef ! "ping" + expectMsg("ping") + } + + } + + override def afterTermination(): Unit = + if (systemB != null) shutdown(systemB) + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 3e9d73e0f5..2580a18dec 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -3,8 +3,6 @@ */ package akka.remote.artery -import java.util.concurrent.TimeoutException - import scala.concurrent.duration._ import akka.actor.Address @@ -12,6 +10,7 @@ import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress import akka.remote.artery.OutboundHandshake.HandshakeReq +import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings @@ -32,11 +31,12 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1) val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2) - private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { + private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds, + retryInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { val destination = null.asInstanceOf[RemoteActorRef] // not used TestSource.probe[String] .map(msg ⇒ Send(msg, None, destination, None)) - .via(new OutboundHandshake(outboundContext, timeout)) + .via(new OutboundHandshake(outboundContext, timeout, retryInterval)) .map { case Send(msg, _, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) .run() @@ -60,7 +60,20 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis) downstream.request(1) - downstream.expectError().getClass should be(classOf[TimeoutException]) + downstream.expectError().getClass should be(classOf[HandshakeTimeoutException]) + } + + "retry HandshakeReq" in { + val controlProbe = TestProbe() + val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref)) + val outboundContext = inboundContext.association(addressB.address) + val (upstream, downstream) = setupStream(outboundContext, retryInterval = 100.millis) + + downstream.request(10) + controlProbe.expectMsg(HandshakeReq(addressA)) + controlProbe.expectMsg(HandshakeReq(addressA)) + controlProbe.expectMsg(HandshakeReq(addressA)) + downstream.cancel() } "not deliver messages from upstream until handshake completed" in { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RestartCounterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RestartCounterSpec.scala new file mode 100644 index 0000000000..c0ac272975 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RestartCounterSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class RestartCounterSpec extends WordSpec with Matchers { + + "RestartCounter" must { + + "count max restarts within duration" in { + val counter = new RestartCounter(3, 3.seconds) + counter.restart() should ===(true) + counter.restart() should ===(true) + counter.restart() should ===(true) + counter.restart() should ===(false) + counter.count() should ===(4) + } + + "allow sporadic restarts" in { + val counter = new RestartCounter(3, 10.millis) + for (_ ← 1 to 10) { + counter.restart() should ===(true) + Thread.sleep(20) + } + } + + "reset count after timeout" in { + val counter = new RestartCounter(3, 500.millis) + counter.restart() + counter.restart() + counter.count() should ===(2) + Thread.sleep(600) + counter.restart() + counter.count() should ===(1) + } + } +}