diff --git a/akka-actor-tests/src/test/scala/akka/TestUtils.scala b/akka-actor-tests/src/test/scala/akka/TestUtils.scala index 9cf777ddb5..ad173f0585 100644 --- a/akka-actor-tests/src/test/scala/akka/TestUtils.scala +++ b/akka-actor-tests/src/test/scala/akka/TestUtils.scala @@ -23,16 +23,14 @@ object TestUtils { temporaryServerAddresses(1, address, udp).head def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1", udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress] = { - val sockets = for (_ ← 1 to numberOfAddresses) yield { + Vector.fill(numberOfAddresses) { val serverSocket: GeneralSocket = if (udp) DatagramChannel.open().socket() else ServerSocketChannel.open().socket() serverSocket.bind(new InetSocketAddress(hostname, 0)) (serverSocket, new InetSocketAddress(hostname, serverSocket.getLocalPort)) - } - - sockets collect { case (socket, address) ⇒ socket.close(); address } + } collect { case (socket, address) ⇒ socket.close(); address } } def verifyActorTermination(actor: ActorRef)(implicit system: ActorSystem): Unit = { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 516dcd292e..cd66fc5ac1 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -452,15 +452,6 @@ akka { # here. Must be an integer > 0 or "unlimited". max-channels = 256000 - # The select loop can be used in two modes: - # - setting "infinite" will select without a timeout, hogging a thread - # - setting a positive timeout will do a bounded select call, - # enabling sharing of a single thread between multiple selectors - # (in this case you will have to use a different configuration for the - # selector-dispatcher, e.g. using "type=Dispatcher" with size 1) - # - setting it to zero means polling, i.e. calling selectNow() - select-timeout = infinite - # When trying to assign a new connection to a selector and the chosen # selector is at full capacity, retry selector choosing and assignment # this many times before giving up diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index 15fe42c710..22c8014f2c 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -18,6 +18,7 @@ object IO { def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager + // What is this? It's public API so I think it deserves a mention trait HasFailureMessage { def failureMessage: Any } @@ -27,18 +28,11 @@ object IO { override def supervisorStrategy = connectionSupervisorStrategy val selectorPool = context.actorOf( - props = Props(classOf[SelectionHandler], self, selectorSettings).withRouter(RandomRouter(nrOfSelectors)), + props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)), name = "selectors") - private def createWorkerMessage(pf: PartialFunction[HasFailureMessage, Props]): PartialFunction[HasFailureMessage, WorkerForCommand] = { - case cmd ⇒ - val props = pf(cmd) - val commander = sender - WorkerForCommand(cmd, commander, props) - } - - def workerForCommandHandler(pf: PartialFunction[Any, Props]): Receive = { - case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! createWorkerMessage(pf)(cmd) + def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, Props]): Receive = { + case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender, pf(cmd)) } } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index fc718acd8a..950b5fa63f 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -4,18 +4,21 @@ package akka.io -import java.lang.Runnable -import java.nio.channels.spi.SelectorProvider -import java.nio.channels.{ SelectableChannel, SelectionKey, CancelledKeyException, ClosedSelectorException } +import java.util.{ Set ⇒ JSet, Iterator ⇒ JIterator } +import java.util.concurrent.atomic.AtomicBoolean +import java.nio.channels.{ Selector, SelectableChannel, SelectionKey, CancelledKeyException, ClosedSelectorException, ClosedChannelException } import java.nio.channels.SelectionKey._ +import java.nio.channels.spi.{ AbstractSelector, SelectorProvider } +import scala.annotation.{ tailrec, switch } import scala.util.control.NonFatal import scala.collection.immutable import scala.concurrent.duration._ import akka.actor._ import com.typesafe.config.Config -import akka.actor.Terminated import akka.io.IO.HasFailureMessage import akka.util.Helpers.Requiring +import akka.event.LoggingAdapter +import akka.util.SerializedSuspendableExecutionContext abstract class SelectionHandlerSettings(config: Config) { import config._ @@ -24,11 +27,6 @@ abstract class SelectionHandlerSettings(config: Config) { case "unlimited" ⇒ -1 case _ ⇒ getInt("max-channels") requiring (_ > 0, "max-channels must be > 0 or 'unlimited'") } - val SelectTimeout: Duration = getString("select-timeout") match { - case "infinite" ⇒ Duration.Inf - case _ ⇒ Duration(getMilliseconds("select-timeout"), MILLISECONDS) requiring ( - _ >= Duration.Zero, "select-timeout must not be negative") - } val SelectorAssociationRetries: Int = getInt("selector-association-retries") requiring ( _ >= 0, "selector-association-retries must be >= 0") @@ -58,135 +56,118 @@ private[io] object SelectionHandler { case object WriteInterest } -private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandlerSettings) extends Actor with ActorLogging { +private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends Actor with ActorLogging { import SelectionHandler._ import settings._ - @volatile var childrenKeys = immutable.HashMap.empty[String, SelectionKey] - val sequenceNumber = Iterator.from(0) - val selectorManagementDispatcher = context.system.dispatchers.lookup(SelectorDispatcher) - val selector = SelectorProvider.provider.openSelector final val OP_READ_AND_WRITE = OP_READ | OP_WRITE // compile-time constant + private val wakeUp = new AtomicBoolean(false) + @volatile var childrenKeys = immutable.HashMap.empty[String, SelectionKey] + var sequenceNumber = 0 + val selectorManagementEC = { + val dispatcher = context.system.dispatchers.lookup(SelectorDispatcher) + SerializedSuspendableExecutionContext(dispatcher.throughput)(dispatcher) + } + + val selector = SelectorProvider.provider.openSelector + def receive: Receive = { - case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender)) - case ReadInterest ⇒ execute(enableInterest(OP_READ, sender)) - case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender)) + case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender)) + case ReadInterest ⇒ execute(enableInterest(OP_READ, sender)) + case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender)) - case DisableReadInterest ⇒ execute(disableInterest(OP_READ, sender)) + case DisableReadInterest ⇒ execute(disableInterest(OP_READ, sender)) - case cmd: WorkerForCommand ⇒ - withCapacityProtection(cmd, SelectorAssociationRetries) { spawnChild(cmd.childProps) } + case cmd: WorkerForCommand ⇒ spawnChildWithCapacityProtection(cmd, SelectorAssociationRetries) - case RegisterChannel(channel, initialOps) ⇒ - execute(registerChannel(channel, sender, initialOps)) + case RegisterChannel(channel, initialOps) ⇒ execute(registerChannel(channel, sender, initialOps)) - case Retry(WorkerForCommand(cmd, commander, _), 0) ⇒ - commander ! cmd.failureMessage + case Retry(cmd, retriesLeft) ⇒ spawnChildWithCapacityProtection(cmd, retriesLeft) - case Retry(cmd, retriesLeft) ⇒ - withCapacityProtection(cmd, retriesLeft) { spawnChild(cmd.childProps) } - - case Terminated(child) ⇒ - execute(unregister(child)) + case Terminated(child) ⇒ execute(unregister(child)) } - override def postStop() { - try { - try { - val iterator = selector.keys.iterator - while (iterator.hasNext) { - val key = iterator.next() - try key.channel.close() - catch { - case NonFatal(e) ⇒ log.error(e, "Error closing channel") - } - } - } finally selector.close() - } catch { - case NonFatal(e) ⇒ log.error(e, "Error closing selector") - } - } + override def postStop(): Unit = execute(terminate()) // we can never recover from failures of a connection or listener child override def supervisorStrategy = SupervisorStrategy.stoppingStrategy - def withCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int)(body: ⇒ Unit): Unit = { + def spawnChildWithCapacityProtection(cmd: WorkerForCommand, retriesLeft: Int): Unit = { if (TraceLogging) log.debug("Executing [{}]", cmd) if (MaxChannelsPerSelector == -1 || childrenKeys.size < MaxChannelsPerSelector) { - body + val newName = sequenceNumber.toString + sequenceNumber += 1 + context watch context.actorOf(props = cmd.childProps.withDispatcher(WorkerDispatcher), name = newName) } else { - log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft) - context.parent forward Retry(cmd, retriesLeft - 1) + if (retriesLeft >= 1) { + log.warning("Rejecting [{}] with [{}] retries left, retrying...", cmd, retriesLeft) + context.parent forward Retry(cmd, retriesLeft - 1) + } else { + log.warning("Rejecting [{}] with no retries left, aborting...", cmd) + cmd.commander ! cmd.apiCommand.failureMessage // I can't do it, Captain! + } } } - def spawnChild(props: Props): ActorRef = - context.watch { - context.actorOf( - props = props.withDispatcher(WorkerDispatcher), - name = sequenceNumber.next().toString) - } - - //////////////// Management Tasks scheduled via the selectorManagementDispatcher ///////////// + //////////////// Management Tasks scheduled via the selectorManagementEC ///////////// def execute(task: Task): Unit = { - selectorManagementDispatcher.execute(task) - selector.wakeup() + selectorManagementEC.execute(task) + if (wakeUp.compareAndSet(false, true)) selector.wakeup() // Avoiding syscall and trade off with LOCK CMPXCHG } - def updateKeyMap(child: ActorRef, key: SelectionKey): Unit = - childrenKeys = childrenKeys.updated(child.path.name, key) - def registerChannel(channel: SelectableChannel, channelActor: ActorRef, initialOps: Int): Task = new Task { def tryRun() { - updateKeyMap(channelActor, channel.register(selector, initialOps, channelActor)) + childrenKeys = childrenKeys.updated(channelActor.path.name, channel.register(selector, initialOps, channelActor)) channelActor ! ChannelRegistered } } - // TODO: evaluate whether we could run the following two tasks directly on the TcpSelector actor itself rather than - // on the selector-management-dispatcher. The trade-off would be using a ConcurrentHashMap - // rather than an unsynchronized one, but since switching interest ops is so frequent - // the change might be beneficial, provided the underlying implementation really is thread-safe - // and behaves consistently on all platforms. - def enableInterest(op: Int, connection: ActorRef) = + // Always set the interest keys on the selector thread according to benchmark + def enableInterest(ops: Int, connection: ActorRef) = new Task { def tryRun() { val key = childrenKeys(connection.path.name) - key.interestOps(key.interestOps | op) + val currentOps = key.interestOps + val newOps = currentOps | ops + if (newOps != currentOps) key.interestOps(newOps) } } - def disableInterest(op: Int, connection: ActorRef) = + def disableInterest(ops: Int, connection: ActorRef) = new Task { def tryRun() { val key = childrenKeys(connection.path.name) - key.interestOps(key.interestOps & ~op) + val currentOps = key.interestOps + val newOps = currentOps & ~ops + if (newOps != currentOps) key.interestOps(newOps) } } def unregister(child: ActorRef) = - new Task { - def tryRun() { - childrenKeys = childrenKeys - child.path.name + new Task { def tryRun() { childrenKeys = childrenKeys - child.path.name } } + + def terminate() = new Task { + def tryRun() { + // Thorough 'close' of the Selector + @tailrec def closeNextChannel(it: JIterator[SelectionKey]): Unit = if (it.hasNext) { + try it.next().channel.close() catch { case NonFatal(e) ⇒ log.error(e, "Error closing channel") } + closeNextChannel(it) } + try closeNextChannel(selector.keys.iterator) finally selector.close() } + } val select = new Task { - val doSelect: () ⇒ Int = - SelectTimeout match { - case Duration.Zero ⇒ () ⇒ selector.selectNow() - case Duration.Inf ⇒ () ⇒ selector.select() - case x ⇒ { val millis = x.toMillis; () ⇒ selector.select(millis) } - } - def tryRun() { - if (doSelect() > 0) { + def tryRun(): Unit = { + wakeUp.set(false) // Reset early, worst-case we do a double-wakeup, but it's supposed to be idempotent so it's just an extra syscall + if (selector.select() > 0) { // This assumes select return value == selectedKeys.size val keys = selector.selectedKeys val iterator = keys.iterator() while (iterator.hasNext) { - val key = iterator.next + val key = iterator.next() if (key.isValid) { try { // Cache because the performance implications of calling this on different platforms are not clear @@ -210,11 +191,13 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler } keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected } - selectorManagementDispatcher.execute(this) // re-schedules select behind all currently queued tasks + + // FIXME what is the appropriate error-handling here, shouldn't this task be resubmitted in case of exception? + selectorManagementEC.execute(this) // re-schedules select behind all currently queued tasks } } - selectorManagementDispatcher.execute(select) // start selection "loop" + selectorManagementEC.execute(select) // start selection "loop" // FIXME: Add possibility to signal failure of task to someone abstract class Task extends Runnable { diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index 63a6f3213d..05dde136bb 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -48,12 +48,8 @@ import akka.io.IO.SelectorBasedManager private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { def receive = workerForCommandHandler { - case c: Connect ⇒ - val commander = sender - Props(classOf[TcpOutgoingConnection], tcp, commander, c) - case b: Bind ⇒ - val commander = sender - Props(classOf[TcpListener], selectorPool, tcp, commander, b) + case c: Connect ⇒ Props(classOf[TcpOutgoingConnection], tcp, sender, c) + case b: Bind ⇒ Props(classOf[TcpListener], selectorPool, tcp, sender, b) } }