Merge pull request #1355 from akka/wip-3176-work-around-epoll-bug-√

#3176 - Adding an AkkaSelector that will work around the EPoll bug, as ...
This commit is contained in:
Roland Kuhn 2013-05-03 01:35:33 -07:00
commit f0ef1684e7
5 changed files with 77 additions and 115 deletions

View file

@ -23,16 +23,14 @@ object TestUtils {
temporaryServerAddresses(1, address, udp).head
def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1", udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress] = {
val sockets = for (_ 1 to numberOfAddresses) yield {
Vector.fill(numberOfAddresses) {
val serverSocket: GeneralSocket =
if (udp) DatagramChannel.open().socket()
else ServerSocketChannel.open().socket()
serverSocket.bind(new InetSocketAddress(hostname, 0))
(serverSocket, new InetSocketAddress(hostname, serverSocket.getLocalPort))
}
sockets collect { case (socket, address) socket.close(); address }
} collect { case (socket, address) socket.close(); address }
}
def verifyActorTermination(actor: ActorRef)(implicit system: ActorSystem): Unit = {

View file

@ -452,15 +452,6 @@ akka {
# here. Must be an integer > 0 or "unlimited".
max-channels = 256000
# The select loop can be used in two modes:
# - setting "infinite" will select without a timeout, hogging a thread
# - setting a positive timeout will do a bounded select call,
# enabling sharing of a single thread between multiple selectors
# (in this case you will have to use a different configuration for the
# selector-dispatcher, e.g. using "type=Dispatcher" with size 1)
# - setting it to zero means polling, i.e. calling selectNow()
select-timeout = infinite
# When trying to assign a new connection to a selector and the chosen
# selector is at full capacity, retry selector choosing and assignment
# this many times before giving up

View file

@ -18,6 +18,7 @@ object IO {
def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager
// What is this? It's public API so I think it deserves a mention
trait HasFailureMessage {
def failureMessage: Any
}
@ -27,18 +28,11 @@ object IO {
override def supervisorStrategy = connectionSupervisorStrategy
val selectorPool = context.actorOf(
props = Props(classOf[SelectionHandler], self, selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
name = "selectors")
private def createWorkerMessage(pf: PartialFunction[HasFailureMessage, Props]): PartialFunction[HasFailureMessage, WorkerForCommand] = {
case cmd
val props = pf(cmd)
val commander = sender
WorkerForCommand(cmd, commander, props)
}
def workerForCommandHandler(pf: PartialFunction[Any, Props]): Receive = {
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) selectorPool ! createWorkerMessage(pf)(cmd)
def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, Props]): Receive = {
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) selectorPool ! WorkerForCommand(cmd, sender, pf(cmd))
}
}

View file

@ -4,18 +4,21 @@
package akka.io
import java.lang.Runnable
import java.nio.channels.spi.SelectorProvider
import java.nio.channels.{ SelectableChannel, SelectionKey, CancelledKeyException, ClosedSelectorException }
import java.util.{ Set JSet, Iterator JIterator }
import java.util.concurrent.atomic.AtomicBoolean
import java.nio.channels.{ Selector, SelectableChannel, SelectionKey, CancelledKeyException, ClosedSelectorException, ClosedChannelException }
import java.nio.channels.SelectionKey._
import java.nio.channels.spi.{ AbstractSelector, SelectorProvider }
import scala.annotation.{ tailrec, switch }
import scala.util.control.NonFatal
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
import com.typesafe.config.Config
import akka.actor.Terminated
import akka.io.IO.HasFailureMessage
import akka.util.Helpers.Requiring
import akka.event.LoggingAdapter
import akka.util.SerializedSuspendableExecutionContext
abstract class SelectionHandlerSettings(config: Config) {
import config._
@ -24,11 +27,6 @@ abstract class SelectionHandlerSettings(config: Config) {
case "unlimited" -1
case _ getInt("max-channels") requiring (_ > 0, "max-channels must be > 0 or 'unlimited'")
}
val SelectTimeout: Duration = getString("select-timeout") match {
case "infinite" Duration.Inf
case _ Duration(getMilliseconds("select-timeout"), MILLISECONDS) requiring (
_ >= Duration.Zero, "select-timeout must not be negative")
}
val SelectorAssociationRetries: Int = getInt("selector-association-retries") requiring (
_ >= 0, "selector-association-retries must be >= 0")
@ -58,135 +56,118 @@ private[io] object SelectionHandler {
case object WriteInterest
}
private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandlerSettings) extends Actor with ActorLogging {
private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends Actor with ActorLogging {
import SelectionHandler._
import settings._
@volatile var childrenKeys = immutable.HashMap.empty[String, SelectionKey]
val sequenceNumber = Iterator.from(0)
val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher)
val selector = SelectorProvider.provider.openSelector
final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant
private val wakeUp = new AtomicBoolean(false)
@volatile var childrenKeys = immutable.HashMap.empty[String, SelectionKey]
var sequenceNumber = 0
val selectorManagementEC = {
val dispatcher = context.system.dispatchers.lookup(SelectorDispatcher)
SerializedSuspendableExecutionContext(dispatcher.throughput)(dispatcher)
}
val selector = SelectorProvider.provider.openSelector
def receive: Receive = {
case WriteInterest execute(enableInterest(OP_WRITE, sender))
case ReadInterest execute(enableInterest(OP_READ, sender))
case AcceptInterest execute(enableInterest(OP_ACCEPT, sender))
case WriteInterest execute(enableInterest(OP_WRITE, sender))
case ReadInterest execute(enableInterest(OP_READ, sender))
case AcceptInterest execute(enableInterest(OP_ACCEPT, sender))
case DisableReadInterest execute(disableInterest(OP_READ, sender))
case DisableReadInterest execute(disableInterest(OP_READ, sender))
case cmd: WorkerForCommand
withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) }
case cmd: WorkerForCommand spawnChildWithCapacityProtection(cmd, SelectorAssociationRetries)
case RegisterChannel(channel, initialOps)
execute(registerChannel(channel, sender, initialOps))
case RegisterChannel(channel, initialOps) execute(registerChannel(channel, sender, initialOps))
case Retry(WorkerForCommand(cmd, commander, _), 0)
commander ! cmd.failureMessage
case Retry(cmd, retriesLeft) spawnChildWithCapacityProtection(cmd, retriesLeft)
case Retry(cmd, retriesLeft)
withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) }
case Terminated(child)
execute(unregister(child))
case Terminated(child) execute(unregister(child))
}
override def postStop() {
try {
try {
val iterator = selector.keys.iterator
while (iterator.hasNext) {
val key = iterator.next()
try key.channel.close()
catch {
case NonFatal(e) log.error(e, "Error closing channel")
}
}
} finally selector.close()
} catch {
case NonFatal(e) log.error(e, "Error closing selector")
}
}
override def postStop(): Unit = execute(terminate())
// we can never recover from failures of a connection or listener child
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: Unit): Unit = {
def spawnChildWithCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int): Unit = {
if (TraceLogging) log.debug("Executing [{}]", cmd)
if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) {
body
val newName = sequenceNumber.toString
sequenceNumber += 1
context watch context.actorOf(props = cmd.childProps.withDispatcher(WorkerDispatcher), name = newName)
} else {
log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft)
context.parent forward Retry(cmd, retriesLeft - 1)
if (retriesLeft >= 1) {
log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft)
context.parent forward Retry(cmd, retriesLeft - 1)
} else {
log.warning("Rejecting [{}] with no retries left, aborting...", cmd)
cmd.commander ! cmd.apiCommand.failureMessage // I can't do it, Captain!
}
}
}
def spawnChild(props: Props): ActorRef =
context.watch {
context.actorOf(
props = props.withDispatcher(WorkerDispatcher),
name = sequenceNumber.next().toString)
}
//////////////// Management Tasks scheduled via the selectorManagementDispatcher /////////////
//////////////// Management Tasks scheduled via the selectorManagementEC /////////////
def execute(task: Task): Unit = {
selectorManagementDispatcher.execute(task)
selector.wakeup()
selectorManagementEC.execute(task)
if (wakeUp.compareAndSet(false, true)) selector.wakeup() // Avoiding syscall and trade off with LOCK CMPXCHG
}
def updateKeyMap(child: ActorRef, key: SelectionKey): Unit =
childrenKeys = childrenKeys.updated(child.path.name, key)
def registerChannel(channel: SelectableChannel, channelActor: ActorRef, initialOps: Int): Task =
new Task {
def tryRun() {
updateKeyMap(channelActor, channel.register(selector, initialOps, channelActor))
childrenKeys = childrenKeys.updated(channelActor.path.name, channel.register(selector, initialOps, channelActor))
channelActor ! ChannelRegistered
}
}
// TODO: evaluate whether we could run the following two tasks directly on the TcpSelector actor itself rather than
// on the selector-management-dispatcher. The trade-off would be using a ConcurrentHashMap
// rather than an unsynchronized one, but since switching interest ops is so frequent
// the change might be beneficial, provided the underlying implementation really is thread-safe
// and behaves consistently on all platforms.
def enableInterest(op: Int, connection: ActorRef) =
// Always set the interest keys on the selector thread according to benchmark
def enableInterest(ops: Int, connection: ActorRef) =
new Task {
def tryRun() {
val key = childrenKeys(connection.path.name)
key.interestOps(key.interestOps | op)
val currentOps = key.interestOps
val newOps = currentOps | ops
if (newOps != currentOps) key.interestOps(newOps)
}
}
def disableInterest(op: Int, connection: ActorRef) =
def disableInterest(ops: Int, connection: ActorRef) =
new Task {
def tryRun() {
val key = childrenKeys(connection.path.name)
key.interestOps(key.interestOps & ~op)
val currentOps = key.interestOps
val newOps = currentOps & ~ops
if (newOps != currentOps) key.interestOps(newOps)
}
}
def unregister(child: ActorRef) =
new Task {
def tryRun() {
childrenKeys = childrenKeys - child.path.name
new Task { def tryRun() { childrenKeys = childrenKeys - child.path.name } }
def terminate() = new Task {
def tryRun() {
// Thorough 'close' of the Selector
@tailrec def closeNextChannel(it: JIterator[SelectionKey]): Unit = if (it.hasNext) {
try it.next().channel.close() catch { case NonFatal(e) log.error(e, "Error closing channel") }
closeNextChannel(it)
}
try closeNextChannel(selector.keys.iterator) finally selector.close()
}
}
val select = new Task {
val doSelect: () Int =
SelectTimeout match {
case Duration.Zero () selector.selectNow()
case Duration.Inf () selector.select()
case x { val millis = x.toMillis; () selector.select(millis) }
}
def tryRun() {
if (doSelect() > 0) {
def tryRun(): Unit = {
wakeUp.set(false) // Reset early, worst-case we do a double-wakeup, but it's supposed to be idempotent so it's just an extra syscall
if (selector.select() > 0) { // This assumes select return value == selectedKeys.size
val keys = selector.selectedKeys
val iterator = keys.iterator()
while (iterator.hasNext) {
val key = iterator.next
val key = iterator.next()
if (key.isValid) {
try {
// Cache because the performance implications of calling this on different platforms are not clear
@ -210,11 +191,13 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler
}
keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected
}
selectorManagementDispatcher.execute(this) // re-schedules select behind all currently queued tasks
// FIXME what is the appropriate error-handling here, shouldn't this task be resubmitted in case of exception?
selectorManagementEC.execute(this) // re-schedules select behind all currently queued tasks
}
}
selectorManagementDispatcher.execute(select) // start selection "loop"
selectorManagementEC.execute(select) // start selection "loop"
// FIXME: Add possibility to signal failure of task to someone
abstract class Task extends Runnable {

View file

@ -48,12 +48,8 @@ import akka.io.IO.SelectorBasedManager
private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging {
def receive = workerForCommandHandler {
case c: Connect
val commander = sender
Props(classOf[TcpOutgoingConnection], tcp, commander, c)
case b: Bind
val commander = sender
Props(classOf[TcpListener], selectorPool, tcp, commander, b)
case c: Connect Props(classOf[TcpOutgoingConnection], tcp, sender, c)
case b: Bind Props(classOf[TcpListener], selectorPool, tcp, sender, b)
}
}