Reset pendingSend on address resolution failure #22076

This commit is contained in:
Andrey Baidarov 2017-02-11 21:20:41 +03:00 committed by Johan Andrén
parent 11ecf03980
commit 9e1d6774e5
2 changed files with 32 additions and 10 deletions

View file

@ -16,7 +16,7 @@ class UdpIntegrationSpec extends AkkaSpec("""
akka.loglevel = INFO
akka.actor.serialize-creators = on""") with ImplicitSender {
val addresses = temporaryServerAddresses(6, udp = true)
val addresses = temporaryServerAddresses(7, udp = true)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
@ -25,7 +25,7 @@ class UdpIntegrationSpec extends AkkaSpec("""
commander.sender()
}
val simpleSender: ActorRef = {
def createSimpleSender(): ActorRef = {
val commander = TestProbe()
commander.send(IO(Udp), SimpleSender)
commander.expectMsg(SimpleSenderReady)
@ -38,15 +38,30 @@ class UdpIntegrationSpec extends AkkaSpec("""
val serverAddress = addresses(0)
val server = bindUdp(serverAddress, testActor)
val data = ByteString("To infinity and beyond!")
val simpleSender = createSimpleSender()
simpleSender ! Send(data, serverAddress)
expectMsgType[Received].data should ===(data)
}
"be able to send several packet back and forth with binding" in {
"be able to deliver subsequent messages after address resolution failure" in {
val unresolvableServerAddress = new InetSocketAddress("some-unresolvable-host", 10000)
val cmd = Send(ByteString("Can't be delivered"), unresolvableServerAddress)
val simpleSender = createSimpleSender()
simpleSender ! cmd
expectMsgType[CommandFailed].cmd should ===(cmd)
val serverAddress = addresses(1)
val clientAddress = addresses(2)
val server = bindUdp(serverAddress, testActor)
val data = ByteString("To infinity and beyond!")
simpleSender ! Send(data, serverAddress)
expectMsgType[Received].data should ===(data)
}
"be able to send several packet back and forth with binding" in {
val serverAddress = addresses(2)
val clientAddress = addresses(3)
val server = bindUdp(serverAddress, testActor)
val client = bindUdp(clientAddress, testActor)
val data = ByteString("Fly little packet!")
@ -79,24 +94,24 @@ class UdpIntegrationSpec extends AkkaSpec("""
"call SocketOption.beforeBind method before bind." in {
val commander = TestProbe()
val assertOption = AssertBeforeBind()
commander.send(IO(Udp), Bind(testActor, addresses(3), options = List(assertOption)))
commander.expectMsg(Bound(addresses(3)))
commander.send(IO(Udp), Bind(testActor, addresses(4), options = List(assertOption)))
commander.expectMsg(Bound(addresses(4)))
assert(assertOption.beforeCalled === 1)
}
"call SocketOption.afterConnect method after binding." in {
val commander = TestProbe()
val assertOption = AssertAfterChannelBind()
commander.send(IO(Udp), Bind(testActor, addresses(4), options = List(assertOption)))
commander.expectMsg(Bound(addresses(4)))
commander.send(IO(Udp), Bind(testActor, addresses(5), options = List(assertOption)))
commander.expectMsg(Bound(addresses(5)))
assert(assertOption.afterCalled === 1)
}
"call DatagramChannelCreator.create method when opening channel" in {
val commander = TestProbe()
val assertOption = AssertOpenDatagramChannel()
commander.send(IO(Udp), Bind(testActor, addresses(5), options = List(assertOption)))
commander.expectMsg(Bound(addresses(5)))
commander.send(IO(Udp), Bind(testActor, addresses(6), options = List(assertOption)))
commander.expectMsg(Bound(addresses(6)))
assert(assertOption.openCalled === 1)
}
}

View file

@ -59,6 +59,13 @@ private[io] trait WithUdpSend {
pendingCommander = null
}
case None
sender() ! CommandFailed(send)
log.debug(
"Name resolution failed for remote address [{}]",
send.target)
retriedSend = false
pendingSend = null
pendingCommander = null
}
} else {
doSend(registration)