From 78f3b10b45a2a7a98502e97e25c562dc7eabc617 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 27 Mar 2013 15:16:28 +0100 Subject: [PATCH 1/7] Turn "compile-time constant" into compile-time constant --- akka-actor/src/main/scala/akka/io/SelectionHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index b041b6d44e..e4a1bc2a45 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -67,7 +67,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler val sequenceNumber = Iterator.from(0) val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher) val selector = SelectorProvider.provider.openSelector - val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant + final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant def receive: Receive = { case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender)) From 1fa574b372ceeb0561b4cdad5c48d7b1559bf8ca Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 27 Mar 2013 15:16:56 +0100 Subject: [PATCH 2/7] Tone down logging in SelectionHandler --- akka-actor/src/main/scala/akka/io/SelectionHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index e4a1bc2a45..d41469f54c 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -113,7 +113,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler override def supervisorStrategy = SupervisorStrategy.stoppingStrategy def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = { - log.debug("Executing [{}]", cmd) + if (TraceLogging) log.debug("Executing [{}]", cmd) if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) { body } else { From 0f9598900941d523ce6bf560c89673c1e8136ccf Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 27 Mar 2013 15:18:18 +0100 Subject: [PATCH 3/7] Associate Tcp.CloseCommands with their respective confirmation events --- akka-actor/src/main/scala/akka/io/Tcp.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 8c5150cc83..bb2ea3adea 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -76,10 +76,18 @@ object Tcp extends ExtensionKey[TcpExt] { case class Register(handler: ActorRef) extends Command case object Unbind extends Command - sealed trait CloseCommand extends Command - case object Close extends CloseCommand - case object ConfirmedClose extends CloseCommand - case object Abort extends CloseCommand + sealed trait CloseCommand extends Command { + def event: ConnectionClosed + } + case object Close extends CloseCommand { + override def event = Closed + } + case object ConfirmedClose extends CloseCommand { + override def event = ConfirmedClosed + } + case object Abort extends CloseCommand { + override def event = Aborted + } case class NoAck(token: Any) object NoAck extends NoAck(null) From 854d0feef7c3f8b0c3ec7b3602504659e344a691 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 27 Mar 2013 15:20:17 +0100 Subject: [PATCH 4/7] Make use of new command.failureMessage --- akka-actor/src/main/scala/akka/io/TcpConnection.scala | 2 +- akka-actor/src/main/scala/akka/io/TcpListener.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index ec866ae75f..b5bbc9ab03 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -68,7 +68,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case write: Write if writePending ⇒ if (TraceLogging) log.debug("Dropping write because queue is full") - sender ! CommandFailed(write) + sender ! write.failureMessage case write: Write if write.data.isEmpty ⇒ if (write.wantsAck) diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 55e0efe640..c10019ebf1 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -50,7 +50,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef, try socket.bind(endpoint, backlog) catch { case NonFatal(e) ⇒ - bindCommander ! CommandFailed(bind) + bindCommander ! bind.failureMessage log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) context.stop(self) } From 4f1ee6a994096b9a8182d617153761ba65e5ed1e Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 27 Mar 2013 15:21:17 +0100 Subject: [PATCH 5/7] Clean up imports --- akka-actor/src/main/scala/akka/io/TcpListener.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index c10019ebf1..d8d93d214d 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -4,14 +4,11 @@ package akka.io -import java.net.InetSocketAddress import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel } import scala.annotation.tailrec -import scala.collection.immutable import scala.util.control.NonFatal import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.io.SelectionHandler._ -import akka.io.Inet.SocketOption import akka.io.Tcp._ import akka.io.IO.HasFailureMessage From 1790bc0e1a46696d243e447c96ee972a8f73d15e Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 27 Mar 2013 15:29:24 +0100 Subject: [PATCH 6/7] TcpOutgoingConnection: Respond with CommandFailed rather than ErrorClosed on failed connect Before, a Tcp.ErrorClosed event is generated when a connection attempt fails. For symmetry with the Tcp.Bind case and general usability of the API a Tcp.CommandFailed(connect) is the better choice. --- .../src/test/scala/akka/io/TcpConnectionSpec.scala | 10 +++++----- akka-actor/src/main/scala/akka/io/TcpConnection.scala | 2 +- .../src/main/scala/akka/io/TcpOutgoingConnection.scala | 5 ++++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index fceb271ab4..6b9883339d 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -5,7 +5,7 @@ package akka.io import java.io.IOException -import java.net.{ Socket, ConnectException, InetSocketAddress, SocketException } +import java.net.{ ConnectException, InetSocketAddress, SocketException } import java.nio.ByteBuffer import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel } import java.nio.channels.spi.SelectorProvider @@ -175,6 +175,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") buffer.flip() ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!") } + "write data after not acknowledged data" in withEstablishedConnection() { setup ⇒ import setup._ @@ -404,6 +405,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertThisConnectionActorTerminated() } + "report when peer closed the connection when trying to write" in withEstablishedConnection() { setup ⇒ import setup._ @@ -431,8 +433,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") EventFilter[SocketException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) - val err = userHandler.expectMsgType[ErrorClosed] - err.cause must be(ConnectionResetByPeerMessage) + userHandler.expectMsg(CommandFailed(Connect(serverAddress))) } verifyActorTermination(connectionActor) @@ -452,8 +453,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") key.isConnectable must be(true) EventFilter[ConnectException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) - val err = userHandler.expectMsgType[ErrorClosed] - err.cause.startsWith(ConnectionRefusedMessagePrefix) must be(true) + userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) } verifyActorTermination(connectionActor) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index b5bbc9ab03..c209421a87 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -318,5 +318,5 @@ private[io] object TcpConnection { */ case class CloseInformation( notificationsTo: Set[ActorRef], - closedEvent: ConnectionClosed) + closedEvent: Event) } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 098ab69b43..bbbe6cfc30 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -49,7 +49,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, log.debug("Connection established") completeConnect(commander, options) } catch { - case e: IOException ⇒ handleError(commander, e) + case e: IOException ⇒ + if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", e) + closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage) + throw e } } From c43ce95bd4fba8ea0eab8fa6d19b69cc7b17ea39 Mon Sep 17 00:00:00 2001 From: Mathias Date: Tue, 2 Apr 2013 16:39:21 +0200 Subject: [PATCH 7/7] Small simplification in TcpConnection --- akka-actor/src/main/scala/akka/io/TcpConnection.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index c209421a87..2353453505 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -51,7 +51,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, context.become(connected(handler)) case cmd: CloseCommand ⇒ - handleClose(commander, Some(sender), closeResponse(cmd)) + handleClose(commander, Some(sender), cmd.event) case ReceiveTimeout ⇒ // after sending `Register` user should watch this actor to make sure @@ -80,7 +80,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, case ChannelWritable ⇒ if (writePending) doWrite(handler) - case cmd: CloseCommand ⇒ handleClose(handler, Some(sender), closeResponse(cmd)) + case cmd: CloseCommand ⇒ handleClose(handler, Some(sender), cmd.event) } /** connection is closing but a write has to be finished first */ @@ -225,13 +225,6 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, context.stop(self) } - def closeResponse(closeCommand: CloseCommand): ConnectionClosed = - closeCommand match { - case Close ⇒ Closed - case Abort ⇒ Aborted - case ConfirmedClose ⇒ ConfirmedClosed - } - def handleError(handler: ActorRef, exception: IOException): Unit = { closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))