add unit tests for the handshake and control stages, #20313

This commit is contained in:
Patrik Nordwall 2016-05-12 11:42:09 +02:00
parent 97691d104f
commit 64b1007114
8 changed files with 340 additions and 23 deletions

View file

@ -142,7 +142,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val codec: AkkaPduCodec = AkkaPduProtobufCodec
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
private val systemMessageResendInterval: FiniteDuration = 1.second // FIXME config
// FIXME config
private val systemMessageResendInterval: FiniteDuration = 1.second
private val handshakeTimeout: FiniteDuration = 10.seconds
// TODO support port 0
private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}"
@ -276,7 +279,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outbound(outboundContext: OutboundContext): Sink[Send, Any] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext))
.via(new OutboundHandshake(outboundContext, handshakeTimeout))
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner))
@ -284,7 +287,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext))
.via(new OutboundHandshake(outboundContext, handshakeTimeout))
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval))
.viaMat(new OutboundControlJunction(outboundContext))(Keep.right)
.via(encoder)

View file

@ -40,7 +40,7 @@ private[akka] object OutboundHandshake {
/**
* INTERNAL API
*/
private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends GraphStage[FlowShape[Send, Send]] {
private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: FiniteDuration) extends GraphStage[FlowShape[Send, Send]] {
val in: Inlet[Send] = Inlet("OutboundHandshake.in")
val out: Outlet[Send] = Outlet("OutboundHandshake.out")
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
@ -49,7 +49,6 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends
new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver {
import OutboundHandshake._
private val timeout: FiniteDuration = 10.seconds // FIXME config
private var handshakeState: HandshakeState = Start
private def remoteAddress = outboundContext.remoteAddress

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.Address
import akka.actor.InternalActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestProbe
object InboundControlJunctionSpec {
case object Control1 extends ControlMessage
case object Control2 extends ControlMessage
case object Control3 extends ControlMessage
}
class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
import InboundControlJunctionSpec._
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1)
val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2)
"Control messages" must {
"be emitted via side channel" in {
val observerProbe = TestProbe()
val inboundContext = new TestInboundContext(localAddress = addressB)
val recipient = null.asInstanceOf[InternalActorRef] // not used
val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef]
.map(msg InboundEnvelope(recipient, addressB.address, msg, None))
.viaMat(new InboundControlJunction)(Keep.both)
.map { case InboundEnvelope(_, _, msg, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)
.run()
controlSubject.attach(new ControlMessageObserver {
override def notify(env: InboundEnvelope) {
observerProbe.ref ! env.message
}
})
downstream.request(10)
upstream.sendNext("msg1")
downstream.expectNext("msg1")
upstream.sendNext(Control1)
upstream.sendNext(Control2)
observerProbe.expectMsg(Control1)
observerProbe.expectMsg(Control2)
upstream.sendNext("msg2")
downstream.expectNext("msg2")
upstream.sendNext(Control3)
observerProbe.expectMsg(Control3)
downstream.cancel()
}
}
}

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.Address
import akka.actor.InternalActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.remote.artery.OutboundHandshake.HandshakeRsp
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestProbe
object InboundHandshakeSpec {
case object Control1 extends ControlMessage
case object Control2 extends ControlMessage
case object Control3 extends ControlMessage
}
class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
import InboundHandshakeSpec._
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1)
val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2)
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
val recipient = null.asInstanceOf[InternalActorRef] // not used
TestSource.probe[AnyRef]
.map(msg InboundEnvelope(recipient, addressB.address, msg, None))
.via(new InboundHandshake(inboundContext))
.map { case InboundEnvelope(_, _, msg, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)
.run()
}
"InboundHandshake stage" must {
"send HandshakeRsp as reply to HandshakeReq" in {
val replyProbe = TestProbe()
val inboundContext = new ManualReplyInboundContext(replyProbe.ref, addressB, new TestControlMessageSubject)
val (upstream, downstream) = setupStream(inboundContext)
downstream.request(10)
upstream.sendNext(HandshakeReq(addressA))
upstream.sendNext("msg1")
replyProbe.expectMsg(HandshakeRsp(addressB))
downstream.expectNext("msg1")
downstream.cancel()
}
}
}

View file

@ -0,0 +1,70 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.Address
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
object OutboundControlJunctionSpec {
case object Control1 extends ControlMessage
case object Control2 extends ControlMessage
case object Control3 extends ControlMessage
}
class OutboundControlJunctionSpec extends AkkaSpec with ImplicitSender {
import OutboundControlJunctionSpec._
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1)
val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2)
"Control messages" must {
"be injected via side channel" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val destination = null.asInstanceOf[RemoteActorRef] // not used
val ((upstream, controlIngress), downstream) = TestSource.probe[String]
.map(msg Send(msg, None, destination, None))
.viaMat(new OutboundControlJunction(outboundContext))(Keep.both)
.map { case Send(msg, _, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)
.run()
controlIngress.sendControlMessage(Control1)
downstream.request(1)
downstream.expectNext(Control1)
upstream.sendNext("msg1")
downstream.request(1)
downstream.expectNext("msg1")
upstream.sendNext("msg2")
downstream.request(1)
downstream.expectNext("msg2")
controlIngress.sendControlMessage(Control2)
upstream.sendNext("msg3")
downstream.request(10)
downstream.expectNextUnorderedN(List("msg3", Control2))
controlIngress.sendControlMessage(Control3)
downstream.expectNext(Control3)
downstream.cancel()
}
}
}

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import akka.actor.Address
import akka.actor.InternalActorRef
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.remote.artery.OutboundHandshake.HandshakeRsp
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
val addressA = UniqueAddress(Address("akka.artery", "sysA", "hostA", 1001), 1)
val addressB = UniqueAddress(Address("akka.artery", "sysB", "hostB", 1002), 2)
private def setupStream(outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
val destination = null.asInstanceOf[RemoteActorRef] // not used
TestSource.probe[String]
.map(msg Send(msg, None, destination, None))
.via(new OutboundHandshake(outboundContext, timeout))
.map { case Send(msg, _, _, _) msg }
.toMat(TestSink.probe[Any])(Keep.both)
.run()
}
"OutboundHandshake stage" must {
"send HandshakeReq when first pulled" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
downstream.expectNext(HandshakeReq(addressA))
downstream.cancel()
}
"timeout if not receiving HandshakeRsp" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis)
downstream.request(1)
downstream.expectNext(HandshakeReq(addressA))
downstream.expectError().getClass should be(classOf[TimeoutException])
}
"not deliver messages from upstream until handshake completed" in {
val controlSubject = new TestControlMessageSubject
val inboundContext = new TestInboundContext(localAddress = addressA, controlSubject)
val outboundContext = inboundContext.association(addressB.address)
val recipient = null.asInstanceOf[InternalActorRef] // not used
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
downstream.expectNext(HandshakeReq(addressA))
upstream.sendNext("msg1")
downstream.expectNoMsg(200.millis)
controlSubject.sendControl(InboundEnvelope(recipient, addressA.address, HandshakeRsp(addressB), None))
downstream.expectNext("msg1")
upstream.sendNext("msg2")
downstream.expectNext("msg2")
downstream.cancel()
}
"complete handshake via another sub-channel" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
downstream.expectNext(HandshakeReq(addressA))
upstream.sendNext("msg1")
// handshake completed first by another sub-channel
outboundContext.completeRemoteAddress(addressB)
downstream.expectNext("msg1")
upstream.sendNext("msg2")
downstream.expectNext("msg2")
downstream.cancel()
}
}
}

View file

@ -54,24 +54,6 @@ object SystemMessageDeliverySpec {
val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB")
.withFallback(commonConfig)
class ManualReplyInboundContext(
replyProbe: ActorRef,
localAddress: UniqueAddress,
controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) {
private var lastReply: Option[(Address, ControlMessage)] = None
override def sendControl(to: Address, message: ControlMessage) = {
lastReply = Some((to, message))
replyProbe ! message
}
def deliverLastReply(): Unit = {
lastReply.foreach { case (to, message) super.sendControl(to, message) }
lastReply = None
}
}
}
class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commonConfig) with ImplicitSender {

View file

@ -14,6 +14,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageObserver
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ThreadLocalRandom
import akka.actor.ActorRef
private[akka] class TestInboundContext(
override val localAddress: UniqueAddress,
@ -76,4 +77,23 @@ private[akka] class TestControlMessageSubject extends ControlMessageSubject {
while (iter.hasNext())
iter.next().notify(env)
}
}
private[akka] class ManualReplyInboundContext(
replyProbe: ActorRef,
localAddress: UniqueAddress,
controlSubject: TestControlMessageSubject) extends TestInboundContext(localAddress, controlSubject) {
private var lastReply: Option[(Address, ControlMessage)] = None
override def sendControl(to: Address, message: ControlMessage) = {
lastReply = Some((to, message))
replyProbe ! message
}
def deliverLastReply(): Unit = {
lastReply.foreach { case (to, message) super.sendControl(to, message) }
lastReply = None
}
}