Merge pull request #1281 from spray/master

Smaller improvements to TCP side of new akka-io implementation
This commit is contained in:
Roland Kuhn 2013-04-04 02:47:35 -07:00
commit 22968a3ab8
6 changed files with 28 additions and 27 deletions

View file

@ -5,7 +5,7 @@
package akka.io package akka.io
import java.io.IOException import java.io.IOException
import java.net.{ Socket, ConnectException, InetSocketAddress, SocketException } import java.net.{ ConnectException, InetSocketAddress, SocketException }
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel } import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel }
import java.nio.channels.spi.SelectorProvider import java.nio.channels.spi.SelectorProvider
@ -176,6 +176,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
buffer.flip() buffer.flip()
ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!") ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!")
} }
"write data after not acknowledged data" in withEstablishedConnection() { setup "write data after not acknowledged data" in withEstablishedConnection() { setup
import setup._ import setup._
@ -405,6 +406,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
assertThisConnectionActorTerminated() assertThisConnectionActorTerminated()
} }
"report when peer closed the connection when trying to write" in withEstablishedConnection() { setup "report when peer closed the connection when trying to write" in withEstablishedConnection() { setup
import setup._ import setup._
@ -432,8 +434,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
EventFilter[SocketException](occurrences = 1) intercept { EventFilter[SocketException](occurrences = 1) intercept {
selector.send(connectionActor, ChannelConnectable) selector.send(connectionActor, ChannelConnectable)
val err = userHandler.expectMsgType[ErrorClosed] userHandler.expectMsg(CommandFailed(Connect(serverAddress)))
err.cause must be(ConnectionResetByPeerMessage)
} }
verifyActorTermination(connectionActor) verifyActorTermination(connectionActor)
@ -453,8 +454,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
key.isConnectable must be(true) key.isConnectable must be(true)
EventFilter[ConnectException](occurrences = 1) intercept { EventFilter[ConnectException](occurrences = 1) intercept {
selector.send(connectionActor, ChannelConnectable) selector.send(connectionActor, ChannelConnectable)
val err = userHandler.expectMsgType[ErrorClosed] userHandler.expectMsg(CommandFailed(Connect(UnboundAddress)))
err.cause.startsWith(ConnectionRefusedMessagePrefix) must be(true)
} }
verifyActorTermination(connectionActor) verifyActorTermination(connectionActor)

View file

@ -67,7 +67,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler
val sequenceNumber = Iterator.from(0) val sequenceNumber = Iterator.from(0)
val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher) val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher)
val selector = SelectorProvider.provider.openSelector val selector = SelectorProvider.provider.openSelector
val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant
def receive: Receive = { def receive: Receive = {
case WriteInterest execute(enableInterest(OP_WRITE, sender)) case WriteInterest execute(enableInterest(OP_WRITE, sender))
@ -113,7 +113,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: Unit): Unit = { def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: Unit): Unit = {
log.debug("Executing [{}]", cmd) if (TraceLogging) log.debug("Executing [{}]", cmd)
if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) { if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) {
body body
} else { } else {

View file

@ -76,10 +76,18 @@ object Tcp extends ExtensionKey[TcpExt] {
case class Register(handler: ActorRef) extends Command case class Register(handler: ActorRef) extends Command
case object Unbind extends Command case object Unbind extends Command
sealed trait CloseCommand extends Command sealed trait CloseCommand extends Command {
case object Close extends CloseCommand def event: ConnectionClosed
case object ConfirmedClose extends CloseCommand }
case object Abort extends CloseCommand case object Close extends CloseCommand {
override def event = Closed
}
case object ConfirmedClose extends CloseCommand {
override def event = ConfirmedClosed
}
case object Abort extends CloseCommand {
override def event = Aborted
}
case class NoAck(token: Any) case class NoAck(token: Any)
object NoAck extends NoAck(null) object NoAck extends NoAck(null)

View file

@ -51,7 +51,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
context.become(connected(handler)) context.become(connected(handler))
case cmd: CloseCommand case cmd: CloseCommand
handleClose(commander, Some(sender), closeResponse(cmd)) handleClose(commander, Some(sender), cmd.event)
case ReceiveTimeout case ReceiveTimeout
// after sending `Register` user should watch this actor to make sure // after sending `Register` user should watch this actor to make sure
@ -68,7 +68,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
case write: Write if writePending case write: Write if writePending
if (TraceLogging) log.debug("Dropping write because queue is full") if (TraceLogging) log.debug("Dropping write because queue is full")
sender ! CommandFailed(write) sender ! write.failureMessage
case write: Write if write.data.isEmpty case write: Write if write.data.isEmpty
if (write.wantsAck) if (write.wantsAck)
@ -80,7 +80,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
case ChannelWritable if (writePending) doWrite(handler) case ChannelWritable if (writePending) doWrite(handler)
case cmd: CloseCommand handleClose(handler, Some(sender), closeResponse(cmd)) case cmd: CloseCommand handleClose(handler, Some(sender), cmd.event)
} }
/** connection is closing but a write has to be finished first */ /** connection is closing but a write has to be finished first */
@ -225,13 +225,6 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
context.stop(self) context.stop(self)
} }
def closeResponse(closeCommand: CloseCommand): ConnectionClosed =
closeCommand match {
case Close Closed
case Abort Aborted
case ConfirmedClose ConfirmedClosed
}
def handleError(handler: ActorRef, exception: IOException): Unit = { def handleError(handler: ActorRef, exception: IOException): Unit = {
closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))) closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))
@ -318,5 +311,5 @@ private[io] object TcpConnection {
*/ */
case class CloseInformation( case class CloseInformation(
notificationsTo: Set[ActorRef], notificationsTo: Set[ActorRef],
closedEvent: ConnectionClosed) closedEvent: Event)
} }

View file

@ -4,14 +4,11 @@
package akka.io package akka.io
import java.net.InetSocketAddress
import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel } import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.actor.{ Props, ActorLogging, ActorRef, Actor }
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.io.Inet.SocketOption
import akka.io.Tcp._ import akka.io.Tcp._
import akka.io.IO.HasFailureMessage import akka.io.IO.HasFailureMessage
@ -50,7 +47,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef,
try socket.bind(endpoint, backlog) try socket.bind(endpoint, backlog)
catch { catch {
case NonFatal(e) case NonFatal(e)
bindCommander ! CommandFailed(bind) bindCommander ! bind.failureMessage
log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint)
context.stop(self) context.stop(self)
} }

View file

@ -49,7 +49,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
log.debug("Connection established") log.debug("Connection established")
completeConnect(commander, options) completeConnect(commander, options)
} catch { } catch {
case e: IOException handleError(commander, e) case e: IOException
if (tcp.Settings.TraceLogging) log.debug("Could not establish connection due to {}", e)
closedMessage = TcpConnection.CloseInformation(Set(commander), connect.failureMessage)
throw e
} }
} }