IOManager connect and accept failure handling. Fixes #3035
* Cherry-picked from release-2.1. * Small changes: channel.socket -> sock
This commit is contained in:
parent
b002bda23f
commit
acadb9e7cb
2 changed files with 54 additions and 20 deletions
|
|
@ -380,6 +380,28 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
|
|||
if (!s.isClosed) s.close()
|
||||
}
|
||||
}
|
||||
|
||||
"fail when listening on an invalid address" in {
|
||||
implicit val self = testActor
|
||||
val address = new InetSocketAddress("irate.elephant", 9999)
|
||||
IOManager(system).listen(address)
|
||||
expectMsgType[Status.Failure](1 seconds)
|
||||
}
|
||||
|
||||
"fail when listening on a privileged port" in {
|
||||
implicit val self = testActor
|
||||
val address = new InetSocketAddress("localhost", 80) // Assumes test not run as root
|
||||
IOManager(system).listen(address)
|
||||
expectMsgType[Status.Failure](1 seconds)
|
||||
}
|
||||
|
||||
"fail when connecting to an invalid address" in {
|
||||
implicit val self = testActor
|
||||
val address = new InetSocketAddress("irate.elephant", 80)
|
||||
IOManager(system).connect(address)
|
||||
expectMsgType[Status.Failure](1 seconds)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1016,31 +1016,43 @@ final class IOManagerActor(val settings: Settings) extends Actor with ActorLoggi
|
|||
|
||||
case IO.Listen(server, address, options) ⇒
|
||||
val channel = ServerSocketChannel open ()
|
||||
channel configureBlocking false
|
||||
|
||||
var backlog = defaultBacklog
|
||||
val sock = channel.socket
|
||||
options foreach {
|
||||
case IO.ReceiveBufferSize(size) ⇒ forwardFailure(sock.setReceiveBufferSize(size))
|
||||
case IO.ReuseAddress(on) ⇒ forwardFailure(sock.setReuseAddress(on))
|
||||
case IO.PerformancePreferences(connTime, latency, bandwidth) ⇒
|
||||
forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
|
||||
case IO.Backlog(number) ⇒ backlog = number
|
||||
try {
|
||||
channel configureBlocking false
|
||||
var backlog = defaultBacklog
|
||||
val sock = channel.socket
|
||||
options foreach {
|
||||
case IO.ReceiveBufferSize(size) ⇒ forwardFailure(sock.setReceiveBufferSize(size))
|
||||
case IO.ReuseAddress(on) ⇒ forwardFailure(sock.setReuseAddress(on))
|
||||
case IO.PerformancePreferences(connTime, latency, bandwidth) ⇒
|
||||
forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth))
|
||||
case IO.Backlog(number) ⇒ backlog = number
|
||||
}
|
||||
sock bind (address, backlog)
|
||||
channels update (server, channel)
|
||||
channel register (selector, OP_ACCEPT, server)
|
||||
server.owner ! IO.Listening(server, sock.getLocalSocketAddress())
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ {
|
||||
channel close ()
|
||||
sender ! Status.Failure(e)
|
||||
}
|
||||
}
|
||||
|
||||
channel.socket bind (address, backlog)
|
||||
channels update (server, channel)
|
||||
channel register (selector, OP_ACCEPT, server)
|
||||
server.owner ! IO.Listening(server, sock.getLocalSocketAddress())
|
||||
run()
|
||||
|
||||
case IO.Connect(socket, address, options) ⇒
|
||||
val channel = SocketChannel open ()
|
||||
channel configureBlocking false
|
||||
channel connect address
|
||||
setSocketOptions(channel.socket, options)
|
||||
channels update (socket, channel)
|
||||
channel register (selector, OP_CONNECT | OP_READ, socket)
|
||||
try {
|
||||
channel configureBlocking false
|
||||
channel connect address
|
||||
setSocketOptions(channel.socket, options)
|
||||
channels update (socket, channel)
|
||||
channel register (selector, OP_CONNECT | OP_READ, socket)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ {
|
||||
channel close ()
|
||||
sender ! Status.Failure(e)
|
||||
}
|
||||
}
|
||||
run()
|
||||
|
||||
case IO.Accept(socket, server, options) ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue