2013-02-01 12:38:13 +01:00
|
|
|
/**
|
2013-02-15 11:59:01 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2013-02-01 12:38:13 +01:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.io
|
|
|
|
|
|
|
|
|
|
import java.lang.Runnable
|
|
|
|
|
import java.nio.channels.spi.SelectorProvider
|
|
|
|
|
import java.nio.channels.{ SelectableChannel, SelectionKey }
|
|
|
|
|
import java.nio.channels.SelectionKey._
|
|
|
|
|
import scala.util.control.NonFatal
|
|
|
|
|
import scala.collection.immutable
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import com.typesafe.config.Config
|
|
|
|
|
import akka.actor.Terminated
|
2013-02-05 12:17:26 +01:00
|
|
|
import akka.io.IO.HasFailureMessage
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
abstract class SelectionHandlerSettings(config: Config) {
|
|
|
|
|
import config._
|
|
|
|
|
|
|
|
|
|
val MaxChannels = getString("max-channels") match {
|
|
|
|
|
case "unlimited" ⇒ -1
|
|
|
|
|
case _ ⇒ getInt("max-channels")
|
|
|
|
|
}
|
|
|
|
|
val SelectTimeout = getString("select-timeout") match {
|
|
|
|
|
case "infinite" ⇒ Duration.Inf
|
|
|
|
|
case x ⇒ Duration(x)
|
|
|
|
|
}
|
|
|
|
|
val SelectorAssociationRetries = getInt("selector-association-retries")
|
|
|
|
|
|
|
|
|
|
val SelectorDispatcher = getString("selector-dispatcher")
|
|
|
|
|
val WorkerDispatcher = getString("worker-dispatcher")
|
|
|
|
|
val TraceLogging = getBoolean("trace-logging")
|
|
|
|
|
|
|
|
|
|
require(MaxChannels == -1 || MaxChannels > 0, "max-channels must be > 0 or 'unlimited'")
|
|
|
|
|
require(SelectTimeout >= Duration.Zero, "select-timeout must not be negative")
|
|
|
|
|
require(SelectorAssociationRetries >= 0, "selector-association-retries must be >= 0")
|
|
|
|
|
|
|
|
|
|
def MaxChannelsPerSelector: Int
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[io] object SelectionHandler {
|
2013-02-04 11:21:04 +01:00
|
|
|
|
2013-02-05 12:17:26 +01:00
|
|
|
case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: Props)
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
case class RegisterChannel(channel: SelectableChannel, initialOps: Int)
|
2013-02-05 13:26:27 +01:00
|
|
|
case object ChannelRegistered
|
2013-02-05 12:17:26 +01:00
|
|
|
case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) }
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
case object ChannelConnectable
|
|
|
|
|
case object ChannelAcceptable
|
|
|
|
|
case object ChannelReadable
|
|
|
|
|
case object ChannelWritable
|
|
|
|
|
case object AcceptInterest
|
|
|
|
|
case object ReadInterest
|
2013-02-05 13:38:27 +01:00
|
|
|
case object DisableReadInterest
|
2013-02-01 12:38:13 +01:00
|
|
|
case object WriteInterest
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandlerSettings) extends Actor with ActorLogging {
|
|
|
|
|
import SelectionHandler._
|
|
|
|
|
import settings._
|
|
|
|
|
|
|
|
|
|
@volatile var childrenKeys = immutable.HashMap.empty[String, SelectionKey]
|
|
|
|
|
val sequenceNumber = Iterator.from(0)
|
|
|
|
|
val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher)
|
|
|
|
|
val selector = SelectorProvider.provider.openSelector
|
2013-03-27 15:16:28 +01:00
|
|
|
final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
def receive: Receive = {
|
2013-02-05 15:48:29 +01:00
|
|
|
case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender))
|
|
|
|
|
case ReadInterest ⇒ execute(enableInterest(OP_READ, sender))
|
|
|
|
|
case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender))
|
2013-02-01 12:38:13 +01:00
|
|
|
|
2013-02-05 15:48:29 +01:00
|
|
|
case DisableReadInterest ⇒ execute(disableInterest(OP_READ, sender))
|
2013-02-01 12:38:13 +01:00
|
|
|
|
2013-02-05 12:17:26 +01:00
|
|
|
case cmd: WorkerForCommand ⇒
|
2013-02-05 13:26:27 +01:00
|
|
|
withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) }
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
case RegisterChannel(channel, initialOps) ⇒
|
|
|
|
|
execute(registerChannel(channel, sender, initialOps))
|
|
|
|
|
|
2013-02-05 12:17:26 +01:00
|
|
|
case Retry(WorkerForCommand(cmd, commander, _), 0) ⇒
|
|
|
|
|
commander ! cmd.failureMessage
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
case Retry(cmd, retriesLeft) ⇒
|
2013-02-05 13:26:27 +01:00
|
|
|
withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) }
|
2013-02-01 12:38:13 +01:00
|
|
|
|
|
|
|
|
case Terminated(child) ⇒
|
|
|
|
|
execute(unregister(child))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() {
|
|
|
|
|
try {
|
2013-02-10 13:52:52 +01:00
|
|
|
try {
|
|
|
|
|
val iterator = selector.keys.iterator
|
|
|
|
|
while (iterator.hasNext) {
|
|
|
|
|
val key = iterator.next()
|
|
|
|
|
try key.channel.close()
|
|
|
|
|
catch {
|
|
|
|
|
case NonFatal(e) ⇒ log.error(e, "Error closing channel")
|
|
|
|
|
}
|
2013-02-04 11:21:04 +01:00
|
|
|
}
|
2013-02-10 13:52:52 +01:00
|
|
|
} finally selector.close()
|
2013-02-01 12:38:13 +01:00
|
|
|
} catch {
|
2013-02-04 11:21:04 +01:00
|
|
|
case NonFatal(e) ⇒ log.error(e, "Error closing selector")
|
2013-02-01 12:38:13 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// we can never recover from failures of a connection or listener child
|
|
|
|
|
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
|
|
|
|
|
2013-02-05 12:17:26 +01:00
|
|
|
def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = {
|
2013-02-10 13:52:52 +01:00
|
|
|
log.debug("Executing [{}]", cmd)
|
2013-02-01 12:38:13 +01:00
|
|
|
if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) {
|
|
|
|
|
body
|
|
|
|
|
} else {
|
2013-02-10 13:52:52 +01:00
|
|
|
log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft)
|
2013-02-01 12:38:13 +01:00
|
|
|
context.parent forward Retry(cmd, retriesLeft - 1)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2013-02-01 13:11:17 +01:00
|
|
|
def spawnChild(props: Props): ActorRef =
|
2013-02-01 12:38:13 +01:00
|
|
|
context.watch {
|
|
|
|
|
context.actorOf(
|
|
|
|
|
props = props.withDispatcher(WorkerDispatcher),
|
|
|
|
|
name = sequenceNumber.next().toString)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//////////////// Management Tasks scheduled via the selectorManagementDispatcher /////////////
|
|
|
|
|
|
|
|
|
|
def execute(task: Task): Unit = {
|
|
|
|
|
selectorManagementDispatcher.execute(task)
|
|
|
|
|
selector.wakeup()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def updateKeyMap(child: ActorRef, key: SelectionKey): Unit =
|
|
|
|
|
childrenKeys = childrenKeys.updated(child.path.name, key)
|
|
|
|
|
|
|
|
|
|
def registerChannel(channel: SelectableChannel, channelActor: ActorRef, initialOps: Int): Task =
|
|
|
|
|
new Task {
|
|
|
|
|
def tryRun() {
|
|
|
|
|
updateKeyMap(channelActor, channel.register(selector, initialOps, channelActor))
|
2013-02-05 13:26:27 +01:00
|
|
|
channelActor ! ChannelRegistered
|
2013-02-01 12:38:13 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: evaluate whether we could run the following two tasks directly on the TcpSelector actor itself rather than
|
|
|
|
|
// on the selector-management-dispatcher. The trade-off would be using a ConcurrentHashMap
|
|
|
|
|
// rather than an unsynchronized one, but since switching interest ops is so frequent
|
|
|
|
|
// the change might be beneficial, provided the underlying implementation really is thread-safe
|
|
|
|
|
// and behaves consistently on all platforms.
|
|
|
|
|
def enableInterest(op: Int, connection: ActorRef) =
|
|
|
|
|
new Task {
|
|
|
|
|
def tryRun() {
|
|
|
|
|
val key = childrenKeys(connection.path.name)
|
|
|
|
|
key.interestOps(key.interestOps | op)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def disableInterest(op: Int, connection: ActorRef) =
|
|
|
|
|
new Task {
|
|
|
|
|
def tryRun() {
|
|
|
|
|
val key = childrenKeys(connection.path.name)
|
|
|
|
|
key.interestOps(key.interestOps & ~op)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def unregister(child: ActorRef) =
|
|
|
|
|
new Task {
|
|
|
|
|
def tryRun() {
|
|
|
|
|
childrenKeys = childrenKeys - child.path.name
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val select = new Task {
|
|
|
|
|
val doSelect: () ⇒ Int =
|
|
|
|
|
SelectTimeout match {
|
|
|
|
|
case Duration.Zero ⇒ () ⇒ selector.selectNow()
|
|
|
|
|
case Duration.Inf ⇒ () ⇒ selector.select()
|
|
|
|
|
case x ⇒ val millis = x.toMillis; () ⇒ selector.select(millis)
|
|
|
|
|
}
|
|
|
|
|
def tryRun() {
|
|
|
|
|
if (doSelect() > 0) {
|
|
|
|
|
val keys = selector.selectedKeys
|
|
|
|
|
val iterator = keys.iterator()
|
|
|
|
|
while (iterator.hasNext) {
|
|
|
|
|
val key = iterator.next
|
|
|
|
|
if (key.isValid) {
|
2013-02-07 17:54:42 +01:00
|
|
|
// Cache because the performance implications of calling this on different platforms are not clear
|
|
|
|
|
val readyOps = key.readyOps()
|
|
|
|
|
key.interestOps(key.interestOps & ~readyOps) // prevent immediate reselection by always clearing
|
2013-02-01 12:38:13 +01:00
|
|
|
val connection = key.attachment.asInstanceOf[ActorRef]
|
2013-02-07 17:54:42 +01:00
|
|
|
readyOps match {
|
2013-02-01 12:38:13 +01:00
|
|
|
case OP_READ ⇒ connection ! ChannelReadable
|
|
|
|
|
case OP_WRITE ⇒ connection ! ChannelWritable
|
|
|
|
|
case OP_READ_AND_WRITE ⇒ connection ! ChannelWritable; connection ! ChannelReadable
|
|
|
|
|
case x if (x & OP_ACCEPT) > 0 ⇒ connection ! ChannelAcceptable
|
|
|
|
|
case x if (x & OP_CONNECT) > 0 ⇒ connection ! ChannelConnectable
|
2013-02-10 13:52:52 +01:00
|
|
|
case x ⇒ log.warning("Invalid readyOps: [{}]", x)
|
2013-02-01 12:38:13 +01:00
|
|
|
}
|
2013-02-10 13:52:52 +01:00
|
|
|
} else log.warning("Invalid selection key: [{}]", key)
|
2013-02-01 12:38:13 +01:00
|
|
|
}
|
|
|
|
|
keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected
|
|
|
|
|
}
|
|
|
|
|
selectorManagementDispatcher.execute(this) // re-schedules select behind all currently queued tasks
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
selectorManagementDispatcher.execute(select) // start selection "loop"
|
|
|
|
|
|
2013-02-06 11:38:42 +01:00
|
|
|
// FIXME: Add possibility to signal failure of task to someone
|
2013-02-01 12:38:13 +01:00
|
|
|
abstract class Task extends Runnable {
|
|
|
|
|
def tryRun()
|
|
|
|
|
def run() {
|
|
|
|
|
try tryRun()
|
|
|
|
|
catch {
|
|
|
|
|
case _: java.nio.channels.ClosedSelectorException ⇒ // ok, expected during shutdown
|
2013-02-10 13:52:52 +01:00
|
|
|
case NonFatal(e) ⇒ log.error(e, "Error during selector management task: [{}]", e)
|
2013-02-01 12:38:13 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|