Merge with upstream + minor edits

This commit is contained in:
Viktor Klang 2012-05-21 11:00:45 +02:00
commit 22f0062f23
63 changed files with 569 additions and 710 deletions

View file

@ -19,7 +19,7 @@ object EventStreamSpec {
loglevel = INFO
event-handlers = ["akka.event.EventStreamSpec$MyLog", "%s"]
}
""".format(Logging.StandardOutLoggerName))
""".format(Logging.StandardOutLogger.getClass.getName))
val configUnhandled = ConfigFactory.parseString("""
akka {

View file

@ -7,11 +7,9 @@ package akka.pattern
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorTimeoutException
import akka.util.Duration
import akka.util.duration._
import akka.dispatch.{ Future, Promise, Await }
import java.lang.IllegalStateException
object PatternSpec {
case class Work(duration: Duration)
@ -41,13 +39,10 @@ class PatternSpec extends AkkaSpec {
Await.ready(gracefulStop(target, 1 millis), 1 second)
}
"complete Future with ActorTimeoutException when actor not terminated within timeout" in {
"complete Future with AskTimeoutException when actor not terminated within timeout" in {
val target = system.actorOf(Props[TargetActor])
target ! Work(250 millis)
val result = gracefulStop(target, 10 millis)
intercept[ActorTimeoutException] {
Await.result(result, 200 millis)
}
intercept[AskTimeoutException] { Await.result(gracefulStop(target, 10 millis), 200 millis) }
}
}

View file

@ -18,7 +18,7 @@
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package org.jboss.netty.akka.util.internal;
package akka.util.internal;
import java.util.AbstractCollection;
import java.util.AbstractMap;

View file

@ -13,12 +13,10 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.akka.util;
package akka.util.internal;
import akka.event.LoggingAdapter;
import akka.util.Duration;
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap;
import org.jboss.netty.akka.util.internal.ReusableIterator;
import java.util.*;
import java.util.concurrent.ThreadFactory;
@ -34,7 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* <h3>Tick Duration</h3>
*
* As described with 'approximated', this timer does not execute the scheduled
* {@link TimerTask} on time. {@link org.jboss.netty.akka.util.HashedWheelTimer}, on every tick, will
* {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
* check if there are any {@link TimerTask}s behind the schedule and execute
* them.
* <p>
@ -46,7 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* <h3>Ticks per Wheel (Wheel Size)</h3>
*
* {@link org.jboss.netty.akka.util.HashedWheelTimer} maintains a data structure called 'wheel'.
* {@link HashedWheelTimer} maintains a data structure called 'wheel'.
* To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
* function is 'dead line of the task'. The default number of ticks per wheel
* (i.e. the size of the wheel) is 512. You could specify a larger value
@ -54,7 +52,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* <h3>Do not create many instances.</h3>
*
* {@link org.jboss.netty.akka.util.HashedWheelTimer} creates a new thread whenever it is instantiated and
* {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
* started. Therefore, you should make sure to create only one instance and
* share it across your application. One of the common mistakes, that makes
* your application unresponsive, is to create a new instance in
@ -63,7 +61,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* <h3>Implementation Details</h3>
*
* {@link org.jboss.netty.akka.util.HashedWheelTimer} is based on
* {@link HashedWheelTimer} is based on
* <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
* Tony Lauck's paper,
* <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.akka.util.internal;
package akka.util.internal;
import java.util.Iterator;

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.akka.util.internal;
package akka.util.internal;
import java.util.regex.Pattern;

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.akka.util;
package akka.util.internal;
/**
* A handle associated with a {@link TimerTask} that is returned by a

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.akka.util;
package akka.util.internal;
import akka.util.Duration;
import java.util.Set;
@ -45,7 +45,7 @@ public interface Timer {
Timeout newTimeout(TimerTask task, Duration delay);
/**
* Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all
* Releases all resources acquired by this {@link Timer} and cancels all
* tasks which were scheduled but not executed yet.
*
* @return the handles associated with the tasks which were canceled by

View file

@ -13,11 +13,11 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.akka.util;
package akka.util.internal;
/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(org.jboss.netty.akka.util.TimerTask, long, java.util.concurrent.TimeUnit)}
* {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}
* .
*
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
@ -28,7 +28,7 @@ public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(org.jboss.netty.akka.util.TimerTask, long, java.util.concurrent.TimeUnit)}
* {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}
* .
*
* @param timeout

View file

@ -5,19 +5,26 @@
package akka
object AkkaException {
//FIXME DOC
def toStringWithStackTrace(throwable: Throwable): String = throwable match {
case null "Unknown Throwable: was 'null'"
case ae: AkkaException ae.toLongString
case e "%s:%s\n%s" format (e.getClass.getName, e.getMessage, stackTraceToString(e))
}
def stackTraceToString(throwable: Throwable): String = {
val trace = throwable.getStackTrace
val sb = new StringBuilder
for (i 0 until trace.length)
sb.append("\tat %s\n" format trace(i))
sb.toString
/**
* Returns the given Throwables stack trace as a String, or the empty String if no trace is found
* @param throwable
* @return
*/
def stackTraceToString(throwable: Throwable): String = throwable.getStackTrace match {
case null ""
case x if x.length == 0 ""
case trace
val sb = new StringBuilder
for (i 0 until trace.length)
sb.append("\tat %s\n" format trace(i))
sb.toString
}
}
@ -32,22 +39,20 @@ object AkkaException {
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
class AkkaException(message: String = "", cause: Throwable = null) extends RuntimeException(message, cause) with Serializable {
lazy val uuid = java.util.UUID.randomUUID().toString
override lazy val toString =
"%s:%s\n[%s]".format(getClass.getName, message, uuid)
lazy val toLongString =
"%s:%s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString)
def this(msg: String) = this(msg, null)
def stackTraceToString = AkkaException.stackTraceToString(this)
lazy val uuid: String = java.util.UUID.randomUUID().toString
override def toString: String = "%s:%s\n[%s]".format(getClass.getName, message, uuid)
def toLongString: String = "%s:%s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString)
def stackTraceToString: String = AkkaException.stackTraceToString(this)
}
/**
* This exception is thrown when Akka detects a problem with the provided configuration
*/
class ConfigurationException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
class ConfigurationException(message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}

View file

@ -129,12 +129,6 @@ case class ActorInitializationException private[akka] (actor: ActorRef, message:
def this(msg: String) = this(null, msg, null)
}
//FIXME: Only used by gracefulStop we should remove this if possible
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null)
extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}
/**
* InvalidMessageException is thrown when an invalid message is sent to an Actor.
* Technically it's only "null" which is an InvalidMessageException but who knows,

View file

@ -350,7 +350,7 @@ class LocalActorRefProvider(
private val tempNode = rootPath / "temp"
override def tempPath() = tempNode / tempName()
override def tempPath(): ActorPath = tempNode / tempName()
/**
* Top-level anchor for the supervision hierarchy of this actor system. Will

View file

@ -7,16 +7,15 @@ package akka.actor
import akka.event._
import akka.dispatch._
import akka.pattern.ask
import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.TimeUnit.MILLISECONDS
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
import java.io.Closeable
import akka.dispatch.Await.{ Awaitable, CanAwait }
import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import collection.immutable.Stack
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
object ActorSystem {

View file

@ -5,12 +5,13 @@
package akka.actor
import akka.util.Duration
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer, Timeout HWTimeout, Timer }
import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout HWTimeout, Timer }
import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher
import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import akka.util.internal._
//#scheduler
/**

View file

@ -93,11 +93,17 @@ import akka.japi.{ Creator }
abstract class UntypedActor extends Actor {
/**
* To be implemented by concrete UntypedActor. Defines the message handler.
* To be implemented by concrete UntypedActor, this defines the behavior of the
* UntypedActor.
*/
@throws(classOf[Exception])
def onReceive(message: Any): Unit
/**
* Returns this UntypedActor's UntypedActorContext
* The UntypedActorContext is not thread safe so do not expose it outside of the
* UntypedActor.
*/
def getContext(): UntypedActorContext = context.asInstanceOf[UntypedActorContext]
/**
@ -150,9 +156,7 @@ abstract class UntypedActor extends Actor {
*/
override def postRestart(reason: Throwable): Unit = super.postRestart(reason)
final protected def receive = {
case msg onReceive(msg)
}
final protected def receive = { case msg onReceive(msg) }
}
/**

View file

@ -87,7 +87,7 @@ class BalancingDispatcher(
@tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit =
if (messageQueue.hasMessages
&& i.hasNext
&& (executorService.get().executor match {
&& (executorService match {
case lm: LoadMetrics lm.atFullThrottle == false
case other true
})

View file

@ -33,16 +33,20 @@ class Dispatcher(
val shutdownTimeout: Duration)
extends MessageDispatcher(_prerequisites) {
protected val executorServiceFactory: ExecutorServiceFactory =
executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)
private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate {
lazy val executor: ExecutorService = factory.createExecutorService
def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory)
}
protected val executorService = new AtomicReference[ExecutorServiceDelegate](
new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService })
@volatile private var executorServiceDelegate: LazyExecutorServiceDelegate =
new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory))
protected final def executorService: ExecutorService = executorServiceDelegate
/**
* INTERNAL USE ONLY
*/
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
@ -51,7 +55,7 @@ class Dispatcher(
/**
* INTERNAL USE ONLY
*/
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) = {
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
val mbox = receiver.mailbox
mbox.systemEnqueue(receiver.self, invocation)
registerForExecution(mbox, false, true)
@ -62,11 +66,11 @@ class Dispatcher(
*/
protected[akka] def executeTask(invocation: TaskInvocation) {
try {
executorService.get() execute invocation
executorService execute invocation
} catch {
case e: RejectedExecutionException
try {
executorService.get() execute invocation
executorService execute invocation
} catch {
case e2: RejectedExecutionException
prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!"))
@ -83,10 +87,15 @@ class Dispatcher(
/**
* INTERNAL USE ONLY
*/
protected[akka] def shutdown: Unit =
Option(executorService.getAndSet(new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService
})) foreach { _.shutdown() }
protected[akka] def shutdown: Unit = {
val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy
val es = synchronized { // FIXME getAndSet using ARFU or Unsafe
val service = executorServiceDelegate
executorServiceDelegate = newDelegate // just a quick getAndSet
service
}
es.shutdown()
}
/**
* Returns if it was registered
@ -97,12 +106,12 @@ class Dispatcher(
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.get() execute mbox
executorService execute mbox
true
} catch {
case e: RejectedExecutionException
try {
executorService.get() execute mbox
executorService execute mbox
true
} catch { //Retry once
case e: RejectedExecutionException
@ -115,7 +124,7 @@ class Dispatcher(
} else false
}
override val toString = Logging.simpleName(this) + "[" + id + "]"
override val toString: String = Logging.simpleName(this) + "[" + id + "]"
}
object PriorityGenerator {

View file

@ -97,6 +97,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
}
}
//INTERNAL API
private def config(id: String): Config = {
import scala.collection.JavaConverters._
def simpleName = id.substring(id.lastIndexOf('.') + 1)
@ -106,6 +107,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
.withFallback(defaultDispatcherConfig)
}
//INTERNAL API
private def idConfig(id: String): Config = {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(Map("id" -> id).asJava)
@ -123,9 +125,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
*
* INTERNAL USE ONLY
*/
private[akka] def from(cfg: Config): MessageDispatcher = {
configuratorFrom(cfg).dispatcher()
}
private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher()
/**
* Creates a MessageDispatcherConfigurator from a Config.

View file

@ -20,7 +20,7 @@ import akka.actor.ActorSystem
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] object Mailbox {
@ -46,6 +46,7 @@ private[akka] object Mailbox {
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
* INTERNAL API
*/
private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
@ -253,6 +254,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
/**
* A MessageQueue is one of the core components in forming an Akka Mailbox.
* The MessageQueue is where the normal messages that are sent to Actors will be enqueued (and subsequently dequeued)
* It needs to atleast support N producers and 1 consumer thread-safely.
*/
trait MessageQueue {
/**
@ -336,7 +338,7 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
}
/**
* A QueueBasedMessageQueue is a MessageQueue which is backed by a java.util.Queue
* A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue
*/
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
@ -354,7 +356,8 @@ trait QueueBasedMessageQueue extends MessageQueue {
}
/**
* UnboundedMessageQueueSemantics adds the enqueue/dequeue operations for unbounded java.util.Queues
* UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue,
* i.e. a non-blocking enqueue and dequeue.
*/
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
@ -362,8 +365,8 @@ trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
}
/**
* BoundedMessageQueueSemantics adds the enqueue/dequeue operations for bounded java.util.Queues,
* and it also forces the java.util.Queue to extend java.util.BlockingQueue
* BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue,
* i.e. blocking enqueue with timeout
*/
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
@ -381,7 +384,7 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
}
/**
* DequeBasedMessageQueue forces the underlying java.util.Queue extend java.util.Deque
* DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java.util.Deque
*/
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue: Deque[Envelope]
@ -389,7 +392,8 @@ trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
}
/**
* UnboundedMessageQueueSemantics adds the enqueue/dequeue operations for unbounded java.util.Deque
* UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue,
* i.e. a non-blocking enqueue and dequeue.
*/
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
@ -398,8 +402,8 @@ trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
}
/**
* BoundedMessageQueueSemantics adds the enqueue/dequeue operations for bounded java.util.Deque,
* and it also forces the java.util.Queue to extend java.util.BlockingQueue
* BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue,
* i.e. blocking enqueue with timeout
*/
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def pushTimeOut: Duration
@ -423,14 +427,14 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
}
/**
* MailboxType is used to construct a Messagequeue given an optional ActorContext owner.
* MailboxType is a factory to create MessageQueues for an optionally provided ActorContext
*/
trait MailboxType {
def create(owner: Option[ActorContext]): MessageQueue
}
/**
* UnboundedMailbox is the standard issue Akka Mailbox as it is unbounded and has quite good performance
* UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
*/
case class UnboundedMailbox() extends MailboxType {
@ -443,7 +447,7 @@ case class UnboundedMailbox() extends MailboxType {
}
/**
* BoundedMailbox is the default bounded mailbox
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
*/
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
@ -461,17 +465,20 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
}
/**
* Extend me to provide the comparator
* UnboundedPriorityMailbox is an unbounded mailbox that allows for priorization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorContext]): MessageQueue =
new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
/**
* Extend me to provide the comparator
* BoundedPriorityMailbox is a bounded mailbox that allows for priorization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
@ -486,7 +493,7 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap
}
/**
* This is the default mailbox for Deques, which is unbounded
* UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque.
*/
case class UnboundedDequeBasedMailbox() extends MailboxType {
@ -499,7 +506,7 @@ case class UnboundedDequeBasedMailbox() extends MailboxType {
}
/**
* This is the default mailbox for Deques, which is bounded
* BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque.
*/
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {

View file

@ -12,7 +12,7 @@ import akka.actor._
* A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down
* See LocalDeathWatch for semantics
*/
trait DeathWatch extends ActorEventBus with ActorClassifier {
abstract class DeathWatch extends ActorEventBus with ActorClassifier {
type Event = Terminated
protected final def classify(event: Event): Classifier = event.actor

View file

@ -182,10 +182,9 @@ trait SubchannelClassification { this: EventBus ⇒
*/
trait ScanningClassification { self: EventBus
protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] {
def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = {
val cM = compareClassifiers(a._1, b._1)
if (cM != 0) cM
else compareSubscribers(a._2, b._2)
def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = compareClassifiers(a._1, b._1) match {
case 0 compareSubscribers(a._2, b._2)
case other other
}
})
@ -238,7 +237,7 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
private val empty = TreeSet.empty[ActorRef]
protected val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize)
private val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize)
@tailrec
protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = {
@ -320,9 +319,9 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒
*/
protected def mapSize: Int
def publish(event: Event): Unit = {
val receivers = mappings.get(classify(event))
if (receivers ne null) receivers foreach { _ ! event }
def publish(event: Event): Unit = mappings.get(classify(event)) match {
case null ()
case some some foreach { _ ! event }
}
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber)

View file

@ -29,7 +29,7 @@ trait LoggingBus extends ActorEventBus {
import Logging._
private val guard = new ReentrantGuard
private val guard = new ReentrantGuard //Switch to ReentrantReadWrite
private var loggers = Seq.empty[ActorRef]
private var _logLevel: LogLevel = _
@ -97,7 +97,7 @@ trait LoggingBus extends ActorEventBus {
val myloggers =
for {
loggerName defaultLoggers
if loggerName != StandardOutLoggerName
if loggerName != StandardOutLogger.getClass.getName
} yield {
try {
system.dynamicAccess.getClassFor[Actor](loggerName) match {
@ -129,7 +129,7 @@ trait LoggingBus extends ActorEventBus {
case _: InvalidActorNameException // ignore if it is already running
}
publish(Debug(logName, this.getClass, "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLoggerName)) {
if (!(defaultLoggers contains StandardOutLogger.getClass.getName)) {
unsubscribe(StandardOutLogger)
}
} catch {
@ -163,6 +163,9 @@ trait LoggingBus extends ActorEventBus {
publish(Debug(simpleName(this), this.getClass, "all default loggers stopped"))
}
/**
* INTERNAL API
*/
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name)
@ -361,17 +364,33 @@ object LogSource {
*/
object Logging {
/**
* Returns a 'safe' getSimpleName for the provided object's Class
* @param obj
* @return the simple name of the given object's Class
*/
def simpleName(obj: AnyRef): String = simpleName(obj.getClass)
/**
* Returns a 'safe' getSimpleName for the provided Class
* @param obj
* @return the simple name of the given Class
*/
def simpleName(clazz: Class[_]): String = {
val n = clazz.getName
val i = n.lastIndexOf('.')
n.substring(i + 1)
}
object Extension extends ExtensionKey[LogExt]
/**
* INTERNAL API
*/
private[akka] object Extension extends ExtensionKey[LogExt]
class LogExt(system: ExtendedActorSystem) extends Extension {
/**
* INTERNAL API
*/
private[akka] class LogExt(system: ExtendedActorSystem) extends Extension {
private val loggerId = new AtomicInteger
def id() = loggerId.incrementAndGet()
}
@ -431,12 +450,6 @@ object Logging {
// these type ascriptions/casts are necessary to avoid CCEs during construction while retaining correct type
val AllLogLevels = Seq(ErrorLevel: AnyRef, WarningLevel, InfoLevel, DebugLevel).asInstanceOf[Seq[LogLevel]]
val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern
val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern
val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern
val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern
/**
* Obtain LoggingAdapter for the given actor system and source object. This
* will use the systems event stream and include the systems address in the
@ -624,27 +637,34 @@ object Logging {
// weird return type due to binary compatibility
def loggerInitialized(): LoggerInitialized.type = LoggerInitialized
/**
* LoggerInitializationException is thrown to indicate that there was a problem initializing a logger
* @param msg
*/
class LoggerInitializationException(msg: String) extends AkkaException(msg)
trait StdOutLogger {
import java.text.SimpleDateFormat
import java.util.Date
val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S")
private val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S")
private val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
private val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern
private val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern
private val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern
private val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern
def timestamp = dateFormat.format(new Date)
def timestamp(): String = synchronized { dateFormat.format(new Date) } // SDF isn't threadsafe
def print(event: Any) {
event match {
case e: Error error(e)
case e: Warning warning(e)
case e: Info info(e)
case e: Debug debug(e)
case e warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e))
}
def print(event: Any): Unit = event match {
case e: Error error(e)
case e: Warning warning(e)
case e: Info info(e)
case e: Debug debug(e)
case e warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e))
}
def error(event: Error) = {
def error(event: Error): Unit = {
val f = if (event.cause == Error.NoCause) errorFormatWithoutCause else errorFormat
println(f.format(
timestamp,
@ -654,21 +674,21 @@ object Logging {
stackTraceFor(event.cause)))
}
def warning(event: Warning) =
def warning(event: Warning): Unit =
println(warningFormat.format(
timestamp,
event.thread.getName,
event.logSource,
event.message))
def info(event: Info) =
def info(event: Info): Unit =
println(infoFormat.format(
timestamp,
event.thread.getName,
event.logSource,
event.message))
def debug(event: Debug) =
def debug(event: Debug): Unit =
println(debugFormat.format(
timestamp,
event.thread.getName,
@ -689,8 +709,8 @@ object Logging {
override val toString = "StandardOutLogger"
override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message)
}
val StandardOutLogger = new StandardOutLogger
val StandardOutLoggerName = StandardOutLogger.getClass.getName
/**
* Actor wrapper around the standard output logger. If
@ -708,7 +728,7 @@ object Logging {
* Returns the StackTrace for the given Throwable as a String
*/
def stackTraceFor(e: Throwable): String = e match {
case null | Error.NoCause ""
case null | Error.NoCause | _: NoStackTrace ""
case other
val sw = new java.io.StringWriter
val pw = new java.io.PrintWriter(sw)
@ -752,51 +772,51 @@ trait LoggingAdapter {
* These actually implement the passing on of the messages to be logged.
* Will not be called if is...Enabled returned false.
*/
protected def notifyError(message: String)
protected def notifyError(cause: Throwable, message: String)
protected def notifyWarning(message: String)
protected def notifyInfo(message: String)
protected def notifyDebug(message: String)
protected def notifyError(message: String): Unit
protected def notifyError(cause: Throwable, message: String): Unit
protected def notifyWarning(message: String): Unit
protected def notifyInfo(message: String): Unit
protected def notifyDebug(message: String): Unit
/*
* The rest is just the widening of the API for the user's convenience.
*/
def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format1(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(cause: Throwable, message: String): Unit = { if (isErrorEnabled) notifyError(cause, message) }
def error(cause: Throwable, template: String, arg1: Any): Unit = { if (isErrorEnabled) notifyError(cause, format1(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(message: String) { if (isErrorEnabled) notifyError(message) }
def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format1(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) }
def error(message: String): Unit = { if (isErrorEnabled) notifyError(message) }
def error(template: String, arg1: Any): Unit = { if (isErrorEnabled) notifyError(format1(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any): Unit = { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) }
def warning(message: String) { if (isWarningEnabled) notifyWarning(message) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format1(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) }
def warning(message: String): Unit = { if (isWarningEnabled) notifyWarning(message) }
def warning(template: String, arg1: Any): Unit = { if (isWarningEnabled) notifyWarning(format1(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any): Unit = { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) }
def info(message: String) { if (isInfoEnabled) notifyInfo(message) }
def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format1(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) }
def info(template: String, arg1: Any): Unit = { if (isInfoEnabled) notifyInfo(format1(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any): Unit = { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) }
def debug(message: String) { if (isDebugEnabled) notifyDebug(message) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format1(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) }
def debug(template: String, arg1: Any): Unit = { if (isDebugEnabled) notifyDebug(format1(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any): Unit = { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) }
def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) }
def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format1(template, arg1)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) }
def log(level: Logging.LogLevel, template: String, arg1: Any): Unit = { if (isEnabled(level)) notifyLog(level, format1(template, arg1)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any): Unit = { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) }
final def isEnabled(level: Logging.LogLevel): Boolean = level match {
case Logging.ErrorLevel isErrorEnabled
@ -812,14 +832,14 @@ trait LoggingAdapter {
case Logging.DebugLevel if (isDebugEnabled) notifyDebug(message)
}
private def format1(t: String, arg: Any) = arg match {
private def format1(t: String, arg: Any): String = arg match {
case a: Array[_] if !a.getClass.getComponentType.isPrimitive format(t, a: _*)
case a: Array[_] format(t, (a map (_.asInstanceOf[AnyRef]): _*))
case x format(t, x)
}
def format(t: String, arg: Any*) = {
val sb = new StringBuilder
def format(t: String, arg: Any*): String = {
val sb = new StringBuilder //FIXME add some decent size hint here
var p = 0
var rest = t
while (p < arg.length) {
@ -829,17 +849,15 @@ trait LoggingAdapter {
rest = ""
p = arg.length
} else {
sb.append(rest.substring(0, index))
sb.append(arg(p))
sb.append(rest.substring(0, index)).append(arg(p))
rest = rest.substring(index + 2)
p += 1
}
}
sb.append(rest)
sb.toString
sb.append(rest).toString
}
}
//FIXME DOCUMENT
class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter {
import Logging._
@ -849,14 +867,9 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class
def isInfoEnabled = bus.logLevel >= InfoLevel
def isDebugEnabled = bus.logLevel >= DebugLevel
protected def notifyError(message: String) { bus.publish(Error(logSource, logClass, message)) }
protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, logClass, message)) }
protected def notifyWarning(message: String) { bus.publish(Warning(logSource, logClass, message)) }
protected def notifyInfo(message: String) { bus.publish(Info(logSource, logClass, message)) }
protected def notifyDebug(message: String) { bus.publish(Debug(logSource, logClass, message)) }
protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, message))
protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, message))
protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, message))
protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message))
protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message))
}

View file

@ -26,9 +26,7 @@ object LoggingReceive {
*/
def apply(r: Receive)(implicit context: ActorContext): Receive = r match {
case _: LoggingReceive r
case _
if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r)
else r
case _ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) else r
}
}
@ -37,7 +35,7 @@ object LoggingReceive {
* @param source the log source, if not defined the actor of the context will be used
*/
class LoggingReceive(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) extends Receive {
def isDefinedAt(o: Any) = {
def isDefinedAt(o: Any): Boolean = {
val handled = r.isDefinedAt(o)
val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor)
context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o))

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import annotation.target._
/**
* This annotation marks a feature which is not yet considered stable and may
* change or be removed in a future release.
*
* @since 1.2
*/
@getter
@setter
@beanGetter
@beanSetter
final class experimental(since: String) extends annotation.StaticAnnotation

View file

@ -24,28 +24,14 @@ trait Function2[T1, T2, R] {
* A Procedure is like a Function, but it doesn't produce a return value.
*/
trait Procedure[T] {
def apply(param: T)
}
/**
* A Procedure is like a Function, but it doesn't produce a return value.
*/
trait Procedure2[T1, T2] {
def apply(param: T1, param2: T2)
}
/**
* An executable piece of code that takes no parameters and doesn't return any value.
*/
trait SideEffect {
def apply()
def apply(param: T): Unit
}
/**
* An executable piece of code that takes no parameters and doesn't return any value.
*/
trait Effect {
def apply()
def apply(): Unit
}
/**
@ -67,9 +53,9 @@ sealed abstract class Option[A] extends java.lang.Iterable[A] {
def get: A
def isEmpty: Boolean
def isDefined = !isEmpty
def isDefined: Boolean = !isEmpty
def asScala: scala.Option[A]
def iterator = if (isEmpty) Iterator.empty else Iterator.single(get)
def iterator: java.util.Iterator[A] = if (isEmpty) Iterator.empty else Iterator.single(get)
}
object Option {
@ -102,18 +88,18 @@ object Option {
* <code>A</code>.
*/
final case class Some[A](v: A) extends Option[A] {
def get = v
def isEmpty = false
def asScala = scala.Some(v)
def get: A = v
def isEmpty: Boolean = false
def asScala: scala.Some[A] = scala.Some(v)
}
/**
* This case object represents non-existent values.
*/
private case object None extends Option[Nothing] {
def get = throw new NoSuchElementException("None.get")
def isEmpty = true
def asScala = scala.None
def get: Nothing = throw new NoSuchElementException("None.get")
def isEmpty: Boolean = true
def asScala: scala.None.type = scala.None
}
implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala

View file

@ -46,7 +46,7 @@ trait AskSupport {
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
@ -96,7 +96,7 @@ trait AskSupport {
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
@ -126,7 +126,7 @@ trait AskSupport {
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
@ -157,6 +157,8 @@ trait AskSupport {
/**
* Akka private optimized representation of the temporary actor spawned to
* receive the reply to an "ask" operation.
*
* INTERNAL API
*/
private[akka] final class PromiseActorRef private (val provider: ActorRefProvider, val result: Promise[Any])
extends MinimalActorRef {
@ -182,14 +184,12 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@inline
private def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
@inline
private def setState(newState: AnyRef): Unit =
Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
private def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
override def getParent = provider.tempContainer
override def getParent: InternalActorRef = provider.tempContainer
/**
* Contract of this method:
@ -234,7 +234,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
case _
}
override def isTerminated = state match {
override def isTerminated: Boolean = state match {
case Stopped | _: StoppedWithPath true
case _ false
}
@ -263,6 +263,9 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
}
}
/**
* INTERNAL API
*/
private[akka] object PromiseActorRef {
private case object Registering
private case object Stopped
@ -272,9 +275,7 @@ private[akka] object PromiseActorRef {
val result = Promise[Any]()(provider.dispatcher)
val a = new PromiseActorRef(provider, result)
val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) }
result onComplete { _
try a.stop() finally f.cancel()
}
result onComplete { _ try a.stop() finally f.cancel() }
a
}
}

View file

@ -4,9 +4,9 @@
package akka.pattern
import akka.actor.{ ActorRef, Actor, ActorSystem, Props, PoisonPill, Terminated, ReceiveTimeout, ActorTimeoutException }
import akka.dispatch.{ Promise, Future }
import akka.util.Duration
import akka.actor._
import akka.util.{ Timeout, Duration }
trait GracefulStopSupport {
/**
@ -14,34 +14,26 @@ trait GracefulStopSupport {
* existing messages of the target actor has been processed and the actor has been
* terminated.
*
* Useful when you need to wait for termination or compose ordered termination of several actors.
* Useful when you need to wait for termination or compose ordered termination of several actors,
* which should only be done outside of the ActorSystem as blocking inside Actors is discouraged.
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* is completed with failure [[akka.actor.ActorTimeoutException]].
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) {
Promise.successful(true)
} else {
val result = Promise[Boolean]()
system.actorOf(Props(new Actor {
// Terminated will be received when target has been stopped
context watch target
} else system match {
case e: ExtendedActorSystem
val ref = PromiseActorRef(e.provider, Timeout(timeout))
e.deathWatch.subscribe(ref, target)
ref.result onComplete {
case Right(Terminated(`target`)) () // Ignore
case _ e.deathWatch.unsubscribe(ref, target)
} // Just making sure we're not leaking here
target ! PoisonPill
// ReceiveTimeout will be received if nothing else is received within the timeout
context setReceiveTimeout timeout
def receive = {
case Terminated(a) if a == target
result success true
context stop self
case ReceiveTimeout
result failure new ActorTimeoutException(
"Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))
context stop self
}
}))
result
ref.result map { case Terminated(`target`) true }
case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")
}
}
}

View file

@ -18,7 +18,7 @@ object Patterns {
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
@ -49,7 +49,7 @@ object Patterns {
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
* holding the eventual reply message; this means that the target actor
* needs to send the result to the `sender` reference provided. The Future
* will be completed with an [[akka.actor.AskTimeoutException]] after the
* will be completed with an [[akka.pattern.AskTimeoutException]] after the
* given timeout has expired; this is independent from any timeout applied
* while awaiting a result for this future (i.e. in
* `Await.result(..., timeout)`).
@ -100,7 +100,7 @@ object Patterns {
* Useful when you need to wait for termination or compose ordered termination of several actors.
*
* If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]]
* is completed with failure [[akka.actor.ActorTimeoutException]].
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] =
scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]]

View file

@ -10,10 +10,8 @@ import akka.actor._
* An Iterable that also contains a version.
*/
trait VersionedIterable[A] {
val version: Long
def version: Long
def iterable: Iterable[A]
def apply(): Iterable[A] = iterable
}
@ -42,7 +40,7 @@ trait ConnectionManager {
/**
* Shuts the connection manager down, which stops all managed actors
*/
def shutdown()
def shutdown(): Unit
/**
* Returns a VersionedIterator containing all connected ActorRefs at some moment in time. Since there is
@ -59,5 +57,5 @@ trait ConnectionManager {
*
* @param ref the dead
*/
def remove(deadRef: ActorRef)
def remove(deadRef: ActorRef): Unit
}

View file

@ -23,7 +23,7 @@ class ConsistentHash[T](nodes: Seq[T], replicas: Int) {
nodes.foreach(this += _)
def +=(node: T) {
def +=(node: T): Unit = {
cluster += node
(1 to replicas) foreach { replica
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
@ -32,7 +32,7 @@ class ConsistentHash[T](nodes: Seq[T], replicas: Int) {
}
}
def -=(node: T) {
def -=(node: T): Unit = {
cluster -= node
(1 to replicas) foreach { replica
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
@ -96,7 +96,7 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T
private var hashvalue = h
/** Begin a new hash using the same seed. */
def reset() {
def reset(): Unit = {
h = startHash(seed)
c = hiddenMagicA
k = hiddenMagicB
@ -104,7 +104,7 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T
}
/** Incorporate the hash value of one item. */
def apply(t: T) {
def apply(t: T): Unit = {
h = extendHash(h, t.##, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
@ -112,7 +112,7 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T
}
/** Incorporate a known hash value. */
def append(i: Int) {
def append(i: Int): Unit = {
h = extendHash(h, i, c, k)
c = nextMagicA(c)
k = nextMagicB(k)
@ -120,14 +120,15 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T
}
/** Retrieve the hash value */
def hash = {
def hash: Int = {
if (!hashed) {
hashvalue = finalizeHash(h)
hashed = true
}
hashvalue
}
override def hashCode = hash
override def hashCode: Int = hash
}
/**
@ -143,35 +144,35 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T
object MurmurHash {
// Magic values used for MurmurHash's 32 bit hash.
// Don't change these without consulting a hashing expert!
final private val visibleMagic = 0x971e137b
final private val hiddenMagicA = 0x95543787
final private val hiddenMagicB = 0x2ad7eb25
final private val visibleMixer = 0x52dce729
final private val hiddenMixerA = 0x7b7d159c
final private val hiddenMixerB = 0x6bce6396
final private val finalMixer1 = 0x85ebca6b
final private val finalMixer2 = 0xc2b2ae35
final private val visibleMagic: Int = 0x971e137b
final private val hiddenMagicA: Int = 0x95543787
final private val hiddenMagicB: Int = 0x2ad7eb25
final private val visibleMixer: Int = 0x52dce729
final private val hiddenMixerA: Int = 0x7b7d159c
final private val hiddenMixerB: Int = 0x6bce6396
final private val finalMixer1: Int = 0x85ebca6b
final private val finalMixer2: Int = 0xc2b2ae35
// Arbitrary values used for hashing certain classes
final private val seedString = 0xf7ca7fd2
final private val seedArray = 0x3c074a61
final private val seedString: Int = 0xf7ca7fd2
final private val seedArray: Int = 0x3c074a61
/** The first 23 magic integers from the first stream are stored here */
val storedMagicA =
val storedMagicA: Array[Int] =
Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray
/** The first 23 magic integers from the second stream are stored here */
val storedMagicB =
val storedMagicB: Array[Int] =
Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray
/** Begin a new hash with a seed value. */
def startHash(seed: Int) = seed ^ visibleMagic
def startHash(seed: Int): Int = seed ^ visibleMagic
/** The initial magic integers in the first stream. */
def startMagicA = hiddenMagicA
def startMagicA: Int = hiddenMagicA
/** The initial magic integer in the second stream. */
def startMagicB = hiddenMagicB
def startMagicB: Int = hiddenMagicB
/**
* Incorporates a new value into an existing hash.
@ -182,18 +183,17 @@ object MurmurHash {
* @param magicB a magic integer from a different stream
* @return the updated hash value
*/
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int) = {
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int =
(hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer
}
/** Given a magic integer from the first stream, compute the next */
def nextMagicA(magicA: Int) = magicA * 5 + hiddenMixerA
def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA
/** Given a magic integer from the second stream, compute the next */
def nextMagicB(magicB: Int) = magicB * 5 + hiddenMixerB
def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB
/** Once all hashes have been incorporated, this performs a final mixing */
def finalizeHash(hash: Int) = {
def finalizeHash(hash: Int): Int = {
var i = (hash ^ (hash >>> 16))
i *= finalMixer1
i ^= (i >>> 13)
@ -203,7 +203,7 @@ object MurmurHash {
}
/** Compute a high-quality hash of an array */
def arrayHash[@specialized T](a: Array[T]) = {
def arrayHash[@specialized T](a: Array[T]): Int = {
var h = startHash(a.length * seedArray)
var c = hiddenMagicA
var k = hiddenMagicB
@ -218,7 +218,7 @@ object MurmurHash {
}
/** Compute a high-quality hash of a string */
def stringHash(s: String) = {
def stringHash(s: String): Int = {
var h = startHash(s.length * seedString)
var c = hiddenMagicA
var k = hiddenMagicB
@ -239,7 +239,7 @@ object MurmurHash {
* where the order of appearance of elements does not matter.
* This is useful for hashing sets, for example.
*/
def symmetricHash[T](xs: TraversableOnce[T], seed: Int) = {
def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = {
var a, b, n = 0
var c = 1
xs.foreach(i {

View file

@ -5,8 +5,7 @@
package akka.routing
import akka.actor.{ Actor, ActorRef }
import java.util.concurrent.ConcurrentSkipListSet
import scala.collection.JavaConversions._
import java.util.{ Set, TreeSet }
sealed trait ListenerMessage
case class Listen(listener: ActorRef) extends ListenerMessage
@ -25,13 +24,29 @@ case class WithListeners(f: (ActorRef) ⇒ Unit) extends ListenerMessage
* Send <code>WithListeners(fun)</code> to traverse the current listeners.
*/
trait Listeners { self: Actor
protected val listeners = new ConcurrentSkipListSet[ActorRef]
protected val listeners: Set[ActorRef] = new TreeSet[ActorRef]
/**
* Chain this into the receive function.
*
* {{ def receive = listenerManagement orElse }}
*/
protected def listenerManagement: Actor.Receive = {
case Listen(l) listeners add l
case Deafen(l) listeners remove l
case WithListeners(f) listeners foreach f
case Listen(l) listeners add l
case Deafen(l) listeners remove l
case WithListeners(f)
val i = listeners.iterator
while (i.hasNext) f(i.next)
}
protected def gossip(msg: Any) = listeners foreach (_ ! msg)
/**
* Sends the supplied message to all current listeners using the provided sender as sender.
*
* @param msg
* @param sender
*/
protected def gossip(msg: Any)(implicit sender: ActorRef = null): Unit = {
val i = listeners.iterator
while (i.hasNext) i.next ! msg
}
}

View file

@ -8,12 +8,10 @@ import akka.util.Duration
import akka.util.duration._
import akka.ConfigurationException
import akka.pattern.pipe
import akka.pattern.AskSupport
import com.typesafe.config.Config
import scala.collection.JavaConversions.iterableAsScalaIterable
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import akka.jsr166y.ThreadLocalRandom
import akka.util.Unsafe
import akka.dispatch.Dispatchers
@ -49,12 +47,11 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
ref: InternalActorRef,
props: Props,
supervisor: InternalActorRef,
receiveTimeout: Option[Duration]): ActorCell =
{
val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout)
Unsafe.instance.monitorEnter(cell)
cell
}
receiveTimeout: Option[Duration]): ActorCell = {
val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout)
Unsafe.instance.monitorEnter(cell)
cell
}
private[akka] val routerConfig = _props.routerConfig
private[akka] val routeeProps = _props.copy(routerConfig = NoRouter)
@ -305,8 +302,8 @@ trait Router extends Actor {
final def receive = ({
case Router.Resize
try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider))
finally assert(ref.resizeInProgress.getAndSet(false))
val ab = ref.resizeInProgress
if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) finally ab.set(false)
case Terminated(child)
ref.removeRoutees(IndexedSeq(child))
@ -321,6 +318,9 @@ trait Router extends Actor {
}
}
/**
* INTERNAL API
*/
private object Router {
case object Resize
@ -374,9 +374,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
abstract class NoRouter extends RouterConfig
case object NoRouter extends NoRouter {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null // FIXME, null, really??
def routerDispatcher: String = ""
def supervisorStrategy = null
def supervisorStrategy = null // FIXME null, really??
override def withFallback(other: RouterConfig): RouterConfig = other
/**
@ -406,9 +406,7 @@ case object FromConfig extends FromConfig {
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig
with Product
with Serializable
with Equals {
with Serializable {
def this() = this(Dispatchers.DefaultDispatcherId)
@ -416,38 +414,6 @@ class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)")
def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy
// open-coded case class to preserve binary compatibility, all deprecated for 2.1
@deprecated("FromConfig does not make sense as case class", "2.0.1")
override def productPrefix = "FromConfig"
@deprecated("FromConfig does not make sense as case class", "2.0.1")
def productArity = 1
@deprecated("FromConfig does not make sense as case class", "2.0.1")
def productElement(x: Int) = x match {
case 0 routerDispatcher
case _ throw new IndexOutOfBoundsException(x.toString)
}
@deprecated("FromConfig does not make sense as case class", "2.0.1")
def copy(d: String = Dispatchers.DefaultDispatcherId): FromConfig = new FromConfig(d)
@deprecated("FromConfig does not make sense as case class", "2.0.1")
def canEqual(o: Any) = o.isInstanceOf[FromConfig]
@deprecated("FromConfig does not make sense as case class", "2.0.1")
override def hashCode = ScalaRunTime._hashCode(this)
@deprecated("FromConfig does not make sense as case class", "2.0.1")
override def toString = "FromConfig(" + routerDispatcher + ")"
@deprecated("FromConfig does not make sense as case class", "2.0.1")
override def equals(other: Any): Boolean = other match {
case FromConfig(x) x == routerDispatcher
case _ false
}
}
object RoundRobinRouter {
@ -512,9 +478,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = {
this(nrOfInstances = nr)
}
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
@ -522,9 +486,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths))
/**
* Constructor that sets the resizer to be used.
@ -535,13 +497,13 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
def withDispatcher(dispatcherId: String): RoundRobinRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
def withSupervisorStrategy(strategy: SupervisorStrategy): RoundRobinRouter = copy(supervisorStrategy = strategy)
}
trait RoundRobinLike { this: RouterConfig
@ -632,9 +594,7 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = {
this(nrOfInstances = nr)
}
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
@ -642,9 +602,7 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths))
/**
* Constructor that sets the resizer to be used.
@ -655,13 +613,13 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
def withDispatcher(dispatcherId: String): RandomRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
def withSupervisorStrategy(strategy: SupervisorStrategy): RandomRouter = copy(supervisorStrategy = strategy)
}
trait RandomLike { this: RouterConfig
@ -758,9 +716,7 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = {
this(nrOfInstances = nr)
}
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
@ -768,9 +724,7 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths))
/**
* Constructor that sets the resizer to be used.
@ -781,19 +735,16 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
def withDispatcher(dispatcherId: String): SmallestMailboxRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
def withSupervisorStrategy(strategy: SupervisorStrategy): SmallestMailboxRouter = copy(supervisorStrategy = strategy)
}
trait SmallestMailboxLike { this: RouterConfig
import java.security.SecureRandom
def nrOfInstances: Int
def routees: Iterable[String]
@ -956,9 +907,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int) = {
this(nrOfInstances = nr)
}
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Constructor that sets the routees to be used.
@ -966,9 +915,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = {
this(routees = iterableAsScalaIterable(routeePaths))
}
def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths))
/**
* Constructor that sets the resizer to be used.
@ -979,13 +926,13 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
def withDispatcher(dispatcherId: String): BroadcastRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy)
def withSupervisorStrategy(strategy: SupervisorStrategy): BroadcastRouter = copy(supervisorStrategy = strategy)
}
trait BroadcastLike { this: RouterConfig
@ -1071,9 +1018,7 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* Constructor that sets nrOfInstances to be created.
* Java API
*/
def this(nr: Int, w: Duration) = {
this(nrOfInstances = nr, within = w)
}
def this(nr: Int, w: Duration) = this(nrOfInstances = nr, within = w)
/**
* Constructor that sets the routees to be used.
@ -1081,9 +1026,8 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String], w: Duration) = {
def this(routeePaths: java.lang.Iterable[String], w: Duration) =
this(routees = iterableAsScalaIterable(routeePaths), within = w)
}
/**
* Constructor that sets the resizer to be used.
@ -1152,7 +1096,7 @@ trait Resizer {
* This method is invoked only in the context of the Router actor in order to safely
* create/stop children.
*/
def resize(props: Props, routeeProvider: RouteeProvider)
def resize(props: Props, routeeProvider: RouteeProvider): Unit
}
case object DefaultResizer {
@ -1168,6 +1112,7 @@ case object DefaultResizer {
messagesPerResize = resizerConfig.getInt("messages-per-resize"))
}
//FIXME DOCUMENT ME
case class DefaultResizer(
/**
* The fewest number of routees the router should ever have.
@ -1242,7 +1187,7 @@ case class DefaultResizer(
def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0)
def resize(props: Props, routeeProvider: RouteeProvider) {
def resize(props: Props, routeeProvider: RouteeProvider): Unit = {
val currentRoutees = routeeProvider.routees
val requestedCapacity = capacity(currentRoutees)
@ -1260,7 +1205,7 @@ case class DefaultResizer(
* Give concurrent messages a chance to be placed in mailbox before
* sending PoisonPill.
*/
protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]) {
protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]): Unit = {
if (abandon.nonEmpty) {
if (stopDelay <= Duration.Zero) {
abandon foreach (_ ! PoisonPill)
@ -1329,9 +1274,7 @@ case class DefaultResizer(
* @param capacity current number of routees
* @return proposed change in the capacity
*/
def filter(pressure: Int, capacity: Int): Int = {
rampup(pressure, capacity) + backoff(pressure, capacity)
}
def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity) + backoff(pressure, capacity)
/**
* Computes a proposed positive (or zero) capacity delta using

View file

@ -14,8 +14,6 @@ import akka.util.NonFatal
import scala.collection.mutable.ArrayBuffer
import java.io.NotSerializableException
case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization {
/**
@ -120,9 +118,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
possibilities(0)._2
}
serializerMap.putIfAbsent(clazz, ser) match {
case null
log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName)
ser
case null log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName); ser
case some some
}
case ser ser
@ -140,10 +136,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer
*/
private val serializers: Map[String, Serializer] = {
for ((k: String, v: String) settings.Serializers)
yield k -> serializerOf(v).fold(throw _, identity)
}
private val serializers: Map[String, Serializer] =
for ((k: String, v: String) settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity)
/**
* bindings is a Seq of tuple representing the mapping from Class to Serializer.

View file

@ -6,7 +6,6 @@ package akka.serialization
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream }
import akka.util.ClassLoaderObjectInputStream
import akka.actor.DynamicAccess
import akka.actor.ExtendedActorSystem
import scala.util.DynamicVariable

View file

@ -8,6 +8,12 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
/**
* BoundedBlockingQueue wraps any Queue and turns the result into a BlockingQueue with a limited capacity
* @param maxCapacity - the maximum capacity of this Queue, needs to be > 0
* @param backing - the backing Queue
* @tparam E - The type of the contents of this Queue
*/
class BoundedBlockingQueue[E <: AnyRef](
val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
@ -22,7 +28,7 @@ class BoundedBlockingQueue[E <: AnyRef](
require(maxCapacity > 0)
}
protected val lock = new ReentrantLock(false)
protected val lock = new ReentrantLock(false) // TODO might want to switch to ReentrantReadWriteLock
private val notEmpty = lock.newCondition()
private val notFull = lock.newCondition()

View file

@ -11,6 +11,7 @@ import scala.collection.mutable.{ Builder, WrappedArray }
import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
import scala.collection.generic.CanBuildFrom
//FIXME MORE DOCS
object ByteString {
/**
@ -53,15 +54,16 @@ object ByteString {
val empty: ByteString = CompactByteString(Array.empty[Byte])
def newBuilder = new ByteStringBuilder
def newBuilder: ByteStringBuilder = new ByteStringBuilder
implicit def canBuildFrom = new CanBuildFrom[TraversableOnce[Byte], Byte, ByteString] {
def apply(from: TraversableOnce[Byte]) = newBuilder
def apply() = newBuilder
}
implicit val canBuildFrom: CanBuildFrom[TraversableOnce[Byte], Byte, ByteString] =
new CanBuildFrom[TraversableOnce[Byte], Byte, ByteString] {
def apply(ignore: TraversableOnce[Byte]): ByteStringBuilder = newBuilder
def apply(): ByteStringBuilder = newBuilder
}
private[akka] object ByteString1C {
def apply(bytes: Array[Byte]) = new ByteString1C(bytes)
def apply(bytes: Array[Byte]): ByteString1C = new ByteString1C(bytes)
}
/**
@ -71,7 +73,7 @@ object ByteString {
final class ByteString1C private (private val bytes: Array[Byte]) extends CompactByteString {
def apply(idx: Int): Byte = bytes(idx)
override def length = bytes.length
override def length: Int = bytes.length
def toArray: Array[Byte] = bytes.clone
@ -81,13 +83,11 @@ object ByteString {
def compact: ByteString1C = this
def asByteBuffer: ByteBuffer =
toByteString1.asByteBuffer
def asByteBuffer: ByteBuffer = toByteString1.asByteBuffer
def decodeString(charset: String): String = new String(bytes, charset)
def ++(that: ByteString): ByteString =
if (!that.isEmpty) toByteString1 ++ that else this
def ++(that: ByteString): ByteString = if (!that.isEmpty) toByteString1 ++ that else this
override def slice(from: Int, until: Int): ByteString =
if ((from != 0) || (until != length)) toByteString1.slice(from, until)
@ -96,12 +96,11 @@ object ByteString {
override def copyToArray[A >: Byte](xs: Array[A], start: Int, len: Int): Unit =
toByteString1.copyToArray(xs, start, len)
def copyToBuffer(buffer: ByteBuffer): Int =
toByteString1.copyToBuffer(buffer)
def copyToBuffer(buffer: ByteBuffer): Int = toByteString1.copyToBuffer(buffer)
}
private[akka] object ByteString1 {
def apply(bytes: Array[Byte]) = new ByteString1(bytes)
def apply(bytes: Array[Byte]): ByteString1 = new ByteString1(bytes)
}
/**
@ -113,7 +112,7 @@ object ByteString {
def apply(idx: Int): Byte = bytes(checkRangeConvert(idx))
private def checkRangeConvert(index: Int) = {
private def checkRangeConvert(index: Int): Int = {
if (0 <= index && length > index)
index + startIndex
else
@ -128,8 +127,7 @@ object ByteString {
override def clone: CompactByteString = ByteString1C(toArray)
def compact: CompactByteString =
if (length == bytes.length) ByteString1C(bytes) else clone
def compact: CompactByteString = if (length == bytes.length) ByteString1C(bytes) else clone
def asByteBuffer: ByteBuffer = {
val buffer = ByteBuffer.wrap(bytes, startIndex, length).asReadOnlyBuffer
@ -161,7 +159,6 @@ object ByteString {
if (copyLength > 0) buffer.put(bytes, startIndex, copyLength)
copyLength
}
}
private[akka] object ByteStrings {
@ -198,10 +195,11 @@ object ByteString {
}
// 0: both empty, 1: 2nd empty, 2: 1st empty, 3: neither empty
// Using length to check emptiness is prohibited by law
def compare(b1: ByteString, b2: ByteString): Int =
if (b1.length == 0)
if (b2.length == 0) 0 else 2
else if (b2.length == 0) 1 else 3
if (b1.isEmpty)
if (b2.isEmpty) 0 else 2
else if (b2.isEmpty) 1 else 3
}
@ -439,7 +437,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
private var _tempLength = 0
private var _tempCapacity = 0
private def clearTemp() {
private def clearTemp(): Unit = {
if (_tempLength > 0) {
val arr = new Array[Byte](_tempLength)
Array.copy(_temp, 0, arr, 0, _tempLength)
@ -448,14 +446,14 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
}
}
private def resizeTemp(size: Int) {
private def resizeTemp(size: Int): Unit = {
val newtemp = new Array[Byte](size)
if (_tempLength > 0) Array.copy(_temp, 0, newtemp, 0, _tempLength)
_temp = newtemp
_tempCapacity = _temp.length
}
private def ensureTempSize(size: Int) {
private def ensureTempSize(size: Int): Unit = {
if (_tempCapacity < size || _tempCapacity == 0) {
var newSize = if (_tempCapacity == 0) 16 else _tempCapacity * 2
while (newSize < size) newSize *= 2
@ -498,7 +496,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
this
}
def clear() {
def clear(): Unit = {
_builder.clear
_length = 0
_tempLength = 0

View file

@ -6,6 +6,13 @@ package akka.util
import java.io.{ InputStream, ObjectInputStream, ObjectStreamClass }
/**
* ClassLoaderObjectInputStream tries to utilize the provided ClassLoader to load Classes and falls
* back to ObjectInputStreams resolver.
*
* @param classLoader - the ClassLoader which is to be used primarily
* @param is - the InputStream that is wrapped
*/
class ClassLoaderObjectInputStream(classLoader: ClassLoader, is: InputStream) extends ObjectInputStream(is) {
override protected def resolveClass(objectStreamClass: ObjectStreamClass): Class[_] =
try Class.forName(objectStreamClass.getName, false, classLoader) catch {

View file

@ -3,7 +3,7 @@
*/
package akka.util
//FIXME DOCS!
object Convert {
def intToBytes(value: Int): Array[Byte] = {

View file

@ -5,7 +5,7 @@
package akka.util
import java.security.{ MessageDigest, SecureRandom }
//FIXME DOCS
object Crypt {
val hex = "0123456789ABCDEF"
val lineSeparator = System.getProperty("line.separator")
@ -32,7 +32,7 @@ object Crypt {
}
def hexify(bytes: Array[Byte]): String = {
val builder = new StringBuilder
val builder = new StringBuilder(bytes.length * 2)
bytes.foreach { byte builder.append(hex.charAt((byte & 0xF0) >> 4)).append(hex.charAt(byte & 0xF)) }
builder.toString
}

View file

@ -110,6 +110,7 @@ object Duration {
}
val Zero: FiniteDuration = new FiniteDuration(0, NANOSECONDS)
val Undefined: Duration = new Duration with Infinite {
override def toString = "Duration.Undefined"
override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this
@ -166,8 +167,8 @@ object Duration {
* including itself.
*/
val Inf: Duration = new Duration with Infinite {
override def toString = "Duration.Inf"
def compare(other: Duration) = if (other eq this) 0 else 1
override def toString: String = "Duration.Inf"
def compare(other: Duration): Int = if (other eq this) 0 else 1
def unary_- : Duration = MinusInf
}
@ -177,7 +178,7 @@ object Duration {
*/
val MinusInf: Duration = new Duration with Infinite {
override def toString = "Duration.MinusInf"
def compare(other: Duration) = if (other eq this) 0 else -1
def compare(other: Duration): Int = if (other eq this) 0 else -1
def unary_- : Duration = Inf
}
@ -188,7 +189,7 @@ object Duration {
def parse(s: String): Duration = unapply(s).get
implicit object DurationIsOrdered extends Ordering[Duration] {
def compare(a: Duration, b: Duration) = a compare b
def compare(a: Duration, b: Duration): Int = a compare b
}
}
@ -263,17 +264,17 @@ abstract class Duration extends Serializable with Ordered[Duration] {
def fromNow: Deadline = Deadline.now + this
// Java API
def lt(other: Duration) = this < other
def lteq(other: Duration) = this <= other
def gt(other: Duration) = this > other
def gteq(other: Duration) = this >= other
def plus(other: Duration) = this + other
def minus(other: Duration) = this - other
def mul(factor: Double) = this * factor
def div(factor: Double) = this / factor
def div(other: Duration) = this / other
def neg() = -this
def isFinite() = finite_?
def lt(other: Duration): Boolean = this < other
def lteq(other: Duration): Boolean = this <= other
def gt(other: Duration): Boolean = this > other
def gteq(other: Duration): Boolean = this >= other
def plus(other: Duration): Duration = this + other
def minus(other: Duration): Duration = this - other
def mul(factor: Double): Duration = this * factor
def div(factor: Double): Duration = this / factor
def div(other: Duration): Double = this / other
def neg(): Duration = -this
def isFinite(): Boolean = finite_?
}
object FiniteDuration {
@ -349,31 +350,19 @@ class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
else c
}
def +(other: Duration) = {
if (!other.finite_?) {
other
} else {
fromNanos(add(toNanos, other.toNanos))
}
}
def +(other: Duration): Duration = if (!other.finite_?) other else fromNanos(add(toNanos, other.toNanos))
def -(other: Duration) = {
if (!other.finite_?) {
other
} else {
fromNanos(add(toNanos, -other.toNanos))
}
}
def -(other: Duration): Duration = if (!other.finite_?) other else fromNanos(add(toNanos, -other.toNanos))
def *(factor: Double) = fromNanos(long2double(toNanos) * factor)
def *(factor: Double): FiniteDuration = fromNanos(long2double(toNanos) * factor)
def /(factor: Double) = fromNanos(long2double(toNanos) / factor)
def /(factor: Double): FiniteDuration = fromNanos(long2double(toNanos) / factor)
def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0
def /(other: Duration): Double = if (other.finite_?) long2double(toNanos) / other.toNanos else 0
def unary_- = Duration(-length, unit)
def unary_- : FiniteDuration = Duration(-length, unit)
def finite_? = true
def finite_? : Boolean = true
override def equals(other: Any) =
(other.asInstanceOf[AnyRef] eq this) || other.isInstanceOf[FiniteDuration] &&
@ -385,178 +374,74 @@ class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration {
}
}
class DurationInt(n: Int) {
private[akka] trait DurationOps {
import duration.Classifier
protected def from(timeUnit: TimeUnit): FiniteDuration
def nanoseconds: FiniteDuration = from(NANOSECONDS)
def nanos: FiniteDuration = from(NANOSECONDS)
def nanosecond: FiniteDuration = from(NANOSECONDS)
def nano: FiniteDuration = from(NANOSECONDS)
def nanoseconds = Duration(n, NANOSECONDS)
def nanos = Duration(n, NANOSECONDS)
def nanosecond = Duration(n, NANOSECONDS)
def nano = Duration(n, NANOSECONDS)
def microseconds: FiniteDuration = from(MICROSECONDS)
def micros: FiniteDuration = from(MICROSECONDS)
def microsecond: FiniteDuration = from(MICROSECONDS)
def micro: FiniteDuration = from(MICROSECONDS)
def microseconds = Duration(n, MICROSECONDS)
def micros = Duration(n, MICROSECONDS)
def microsecond = Duration(n, MICROSECONDS)
def micro = Duration(n, MICROSECONDS)
def milliseconds: FiniteDuration = from(MILLISECONDS)
def millis: FiniteDuration = from(MILLISECONDS)
def millisecond: FiniteDuration = from(MILLISECONDS)
def milli: FiniteDuration = from(MILLISECONDS)
def milliseconds = Duration(n, MILLISECONDS)
def millis = Duration(n, MILLISECONDS)
def millisecond = Duration(n, MILLISECONDS)
def milli = Duration(n, MILLISECONDS)
def seconds: FiniteDuration = from(SECONDS)
def second: FiniteDuration = from(SECONDS)
def seconds = Duration(n, SECONDS)
def second = Duration(n, SECONDS)
def minutes: FiniteDuration = from(MINUTES)
def minute: FiniteDuration = from(MINUTES)
def minutes = Duration(n, MINUTES)
def minute = Duration(n, MINUTES)
def hours: FiniteDuration = from(HOURS)
def hour: FiniteDuration = from(HOURS)
def hours = Duration(n, HOURS)
def hour = Duration(n, HOURS)
def days: FiniteDuration = from(DAYS)
def day: FiniteDuration = from(DAYS)
def days = Duration(n, DAYS)
def day = Duration(n, DAYS)
def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS))
def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS))
def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS))
def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS))
def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS))
def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS))
def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS))
def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS))
def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS))
def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS))
def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS))
def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS))
def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(SECONDS))
def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(SECONDS))
def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS))
def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS))
def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MINUTES))
def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MINUTES))
def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES))
def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES))
def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(HOURS))
def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(HOURS))
def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS))
def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS))
def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS))
def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS))
def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(DAYS))
def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(DAYS))
}
class DurationLong(n: Long) {
import duration.Classifier
def nanoseconds = Duration(n, NANOSECONDS)
def nanos = Duration(n, NANOSECONDS)
def nanosecond = Duration(n, NANOSECONDS)
def nano = Duration(n, NANOSECONDS)
def microseconds = Duration(n, MICROSECONDS)
def micros = Duration(n, MICROSECONDS)
def microsecond = Duration(n, MICROSECONDS)
def micro = Duration(n, MICROSECONDS)
def milliseconds = Duration(n, MILLISECONDS)
def millis = Duration(n, MILLISECONDS)
def millisecond = Duration(n, MILLISECONDS)
def milli = Duration(n, MILLISECONDS)
def seconds = Duration(n, SECONDS)
def second = Duration(n, SECONDS)
def minutes = Duration(n, MINUTES)
def minute = Duration(n, MINUTES)
def hours = Duration(n, HOURS)
def hour = Duration(n, HOURS)
def days = Duration(n, DAYS)
def day = Duration(n, DAYS)
def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS))
def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS))
def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS))
def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS))
def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS))
def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES))
def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES))
def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS))
def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS))
def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS))
def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS))
class DurationInt(n: Int) extends DurationOps {
override protected def from(timeUnit: TimeUnit): FiniteDuration = Duration(n, timeUnit)
}
class DurationDouble(d: Double) {
import duration.Classifier
class DurationLong(n: Long) extends DurationOps {
override protected def from(timeUnit: TimeUnit): FiniteDuration = Duration(n, timeUnit)
}
def nanoseconds = Duration(d, NANOSECONDS)
def nanos = Duration(d, NANOSECONDS)
def nanosecond = Duration(d, NANOSECONDS)
def nano = Duration(d, NANOSECONDS)
def microseconds = Duration(d, MICROSECONDS)
def micros = Duration(d, MICROSECONDS)
def microsecond = Duration(d, MICROSECONDS)
def micro = Duration(d, MICROSECONDS)
def milliseconds = Duration(d, MILLISECONDS)
def millis = Duration(d, MILLISECONDS)
def millisecond = Duration(d, MILLISECONDS)
def milli = Duration(d, MILLISECONDS)
def seconds = Duration(d, SECONDS)
def second = Duration(d, SECONDS)
def minutes = Duration(d, MINUTES)
def minute = Duration(d, MINUTES)
def hours = Duration(d, HOURS)
def hour = Duration(d, HOURS)
def days = Duration(d, DAYS)
def day = Duration(d, DAYS)
def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS))
def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS))
def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS))
def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS))
def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS))
def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS))
def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS))
def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS))
def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS))
def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS))
def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS))
def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS))
def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, SECONDS))
def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, SECONDS))
def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MINUTES))
def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MINUTES))
def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, HOURS))
def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, HOURS))
def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, DAYS))
def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, DAYS))
class DurationDouble(d: Double) extends DurationOps {
override protected def from(timeUnit: TimeUnit): FiniteDuration = Duration(d, timeUnit)
}
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@ -565,24 +450,27 @@ case class Timeout(duration: Duration) {
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
/**
* A Timeout is a wrapper on top of Duration to be more specific about what the duration means.
*/
object Timeout {
/**
* A timeout with zero duration, will cause most requests to always timeout.
*/
val zero = new Timeout(Duration.Zero)
val zero: Timeout = new Timeout(Duration.Zero)
/**
* A Timeout with infinite duration. Will never timeout. Use extreme caution with this
* as it may cause memory leaks, blocked threads, or may not even be supported by
* the receiver, which would result in an exception.
*/
val never = new Timeout(Duration.Inf)
val never: Timeout = new Timeout(Duration.Inf)
def apply(timeout: Long) = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
def apply(timeout: Long): Timeout = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit): Timeout = new Timeout(length, unit)
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
implicit def durationToTimeout(duration: Duration): Timeout = new Timeout(duration)
implicit def intToTimeout(timeout: Int): Timeout = new Timeout(timeout)
implicit def longToTimeout(timeout: Long): Timeout = new Timeout(timeout)
}

View file

@ -45,18 +45,13 @@ object Helpers {
else base64(next, sb)
}
def ignore[E: Manifest](body: Unit) {
try {
body
} catch {
case e if manifest[E].erasure.isAssignableFrom(e.getClass) ()
}
}
//FIXME docs
def ignore[E: Manifest](body: Unit): Unit =
try body catch { case e if manifest[E].erasure.isAssignableFrom(e.getClass) () }
def withPrintStackTraceOnError(body: Unit) {
try {
body
} catch {
//FIXME docs
def withPrintStackTraceOnError(body: Unit): Unit = {
try body catch {
case e: Throwable
val sw = new java.io.StringWriter()
var root = e

View file

@ -91,7 +91,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {
/**
* Applies the supplied function to all keys and their values
*/
def foreach(fun: (K, V) Unit) {
def foreach(fun: (K, V) Unit): Unit = {
import scala.collection.JavaConversions._
container.entrySet foreach { e e.getValue.foreach(fun(e.getKey, _)) }
}
@ -112,7 +112,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) {
/**
* Returns the key set.
*/
def keys = scala.collection.JavaConversions.collectionAsScalaIterable(container.keySet)
def keys: Iterable[K] = scala.collection.JavaConversions.collectionAsScalaIterable(container.keySet)
/**
* Disassociates the value of type V from the key of type K

View file

@ -7,17 +7,12 @@ package akka.util
import java.util.concurrent.locks.{ ReentrantLock }
import java.util.concurrent.atomic.{ AtomicBoolean }
final class ReentrantGuard {
final val lock = new ReentrantLock
final class ReentrantGuard extends ReentrantLock {
@inline
final def withGuard[T](body: T): T = {
lock.lock
try {
body
} finally {
lock.unlock
}
lock()
try body finally unlock()
}
}
@ -29,9 +24,7 @@ class Switch(startAsOn: Boolean = false) {
protected def transcend(from: Boolean, action: Unit): Boolean = synchronized {
if (switch.compareAndSet(from, !from)) {
try {
action
} catch {
try action catch {
case e
switch.compareAndSet(!from, from) // revert status
throw e
@ -67,18 +60,12 @@ class Switch(startAsOn: Boolean = false) {
/**
* Executes the provided action and returns its value if the switch is IMMEDIATELY on (i.e. no lock involved)
*/
def ifOnYield[T](action: T): Option[T] = {
if (switch.get) Some(action)
else None
}
def ifOnYield[T](action: T): Option[T] = if (switch.get) Some(action) else None
/**
* Executes the provided action and returns its value if the switch is IMMEDIATELY off (i.e. no lock involved)
*/
def ifOffYield[T](action: T): Option[T] = {
if (!switch.get) Some(action)
else None
}
def ifOffYield[T](action: T): Option[T] = if (!switch.get) Some(action) else None
/**
* Executes the provided action and returns if the action was executed or not, if the switch is IMMEDIATELY on (i.e. no lock involved)
@ -104,19 +91,13 @@ class Switch(startAsOn: Boolean = false) {
* Executes the provided action and returns its value if the switch is on, waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOnYield[T](action: T): Option[T] = synchronized {
if (switch.get) Some(action)
else None
}
def whileOnYield[T](action: T): Option[T] = synchronized { if (switch.get) Some(action) else None }
/**
* Executes the provided action and returns its value if the switch is off, waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def whileOffYield[T](action: T): Option[T] = synchronized {
if (!switch.get) Some(action)
else None
}
def whileOffYield[T](action: T): Option[T] = synchronized { if (!switch.get) Some(action) else None }
/**
* Executes the provided action and returns if the action was executed or not, if the switch is on, waiting for any pending changes to happen before (locking)
@ -144,22 +125,20 @@ class Switch(startAsOn: Boolean = false) {
* Executes the provided callbacks depending on if the switch is either on or off waiting for any pending changes to happen before (locking)
* Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance
*/
def fold[T](on: T)(off: T) = synchronized {
if (switch.get) on else off
}
def fold[T](on: T)(off: T): T = synchronized { if (switch.get) on else off }
/**
* Executes the given code while holding this switchs lock, i.e. protected from concurrent modification of the switch status.
*/
def locked[T](code: T) = synchronized { code }
def locked[T](code: T): T = synchronized { code }
/**
* Returns whether the switch is IMMEDIATELY on (no locking)
*/
def isOn = switch.get
def isOn: Boolean = switch.get
/**
* Returns whether the switch is IMMEDDIATELY off (no locking)
*/
def isOff = !isOn
def isOff: Boolean = !isOn
}

View file

@ -6,8 +6,10 @@ package akka.util
/**
* Collection of internal reflection utilities which may or may not be
* available (most services specific to HotSpot, but fails gracefully).
*
* INTERNAL API
*/
object Reflect {
private[akka] object Reflect {
/**
* This optionally holds a function which looks N levels above itself

View file

@ -7,6 +7,9 @@ package akka.util;
import java.lang.reflect.Field;
/**
* INTERNAL API
*/
public final class Unsafe {
public final static sun.misc.Unsafe instance;
static {

View file

@ -7,6 +7,7 @@ package akka.util
import scala.util.continuations._
import akka.dispatch.MessageDispatcher
//FIXME Needs docs
package object cps {
def matchC[A, B, C, D](in: A)(pf: PartialFunction[A, B @cpsParam[C, D]]): B @cpsParam[C, D] = pf(in)

View file

@ -5,7 +5,7 @@
package akka.util
import java.util.concurrent.TimeUnit
//FIXME Needs docs
package object duration {
trait Classifier[C] {
type R
@ -15,38 +15,32 @@ package object duration {
object span
implicit object spanConvert extends Classifier[span.type] {
type R = FiniteDuration
def convert(d: FiniteDuration) = d
def convert(d: FiniteDuration): FiniteDuration = d
}
object fromNow
implicit object fromNowConvert extends Classifier[fromNow.type] {
type R = Deadline
def convert(d: FiniteDuration) = Deadline.now + d
def convert(d: FiniteDuration): Deadline = Deadline.now + d
}
implicit def intToDurationInt(n: Int) = new DurationInt(n)
implicit def longToDurationLong(n: Long) = new DurationLong(n)
implicit def doubleToDurationDouble(d: Double) = new DurationDouble(d)
implicit def intToDurationInt(n: Int): DurationInt = new DurationInt(n)
implicit def longToDurationLong(n: Long): DurationLong = new DurationLong(n)
implicit def doubleToDurationDouble(d: Double): DurationDouble = new DurationDouble(d)
implicit def pairIntToDuration(p: (Int, TimeUnit)) = Duration(p._1, p._2)
implicit def pairLongToDuration(p: (Long, TimeUnit)) = Duration(p._1, p._2)
implicit def durationToPair(d: Duration) = (d.length, d.unit)
implicit def pairIntToDuration(p: (Int, TimeUnit)): FiniteDuration = Duration(p._1, p._2)
implicit def pairLongToDuration(p: (Long, TimeUnit)): FiniteDuration = Duration(p._1, p._2)
implicit def durationToPair(d: Duration): (Long, TimeUnit) = (d.length, d.unit)
/*
* avoid reflection based invocation by using non-duck type
*/
class IntMult(i: Int) {
def *(d: Duration) = d * i
}
implicit def intMult(i: Int) = new IntMult(i)
class IntMult(i: Int) { def *(d: Duration): Duration = d * i }
implicit def intMult(i: Int): IntMult = new IntMult(i)
class LongMult(l: Long) {
def *(d: Duration) = d * l
}
implicit def longMult(l: Long) = new LongMult(l)
class LongMult(l: Long) { def *(d: Duration): Duration = d * l }
implicit def longMult(l: Long): LongMult = new LongMult(l)
class DoubleMult(f: Double) {
def *(d: Duration) = d * f
}
implicit def doubleMult(f: Double) = new DoubleMult(f)
class DoubleMult(f: Double) { def *(d: Duration): Duration = d * f }
implicit def doubleMult(f: Double): DoubleMult = new DoubleMult(f)
}

View file

@ -247,6 +247,30 @@ Summary: ``actorOf`` vs. ``actorFor``
- ``actorFor`` only ever looks up an existing actor, i.e. does not create
one.
Reusing Actor Paths
-------------------
When an actor is terminated, its path will point to the dead letter mailbox,
DeathWatch will publish its final transition and in general it is not expected
to come back to life again (since the actor life cycle does not allow this).
While it is possible to create an actor at a later time with an identical
path—simply due to it being impossible to enforce the opposite without keeping
the set of all actors ever created available—this is not good practice: remote
actor references which “died” suddenly start to work again, but without any
guarantee of ordering between this transition and any other event, hence the
new inhabitant of the path may receive messages which were destined for the
previous tenant.
It may be the right thing to do in very specific circumstances, but make sure
to confine the handling of this precisely to the actors supervisor, because
that is the only actor which can reliably detect proper deregistration of the
name, before which creation of the new child will fail.
It may also be required during testing, when the test subject depends on being
instantiated at a specific path. In that case it is best to mock its supervisor
so that it will forward the Terminated message to the appropriate point in the
test procedure, enabling the latter to await proper deregistration of the name.
The Interplay with Remote Deployment
------------------------------------

View file

@ -36,7 +36,7 @@ import static akka.pattern.Patterns.gracefulStop;
import akka.dispatch.Future;
import akka.dispatch.Await;
import akka.util.Duration;
import akka.actor.ActorTimeoutException;
import akka.pattern.AskTimeoutException;
//#import-gracefulStop
//#import-askPipe
@ -207,7 +207,7 @@ public class UntypedActorDocTestBase {
Future<Boolean> stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (ActorTimeoutException e) {
} catch (AskTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
//#gracefulStop

View file

@ -4,6 +4,10 @@
Microkernel (Java)
==================
The purpose of the Akka Microkernel is to offer a bundling mechanism so that you can distribute
an Akka application as a single payload, without the need to run in a Java Application Server or manually
having to create a launcher script.
The Akka Microkernel is included in the Akka download found at `downloads`_.
.. _downloads: http://akka.io/downloads

View file

@ -506,6 +506,18 @@ termination of several actors:
.. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java
:include: import-gracefulStop,gracefulStop
When ``gracefulStop()`` returns successfully, the actors ``postStop()`` hook
will have been executed: there exists a happens-before edge between the end of
``postStop()`` and the return of ``gracefulStop()``.
.. warning::
Keep in mind that an actor stopping and its name being deregistered are
separate events which happen asynchronously from each other. Therefore it may
be that you will find the name still in use after ``gracefulStop()``
returned. In order to guarantee proper deregistration, only reuse names from
within a supervisor you control and only in response to a :class:`Terminated`
message, i.e. not for top-level actors.
.. _UntypedActor.HotSwap:

View file

@ -550,6 +550,18 @@ termination of several actors:
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#gracefulStop
When ``gracefulStop()`` returns successfully, the actors ``postStop()`` hook
will have been executed: there exists a happens-before edge between the end of
``postStop()`` and the return of ``gracefulStop()``.
.. warning::
Keep in mind that an actor stopping and its name being deregistered are
separate events which happen asynchronously from each other. Therefore it may
be that you will find the name still in use after ``gracefulStop()``
returned. In order to guarantee proper deregistration, only reuse names from
within a supervisor you control and only in response to a :class:`Terminated`
message, i.e. not for top-level actors.
.. _Actor.HotSwap:

View file

@ -326,14 +326,13 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
//#gracefulStop
import akka.pattern.gracefulStop
import akka.dispatch.Await
import akka.actor.ActorTimeoutException
try {
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system)
Await.result(stopped, 6 seconds)
// the actor has been stopped
} catch {
case e: ActorTimeoutException // the actor wasn't stopped within 5 seconds
case e: akka.pattern.AskTimeoutException // the actor wasn't stopped within 5 seconds
}
//#gracefulStop
}

View file

@ -4,6 +4,10 @@
Microkernel (Scala)
===================
The purpose of the Akka Microkernel is to offer a bundling mechanism so that you can distribute
an Akka application as a single payload, without the need to run in a Java Application Server or manually
having to create a launcher script.
The Akka Microkernel is included in the Akka download found at `downloads`_.
.. _downloads: http://akka.io/downloads

View file

@ -12,7 +12,7 @@ import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import scala.concurrent.stm._
import akka.pattern.ask
import akka.pattern.{ AskTimeoutException, ask }
object CoordinatedIncrement {
@ -96,7 +96,7 @@ class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) wit
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[ActorTimeoutException]())
EventFilter[AskTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, failer) = actorOfs
val coordinated = Coordinated()

View file

@ -15,7 +15,7 @@ import akka.testkit.TestEvent.Mute
import scala.concurrent.stm._
import scala.util.Random.{ nextInt random }
import java.util.concurrent.CountDownLatch
import akka.pattern.ask
import akka.pattern.{ AskTimeoutException, ask }
object FickleFriends {
case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch)
@ -120,7 +120,7 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[ActorTimeoutException]())
EventFilter[AskTimeoutException]())
system.eventStream.publish(Mute(ignoreExceptions))
val (counters, coordinator) = actorOfs
val latch = new CountDownLatch(1)

View file

@ -10,7 +10,7 @@ import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
import scala.concurrent.stm._
import akka.pattern.ask
import akka.pattern.{ AskTimeoutException, ask }
object TransactorIncrement {
case class Increment(friends: Seq[ActorRef], latch: TestLatch)
@ -105,7 +105,7 @@ class TransactorSpec extends AkkaSpec {
val ignoreExceptions = Seq(
EventFilter[ExpectedFailureException](),
EventFilter[CoordinatedTransactionException](),
EventFilter[ActorTimeoutException]())
EventFilter[AskTimeoutException]())
filterEvents(ignoreExceptions) {
val (counters, failer) = createTransactors
val failLatch = TestLatch(numCounters)

View file

View file

BIN
file-based/mailbox_user__c Normal file

Binary file not shown.

BIN
file-based/mailbox_user__d Normal file

Binary file not shown.

BIN
file-based/mailbox_user__e Normal file

Binary file not shown.

BIN
file-based/mailbox_user__f Normal file

Binary file not shown.