Merge pull request #1333 from akka/wip-3230-bound-addr-∂π
include bound localAddress in Bound message, see #3230
This commit is contained in:
commit
5005c17f1f
10 changed files with 27 additions and 17 deletions
|
|
@ -22,7 +22,7 @@ class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max
|
||||||
val commander = TestProbe()
|
val commander = TestProbe()
|
||||||
val addresses = temporaryServerAddresses(2)
|
val addresses = temporaryServerAddresses(2)
|
||||||
commander.send(IO(Tcp), Bind(bindHandler.ref, addresses(0)))
|
commander.send(IO(Tcp), Bind(bindHandler.ref, addresses(0)))
|
||||||
commander.expectMsg(Bound)
|
commander.expectMsg(Bound(addresses(0)))
|
||||||
|
|
||||||
// we are now at the configured max-channel capacity of 4
|
// we are now at the configured max-channel capacity of 4
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -456,8 +456,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
assertThisConnectionActorTerminated()
|
assertThisConnectionActorTerminated()
|
||||||
}
|
}
|
||||||
|
|
||||||
// This tets is disabled on windows, as the assumption that not calling accept on a server socket means that
|
// This test is disabled on windows, as the assumption that not calling accept on a server socket means that
|
||||||
// no TCP level connection has been established with the client does not hold.
|
// no TCP level connection has been established with the client does not hold.
|
||||||
|
// RK: I think Windows is no different than any other OS in this regard, there was just a sleep() missing.
|
||||||
"report failed connection attempt while not accepted" in withUnacceptedConnection() { setup ⇒
|
"report failed connection attempt while not accepted" in withUnacceptedConnection() { setup ⇒
|
||||||
import setup._
|
import setup._
|
||||||
ignoreIfWindows
|
ignoreIfWindows
|
||||||
|
|
@ -465,6 +466,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
// close instead of accept
|
// close instead of accept
|
||||||
localServer.close()
|
localServer.close()
|
||||||
|
|
||||||
|
// must give the OS some time to send RST from server to client
|
||||||
|
Thread.sleep(100)
|
||||||
|
|
||||||
EventFilter[SocketException](occurrences = 1) intercept {
|
EventFilter[SocketException](occurrences = 1) intercept {
|
||||||
selector.send(connectionActor, ChannelConnectable)
|
selector.send(connectionActor, ChannelConnectable)
|
||||||
userHandler.expectMsg(CommandFailed(Connect(serverAddress)))
|
userHandler.expectMsg(CommandFailed(Connect(serverAddress)))
|
||||||
|
|
@ -679,6 +683,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
val userHandler = TestProbe()
|
val userHandler = TestProbe()
|
||||||
val selector = TestProbe()
|
val selector = TestProbe()
|
||||||
val connectionActor = connectionActorCons(selector.ref, userHandler.ref)
|
val connectionActor = connectionActorCons(selector.ref, userHandler.ref)
|
||||||
|
// calling .underlyingActor ensures that the actor is actually created at this point
|
||||||
val clientSideChannel = connectionActor.underlyingActor.channel
|
val clientSideChannel = connectionActor.underlyingActor.channel
|
||||||
|
|
||||||
selector.expectMsg(RegisterChannel(clientSideChannel, OP_CONNECT))
|
selector.expectMsg(RegisterChannel(clientSideChannel, OP_CONNECT))
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒
|
||||||
def bindServer(): Unit = {
|
def bindServer(): Unit = {
|
||||||
val bindCommander = TestProbe()
|
val bindCommander = TestProbe()
|
||||||
bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
|
bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
|
||||||
bindCommander.expectMsg(Bound)
|
bindCommander.expectMsg(Bound(endpoint))
|
||||||
}
|
}
|
||||||
|
|
||||||
def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
|
def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
||||||
|
|
||||||
"let the Bind commander know when binding is completed" in new TestSetup {
|
"let the Bind commander know when binding is completed" in new TestSetup {
|
||||||
listener ! ChannelRegistered
|
listener ! ChannelRegistered
|
||||||
bindCommander.expectMsg(Bound)
|
bindCommander.expectMsgType[Bound]
|
||||||
}
|
}
|
||||||
|
|
||||||
"accept acceptable connections and register them with its parent" in new TestSetup {
|
"accept acceptable connections and register them with its parent" in new TestSetup {
|
||||||
|
|
@ -102,7 +102,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
||||||
|
|
||||||
def bindListener() {
|
def bindListener() {
|
||||||
listener ! ChannelRegistered
|
listener ! ChannelRegistered
|
||||||
bindCommander.expectMsg(Bound)
|
bindCommander.expectMsgType[Bound]
|
||||||
}
|
}
|
||||||
|
|
||||||
def attemptConnectionToEndpoint(): Unit = new Socket(endpoint.getHostName, endpoint.getPort)
|
def attemptConnectionToEndpoint(): Unit = new Socket(endpoint.getHostName, endpoint.getPort)
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with
|
||||||
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
|
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
|
||||||
val commander = TestProbe()
|
val commander = TestProbe()
|
||||||
commander.send(IO(Udp), Udp.Bind(handler, address))
|
commander.send(IO(Udp), Udp.Bind(handler, address))
|
||||||
commander.expectMsg(Udp.Bound)
|
commander.expectMsg(Udp.Bound(address))
|
||||||
commander.sender
|
commander.sender
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitS
|
||||||
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
|
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
|
||||||
val commander = TestProbe()
|
val commander = TestProbe()
|
||||||
commander.send(IO(Udp), Bind(handler, address))
|
commander.send(IO(Udp), Bind(handler, address))
|
||||||
commander.expectMsg(Bound)
|
commander.expectMsg(Bound(address))
|
||||||
commander.sender
|
commander.sender
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -117,8 +117,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event
|
case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event
|
||||||
case class CommandFailed(cmd: Command) extends Event
|
case class CommandFailed(cmd: Command) extends Event
|
||||||
|
|
||||||
sealed trait Bound extends Event
|
case class Bound(localAddress: InetSocketAddress) extends Event
|
||||||
case object Bound extends Bound
|
|
||||||
sealed trait Unbound extends Event
|
sealed trait Unbound extends Event
|
||||||
case object Unbound extends Unbound
|
case object Unbound extends Unbound
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import akka.actor.{ Props, ActorLogging, ActorRef, Actor }
|
||||||
import akka.io.SelectionHandler._
|
import akka.io.SelectionHandler._
|
||||||
import akka.io.Tcp._
|
import akka.io.Tcp._
|
||||||
import akka.io.IO.HasFailureMessage
|
import akka.io.IO.HasFailureMessage
|
||||||
|
import java.net.InetSocketAddress
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -42,8 +43,11 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
|
||||||
serverSocketChannel.configureBlocking(false)
|
serverSocketChannel.configureBlocking(false)
|
||||||
val socket = serverSocketChannel.socket
|
val socket = serverSocketChannel.socket
|
||||||
options.foreach(_.beforeServerSocketBind(socket))
|
options.foreach(_.beforeServerSocketBind(socket))
|
||||||
try socket.bind(endpoint, backlog)
|
try {
|
||||||
catch {
|
socket.bind(endpoint, backlog)
|
||||||
|
require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress],
|
||||||
|
s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]")
|
||||||
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
bindCommander ! bind.failureMessage
|
bindCommander ! bind.failureMessage
|
||||||
log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint)
|
log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint)
|
||||||
|
|
@ -58,7 +62,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
|
||||||
|
|
||||||
def receive: Receive = {
|
def receive: Receive = {
|
||||||
case ChannelRegistered ⇒
|
case ChannelRegistered ⇒
|
||||||
bindCommander ! Bound
|
bindCommander ! Bound(channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])
|
||||||
context.become(bound)
|
context.become(bound)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -54,8 +54,7 @@ object Udp extends ExtensionKey[UdpExt] {
|
||||||
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
|
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
|
||||||
case class CommandFailed(cmd: Command) extends Event
|
case class CommandFailed(cmd: Command) extends Event
|
||||||
|
|
||||||
sealed trait Bound extends Event
|
case class Bound(localAddress: InetSocketAddress) extends Event
|
||||||
case object Bound extends Bound
|
|
||||||
|
|
||||||
sealed trait SimpleSendReady extends Event
|
sealed trait SimpleSendReady extends Event
|
||||||
case object SimpleSendReady extends SimpleSendReady
|
case object SimpleSendReady extends SimpleSendReady
|
||||||
|
|
|
||||||
|
|
@ -34,8 +34,11 @@ private[io] class UdpListener(val udp: UdpExt,
|
||||||
datagramChannel.configureBlocking(false)
|
datagramChannel.configureBlocking(false)
|
||||||
val socket = datagramChannel.socket
|
val socket = datagramChannel.socket
|
||||||
options.foreach(_.beforeDatagramBind(socket))
|
options.foreach(_.beforeDatagramBind(socket))
|
||||||
try socket.bind(endpoint)
|
try {
|
||||||
catch {
|
socket.bind(endpoint)
|
||||||
|
require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress],
|
||||||
|
s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]")
|
||||||
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
bindCommander ! CommandFailed(bind)
|
bindCommander ! CommandFailed(bind)
|
||||||
log.error(e, "Failed to bind UDP channel to endpoint [{}]", endpoint)
|
log.error(e, "Failed to bind UDP channel to endpoint [{}]", endpoint)
|
||||||
|
|
@ -48,7 +51,7 @@ private[io] class UdpListener(val udp: UdpExt,
|
||||||
|
|
||||||
def receive: Receive = {
|
def receive: Receive = {
|
||||||
case ChannelRegistered ⇒
|
case ChannelRegistered ⇒
|
||||||
bindCommander ! Bound
|
bindCommander ! Bound(channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])
|
||||||
context.become(readHandlers orElse sendHandlers, discardOld = true)
|
context.become(readHandlers orElse sendHandlers, discardOld = true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue