diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 36093f86a0..6820b815fe 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -99,7 +99,6 @@ object ActorModelSpec { trait MessageDispatcherInterceptor extends MessageDispatcher { val stats = new ConcurrentHashMap[ActorRef, InterceptorStats] - val starts = new AtomicLong(0) val stops = new AtomicLong(0) def getStats(actorRef: ActorRef) = { @@ -135,11 +134,6 @@ object ActorModelSpec { super.dispatch(receiver, invocation) } - protected[akka] abstract override def start() { - starts.incrementAndGet() - super.start() - } - protected[akka] abstract override def shutdown() { stops.incrementAndGet() super.shutdown() @@ -147,16 +141,14 @@ object ActorModelSpec { } def assertDispatcher(dispatcher: MessageDispatcherInterceptor)( - starts: Long = dispatcher.starts.get(), stops: Long = dispatcher.stops.get())(implicit app: ActorSystem) { val deadline = System.currentTimeMillis + dispatcher.timeoutMs * 5 try { - await(deadline)(starts == dispatcher.starts.get) await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - app.eventStream.publish(Error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + - " required: starts=" + starts + ",stops=" + stops)) + app.eventStream.publish(Error(e, dispatcher, "actual: stops=" + dispatcher.stops.get + + " required: stops=" + stops)) throw e } } @@ -247,11 +239,11 @@ abstract class ActorModelSpec extends AkkaSpec { "must dynamically handle its own life cycle" in { implicit val dispatcher = newInterceptedDispatcher - assertDispatcher(dispatcher)(starts = 0, stops = 0) + assertDispatcher(dispatcher)(stops = 0) val a = newTestActor(dispatcher) - assertDispatcher(dispatcher)(starts = 1, stops = 0) + assertDispatcher(dispatcher)(stops = 0) a.stop() - assertDispatcher(dispatcher)(starts = 1, stops = 1) + assertDispatcher(dispatcher)(stops = 1) assertRef(a, dispatcher)( suspensions = 0, resumes = 0, @@ -264,15 +256,15 @@ abstract class ActorModelSpec extends AkkaSpec { val futures = for (i ← 1 to 10) yield Future { i } - assertDispatcher(dispatcher)(starts = 2, stops = 2) + assertDispatcher(dispatcher)(stops = 2) val a2 = newTestActor(dispatcher) val futures2 = for (i ← 1 to 10) yield Future { i } - assertDispatcher(dispatcher)(starts = 3, stops = 2) + assertDispatcher(dispatcher)(stops = 2) a2.stop - assertDispatcher(dispatcher)(starts = 3, stops = 3) + assertDispatcher(dispatcher)(stops = 3) } "process messages one at a time" in { @@ -363,7 +355,7 @@ abstract class ActorModelSpec extends AkkaSpec { } for (run ← 1 to 3) { flood(50000) - assertDispatcher(dispatcher)(starts = run, stops = run) + assertDispatcher(dispatcher)(stops = run) } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 256bb4f9ca..0261b8fc7e 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -436,12 +436,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } f3.await assert(f3.get === "SUCCESS") - - // give time for all callbacks to execute - Thread sleep 100 - - // make sure all futures are completed in dispatcher - assert(implicitly[MessageDispatcher].tasks === 0) } } @@ -559,10 +553,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { assert(a.get === 5) assert(b.get === 3) assert(result2.get === 50) - Thread.sleep(100) - - // make sure all futures are completed in dispatcher - assert(implicitly[MessageDispatcher].tasks === 0) } "shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry" in { @@ -771,18 +761,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { } - "ticket812FutureDispatchCleanup" in { - filterException[FutureTimeoutException] { - implicit val dispatcher = app.dispatcherFactory.newDispatcher("ticket812FutureDispatchCleanup").build - assert(dispatcher.tasks === 0) - val future = Future({ Thread.sleep(100); "Done" }, 10) - intercept[FutureTimeoutException] { future.await } - assert(dispatcher.tasks === 1) - Thread.sleep(200) - assert(dispatcher.tasks === 0) - } - } - "run callbacks async" in { val latch = Vector.fill(10)(new StandardLatch) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e250ef066b..72fb681495 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -5,13 +5,14 @@ package akka.dispatch import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicLong import akka.event.Logging.Error import akka.config.Configuration import akka.util.{ Duration, Switch, ReentrantGuard } +import atomic.{ AtomicInteger, AtomicLong } import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } import akka.actor._ import akka.actor.ActorSystem +import locks.ReentrantLock import scala.annotation.tailrec /** @@ -87,12 +88,9 @@ object MessageDispatcher { abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { import MessageDispatcher._ - protected val _tasks = new AtomicLong(0L) - protected val _actors = new AtomicLong(0L) - protected val guard = new ReentrantGuard - protected val active = new Switch(false) + protected val _inhabitants = new AtomicLong(0L) - private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard + private val shutdownSchedule = new AtomicInteger(UNSCHEDULED) /** * Creates and returns a mailbox for the given actor. @@ -113,91 +111,65 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { * Attaches the specified actor instance to this dispatcher */ final def attach(actor: ActorCell) { - guard.lock.lock() - try { - startIfUnstarted() - register(actor) - } finally { - guard.lock.unlock() - } + register(actor) } /** * Detaches the specified actor instance from this dispatcher */ final def detach(actor: ActorCell) { - guard.lock.lock() - try { - unregister(actor) - if (_tasks.get == 0 && _actors.get == 0) { - shutdownSchedule match { - case UNSCHEDULED ⇒ - shutdownSchedule = SCHEDULED - app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED ⇒ - shutdownSchedule = RESCHEDULED - case RESCHEDULED ⇒ //Already marked for reschedule - } - } - } finally { guard.lock.unlock() } - } - - protected final def startIfUnstarted() { - if (active.isOff) { - guard.lock.lock() - try { active.switchOn { start() } } - finally { guard.lock.unlock() } - } + unregister(actor) + ifSensibleToDoSoThenScheduleShutdown() } protected[akka] final def dispatchTask(block: () ⇒ Unit) { - _tasks.getAndIncrement() + val invocation = TaskInvocation(app, block, taskCleanup) + _inhabitants.getAndIncrement() try { - startIfUnstarted() - executeTask(TaskInvocation(app, block, taskCleanup)) + executeTask(invocation) } catch { case e ⇒ - _tasks.decrementAndGet + _inhabitants.decrementAndGet throw e } } - private val taskCleanup: () ⇒ Unit = - () ⇒ if (_tasks.decrementAndGet() == 0) { - guard.lock.lock() - try { - if (_tasks.get == 0 && _actors.get == 0) { - shutdownSchedule match { - case UNSCHEDULED ⇒ - shutdownSchedule = SCHEDULED - app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED ⇒ - shutdownSchedule = RESCHEDULED - case RESCHEDULED ⇒ //Already marked for reschedule - } - } - } finally { guard.lock.unlock() } - } + @tailrec + private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = _inhabitants.get() match { + case 0 ⇒ + shutdownSchedule.get match { + case UNSCHEDULED ⇒ + if (shutdownSchedule.compareAndSet(UNSCHEDULED, SCHEDULED)) { + app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + () + } else ifSensibleToDoSoThenScheduleShutdown() + case SCHEDULED ⇒ + if (shutdownSchedule.compareAndSet(SCHEDULED, RESCHEDULED)) () + else ifSensibleToDoSoThenScheduleShutdown() + case RESCHEDULED ⇒ () + } + case _ ⇒ () + } + + private val taskCleanup: () ⇒ Unit = () ⇒ if (_inhabitants.decrementAndGet() == 0) ifSensibleToDoSoThenScheduleShutdown() /** - * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, - * and only call it under the dispatcher-guard, see "attach" for the only invocation + * Don't call this, this calls you. See "attach" for only invocation */ protected[akka] def register(actor: ActorCell) { - _actors.incrementAndGet() + _inhabitants.incrementAndGet() // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps? } /** - * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, - * and only call it under the dispatcher-guard, see "detach" for the only invocation + * Don't call this, this calls you. See "detach" for the only invocation */ protected[akka] def unregister(actor: ActorCell) { - _actors.decrementAndGet() + _inhabitants.decrementAndGet() val mailBox = actor.mailbox + mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up actor.mailbox = deadLetterMailbox - mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up cleanUpMailboxFor(actor, mailBox) mailBox.cleanUp() } @@ -229,23 +201,22 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { } private val shutdownAction = new Runnable { - def run() { - guard.lock.lock() - try { - shutdownSchedule match { - case RESCHEDULED ⇒ - shutdownSchedule = SCHEDULED + @tailrec + final def run() { + shutdownSchedule.get match { + case UNSCHEDULED ⇒ () + case SCHEDULED ⇒ + try { + if (_inhabitants.get == 0) //Warning, racy + shutdown() + } finally { + shutdownSchedule.getAndSet(UNSCHEDULED) //TODO perhaps check if it was modified since we checked? + } + case RESCHEDULED ⇒ + if (shutdownSchedule.compareAndSet(RESCHEDULED, SCHEDULED)) app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED ⇒ - if (_tasks.get == 0 && _actors.get() == 0) { - active switchOff { - shutdown() // shut down in the dispatcher's references is zero - } - } - shutdownSchedule = UNSCHEDULED - case UNSCHEDULED ⇒ //Do nothing - } - } finally { guard.lock.unlock() } + else run() + } } } @@ -301,13 +272,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { protected[akka] def executeTask(invocation: TaskInvocation) - /** - * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown - */ - protected[akka] def start(): Unit - /** * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached + * Must be idempotent */ protected[akka] def shutdown(): Unit @@ -320,11 +287,6 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { * Returns the "current" emptiness status of the mailbox for the specified actor */ def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages - - /** - * Returns the amount of tasks queued for execution - */ - def tasks: Long = _tasks.get } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index c311067270..32a8268c29 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -77,11 +77,6 @@ class BalancingDispatcher( buddies.remove(actor) } - protected[akka] override def shutdown() { - super.shutdown() - buddies.clear() - } - protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { if (mailBox.hasSystemMessages) { var messages = mailBox.systemDrain() diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 2d19f7a518..fda365badb 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -93,15 +93,18 @@ class Dispatcher( executorService.get() execute invocation } catch { case e: RejectedExecutionException ⇒ - app.eventStream.publish(Warning(this, e.toString)) - throw e + try { + executorService.get() execute invocation + } catch { + case e2: RejectedExecutionException ⇒ + app.eventStream.publish(Warning(this, e2.toString)) + throw e2 + } } } protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) - protected[akka] def start {} - protected[akka] def shutdown { executorService.getAndSet(new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index b5009d1c3f..a31f4434d1 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -4,15 +4,16 @@ package akka.util -import java.util.concurrent.locks.{ ReentrantReadWriteLock, ReentrantLock } +import java.util.concurrent.locks.{ ReentrantLock } import java.util.concurrent.atomic.{ AtomicBoolean } /** * @author Jonas Bonér */ final class ReentrantGuard { - val lock = new ReentrantLock + final val lock = new ReentrantLock + @inline final def withGuard[T](body: ⇒ T): T = { lock.lock try { @@ -23,78 +24,6 @@ final class ReentrantGuard { } } -/** - * @author Jonas Bonér - */ -class ReadWriteGuard { - private val rwl = new ReentrantReadWriteLock - val readLock = rwl.readLock - val writeLock = rwl.writeLock - - def withWriteGuard[T](body: ⇒ T): T = { - writeLock.lock - try { - body - } finally { - writeLock.unlock - } - } - - def withReadGuard[T](body: ⇒ T): T = { - readLock.lock - try { - body - } finally { - readLock.unlock - } - } -} - -/** - * A very simple lock that uses CAS (Compare-And-Swap) - * Does not keep track of the owner and isn't Reentrant, so don't nest and try to stick to the if*-methods - */ -class SimpleLock(startLocked: Boolean = false) extends AtomicBoolean(startLocked) { - def ifPossible(perform: () ⇒ Unit): Boolean = { - if (tryLock()) { - try { - perform - } finally { - unlock() - } - true - } else false - } - - def ifPossibleYield[T](perform: () ⇒ T): Option[T] = { - if (tryLock()) { - try { - Some(perform()) - } finally { - unlock() - } - } else None - } - - def ifPossibleApply[T, R](value: T)(function: (T) ⇒ R): Option[R] = { - if (tryLock()) { - try { - Some(function(value)) - } finally { - unlock() - } - } else None - } - - def tryLock() = compareAndSet(false, true) - - def tryUnlock() = compareAndSet(true, false) - - def locked = get - - def unlock(): Unit = set(false) -} - /** * An atomic switch that can be either on or off */ diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index d8f5e49716..7643a0bd31 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -283,7 +283,7 @@ class TransactionLog private ( * If already closed, the call is ignored. */ def close() { - if (isOpen.switchOff) { + isOpen switchOff { EventHandler.debug(this, "Closing transaction log [%s]".format(logId)) try { if (isAsync) { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d02283bb37..7545a45f1c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic._ import akka.AkkaException import akka.actor.ActorSystem import akka.event.Logging +import locks.ReentrantReadWriteLock import org.jboss.netty.channel._ class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { @@ -354,7 +355,7 @@ class NettyRemoteSupport(_app: ActorSystem) extends RemoteSupport(_app) with Rem val clientSettings = new RemoteClientSettings(app.config, app.AkkaConfig.DefaultTimeUnit) private val remoteClients = new HashMap[RemoteAddress, RemoteClient] - private val clientsLock = new ReadWriteGuard + private val clientsLock = new ReentrantReadWriteLock protected[akka] def send(message: Any, senderOption: Option[ActorRef], @@ -392,32 +393,50 @@ class NettyRemoteSupport(_app: ActorSystem) extends RemoteSupport(_app) with Rem } } - def bindClient(remoteAddress: RemoteAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = clientsLock withWriteGuard { - if (putIfAbsent && remoteClients.contains(remoteAddress)) false - else { - client.connect() - remoteClients.put(remoteAddress, client).foreach(_.shutdown()) - true + def bindClient(remoteAddress: RemoteAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = { + clientsLock.writeLock().lock() + try { + if (putIfAbsent && remoteClients.contains(remoteAddress)) false + else { + client.connect() + remoteClients.put(remoteAddress, client).foreach(_.shutdown()) + true + } + } finally { + clientsLock.writeLock().unlock() } } - def unbindClient(remoteAddress: RemoteAddress): Unit = clientsLock withWriteGuard { - remoteClients.foreach { - case (k, v) ⇒ if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) } + def unbindClient(remoteAddress: RemoteAddress): Unit = { + clientsLock.writeLock().lock() + try { + remoteClients.foreach { case (k, v) ⇒ if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) } } + } finally { + clientsLock.writeLock().unlock() } } - def shutdownClientConnection(remoteAddress: RemoteAddress): Boolean = clientsLock withWriteGuard { - remoteClients.remove(remoteAddress) match { - case Some(client) ⇒ client.shutdown() - case None ⇒ false + def shutdownClientConnection(remoteAddress: RemoteAddress): Boolean = { + clientsLock.writeLock().lock() + try { + remoteClients.remove(remoteAddress) match { + case Some(client) ⇒ client.shutdown() + case None ⇒ false + } + } finally { + clientsLock.writeLock().unlock() } } - def restartClientConnection(remoteAddress: RemoteAddress): Boolean = clientsLock withReadGuard { - remoteClients.get(remoteAddress) match { - case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) - case None ⇒ false + def restartClientConnection(remoteAddress: RemoteAddress): Boolean = { + clientsLock.readLock().lock() + try { + remoteClients.get(remoteAddress) match { + case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) + case None ⇒ false + } + } finally { + clientsLock.readLock().unlock() } } @@ -448,9 +467,12 @@ class NettyRemoteSupport(_app: ActorSystem) extends RemoteSupport(_app) with Rem */ def shutdown(): Unit = _isRunning switchOff { - clientsLock withWriteGuard { + clientsLock.writeLock().lock() + try { remoteClients foreach { case (_, client) ⇒ client.shutdown() } remoteClients.clear() + } finally { + clientsLock.writeLock().unlock() } currentServer.getAndSet(None) foreach { _.shutdown() } } @@ -636,23 +658,29 @@ class RemoteServerHandler( } class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) { - protected val guard = new ReadWriteGuard + protected val guard = new ReentrantReadWriteLock protected val open = new AtomicBoolean(true) - override def add(channel: Channel): Boolean = guard withReadGuard { - if (open.get) { - super.add(channel) - } else { - channel.close - false + override def add(channel: Channel): Boolean = { + guard.readLock().lock() + try { + if (open.get) { + super.add(channel) + } else { + channel.close + false + } + } finally { + guard.readLock().unlock() } } - override def close(): ChannelGroupFuture = guard withWriteGuard { - if (open.getAndSet(false)) { - super.close - } else { - throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") + override def close(): ChannelGroupFuture = { + guard.writeLock().lock() + try { + if (open.getAndSet(false)) super.close else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") + } finally { + guard.writeLock().unlock() } } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index fc01045e79..32705ae58e 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -113,8 +113,6 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr case _ ⇒ None } - protected[akka] override def start() {} - protected[akka] override def shutdown() {} protected[akka] override def throughput = 0