diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 50add9d95b..9fafc202d2 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -5,7 +5,7 @@ package akka.io import java.io.IOException -import java.net.{ Socket, ConnectException, InetSocketAddress, SocketException } +import java.net.{ ConnectException, InetSocketAddress, SocketException } import java.nio.ByteBuffer import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel } import java.nio.channels.spi.SelectorProvider @@ -176,6 +176,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") buffer.flip() ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!") } + "write data after not acknowledged data" in withEstablishedConnection() { setup ⇒ import setup._ @@ -405,6 +406,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertThisConnectionActorTerminated() } + "report when peer closed the connection when trying to write" in withEstablishedConnection() { setup ⇒ import setup._ @@ -432,8 +434,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") EventFilter[SocketException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) - val err = userHandler.expectMsgType[ErrorClosed] - err.cause must be(ConnectionResetByPeerMessage) + userHandler.expectMsg(CommandFailed(Connect(serverAddress))) } verifyActorTermination(connectionActor) @@ -453,8 +454,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") key.isConnectable must be(true) EventFilter[ConnectException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) - val err = userHandler.expectMsgType[ErrorClosed] - err.cause.startsWith(ConnectionRefusedMessagePrefix) must be(true) + userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) } verifyActorTermination(connectionActor) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index b041b6d44e..d41469f54c 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -67,7 +67,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler val sequenceNumber = Iterator.from(0) val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher) val selector = SelectorProvider.provider.openSelector - val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant + final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant def receive: Receive = { case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender)) @@ -113,7 +113,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler override def supervisorStrategy = SupervisorStrategy.stoppingStrategy def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = { - log.debug("Executing [{}]", cmd) + if (TraceLogging) log.debug("Executing [{}]", cmd) if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) { body } else { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 8c5150cc83..bb2ea3adea 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -76,10 +76,18 @@ object Tcp extends ExtensionKey[TcpExt] { case class Register(handler: ActorRef) extends Command case object Unbind extends Command - sealed trait CloseCommand extends Command - case object Close extends CloseCommand - case object ConfirmedClose extends CloseCommand - case object Abort extends CloseCommand + sealed trait CloseCommand extends Command { + def event: ConnectionClosed + } + case object Close extends CloseCommand { + override def event = Closed + } + case object ConfirmedClose extends CloseCommand { + override def event = ConfirmedClosed + } + case object Abort extends CloseCommand { + override def event = Aborted + } case class NoAck(token: Any) object NoAck extends NoAck(null) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index adfa7a2358..235cb8d3ef 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -51,7 +51,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, context.become(connected(handler)) case cmd: CloseCommand ⇒ - handleClose(commander, Some(sender), closeResponse(cmd)) + handleClose(commander, Some(sender), cmd.event) case ReceiveTimeout ⇒ // after sending `Register` user should watch this actor to make sure @@ -68,7 +68,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case write: Write if writePending ⇒ if (TraceLogging) log.debug("Dropping write because queue is full") - sender ! CommandFailed(write) + sender ! write.failureMessage case write: Write if write.data.isEmpty ⇒ if (write.wantsAck) @@ -80,7 +80,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case ChannelWritable ⇒ if (writePending) doWrite(handler) - case cmd: CloseCommand ⇒ handleClose(handler, Some(sender), closeResponse(cmd)) + case cmd: CloseCommand ⇒ handleClose(handler, Some(sender), cmd.event) } /** connection is closing but a write has to be finished first */ @@ -225,13 +225,6 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, context.stop(self) } - def closeResponse(closeCommand: CloseCommand): ConnectionClosed = - closeCommand match { - case Close ⇒ Closed - case Abort ⇒ Aborted - case ConfirmedClose ⇒ ConfirmedClosed - } - def handleError(handler: ActorRef, exception: IOException): Unit = { closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))) @@ -318,5 +311,5 @@ private[io] object TcpConnection { */ case class CloseInformation( notificationsTo: Set[ActorRef], - closedEvent: ConnectionClosed) + closedEvent: Event) } diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 55e0efe640..d8d93d214d 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -4,14 +4,11 @@ package akka.io -import java.net.InetSocketAddress import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel } import scala.annotation.tailrec -import scala.collection.immutable import scala.util.control.NonFatal import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.io.SelectionHandler._ -import akka.io.Inet.SocketOption import akka.io.Tcp._ import akka.io.IO.HasFailureMessage @@ -50,7 +47,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef, try socket.bind(endpoint, backlog) catch { case NonFatal(e) ⇒ - bindCommander ! CommandFailed(bind) + bindCommander ! bind.failureMessage log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) context.stop(self) } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 098ab69b43..bbbe6cfc30 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -49,7 +49,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, log.debug("Connection established") completeConnect(commander, options) } catch { - case e: IOException ⇒ handleError(commander, e) + case e: IOException ⇒ + if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", e) + closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage) + throw e } }