TcpOutgoingConnection: Respond with CommandFailed rather than ErrorClosed on failed connect
Before, a Tcp.ErrorClosed event is generated when a connection attempt fails. For symmetry with the Tcp.Bind case and general usability of the API a Tcp.CommandFailed(connect) is the better choice.
This commit is contained in:
parent
4f1ee6a994
commit
1790bc0e1a
3 changed files with 10 additions and 7 deletions
|
|
@ -5,7 +5,7 @@
|
||||||
package akka.io
|
package akka.io
|
||||||
|
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.net.{ Socket, ConnectException, InetSocketAddress, SocketException }
|
import java.net.{ ConnectException, InetSocketAddress, SocketException }
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel }
|
import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel }
|
||||||
import java.nio.channels.spi.SelectorProvider
|
import java.nio.channels.spi.SelectorProvider
|
||||||
|
|
@ -175,6 +175,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
buffer.flip()
|
buffer.flip()
|
||||||
ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!")
|
ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!")
|
||||||
}
|
}
|
||||||
|
|
||||||
"write data after not acknowledged data" in withEstablishedConnection() { setup ⇒
|
"write data after not acknowledged data" in withEstablishedConnection() { setup ⇒
|
||||||
import setup._
|
import setup._
|
||||||
|
|
||||||
|
|
@ -404,6 +405,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
|
|
||||||
assertThisConnectionActorTerminated()
|
assertThisConnectionActorTerminated()
|
||||||
}
|
}
|
||||||
|
|
||||||
"report when peer closed the connection when trying to write" in withEstablishedConnection() { setup ⇒
|
"report when peer closed the connection when trying to write" in withEstablishedConnection() { setup ⇒
|
||||||
import setup._
|
import setup._
|
||||||
|
|
||||||
|
|
@ -431,8 +433,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
|
|
||||||
EventFilter[SocketException](occurrences = 1) intercept {
|
EventFilter[SocketException](occurrences = 1) intercept {
|
||||||
selector.send(connectionActor, ChannelConnectable)
|
selector.send(connectionActor, ChannelConnectable)
|
||||||
val err = userHandler.expectMsgType[ErrorClosed]
|
userHandler.expectMsg(CommandFailed(Connect(serverAddress)))
|
||||||
err.cause must be(ConnectionResetByPeerMessage)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyActorTermination(connectionActor)
|
verifyActorTermination(connectionActor)
|
||||||
|
|
@ -452,8 +453,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
key.isConnectable must be(true)
|
key.isConnectable must be(true)
|
||||||
EventFilter[ConnectException](occurrences = 1) intercept {
|
EventFilter[ConnectException](occurrences = 1) intercept {
|
||||||
selector.send(connectionActor, ChannelConnectable)
|
selector.send(connectionActor, ChannelConnectable)
|
||||||
val err = userHandler.expectMsgType[ErrorClosed]
|
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress)))
|
||||||
err.cause.startsWith(ConnectionRefusedMessagePrefix) must be(true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
verifyActorTermination(connectionActor)
|
verifyActorTermination(connectionActor)
|
||||||
|
|
|
||||||
|
|
@ -318,5 +318,5 @@ private[io] object TcpConnection {
|
||||||
*/
|
*/
|
||||||
case class CloseInformation(
|
case class CloseInformation(
|
||||||
notificationsTo: Set[ActorRef],
|
notificationsTo: Set[ActorRef],
|
||||||
closedEvent: ConnectionClosed)
|
closedEvent: Event)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
|
||||||
log.debug("Connection established")
|
log.debug("Connection established")
|
||||||
completeConnect(commander, options)
|
completeConnect(commander, options)
|
||||||
} catch {
|
} catch {
|
||||||
case e: IOException ⇒ handleError(commander, e)
|
case e: IOException ⇒
|
||||||
|
if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", e)
|
||||||
|
closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage)
|
||||||
|
throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue