diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 9ff7ee86fb..b57a0687e3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -350,17 +350,21 @@ abstract class ActorModelSpec extends AkkaSpec { assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) } catch { case e ⇒ - val buddies = dispatcher.asInstanceOf[BalancingDispatcher].buddies - val mq = dispatcher.asInstanceOf[BalancingDispatcher].messageQueue + dispatcher match { + case dispatcher: BalancingDispatcher ⇒ + val buddies = dispatcher.buddies + val mq = dispatcher.messageQueue - System.err.println("Buddies left: ") - buddies.toArray foreach { - case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isShutdown + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + System.err.println("Buddies left: ") + buddies.toArray foreach { + case cell: ActorCell ⇒ + System.err.println(" - " + cell.self.path + " " + cell.isShutdown + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + } + + System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") + case _ => } - System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") - throw e } assertCountDown(stopLatch, waitTime, "Expected all children to stop") diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java index 0f37dec003..80cc4c9675 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java @@ -4,15 +4,18 @@ package akka.dispatch; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import akka.util.Unsafe; -abstract class AbstractMailbox { - private volatile int _status; // not initialized because this is faster: 0 == Open - protected final static AtomicIntegerFieldUpdater updater = - AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status"); +final class AbstractMailbox { + final static long mailboxStatusOffset; + final static long systemMessageOffset; - private volatile SystemMessage _systemQueue; // not initialized because this is faster - protected final static AtomicReferenceFieldUpdater systemQueueUpdater = - AtomicReferenceFieldUpdater.newUpdater(AbstractMailbox.class, SystemMessage.class, "_systemQueue"); + static { + try { + mailboxStatusOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_status")); + systemMessageOffset = Unsafe.instance.objectFieldOffset(Mailbox.class.getDeclaredField("_systemQueue")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 95fbffe81c..2c5d43c906 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -36,11 +36,17 @@ object Mailbox { /** * @author Jonas Bonér */ -abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with MessageQueue with SystemMessageQueue with Runnable { +abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { import Mailbox._ + @volatile + protected var _status: Status = _ //0 by default + + @volatile + protected var _systemQueue: SystemMessage = _ //null by default + @inline - final def status: Mailbox.Status = AbstractMailbox.updater.get(this) + final def status: Mailbox.Status = _status @inline final def shouldProcessMessage: Boolean = (status & 3) == Open @@ -56,11 +62,11 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag @inline protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = - AbstractMailbox.updater.compareAndSet(this, oldStatus, newStatus) + Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus) + //AbstractMailbox.updater.compareAndSet(this, oldStatus, newStatus) @inline - protected final def setStatus(newStatus: Status): Unit = - AbstractMailbox.updater.set(this, newStatus) + protected final def setStatus(newStatus: Status): Unit = _status = newStatus /** * set new primary status Open. Caller does not need to worry about whether @@ -125,8 +131,9 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag /* * AtomicReferenceFieldUpdater for system queue */ - protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this) - protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) + protected final def systemQueueGet: SystemMessage = _systemQueue + protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = //AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) + Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages diff --git a/akka-actor/src/main/scala/akka/util/Unsafe.java b/akka-actor/src/main/scala/akka/util/Unsafe.java new file mode 100644 index 0000000000..4449f045be --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/Unsafe.java @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + + +package akka.util; + +import java.lang.reflect.Field; + +public final class Unsafe { + public final static sun.misc.Unsafe instance; + static { + try { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + instance = (sun.misc.Unsafe) field.get(null); + } catch(Throwable t) { + throw new ExceptionInInitializerError(t); + } + } +}