Failed commands in SelectionHandler are now notified in a nicer way.

This commit is contained in:
Endre Sándor Varga 2013-02-05 12:17:26 +01:00
parent 1ec065b0cd
commit 3505a7e76b
8 changed files with 58 additions and 46 deletions

View file

@ -20,7 +20,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
"register its ServerSocketChannel with its selector" in new TestSetup
"let the Bind commander know when binding is completed" in new TestSetup {
listener ! KickStartDone
listener ! WorkerForCommandDone
bindCommander.expectMsg(Bound)
}
@ -36,13 +36,13 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
parent.expectMsg(AcceptInterest)
// FIXME: ugly stuff here
selectorRouter.expectMsgType[KickStartCommand]
selectorRouter.expectMsgType[KickStartCommand]
selectorRouter.expectMsgType[WorkerForCommand]
selectorRouter.expectMsgType[WorkerForCommand]
selectorRouter.expectNoMsg(100.millis)
// and pick up the last remaining connection on the next ChannelAcceptable
listener ! ChannelAcceptable
selectorRouter.expectMsgType[KickStartCommand]
selectorRouter.expectMsgType[WorkerForCommand]
}
"react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup {
@ -61,7 +61,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
attemptConnectionToEndpoint()
listener ! ChannelAcceptable
val props = selectorRouter.expectMsgType[KickStartCommand].childProps
val props = selectorRouter.expectMsgType[WorkerForCommand].childProps
// FIXME: need to instantiate propss
//selectorRouter.expectMsgType[RegisterChannel].channel.isOpen must be(true)
@ -87,7 +87,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
parent.expectMsgType[RegisterChannel]
def bindListener() {
listener ! KickStartDone
listener ! WorkerForCommandDone
bindCommander.expectMsg(Bound)
}

View file

@ -6,7 +6,7 @@ package akka.io
import akka.actor._
import akka.routing.RandomRouter
import akka.io.SelectionHandler.KickStartCommand
import akka.io.SelectionHandler.WorkerForCommand
object IO {
@ -16,22 +16,25 @@ object IO {
def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager
trait HasFailureMessage {
def failureMessage: Any
}
abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
val selectorPool = context.actorOf(
props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)),
name = "selectors")
def createKickStart(pf: PartialFunction[Any, Props], cmd: Any): PartialFunction[Any, KickStartCommand] = {
pf.andThen { props
private def createKickStart(pf: PartialFunction[HasFailureMessage, Props]): PartialFunction[HasFailureMessage, WorkerForCommand] = {
case cmd
val props = pf(cmd)
val commander = sender
KickStartCommand(cmd, commander, props)
}
WorkerForCommand(cmd, commander, props)
}
def kickStartReceive(pf: PartialFunction[Any, Props]): Receive = {
//case KickStartFailed =
case cmd if pf.isDefinedAt(cmd) selectorPool ! createKickStart(pf, cmd)(cmd)
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) selectorPool ! createKickStart(pf)(cmd)
}
}

View file

@ -14,6 +14,7 @@ import scala.concurrent.duration._
import akka.actor._
import com.typesafe.config.Config
import akka.actor.Terminated
import akka.io.IO.HasFailureMessage
abstract class SelectionHandlerSettings(config: Config) {
import config._
@ -42,14 +43,12 @@ abstract class SelectionHandlerSettings(config: Config) {
private[io] object SelectionHandler {
//FIXME: temporary
case class KickStartCommand(apiCommand: Any, commander: ActorRef, childProps: Props)
case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: Props)
// FIXME: all actors should listen to this
case object KickStartDone
case class KickStartFailed(apiCommand: Any, commander: ActorRef)
case object WorkerForCommandDone
case class RegisterChannel(channel: SelectableChannel, initialOps: Int)
case class Retry(command: KickStartCommand, retriesLeft: Int) { require(retriesLeft >= 0) }
case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) }
case object ChannelConnectable
case object ChannelAcceptable
@ -75,21 +74,21 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler
case ReadInterest execute(enableInterest(OP_READ, sender))
case AcceptInterest execute(enableInterest(OP_ACCEPT, sender))
// FIXME: provide StopReading functionality
//case StopReading execute(disableInterest(OP_READ, sender))
case cmd: KickStartCommand
case cmd: WorkerForCommand
// FIXME: factor out to common
withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) ! KickStartDone }
withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) ! WorkerForCommandDone }
case RegisterChannel(channel, initialOps)
execute(registerChannel(channel, sender, initialOps))
case Retry(cmd, 0)
// FIXME: extractors
manager ! KickStartFailed(cmd.apiCommand, cmd.commander)
case Retry(WorkerForCommand(cmd, commander, _), 0)
commander ! cmd.failureMessage
case Retry(cmd, retriesLeft)
withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) ! KickStartDone }
withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) ! WorkerForCommandDone }
case Terminated(child)
execute(unregister(child))
@ -114,7 +113,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler
// we can never recover from failures of a connection or listener child
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def withCapacityProtection(cmd: KickStartCommand, retriesLeft: Int)(body: Unit): Unit = {
def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: Unit): Unit = {
log.debug("Executing {}", cmd)
if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) {
body

View file

@ -6,7 +6,7 @@ package akka.io
import java.net.InetSocketAddress
import java.net.Socket
import akka.io.Inet.SocketOption
import akka.io.Inet._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
@ -58,7 +58,9 @@ object Tcp extends ExtensionKey[TcpExt] {
}
/// COMMANDS
trait Command
trait Command extends IO.HasFailureMessage {
def failureMessage = CommandFailed(this)
}
case class Connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,

View file

@ -5,7 +5,7 @@
package akka.io
import java.net.InetSocketAddress
import java.nio.channels.{ SelectionKey, ServerSocketChannel }
import java.nio.channels.{ SocketChannel, SelectionKey, ServerSocketChannel }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
@ -13,6 +13,17 @@ import akka.actor.{ Props, ActorLogging, ActorRef, Actor }
import akka.io.SelectionHandler._
import akka.io.Inet.SocketOption
import akka.io.Tcp._
import akka.io.IO.HasFailureMessage
private[io] object TcpListener {
case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage {
def failureMessage = FailedRegisterIncoming(channel)
}
case class FailedRegisterIncoming(channel: SocketChannel)
}
private[io] class TcpListener(selectorRouter: ActorRef,
handler: ActorRef,
@ -23,6 +34,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
options: immutable.Traversable[SocketOption]) extends Actor with ActorLogging {
def selector: ActorRef = context.parent
import TcpListener._
import tcp.Settings._
context.watch(handler) // sign death pact
@ -38,7 +50,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
log.debug("Successfully bound to {}", endpoint)
def receive: Receive = {
case KickStartDone
case WorkerForCommandDone
bindCommander ! Bound
context.become(bound)
}
@ -47,12 +59,12 @@ private[io] class TcpListener(selectorRouter: ActorRef,
case ChannelAcceptable
acceptAllPending(BatchAcceptLimit)
// case CommandFailed(RegisterIncomingConnection(socketChannel, _, _))
// log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection")
// try socketChannel.close()
// catch {
// case NonFatal(e) log.error(e, "Error closing channel")
// }
case FailedRegisterIncoming(socketChannel)
log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection")
try socketChannel.close()
catch {
case NonFatal(e) log.error(e, "Error closing channel")
}
case Unbind
log.debug("Unbinding endpoint {}", endpoint)
@ -72,9 +84,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
if (socketChannel != null) {
log.debug("New connection accepted")
socketChannel.configureBlocking(false)
//selectorRouter ! RegisterIncomingConnection(socketChannel, handler, options)
// FIXME null is not nice. There is no explicit API command here
selectorRouter ! KickStartCommand(null, context.system.deadLetters, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options)))
selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options)))
acceptAllPending(limit - 1)
}
} else context.parent ! AcceptInterest

View file

@ -4,10 +4,8 @@
package akka.io
import akka.actor.{ ActorLogging, Actor, Props }
import akka.routing.RandomRouter
import Tcp._
import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand }
import akka.actor.{ ActorLogging, Props }
import akka.io.IO.SelectorBasedManager
/**

View file

@ -15,7 +15,9 @@ object UdpFF extends ExtensionKey[UdpFFExt] {
// Java API
override def get(system: ActorSystem): UdpFFExt = system.extension(this)
trait Command
trait Command extends IO.HasFailureMessage {
def failureMessage = CommandFailed(this)
}
case object NoAck
case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command {

View file

@ -3,11 +3,9 @@
*/
package akka.io
import akka.actor.{ ActorRef, Props, Actor }
import akka.io.UdpFF._
import akka.routing.RandomRouter
import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand }
import akka.actor.{ ActorRef, Props }
import akka.io.IO.SelectorBasedManager
import akka.io.UdpFF._
/**
* UdpFFManager is a facade for simple fire-and-forget style UDP operations