=act #3645 Make TcpOutgoingConnection actor handle UnresolvedAddressExceptions

This commit is contained in:
Mathias 2013-10-07 12:54:02 +02:00
parent e05d30aeaa
commit bb1b22bffd
2 changed files with 27 additions and 10 deletions

View file

@ -544,6 +544,16 @@ class TcpConnectionSpec extends AkkaSpec("""
}
}
"report failed connection attempt when target cannot be resolved" in
new UnacceptedConnectionTest() {
val address = new InetSocketAddress("notthere.local", 666)
override lazy val connectionActor = createConnectionActorWithoutRegistration(serverAddress = address)
run {
connectionActor ! newChannelRegistration
userHandler.expectMsg(30.seconds, CommandFailed(Connect(address)))
}
}
"report failed connection attempt when timing out" in
new UnacceptedConnectionTest() {
override lazy val connectionActor = createConnectionActor(serverAddress = UnboundAddress, timeout = Option(100.millis))
@ -758,16 +768,24 @@ class TcpConnectionSpec extends AkkaSpec("""
def createConnectionActor(serverAddress: InetSocketAddress = serverAddress,
options: immutable.Seq[SocketOption] = Nil,
timeout: Option[FiniteDuration] = None): TestActorRef[TcpOutgoingConnection] = {
val ref = TestActorRef(
new TcpOutgoingConnection(Tcp(system), this, userHandler.ref, Connect(serverAddress, options = options, timeout = timeout)) {
override def postRestart(reason: Throwable): Unit = context.stop(self) // ensure we never restart
})
ref ! new ChannelRegistration {
val ref = createConnectionActorWithoutRegistration(serverAddress, options, timeout)
ref ! newChannelRegistration
ref
}
def newChannelRegistration: ChannelRegistration =
new ChannelRegistration {
def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op
def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op
}
ref
}
def createConnectionActorWithoutRegistration(serverAddress: InetSocketAddress = serverAddress,
options: immutable.Seq[SocketOption] = Nil,
timeout: Option[FiniteDuration] = None): TestActorRef[TcpOutgoingConnection] =
TestActorRef(
new TcpOutgoingConnection(Tcp(system), this, userHandler.ref, Connect(serverAddress, options = options, timeout = timeout)) {
override def postRestart(reason: Throwable): Unit = context.stop(self) // ensure we never restart
})
}
trait SmallRcvBuffer { _: LocalServerTest

View file

@ -4,9 +4,8 @@
package akka.io
import java.io.IOException
import java.nio.channels.{ SelectionKey, SocketChannel }
import java.net.ConnectException
import scala.util.control.NonFatal
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.{ ReceiveTimeout, ActorRef }
@ -42,7 +41,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
try {
thunk
} catch {
case e: IOException
case NonFatal(e)
log.debug("Could not establish connection to [{}] due to {}", remoteAddress, e)
stop()
}