From 4256522fc8053e6f2fdf36a312f29f72881c59c9 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 16 May 2012 13:43:00 +0200 Subject: [PATCH 01/12] clarify semantics of name reuse/gracefulStop, see #2073 --- akka-docs/general/addressing.rst | 24 ++++++++++++++++++++++++ akka-docs/java/untyped-actors.rst | 12 ++++++++++++ akka-docs/scala/actors.rst | 12 ++++++++++++ 3 files changed, 48 insertions(+) diff --git a/akka-docs/general/addressing.rst b/akka-docs/general/addressing.rst index 94e571264d..424140ab96 100644 --- a/akka-docs/general/addressing.rst +++ b/akka-docs/general/addressing.rst @@ -247,6 +247,30 @@ Summary: ``actorOf`` vs. ``actorFor`` - ``actorFor`` only ever looks up an existing actor, i.e. does not create one. +Reusing Actor Paths +------------------- + +When an actor is terminated, its path will point to the dead letter mailbox, +DeathWatch will publish its final transition and in general it is not expected +to come back to life again (since the actor life cycle does not allow this). +While it is possible to create an actor at a later time with an identical +path—simply due to it being impossible to enforce the opposite without keeping +the set of all actors ever created available—this is not good practice: remote +actor references which “died” suddenly start to work again, but without any +guarantee of ordering between this transition and any other event, hence the +new inhabitant of the path may receive messages which were destined for the +previous tenant. + +It may be the right thing to do in very specific circumstances, but make sure +to confine the handling of this precisely to the actor’s supervisor, because +that is the only actor which can reliably detect proper deregistration of the +name, before which creation of the new child will fail. + +It may also be required during testing, when the test subject depends on being +instantiated at a specific path. In that case it is best to mock its supervisor +so that it will forward the Terminated message to the appropriate point in the +test procedure, enabling the latter to await proper deregistration of the name. + The Interplay with Remote Deployment ------------------------------------ diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 89553f091a..38d2d4c430 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -506,6 +506,18 @@ termination of several actors: .. includecode:: code/akka/docs/actor/UntypedActorDocTestBase.java :include: import-gracefulStop,gracefulStop +When ``gracefulStop()`` returns successfully, the actor’s ``postStop()`` hook +will have been executed: there exists a happens-before edge between the end of +``postStop()`` and the return of ``gracefulStop()``. + +.. warning:: + + Keep in mind that an actor stopping and its name being deregistered are + separate events which happen asynchronously from each other. Therefore it may + be that you will find the name still in use after ``gracefulStop()`` + returned. In order to guarantee proper deregistration, only reuse names from + within a supervisor you control and only in response to a :class:`Terminated` + message, i.e. not for top-level actors. .. _UntypedActor.HotSwap: diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index fae84c080f..5374c8a37c 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -550,6 +550,18 @@ termination of several actors: .. includecode:: code/akka/docs/actor/ActorDocSpec.scala#gracefulStop +When ``gracefulStop()`` returns successfully, the actor’s ``postStop()`` hook +will have been executed: there exists a happens-before edge between the end of +``postStop()`` and the return of ``gracefulStop()``. + +.. warning:: + + Keep in mind that an actor stopping and its name being deregistered are + separate events which happen asynchronously from each other. Therefore it may + be that you will find the name still in use after ``gracefulStop()`` + returned. In order to guarantee proper deregistration, only reuse names from + within a supervisor you control and only in response to a :class:`Terminated` + message, i.e. not for top-level actors. .. _Actor.HotSwap: From 5eba9fceef884b16c8764a78d0d8bb274dfbc830 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 13:37:26 +0200 Subject: [PATCH 02/12] Saving the planet and shufflin' --- .../scala/akka/event/EventStreamSpec.scala | 2 +- .../main/scala/akka/actor/UntypedActor.scala | 12 +- .../scala/akka/dispatch/Dispatchers.scala | 6 +- .../main/scala/akka/dispatch/Mailbox.scala | 56 +++++- .../main/scala/akka/event/DeathWatch.scala | 2 +- .../src/main/scala/akka/event/EventBus.scala | 15 +- .../src/main/scala/akka/event/Logging.scala | 167 ++++++++++-------- .../scala/akka/event/LoggingReceive.scala | 6 +- .../src/main/scala/akka/japi/JavaAPI.scala | 34 ++-- .../src/main/scala/akka/util/LockUtil.scala | 25 +-- 10 files changed, 177 insertions(+), 148 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index d2497c4a69..a8cd32f5d3 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -19,7 +19,7 @@ object EventStreamSpec { loglevel = INFO event-handlers = ["akka.event.EventStreamSpec$MyLog", "%s"] } - """.format(Logging.StandardOutLoggerName)) + """.format(Logging.StandardOutLogger.getClass.getName)) val configUnhandled = ConfigFactory.parseString(""" akka { diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index a5ebeb851c..c56a2a0167 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -93,11 +93,17 @@ import akka.japi.{ Creator } abstract class UntypedActor extends Actor { /** - * To be implemented by concrete UntypedActor. Defines the message handler. + * To be implemented by concrete UntypedActor, this defines the behavior of the + * UntypedActor. */ @throws(classOf[Exception]) def onReceive(message: Any): Unit + /** + * Returns this UntypedActor's UntypedActorContext + * The UntypedActorContext is not thread safe so do not expose it outside of the + * UntypedActor. + */ def getContext(): UntypedActorContext = context.asInstanceOf[UntypedActorContext] /** @@ -150,9 +156,7 @@ abstract class UntypedActor extends Actor { */ override def postRestart(reason: Throwable): Unit = super.postRestart(reason) - final protected def receive = { - case msg ⇒ onReceive(msg) - } + final protected def receive = { case msg ⇒ onReceive(msg) } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 54173b8460..9fae624e66 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -97,6 +97,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } } + //INTERNAL API private def config(id: String): Config = { import scala.collection.JavaConverters._ def simpleName = id.substring(id.lastIndexOf('.') + 1) @@ -106,6 +107,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc .withFallback(defaultDispatcherConfig) } + //INTERNAL API private def idConfig(id: String): Config = { import scala.collection.JavaConverters._ ConfigFactory.parseMap(Map("id" -> id).asJava) @@ -123,9 +125,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * * INTERNAL USE ONLY */ - private[akka] def from(cfg: Config): MessageDispatcher = { - configuratorFrom(cfg).dispatcher() - } + private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher() /** * Creates a MessageDispatcherConfigurator from a Config. diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 2e3a98e8d9..1bb882d497 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -16,7 +16,10 @@ import akka.actor.ActorSystem class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) -object Mailbox { +/** + * INTERNAL API + */ +private[akka] object Mailbox { type Status = Int @@ -40,6 +43,7 @@ object Mailbox { * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, * but can't be exposed to user defined mailbox subclasses. * + * INTERNAL API */ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable { @@ -244,6 +248,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes } } +/** + * A MessageQueue is the user-message "lane" of an Akka Mailbox. + * It needs to atleast support N producers and 1 consumer thread-safely. + */ trait MessageQueue { /** * Try to enqueue the message to this queue, or throw an exception. @@ -325,6 +333,9 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ def hasSystemMessages: Boolean = systemQueueGet ne null } +/** + * A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue + */ trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] def numberOfMessages = queue.size @@ -340,11 +351,19 @@ trait QueueBasedMessageQueue extends MessageQueue { } } +/** + * UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue, + * i.e. a non-blocking enqueue and dequeue. + */ trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle def dequeue(): Envelope = queue.poll() } +/** + * BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue, + * i.e. blocking enqueue with timeout + */ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] @@ -360,17 +379,28 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def dequeue(): Envelope = queue.poll() } +/** + * DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java.util.Deque + */ trait DequeBasedMessageQueue extends QueueBasedMessageQueue { def queue: Deque[Envelope] def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit } +/** + * UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue, + * i.e. a non-blocking enqueue and dequeue. + */ trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle def dequeue(): Envelope = queue.poll() } +/** + * BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue, + * i.e. blocking enqueue with timeout + */ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingDeque[Envelope] @@ -393,14 +423,14 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { } /** - * Mailbox configuration. + * MailboxType is a factory to create MessageQueues for an optionally provided ActorContext */ trait MailboxType { def create(owner: Option[ActorContext]): MessageQueue } /** - * It's a case class for Java (new UnboundedMailbox) + * UnboundedMailbox is the default unbounded MailboxType used by Akka Actors. */ case class UnboundedMailbox() extends MailboxType { @@ -412,6 +442,9 @@ case class UnboundedMailbox() extends MailboxType { } } +/** + * BoundedMailbox is the default bounded MailboxType used by Akka Actors. + */ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), @@ -428,17 +461,20 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat } /** - * Extend me to provide the comparator + * UnboundedPriorityMailbox is an unbounded mailbox that allows for priorization of its contents. + * Extend this class and provide the Comparator in the constructor. */ -class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { +class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType { + def this(cmp: Comparator[Envelope]) = this(cmp, 11) final override def create(owner: Option[ActorContext]): MessageQueue = - new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final def queue: Queue[Envelope] = this } } /** - * Extend me to provide the comparator + * BoundedPriorityMailbox is a bounded mailbox that allows for priorization of its contents. + * Extend this class and provide the Comparator in the constructor. */ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { @@ -452,6 +488,9 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap } } +/** + * UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque. + */ case class UnboundedDequeBasedMailbox() extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this() @@ -462,6 +501,9 @@ case class UnboundedDequeBasedMailbox() extends MailboxType { } } +/** + * BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque. + */ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index 7469f6609f..8bf6935619 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -12,7 +12,7 @@ import akka.actor._ * A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down * See LocalDeathWatch for semantics */ -trait DeathWatch extends ActorEventBus with ActorClassifier { +abstract class DeathWatch extends ActorEventBus with ActorClassifier { type Event = Terminated protected final def classify(event: Event): Classifier = event.actor diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 2dd22b3b54..6a5cc67cc4 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -182,10 +182,9 @@ trait SubchannelClassification { this: EventBus ⇒ */ trait ScanningClassification { self: EventBus ⇒ protected final val subscribers = new ConcurrentSkipListSet[(Classifier, Subscriber)](new Comparator[(Classifier, Subscriber)] { - def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = { - val cM = compareClassifiers(a._1, b._1) - if (cM != 0) cM - else compareSubscribers(a._2, b._2) + def compare(a: (Classifier, Subscriber), b: (Classifier, Subscriber)): Int = compareClassifiers(a._1, b._1) match { + case 0 ⇒ compareSubscribers(a._2, b._2) + case other ⇒ other } }) @@ -238,7 +237,7 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒ import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec private val empty = TreeSet.empty[ActorRef] - protected val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize) + private val mappings = new ConcurrentHashMap[ActorRef, TreeSet[ActorRef]](mapSize) @tailrec protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = { @@ -320,9 +319,9 @@ trait ActorClassification { this: ActorEventBus with ActorClassifier ⇒ */ protected def mapSize: Int - def publish(event: Event): Unit = { - val receivers = mappings.get(classify(event)) - if (receivers ne null) receivers foreach { _ ! event } + def publish(event: Event): Unit = mappings.get(classify(event)) match { + case null ⇒ () + case some ⇒ some foreach { _ ! event } } def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber) diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index ac31b133b3..1230756517 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -29,7 +29,7 @@ trait LoggingBus extends ActorEventBus { import Logging._ - private val guard = new ReentrantGuard + private val guard = new ReentrantGuard //Switch to ReentrantReadWrite private var loggers = Seq.empty[ActorRef] private var _logLevel: LogLevel = _ @@ -97,7 +97,7 @@ trait LoggingBus extends ActorEventBus { val myloggers = for { loggerName ← defaultLoggers - if loggerName != StandardOutLoggerName + if loggerName != StandardOutLogger.getClass.getName } yield { try { system.dynamicAccess.getClassFor[Actor](loggerName) match { @@ -129,7 +129,7 @@ trait LoggingBus extends ActorEventBus { case _: InvalidActorNameException ⇒ // ignore if it is already running } publish(Debug(logName, this.getClass, "Default Loggers started")) - if (!(defaultLoggers contains StandardOutLoggerName)) { + if (!(defaultLoggers contains StandardOutLogger.getClass.getName)) { unsubscribe(StandardOutLogger) } } catch { @@ -163,6 +163,9 @@ trait LoggingBus extends ActorEventBus { publish(Debug(simpleName(this), this.getClass, "all default loggers stopped")) } + /** + * INTERNAL API + */ private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) @@ -361,17 +364,33 @@ object LogSource { */ object Logging { + /** + * Returns a 'safe' getSimpleName for the provided object's Class + * @param obj + * @return the simple name of the given object's Class + */ def simpleName(obj: AnyRef): String = simpleName(obj.getClass) + /** + * Returns a 'safe' getSimpleName for the provided Class + * @param obj + * @return the simple name of the given Class + */ def simpleName(clazz: Class[_]): String = { val n = clazz.getName val i = n.lastIndexOf('.') n.substring(i + 1) } - object Extension extends ExtensionKey[LogExt] + /** + * INTERNAL API + */ + private[akka] object Extension extends ExtensionKey[LogExt] - class LogExt(system: ExtendedActorSystem) extends Extension { + /** + * INTERNAL API + */ + private[akka] class LogExt(system: ExtendedActorSystem) extends Extension { private val loggerId = new AtomicInteger def id() = loggerId.incrementAndGet() } @@ -431,12 +450,6 @@ object Logging { // these type ascriptions/casts are necessary to avoid CCEs during construction while retaining correct type val AllLogLevels = Seq(ErrorLevel: AnyRef, WarningLevel, InfoLevel, DebugLevel).asInstanceOf[Seq[LogLevel]] - val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern - val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern - val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern - val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern - val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern - /** * Obtain LoggingAdapter for the given actor system and source object. This * will use the system’s event stream and include the system’s address in the @@ -624,27 +637,34 @@ object Logging { // weird return type due to binary compatibility def loggerInitialized(): LoggerInitialized.type = LoggerInitialized + /** + * LoggerInitializationException is thrown to indicate that there was a problem initializing a logger + * @param msg + */ class LoggerInitializationException(msg: String) extends AkkaException(msg) trait StdOutLogger { import java.text.SimpleDateFormat import java.util.Date - val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S") + private val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S") + private val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + private val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern + private val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern + private val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern + private val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern - def timestamp = dateFormat.format(new Date) + def timestamp(): String = synchronized { dateFormat.format(new Date) } // SDF isn't threadsafe - def print(event: Any) { - event match { - case e: Error ⇒ error(e) - case e: Warning ⇒ warning(e) - case e: Info ⇒ info(e) - case e: Debug ⇒ debug(e) - case e ⇒ warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e)) - } + def print(event: Any): Unit = event match { + case e: Error ⇒ error(e) + case e: Warning ⇒ warning(e) + case e: Info ⇒ info(e) + case e: Debug ⇒ debug(e) + case e ⇒ warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e)) } - def error(event: Error) = { + def error(event: Error): Unit = { val f = if (event.cause == Error.NoCause) errorFormatWithoutCause else errorFormat println(f.format( timestamp, @@ -654,21 +674,21 @@ object Logging { stackTraceFor(event.cause))) } - def warning(event: Warning) = + def warning(event: Warning): Unit = println(warningFormat.format( timestamp, event.thread.getName, event.logSource, event.message)) - def info(event: Info) = + def info(event: Info): Unit = println(infoFormat.format( timestamp, event.thread.getName, event.logSource, event.message)) - def debug(event: Debug) = + def debug(event: Debug): Unit = println(debugFormat.format( timestamp, event.thread.getName, @@ -689,8 +709,8 @@ object Logging { override val toString = "StandardOutLogger" override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message) } + val StandardOutLogger = new StandardOutLogger - val StandardOutLoggerName = StandardOutLogger.getClass.getName /** * Actor wrapper around the standard output logger. If @@ -708,7 +728,7 @@ object Logging { * Returns the StackTrace for the given Throwable as a String */ def stackTraceFor(e: Throwable): String = e match { - case null | Error.NoCause ⇒ "" + case null | Error.NoCause | _: NoStackTrace ⇒ "" case other ⇒ val sw = new java.io.StringWriter val pw = new java.io.PrintWriter(sw) @@ -752,51 +772,51 @@ trait LoggingAdapter { * These actually implement the passing on of the messages to be logged. * Will not be called if is...Enabled returned false. */ - protected def notifyError(message: String) - protected def notifyError(cause: Throwable, message: String) - protected def notifyWarning(message: String) - protected def notifyInfo(message: String) - protected def notifyDebug(message: String) + protected def notifyError(message: String): Unit + protected def notifyError(cause: Throwable, message: String): Unit + protected def notifyWarning(message: String): Unit + protected def notifyInfo(message: String): Unit + protected def notifyDebug(message: String): Unit /* * The rest is just the widening of the API for the user's convenience. */ - def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) } - def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format1(template, arg1)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) } + def error(cause: Throwable, message: String): Unit = { if (isErrorEnabled) notifyError(cause, message) } + def error(cause: Throwable, template: String, arg1: Any): Unit = { if (isErrorEnabled) notifyError(cause, format1(template, arg1)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) } - def error(message: String) { if (isErrorEnabled) notifyError(message) } - def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format1(template, arg1)) } - def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) } - def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) } - def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) } + def error(message: String): Unit = { if (isErrorEnabled) notifyError(message) } + def error(template: String, arg1: Any): Unit = { if (isErrorEnabled) notifyError(format1(template, arg1)) } + def error(template: String, arg1: Any, arg2: Any): Unit = { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) } - def warning(message: String) { if (isWarningEnabled) notifyWarning(message) } - def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format1(template, arg1)) } - def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) } - def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) } - def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) } + def warning(message: String): Unit = { if (isWarningEnabled) notifyWarning(message) } + def warning(template: String, arg1: Any): Unit = { if (isWarningEnabled) notifyWarning(format1(template, arg1)) } + def warning(template: String, arg1: Any, arg2: Any): Unit = { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) } def info(message: String) { if (isInfoEnabled) notifyInfo(message) } - def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format1(template, arg1)) } - def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) } - def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) } - def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) } + def info(template: String, arg1: Any): Unit = { if (isInfoEnabled) notifyInfo(format1(template, arg1)) } + def info(template: String, arg1: Any, arg2: Any): Unit = { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) } def debug(message: String) { if (isDebugEnabled) notifyDebug(message) } - def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format1(template, arg1)) } - def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) } - def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) } - def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) } + def debug(template: String, arg1: Any): Unit = { if (isDebugEnabled) notifyDebug(format1(template, arg1)) } + def debug(template: String, arg1: Any, arg2: Any): Unit = { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) } def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) } - def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format1(template, arg1)) } - def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) } - def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) } - def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) } + def log(level: Logging.LogLevel, template: String, arg1: Any): Unit = { if (isEnabled(level)) notifyLog(level, format1(template, arg1)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any): Unit = { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any): Unit = { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) } final def isEnabled(level: Logging.LogLevel): Boolean = level match { case Logging.ErrorLevel ⇒ isErrorEnabled @@ -812,14 +832,14 @@ trait LoggingAdapter { case Logging.DebugLevel ⇒ if (isDebugEnabled) notifyDebug(message) } - private def format1(t: String, arg: Any) = arg match { + private def format1(t: String, arg: Any): String = arg match { case a: Array[_] if !a.getClass.getComponentType.isPrimitive ⇒ format(t, a: _*) case a: Array[_] ⇒ format(t, (a map (_.asInstanceOf[AnyRef]): _*)) case x ⇒ format(t, x) } - def format(t: String, arg: Any*) = { - val sb = new StringBuilder + def format(t: String, arg: Any*): String = { + val sb = new StringBuilder //FIXME add some decent size hint here var p = 0 var rest = t while (p < arg.length) { @@ -829,17 +849,15 @@ trait LoggingAdapter { rest = "" p = arg.length } else { - sb.append(rest.substring(0, index)) - sb.append(arg(p)) + sb.append(rest.substring(0, index)).append(arg(p)) rest = rest.substring(index + 2) p += 1 } } - sb.append(rest) - sb.toString + sb.append(rest).toString } } - +//FIXME DOCUMENT class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter { import Logging._ @@ -849,14 +867,9 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class def isInfoEnabled = bus.logLevel >= InfoLevel def isDebugEnabled = bus.logLevel >= DebugLevel - protected def notifyError(message: String) { bus.publish(Error(logSource, logClass, message)) } - - protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, logClass, message)) } - - protected def notifyWarning(message: String) { bus.publish(Warning(logSource, logClass, message)) } - - protected def notifyInfo(message: String) { bus.publish(Info(logSource, logClass, message)) } - - protected def notifyDebug(message: String) { bus.publish(Debug(logSource, logClass, message)) } - + protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, message)) + protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, message)) + protected def notifyWarning(message: String): Unit = bus.publish(Warning(logSource, logClass, message)) + protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message)) + protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message)) } diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala index 452b2b6b19..337815eed1 100644 --- a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala +++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala @@ -26,9 +26,7 @@ object LoggingReceive { */ def apply(r: Receive)(implicit context: ActorContext): Receive = r match { case _: LoggingReceive ⇒ r - case _ ⇒ - if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) - else r + case _ ⇒ if (context.system.settings.AddLoggingReceive) new LoggingReceive(None, r) else r } } @@ -37,7 +35,7 @@ object LoggingReceive { * @param source the log source, if not defined the actor of the context will be used */ class LoggingReceive(source: Option[AnyRef], r: Receive)(implicit context: ActorContext) extends Receive { - def isDefinedAt(o: Any) = { + def isDefinedAt(o: Any): Boolean = { val handled = r.isDefinedAt(o) val (str, clazz) = LogSource.fromAnyRef(source getOrElse context.asInstanceOf[ActorCell].actor) context.system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 47ce667759..5bd38ad52a 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -24,28 +24,14 @@ trait Function2[T1, T2, R] { * A Procedure is like a Function, but it doesn't produce a return value. */ trait Procedure[T] { - def apply(param: T) -} - -/** - * A Procedure is like a Function, but it doesn't produce a return value. - */ -trait Procedure2[T1, T2] { - def apply(param: T1, param2: T2) -} - -/** - * An executable piece of code that takes no parameters and doesn't return any value. - */ -trait SideEffect { - def apply() + def apply(param: T): Unit } /** * An executable piece of code that takes no parameters and doesn't return any value. */ trait Effect { - def apply() + def apply(): Unit } /** @@ -67,9 +53,9 @@ sealed abstract class Option[A] extends java.lang.Iterable[A] { def get: A def isEmpty: Boolean - def isDefined = !isEmpty + def isDefined: Boolean = !isEmpty def asScala: scala.Option[A] - def iterator = if (isEmpty) Iterator.empty else Iterator.single(get) + def iterator: java.util.Iterator[A] = if (isEmpty) Iterator.empty else Iterator.single(get) } object Option { @@ -102,18 +88,18 @@ object Option { * A. */ final case class Some[A](v: A) extends Option[A] { - def get = v - def isEmpty = false - def asScala = scala.Some(v) + def get: A = v + def isEmpty: Boolean = false + def asScala: scala.Some[A] = scala.Some(v) } /** * This case object represents non-existent values. */ private case object None extends Option[Nothing] { - def get = throw new NoSuchElementException("None.get") - def isEmpty = true - def asScala = scala.None + def get: Nothing = throw new NoSuchElementException("None.get") + def isEmpty: Boolean = true + def asScala: scala.None.type = scala.None } implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index c3295d4b52..14c787d3f6 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -7,17 +7,12 @@ package akka.util import java.util.concurrent.locks.{ ReentrantLock } import java.util.concurrent.atomic.{ AtomicBoolean } -final class ReentrantGuard { - final val lock = new ReentrantLock +final class ReentrantGuard extends ReentrantLock { @inline final def withGuard[T](body: ⇒ T): T = { - lock.lock - try { - body - } finally { - lock.unlock - } + lock() + try body finally unlock() } } @@ -104,19 +99,13 @@ class Switch(startAsOn: Boolean = false) { * Executes the provided action and returns its value if the switch is on, waiting for any pending changes to happen before (locking) * Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance */ - def whileOnYield[T](action: ⇒ T): Option[T] = synchronized { - if (switch.get) Some(action) - else None - } + def whileOnYield[T](action: ⇒ T): Option[T] = synchronized { if (switch.get) Some(action) else None } /** * Executes the provided action and returns its value if the switch is off, waiting for any pending changes to happen before (locking) * Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance */ - def whileOffYield[T](action: ⇒ T): Option[T] = synchronized { - if (!switch.get) Some(action) - else None - } + def whileOffYield[T](action: ⇒ T): Option[T] = synchronized { if (!switch.get) Some(action) else None } /** * Executes the provided action and returns if the action was executed or not, if the switch is on, waiting for any pending changes to happen before (locking) @@ -144,9 +133,7 @@ class Switch(startAsOn: Boolean = false) { * Executes the provided callbacks depending on if the switch is either on or off waiting for any pending changes to happen before (locking) * Be careful of longrunning or blocking within the provided action as it can lead to deadlocks or bad performance */ - def fold[T](on: ⇒ T)(off: ⇒ T) = synchronized { - if (switch.get) on else off - } + def fold[T](on: ⇒ T)(off: ⇒ T): T = synchronized { if (switch.get) on else off } /** * Executes the given code while holding this switch’s lock, i.e. protected from concurrent modification of the switch status. From 5afe6601ff3ca7168ee8fe1aeb3d52bd8a3cfbbd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 14:55:38 +0200 Subject: [PATCH 03/12] Removing ActorTimeoutException since it was only used in GracefulStop, and changed GracefulStop to use PromiseActorRef instead of spawning a toplevel actor --- .../test/scala/akka/pattern/PatternSpec.scala | 9 +---- .../src/main/scala/akka/actor/Actor.scala | 6 --- .../main/scala/akka/pattern/AskSupport.scala | 4 +- .../akka/pattern/GracefulStopSupport.scala | 37 ++++++++----------- 4 files changed, 18 insertions(+), 38 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index 2776beabce..68e6d40824 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -7,11 +7,9 @@ package akka.pattern import akka.testkit.AkkaSpec import akka.actor.Props import akka.actor.Actor -import akka.actor.ActorTimeoutException import akka.util.Duration import akka.util.duration._ import akka.dispatch.{ Future, Promise, Await } -import java.lang.IllegalStateException object PatternSpec { case class Work(duration: Duration) @@ -41,13 +39,10 @@ class PatternSpec extends AkkaSpec { Await.ready(gracefulStop(target, 1 millis), 1 second) } - "complete Future with ActorTimeoutException when actor not terminated within timeout" in { + "complete Future with AskTimeoutException when actor not terminated within timeout" in { val target = system.actorOf(Props[TargetActor]) target ! Work(250 millis) - val result = gracefulStop(target, 10 millis) - intercept[ActorTimeoutException] { - Await.result(result, 200 millis) - } + intercept[AskTimeoutException] { Await.result(gracefulStop(target, 10 millis), 200 millis) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 3e233a2056..7c020925eb 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -129,12 +129,6 @@ case class ActorInitializationException private[akka] (actor: ActorRef, message: def this(msg: String) = this(null, msg, null) } -//FIXME: Only used by gracefulStop we should remove this if possible -class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) - extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null) -} - /** * InvalidMessageException is thrown when an invalid message is sent to an Actor. * Technically it's only "null" which is an InvalidMessageException but who knows, diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index ef4217039d..ede65b17da 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -272,9 +272,7 @@ private[akka] object PromiseActorRef { val result = Promise[Any]()(provider.dispatcher) val a = new PromiseActorRef(provider, result) val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) } - result onComplete { _ ⇒ - try a.stop() finally f.cancel() - } + result onComplete { _ ⇒ try a.stop() finally f.cancel() } a } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index d6fbd31c1e..8b441f3d5b 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -4,9 +4,9 @@ package akka.pattern -import akka.actor.{ ActorRef, Actor, ActorSystem, Props, PoisonPill, Terminated, ReceiveTimeout, ActorTimeoutException } import akka.dispatch.{ Promise, Future } -import akka.util.Duration +import akka.actor._ +import akka.util.{ Timeout, Duration } trait GracefulStopSupport { /** @@ -14,7 +14,8 @@ trait GracefulStopSupport { * existing messages of the target actor has been processed and the actor has been * terminated. * - * Useful when you need to wait for termination or compose ordered termination of several actors. + * Useful when you need to wait for termination or compose ordered termination of several actors, + * which should only be done outside of the ActorSystem as blocking inside Actors is discouraged. * * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] * is completed with failure [[akka.actor.ActorTimeoutException]]. @@ -22,26 +23,18 @@ trait GracefulStopSupport { def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { if (target.isTerminated) { Promise.successful(true) - } else { - val result = Promise[Boolean]() - system.actorOf(Props(new Actor { - // Terminated will be received when target has been stopped - context watch target + } else system match { + case e: ExtendedActorSystem ⇒ + val ref = PromiseActorRef(e.provider, Timeout(timeout)) + e.deathWatch.subscribe(ref, target) + ref.result onComplete { case x ⇒ println(x) } + ref.result onComplete { + case Right(Terminated(`target`)) ⇒ () // Ignore + case _ ⇒ e.deathWatch.unsubscribe(ref, target) + } // Just making sure we're not leaking here target ! PoisonPill - // ReceiveTimeout will be received if nothing else is received within the timeout - context setReceiveTimeout timeout - - def receive = { - case Terminated(a) if a == target ⇒ - result success true - context stop self - case ReceiveTimeout ⇒ - result failure new ActorTimeoutException( - "Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout)) - context stop self - } - })) - result + ref.result map { case Terminated(`target`) ⇒ true } + case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") } } } \ No newline at end of file From 2bb255b480d64aa60b5540705d4b0d62114595a3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 15:04:08 +0200 Subject: [PATCH 04/12] Removing ActorTimeoutException everywhere... I swear it... --- akka-actor/src/main/scala/akka/pattern/AskSupport.scala | 6 +++--- .../src/main/scala/akka/pattern/GracefulStopSupport.scala | 2 +- akka-actor/src/main/scala/akka/pattern/Patterns.scala | 6 +++--- .../java/code/akka/docs/actor/UntypedActorDocTestBase.java | 4 ++-- akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala | 3 +-- .../scala/akka/transactor/CoordinatedIncrementSpec.scala | 4 ++-- .../src/test/scala/akka/transactor/FickleFriendsSpec.scala | 4 ++-- .../src/test/scala/akka/transactor/TransactorSpec.scala | 4 ++-- 8 files changed, 16 insertions(+), 17 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index ede65b17da..cfaa0a182b 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -46,7 +46,7 @@ trait AskSupport { * Sends a message asynchronously and returns a [[akka.dispatch.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the + * will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in * `Await.result(..., timeout)`). @@ -96,7 +96,7 @@ trait AskSupport { * Sends a message asynchronously and returns a [[akka.dispatch.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the + * will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in * `Await.result(..., timeout)`). @@ -126,7 +126,7 @@ trait AskSupport { * Sends a message asynchronously and returns a [[akka.dispatch.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the + * will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in * `Await.result(..., timeout)`). diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 8b441f3d5b..9c8b6ae5ff 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -18,7 +18,7 @@ trait GracefulStopSupport { * which should only be done outside of the ActorSystem as blocking inside Actors is discouraged. * * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] - * is completed with failure [[akka.actor.ActorTimeoutException]]. + * is completed with failure [[akka.pattern.AskTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { if (target.isTerminated) { diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index b58e9a8fc1..853b46e318 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -18,7 +18,7 @@ object Patterns { * Sends a message asynchronously and returns a [[akka.dispatch.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the + * will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in * `Await.result(..., timeout)`). @@ -49,7 +49,7 @@ object Patterns { * Sends a message asynchronously and returns a [[akka.dispatch.Future]] * holding the eventual reply message; this means that the target actor * needs to send the result to the `sender` reference provided. The Future - * will be completed with an [[akka.actor.AskTimeoutException]] after the + * will be completed with an [[akka.pattern.AskTimeoutException]] after the * given timeout has expired; this is independent from any timeout applied * while awaiting a result for this future (i.e. in * `Await.result(..., timeout)`). @@ -100,7 +100,7 @@ object Patterns { * Useful when you need to wait for termination or compose ordered termination of several actors. * * If the target actor isn't terminated within the timeout the [[akka.dispatch.Future]] - * is completed with failure [[akka.actor.ActorTimeoutException]]. + * is completed with failure [[akka.pattern.AskTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration, system: ActorSystem): Future[java.lang.Boolean] = scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index 65ff37c10e..146131f61e 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -36,7 +36,7 @@ import static akka.pattern.Patterns.gracefulStop; import akka.dispatch.Future; import akka.dispatch.Await; import akka.util.Duration; -import akka.actor.ActorTimeoutException; +import akka.pattern.AskTimeoutException; //#import-gracefulStop //#import-askPipe @@ -207,7 +207,7 @@ public class UntypedActorDocTestBase { Future stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system); Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); // the actor has been stopped - } catch (ActorTimeoutException e) { + } catch (AskTimeoutException e) { // the actor wasn't stopped within 5 seconds } //#gracefulStop diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 0bc540f970..8aed17605c 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -326,14 +326,13 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#gracefulStop import akka.pattern.gracefulStop import akka.dispatch.Await - import akka.actor.ActorTimeoutException try { val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system) Await.result(stopped, 6 seconds) // the actor has been stopped } catch { - case e: ActorTimeoutException ⇒ // the actor wasn't stopped within 5 seconds + case e: akka.pattern.AskTimeoutException ⇒ // the actor wasn't stopped within 5 seconds } //#gracefulStop } diff --git a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala index 9c019a56a5..c76a5a701c 100644 --- a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala @@ -12,7 +12,7 @@ import akka.util.duration._ import akka.util.Timeout import akka.testkit._ import scala.concurrent.stm._ -import akka.pattern.ask +import akka.pattern.{ AskTimeoutException, ask } object CoordinatedIncrement { @@ -96,7 +96,7 @@ class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) wit val ignoreExceptions = Seq( EventFilter[ExpectedFailureException](), EventFilter[CoordinatedTransactionException](), - EventFilter[ActorTimeoutException]()) + EventFilter[AskTimeoutException]()) filterEvents(ignoreExceptions) { val (counters, failer) = actorOfs val coordinated = Coordinated() diff --git a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala index 4f7fc89c14..9deee7b9cc 100644 --- a/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/FickleFriendsSpec.scala @@ -15,7 +15,7 @@ import akka.testkit.TestEvent.Mute import scala.concurrent.stm._ import scala.util.Random.{ nextInt ⇒ random } import java.util.concurrent.CountDownLatch -import akka.pattern.ask +import akka.pattern.{ AskTimeoutException, ask } object FickleFriends { case class FriendlyIncrement(friends: Seq[ActorRef], timeout: Timeout, latch: CountDownLatch) @@ -120,7 +120,7 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { val ignoreExceptions = Seq( EventFilter[ExpectedFailureException](), EventFilter[CoordinatedTransactionException](), - EventFilter[ActorTimeoutException]()) + EventFilter[AskTimeoutException]()) system.eventStream.publish(Mute(ignoreExceptions)) val (counters, coordinator) = actorOfs val latch = new CountDownLatch(1) diff --git a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala index 1954c9a13b..df9723ffd2 100644 --- a/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/TransactorSpec.scala @@ -10,7 +10,7 @@ import akka.util.duration._ import akka.util.Timeout import akka.testkit._ import scala.concurrent.stm._ -import akka.pattern.ask +import akka.pattern.{ AskTimeoutException, ask } object TransactorIncrement { case class Increment(friends: Seq[ActorRef], latch: TestLatch) @@ -105,7 +105,7 @@ class TransactorSpec extends AkkaSpec { val ignoreExceptions = Seq( EventFilter[ExpectedFailureException](), EventFilter[CoordinatedTransactionException](), - EventFilter[ActorTimeoutException]()) + EventFilter[AskTimeoutException]()) filterEvents(ignoreExceptions) { val (counters, failer) = createTransactors val failLatch = TestLatch(numCounters) From 07bf11d326132745fa1215570b26c12f7e49dee7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 15:40:51 +0200 Subject: [PATCH 05/12] Removing debug equipment left inside the patient. --- akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 9c8b6ae5ff..5f78e8ba27 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -27,7 +27,6 @@ trait GracefulStopSupport { case e: ExtendedActorSystem ⇒ val ref = PromiseActorRef(e.provider, Timeout(timeout)) e.deathWatch.subscribe(ref, target) - ref.result onComplete { case x ⇒ println(x) } ref.result onComplete { case Right(Terminated(`target`)) ⇒ () // Ignore case _ ⇒ e.deathWatch.unsubscribe(ref, target) From 4fb4903225e9b7f0d770d8812fb0a12f63c1bd77 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 16:41:19 +0200 Subject: [PATCH 06/12] Further work on binary compatibility --- .../main/scala/akka/pattern/AskSupport.scala | 15 +++-- .../akka/routing/ConnectionManager.scala | 8 +-- .../scala/akka/routing/ConsistentHash.scala | 60 +++++++++---------- .../src/main/scala/akka/routing/Routing.scala | 2 - 4 files changed, 42 insertions(+), 43 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index cfaa0a182b..a20baaf533 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -157,6 +157,8 @@ trait AskSupport { /** * Akka private optimized representation of the temporary actor spawned to * receive the reply to an "ask" operation. + * + * INTERNAL API */ private[akka] final class PromiseActorRef private (val provider: ActorRefProvider, val result: Promise[Any]) extends MinimalActorRef { @@ -182,14 +184,12 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset) @inline - private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = - Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) + private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) @inline - private def setState(newState: AnyRef): Unit = - Unsafe.instance.putObjectVolatile(this, stateOffset, newState) + private def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState) - override def getParent = provider.tempContainer + override def getParent: InternalActorRef = provider.tempContainer /** * Contract of this method: @@ -234,7 +234,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide case _ ⇒ } - override def isTerminated = state match { + override def isTerminated: Boolean = state match { case Stopped | _: StoppedWithPath ⇒ true case _ ⇒ false } @@ -263,6 +263,9 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide } } +/** + * INTERNAL API + */ private[akka] object PromiseActorRef { private case object Registering private case object Stopped diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index 3136a2342d..9029c1f78b 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -10,10 +10,8 @@ import akka.actor._ * An Iterable that also contains a version. */ trait VersionedIterable[A] { - val version: Long - + def version: Long def iterable: Iterable[A] - def apply(): Iterable[A] = iterable } @@ -42,7 +40,7 @@ trait ConnectionManager { /** * Shuts the connection manager down, which stops all managed actors */ - def shutdown() + def shutdown(): Unit /** * Returns a VersionedIterator containing all connected ActorRefs at some moment in time. Since there is @@ -59,5 +57,5 @@ trait ConnectionManager { * * @param ref the dead */ - def remove(deadRef: ActorRef) + def remove(deadRef: ActorRef): Unit } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 130db2be3e..afa321d07d 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -23,7 +23,7 @@ class ConsistentHash[T](nodes: Seq[T], replicas: Int) { nodes.foreach(this += _) - def +=(node: T) { + def +=(node: T): Unit = { cluster += node (1 to replicas) foreach { replica ⇒ val key = hashFor((node + ":" + replica).getBytes("UTF-8")) @@ -32,7 +32,7 @@ class ConsistentHash[T](nodes: Seq[T], replicas: Int) { } } - def -=(node: T) { + def -=(node: T): Unit = { cluster -= node (1 to replicas) foreach { replica ⇒ val key = hashFor((node + ":" + replica).getBytes("UTF-8")) @@ -96,7 +96,7 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T private var hashvalue = h /** Begin a new hash using the same seed. */ - def reset() { + def reset(): Unit = { h = startHash(seed) c = hiddenMagicA k = hiddenMagicB @@ -104,7 +104,7 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T } /** Incorporate the hash value of one item. */ - def apply(t: T) { + def apply(t: T): Unit = { h = extendHash(h, t.##, c, k) c = nextMagicA(c) k = nextMagicB(k) @@ -112,7 +112,7 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T } /** Incorporate a known hash value. */ - def append(i: Int) { + def append(i: Int): Unit = { h = extendHash(h, i, c, k) c = nextMagicA(c) k = nextMagicB(k) @@ -120,14 +120,15 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T } /** Retrieve the hash value */ - def hash = { + def hash: Int = { if (!hashed) { hashvalue = finalizeHash(h) hashed = true } hashvalue } - override def hashCode = hash + + override def hashCode: Int = hash } /** @@ -143,35 +144,35 @@ class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T object MurmurHash { // Magic values used for MurmurHash's 32 bit hash. // Don't change these without consulting a hashing expert! - final private val visibleMagic = 0x971e137b - final private val hiddenMagicA = 0x95543787 - final private val hiddenMagicB = 0x2ad7eb25 - final private val visibleMixer = 0x52dce729 - final private val hiddenMixerA = 0x7b7d159c - final private val hiddenMixerB = 0x6bce6396 - final private val finalMixer1 = 0x85ebca6b - final private val finalMixer2 = 0xc2b2ae35 + final private val visibleMagic: Int = 0x971e137b + final private val hiddenMagicA: Int = 0x95543787 + final private val hiddenMagicB: Int = 0x2ad7eb25 + final private val visibleMixer: Int = 0x52dce729 + final private val hiddenMixerA: Int = 0x7b7d159c + final private val hiddenMixerB: Int = 0x6bce6396 + final private val finalMixer1: Int = 0x85ebca6b + final private val finalMixer2: Int = 0xc2b2ae35 // Arbitrary values used for hashing certain classes - final private val seedString = 0xf7ca7fd2 - final private val seedArray = 0x3c074a61 + final private val seedString: Int = 0xf7ca7fd2 + final private val seedArray: Int = 0x3c074a61 /** The first 23 magic integers from the first stream are stored here */ - val storedMagicA = + val storedMagicA: Array[Int] = Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray /** The first 23 magic integers from the second stream are stored here */ - val storedMagicB = + val storedMagicB: Array[Int] = Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray /** Begin a new hash with a seed value. */ - def startHash(seed: Int) = seed ^ visibleMagic + def startHash(seed: Int): Int = seed ^ visibleMagic /** The initial magic integers in the first stream. */ - def startMagicA = hiddenMagicA + def startMagicA: Int = hiddenMagicA /** The initial magic integer in the second stream. */ - def startMagicB = hiddenMagicB + def startMagicB: Int = hiddenMagicB /** * Incorporates a new value into an existing hash. @@ -182,18 +183,17 @@ object MurmurHash { * @param magicB a magic integer from a different stream * @return the updated hash value */ - def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int) = { + def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int = (hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer - } /** Given a magic integer from the first stream, compute the next */ - def nextMagicA(magicA: Int) = magicA * 5 + hiddenMixerA + def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA /** Given a magic integer from the second stream, compute the next */ - def nextMagicB(magicB: Int) = magicB * 5 + hiddenMixerB + def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB /** Once all hashes have been incorporated, this performs a final mixing */ - def finalizeHash(hash: Int) = { + def finalizeHash(hash: Int): Int = { var i = (hash ^ (hash >>> 16)) i *= finalMixer1 i ^= (i >>> 13) @@ -203,7 +203,7 @@ object MurmurHash { } /** Compute a high-quality hash of an array */ - def arrayHash[@specialized T](a: Array[T]) = { + def arrayHash[@specialized T](a: Array[T]): Int = { var h = startHash(a.length * seedArray) var c = hiddenMagicA var k = hiddenMagicB @@ -218,7 +218,7 @@ object MurmurHash { } /** Compute a high-quality hash of a string */ - def stringHash(s: String) = { + def stringHash(s: String): Int = { var h = startHash(s.length * seedString) var c = hiddenMagicA var k = hiddenMagicB @@ -239,7 +239,7 @@ object MurmurHash { * where the order of appearance of elements does not matter. * This is useful for hashing sets, for example. */ - def symmetricHash[T](xs: TraversableOnce[T], seed: Int) = { + def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = { var a, b, n = 0 var c = 1 xs.foreach(i ⇒ { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 211ef202f7..c3db8293d2 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -8,12 +8,10 @@ import akka.util.Duration import akka.util.duration._ import akka.ConfigurationException import akka.pattern.pipe -import akka.pattern.AskSupport import com.typesafe.config.Config import scala.collection.JavaConversions.iterableAsScalaIterable import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import akka.jsr166y.ThreadLocalRandom import akka.util.Unsafe import akka.dispatch.Dispatchers From 134f1a19a50331acabac6b84eb3599d6edc24303 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 16:41:39 +0200 Subject: [PATCH 07/12] Reworking Listeners so that senders can be supplied --- .../main/scala/akka/routing/Listeners.scala | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/Listeners.scala b/akka-actor/src/main/scala/akka/routing/Listeners.scala index 39fbf6355d..5ac02e2945 100644 --- a/akka-actor/src/main/scala/akka/routing/Listeners.scala +++ b/akka-actor/src/main/scala/akka/routing/Listeners.scala @@ -5,8 +5,7 @@ package akka.routing import akka.actor.{ Actor, ActorRef } -import java.util.concurrent.ConcurrentSkipListSet -import scala.collection.JavaConversions._ +import java.util.{ Set, TreeSet } sealed trait ListenerMessage case class Listen(listener: ActorRef) extends ListenerMessage @@ -25,13 +24,29 @@ case class WithListeners(f: (ActorRef) ⇒ Unit) extends ListenerMessage * Send WithListeners(fun) to traverse the current listeners. */ trait Listeners { self: Actor ⇒ - protected val listeners = new ConcurrentSkipListSet[ActorRef] + protected val listeners: Set[ActorRef] = new TreeSet[ActorRef] + /** + * Chain this into the receive function. + * + * {{ def receive = listenerManagement orElse … }} + */ protected def listenerManagement: Actor.Receive = { - case Listen(l) ⇒ listeners add l - case Deafen(l) ⇒ listeners remove l - case WithListeners(f) ⇒ listeners foreach f + case Listen(l) ⇒ listeners add l + case Deafen(l) ⇒ listeners remove l + case WithListeners(f) ⇒ + val i = listeners.iterator + while (i.hasNext) f(i.next) } - protected def gossip(msg: Any) = listeners foreach (_ ! msg) + /** + * Sends the supplied message to all current listeners using the provided sender as sender. + * + * @param msg + * @param sender + */ + protected def gossip(msg: Any)(implicit sender: ActorRef = null): Unit = { + val i = listeners.iterator + while (i.hasNext) i.next ! msg + } } From 94e71b7a18f05eb85bbe0a30f3364d999c3575b6 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 18 May 2012 19:25:43 +0200 Subject: [PATCH 08/12] Huge refactor + preparing for binary compatibility, last stretch for akka-actor.jar... --- .../src/main/scala/akka/AkkaException.scala | 37 ++- .../src/main/scala/akka/experimental.scala | 19 -- .../src/main/scala/akka/routing/Routing.scala | 127 +++----- .../akka/serialization/Serialization.scala | 12 +- .../scala/akka/serialization/Serializer.scala | 1 - .../akka/util/BoundedBlockingQueue.scala | 8 +- .../src/main/scala/akka/util/ByteString.scala | 48 ++- .../util/ClassLoaderObjectInputStream.scala | 7 + .../src/main/scala/akka/util/Convert.scala | 2 +- .../src/main/scala/akka/util/Crypt.scala | 4 +- .../src/main/scala/akka/util/Duration.scala | 276 ++++++------------ .../src/main/scala/akka/util/Helpers.scala | 17 +- .../src/main/scala/akka/util/Index.scala | 4 +- .../src/main/scala/akka/util/LockUtil.scala | 20 +- .../src/main/scala/akka/util/Reflect.scala | 4 +- .../src/main/scala/akka/util/Unsafe.java | 3 + .../main/scala/akka/util/cps/package.scala | 1 + .../scala/akka/util/duration/package.scala | 36 +-- 18 files changed, 218 insertions(+), 408 deletions(-) delete mode 100644 akka-actor/src/main/scala/akka/experimental.scala diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 79d78b9d39..e5b0cb6c80 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -5,19 +5,26 @@ package akka object AkkaException { - + //FIXME DOC def toStringWithStackTrace(throwable: Throwable): String = throwable match { case null ⇒ "Unknown Throwable: was 'null'" case ae: AkkaException ⇒ ae.toLongString case e ⇒ "%s:%s\n%s" format (e.getClass.getName, e.getMessage, stackTraceToString(e)) } - def stackTraceToString(throwable: Throwable): String = { - val trace = throwable.getStackTrace - val sb = new StringBuilder - for (i ← 0 until trace.length) - sb.append("\tat %s\n" format trace(i)) - sb.toString + /** + * Returns the given Throwables stack trace as a String, or the empty String if no trace is found + * @param throwable + * @return + */ + def stackTraceToString(throwable: Throwable): String = throwable.getStackTrace match { + case null ⇒ "" + case x if x.length == 0 ⇒ "" + case trace ⇒ + val sb = new StringBuilder + for (i ← 0 until trace.length) + sb.append("\tat %s\n" format trace(i)) + sb.toString } } @@ -32,17 +39,15 @@ object AkkaException { */ //TODO add @SerialVersionUID(1L) when SI-4804 is fixed class AkkaException(message: String = "", cause: Throwable = null) extends RuntimeException(message, cause) with Serializable { - lazy val uuid = java.util.UUID.randomUUID().toString - - override lazy val toString = - "%s:%s\n[%s]".format(getClass.getName, message, uuid) - - lazy val toLongString = - "%s:%s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString) - def this(msg: String) = this(msg, null) - def stackTraceToString = AkkaException.stackTraceToString(this) + lazy val uuid = java.util.UUID.randomUUID().toString + + override def toString: String = "%s:%s\n[%s]".format(getClass.getName, message, uuid) + + def toLongString: String = "%s:%s\n[%s]\n%s".format(getClass.getName, message, uuid, stackTraceToString) + + def stackTraceToString: String = AkkaException.stackTraceToString(this) } /** diff --git a/akka-actor/src/main/scala/akka/experimental.scala b/akka-actor/src/main/scala/akka/experimental.scala deleted file mode 100644 index aef3cb5c85..0000000000 --- a/akka-actor/src/main/scala/akka/experimental.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka - -import annotation.target._ - -/** - * This annotation marks a feature which is not yet considered stable and may - * change or be removed in a future release. - * - * @since 1.2 - */ -@getter -@setter -@beanGetter -@beanSetter -final class experimental(since: String) extends annotation.StaticAnnotation diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c3db8293d2..f27919d316 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -47,12 +47,11 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup ref: InternalActorRef, props: Props, supervisor: InternalActorRef, - receiveTimeout: Option[Duration]): ActorCell = - { - val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout) - Unsafe.instance.monitorEnter(cell) - cell - } + receiveTimeout: Option[Duration]): ActorCell = { + val cell = super.newActorCell(system, ref, props, supervisor, receiveTimeout) + Unsafe.instance.monitorEnter(cell) + cell + } private[akka] val routerConfig = _props.routerConfig private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) @@ -303,8 +302,8 @@ trait Router extends Actor { final def receive = ({ case Router.Resize ⇒ - try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) - finally assert(ref.resizeInProgress.getAndSet(false)) + val ab = ref.resizeInProgress + if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) finally ab.set(false) case Terminated(child) ⇒ ref.removeRoutees(IndexedSeq(child)) @@ -319,6 +318,9 @@ trait Router extends Actor { } } +/** + * INTERNAL API + */ private object Router { case object Resize @@ -372,9 +374,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef) //TODO add @SerialVersionUID(1L) when SI-4804 is fixed abstract class NoRouter extends RouterConfig case object NoRouter extends NoRouter { - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null + def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null // FIXME, null, really?? def routerDispatcher: String = "" - def supervisorStrategy = null + def supervisorStrategy = null // FIXME null, really?? override def withFallback(other: RouterConfig): RouterConfig = other /** @@ -404,9 +406,7 @@ case object FromConfig extends FromConfig { //TODO add @SerialVersionUID(1L) when SI-4804 is fixed class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends RouterConfig - with Product - with Serializable - with Equals { + with Serializable { def this() = this(Dispatchers.DefaultDispatcherId) @@ -414,38 +414,6 @@ class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) throw new ConfigurationException("router " + routeeProvider.context.self + " needs external configuration from file (e.g. application.conf)") def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy - - // open-coded case class to preserve binary compatibility, all deprecated for 2.1 - @deprecated("FromConfig does not make sense as case class", "2.0.1") - override def productPrefix = "FromConfig" - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - def productArity = 1 - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - def productElement(x: Int) = x match { - case 0 ⇒ routerDispatcher - case _ ⇒ throw new IndexOutOfBoundsException(x.toString) - } - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - def copy(d: String = Dispatchers.DefaultDispatcherId): FromConfig = new FromConfig(d) - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - def canEqual(o: Any) = o.isInstanceOf[FromConfig] - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - override def hashCode = ScalaRunTime._hashCode(this) - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - override def toString = "FromConfig(" + routerDispatcher + ")" - - @deprecated("FromConfig does not make sense as case class", "2.0.1") - override def equals(other: Any): Boolean = other match { - case FromConfig(x) ⇒ x == routerDispatcher - case _ ⇒ false - } - } object RoundRobinRouter { @@ -510,9 +478,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int) = { - this(nrOfInstances = nr) - } + def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. @@ -520,9 +486,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(routeePaths: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(routeePaths)) - } + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. @@ -533,13 +497,13 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = /** * Java API for setting routerDispatcher */ - def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + def withDispatcher(dispatcherId: String): RoundRobinRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ - def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) + def withSupervisorStrategy(strategy: SupervisorStrategy): RoundRobinRouter = copy(supervisorStrategy = strategy) } trait RoundRobinLike { this: RouterConfig ⇒ @@ -630,9 +594,7 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int) = { - this(nrOfInstances = nr) - } + def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. @@ -640,9 +602,7 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(routeePaths: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(routeePaths)) - } + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. @@ -653,13 +613,13 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, /** * Java API for setting routerDispatcher */ - def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + def withDispatcher(dispatcherId: String): RandomRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ - def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) + def withSupervisorStrategy(strategy: SupervisorStrategy): RandomRouter = copy(supervisorStrategy = strategy) } trait RandomLike { this: RouterConfig ⇒ @@ -756,9 +716,7 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int) = { - this(nrOfInstances = nr) - } + def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. @@ -766,9 +724,7 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(routeePaths: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(routeePaths)) - } + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. @@ -779,19 +735,16 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin /** * Java API for setting routerDispatcher */ - def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + def withDispatcher(dispatcherId: String): SmallestMailboxRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ - def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) + def withSupervisorStrategy(strategy: SupervisorStrategy): SmallestMailboxRouter = copy(supervisorStrategy = strategy) } trait SmallestMailboxLike { this: RouterConfig ⇒ - - import java.security.SecureRandom - def nrOfInstances: Int def routees: Iterable[String] @@ -954,9 +907,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int) = { - this(nrOfInstances = nr) - } + def this(nr: Int) = this(nrOfInstances = nr) /** * Constructor that sets the routees to be used. @@ -964,9 +915,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(routeePaths: java.lang.Iterable[String]) = { - this(routees = iterableAsScalaIterable(routeePaths)) - } + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) /** * Constructor that sets the resizer to be used. @@ -977,13 +926,13 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N /** * Java API for setting routerDispatcher */ - def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId) + def withDispatcher(dispatcherId: String): BroadcastRouter = copy(routerDispatcher = dispatcherId) /** * Java API for setting the supervisor strategy to be used for the “head” * Router actor. */ - def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) + def withSupervisorStrategy(strategy: SupervisorStrategy): BroadcastRouter = copy(supervisorStrategy = strategy) } trait BroadcastLike { this: RouterConfig ⇒ @@ -1069,9 +1018,7 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int, w: Duration) = { - this(nrOfInstances = nr, within = w) - } + def this(nr: Int, w: Duration) = this(nrOfInstances = nr, within = w) /** * Constructor that sets the routees to be used. @@ -1079,9 +1026,8 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * @param routeePaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(routeePaths: java.lang.Iterable[String], w: Duration) = { + def this(routeePaths: java.lang.Iterable[String], w: Duration) = this(routees = iterableAsScalaIterable(routeePaths), within = w) - } /** * Constructor that sets the resizer to be used. @@ -1150,7 +1096,7 @@ trait Resizer { * This method is invoked only in the context of the Router actor in order to safely * create/stop children. */ - def resize(props: Props, routeeProvider: RouteeProvider) + def resize(props: Props, routeeProvider: RouteeProvider): Unit } case object DefaultResizer { @@ -1166,6 +1112,7 @@ case object DefaultResizer { messagesPerResize = resizerConfig.getInt("messages-per-resize")) } +//FIXME DOCUMENT ME case class DefaultResizer( /** * The fewest number of routees the router should ever have. @@ -1240,7 +1187,7 @@ case class DefaultResizer( def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0) - def resize(props: Props, routeeProvider: RouteeProvider) { + def resize(props: Props, routeeProvider: RouteeProvider): Unit = { val currentRoutees = routeeProvider.routees val requestedCapacity = capacity(currentRoutees) @@ -1258,7 +1205,7 @@ case class DefaultResizer( * Give concurrent messages a chance to be placed in mailbox before * sending PoisonPill. */ - protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]) { + protected def delayedStop(scheduler: Scheduler, abandon: IndexedSeq[ActorRef]): Unit = { if (abandon.nonEmpty) { if (stopDelay <= Duration.Zero) { abandon foreach (_ ! PoisonPill) @@ -1327,9 +1274,7 @@ case class DefaultResizer( * @param capacity current number of routees * @return proposed change in the capacity */ - def filter(pressure: Int, capacity: Int): Int = { - rampup(pressure, capacity) + backoff(pressure, capacity) - } + def filter(pressure: Int, capacity: Int): Int = rampup(pressure, capacity) + backoff(pressure, capacity) /** * Computes a proposed positive (or zero) capacity delta using diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 03d03dc785..7355e4f7fb 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -14,8 +14,6 @@ import akka.util.NonFatal import scala.collection.mutable.ArrayBuffer import java.io.NotSerializableException -case class NoSerializerFoundException(m: String) extends AkkaException(m) - object Serialization { /** @@ -120,9 +118,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { possibilities(0)._2 } serializerMap.putIfAbsent(clazz, ser) match { - case null ⇒ - log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName) - ser + case null ⇒ log.debug("Using serializer[{}] for message [{}]", ser.getClass.getName, clazz.getName); ser case some ⇒ some } case ser ⇒ ser @@ -140,10 +136,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) * By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer */ - private val serializers: Map[String, Serializer] = { - for ((k: String, v: String) ← settings.Serializers) - yield k -> serializerOf(v).fold(throw _, identity) - } + private val serializers: Map[String, Serializer] = + for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity) /** * bindings is a Seq of tuple representing the mapping from Class to Serializer. diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 5696201f62..f6300ca998 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -6,7 +6,6 @@ package akka.serialization import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } import akka.util.ClassLoaderObjectInputStream -import akka.actor.DynamicAccess import akka.actor.ExtendedActorSystem import scala.util.DynamicVariable diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index 7eb90b8ef0..c7c8308de0 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -8,6 +8,12 @@ import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{ TimeUnit, BlockingQueue } import java.util.{ AbstractQueue, Queue, Collection, Iterator } +/** + * BoundedBlockingQueue wraps any Queue and turns the result into a BlockingQueue with a limited capacity + * @param maxCapacity - the maximum capacity of this Queue, needs to be > 0 + * @param backing - the backing Queue + * @tparam E - The type of the contents of this Queue + */ class BoundedBlockingQueue[E <: AnyRef]( val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { @@ -22,7 +28,7 @@ class BoundedBlockingQueue[E <: AnyRef]( require(maxCapacity > 0) } - protected val lock = new ReentrantLock(false) + protected val lock = new ReentrantLock(false) // TODO might want to switch to ReentrantReadWriteLock private val notEmpty = lock.newCondition() private val notFull = lock.newCondition() diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 6d869826a8..ac074d5b28 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -11,6 +11,7 @@ import scala.collection.mutable.{ Builder, WrappedArray } import scala.collection.immutable.{ IndexedSeq, VectorBuilder } import scala.collection.generic.CanBuildFrom +//FIXME MORE DOCS object ByteString { /** @@ -53,15 +54,16 @@ object ByteString { val empty: ByteString = CompactByteString(Array.empty[Byte]) - def newBuilder = new ByteStringBuilder + def newBuilder: ByteStringBuilder = new ByteStringBuilder - implicit def canBuildFrom = new CanBuildFrom[TraversableOnce[Byte], Byte, ByteString] { - def apply(from: TraversableOnce[Byte]) = newBuilder - def apply() = newBuilder - } + implicit val canBuildFrom: CanBuildFrom[TraversableOnce[Byte], Byte, ByteString] = + new CanBuildFrom[TraversableOnce[Byte], Byte, ByteString] { + def apply(ignore: TraversableOnce[Byte]): ByteStringBuilder = newBuilder + def apply(): ByteStringBuilder = newBuilder + } private[akka] object ByteString1C { - def apply(bytes: Array[Byte]) = new ByteString1C(bytes) + def apply(bytes: Array[Byte]): ByteString1C = new ByteString1C(bytes) } /** @@ -71,7 +73,7 @@ object ByteString { final class ByteString1C private (private val bytes: Array[Byte]) extends CompactByteString { def apply(idx: Int): Byte = bytes(idx) - override def length = bytes.length + override def length: Int = bytes.length def toArray: Array[Byte] = bytes.clone @@ -81,13 +83,11 @@ object ByteString { def compact: ByteString1C = this - def asByteBuffer: ByteBuffer = - toByteString1.asByteBuffer + def asByteBuffer: ByteBuffer = toByteString1.asByteBuffer def decodeString(charset: String): String = new String(bytes, charset) - def ++(that: ByteString): ByteString = - if (!that.isEmpty) toByteString1 ++ that else this + def ++(that: ByteString): ByteString = if (!that.isEmpty) toByteString1 ++ that else this override def slice(from: Int, until: Int): ByteString = if ((from != 0) || (until != length)) toByteString1.slice(from, until) @@ -96,12 +96,11 @@ object ByteString { override def copyToArray[A >: Byte](xs: Array[A], start: Int, len: Int): Unit = toByteString1.copyToArray(xs, start, len) - def copyToBuffer(buffer: ByteBuffer): Int = - toByteString1.copyToBuffer(buffer) + def copyToBuffer(buffer: ByteBuffer): Int = toByteString1.copyToBuffer(buffer) } private[akka] object ByteString1 { - def apply(bytes: Array[Byte]) = new ByteString1(bytes) + def apply(bytes: Array[Byte]): ByteString1 = new ByteString1(bytes) } /** @@ -113,7 +112,7 @@ object ByteString { def apply(idx: Int): Byte = bytes(checkRangeConvert(idx)) - private def checkRangeConvert(index: Int) = { + private def checkRangeConvert(index: Int): Int = { if (0 <= index && length > index) index + startIndex else @@ -128,8 +127,7 @@ object ByteString { override def clone: CompactByteString = ByteString1C(toArray) - def compact: CompactByteString = - if (length == bytes.length) ByteString1C(bytes) else clone + def compact: CompactByteString = if (length == bytes.length) ByteString1C(bytes) else clone def asByteBuffer: ByteBuffer = { val buffer = ByteBuffer.wrap(bytes, startIndex, length).asReadOnlyBuffer @@ -161,7 +159,6 @@ object ByteString { if (copyLength > 0) buffer.put(bytes, startIndex, copyLength) copyLength } - } private[akka] object ByteStrings { @@ -198,10 +195,11 @@ object ByteString { } // 0: both empty, 1: 2nd empty, 2: 1st empty, 3: neither empty + // Using length to check emptiness is prohibited by law def compare(b1: ByteString, b2: ByteString): Int = - if (b1.length == 0) - if (b2.length == 0) 0 else 2 - else if (b2.length == 0) 1 else 3 + if (b1.isEmpty) + if (b2.isEmpty) 0 else 2 + else if (b2.isEmpty) 1 else 3 } @@ -439,7 +437,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { private var _tempLength = 0 private var _tempCapacity = 0 - private def clearTemp() { + private def clearTemp(): Unit = { if (_tempLength > 0) { val arr = new Array[Byte](_tempLength) Array.copy(_temp, 0, arr, 0, _tempLength) @@ -448,14 +446,14 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { } } - private def resizeTemp(size: Int) { + private def resizeTemp(size: Int): Unit = { val newtemp = new Array[Byte](size) if (_tempLength > 0) Array.copy(_temp, 0, newtemp, 0, _tempLength) _temp = newtemp _tempCapacity = _temp.length } - private def ensureTempSize(size: Int) { + private def ensureTempSize(size: Int): Unit = { if (_tempCapacity < size || _tempCapacity == 0) { var newSize = if (_tempCapacity == 0) 16 else _tempCapacity * 2 while (newSize < size) newSize *= 2 @@ -498,7 +496,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { this } - def clear() { + def clear(): Unit = { _builder.clear _length = 0 _tempLength = 0 diff --git a/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala b/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala index 3ad55d69eb..ab2514861e 100644 --- a/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala +++ b/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala @@ -6,6 +6,13 @@ package akka.util import java.io.{ InputStream, ObjectInputStream, ObjectStreamClass } +/** + * ClassLoaderObjectInputStream tries to utilize the provided ClassLoader to load Classes and falls + * back to ObjectInputStreams resolver. + * + * @param classLoader - the ClassLoader which is to be used primarily + * @param is - the InputStream that is wrapped + */ class ClassLoaderObjectInputStream(classLoader: ClassLoader, is: InputStream) extends ObjectInputStream(is) { override protected def resolveClass(objectStreamClass: ObjectStreamClass): Class[_] = try Class.forName(objectStreamClass.getName, false, classLoader) catch { diff --git a/akka-actor/src/main/scala/akka/util/Convert.scala b/akka-actor/src/main/scala/akka/util/Convert.scala index a805b17fb2..3fead7aef7 100644 --- a/akka-actor/src/main/scala/akka/util/Convert.scala +++ b/akka-actor/src/main/scala/akka/util/Convert.scala @@ -3,7 +3,7 @@ */ package akka.util - +//FIXME DOCS! object Convert { def intToBytes(value: Int): Array[Byte] = { diff --git a/akka-actor/src/main/scala/akka/util/Crypt.scala b/akka-actor/src/main/scala/akka/util/Crypt.scala index 7dd678e748..280cd90768 100644 --- a/akka-actor/src/main/scala/akka/util/Crypt.scala +++ b/akka-actor/src/main/scala/akka/util/Crypt.scala @@ -5,7 +5,7 @@ package akka.util import java.security.{ MessageDigest, SecureRandom } - +//FIXME DOCS object Crypt { val hex = "0123456789ABCDEF" val lineSeparator = System.getProperty("line.separator") @@ -32,7 +32,7 @@ object Crypt { } def hexify(bytes: Array[Byte]): String = { - val builder = new StringBuilder + val builder = new StringBuilder(bytes.length * 2) bytes.foreach { byte ⇒ builder.append(hex.charAt((byte & 0xF0) >> 4)).append(hex.charAt(byte & 0xF)) } builder.toString } diff --git a/akka-actor/src/main/scala/akka/util/Duration.scala b/akka-actor/src/main/scala/akka/util/Duration.scala index a213fe1869..b37cf24c3b 100644 --- a/akka-actor/src/main/scala/akka/util/Duration.scala +++ b/akka-actor/src/main/scala/akka/util/Duration.scala @@ -110,6 +110,7 @@ object Duration { } val Zero: FiniteDuration = new FiniteDuration(0, NANOSECONDS) + val Undefined: Duration = new Duration with Infinite { override def toString = "Duration.Undefined" override def equals(other: Any) = other.asInstanceOf[AnyRef] eq this @@ -166,8 +167,8 @@ object Duration { * including itself. */ val Inf: Duration = new Duration with Infinite { - override def toString = "Duration.Inf" - def compare(other: Duration) = if (other eq this) 0 else 1 + override def toString: String = "Duration.Inf" + def compare(other: Duration): Int = if (other eq this) 0 else 1 def unary_- : Duration = MinusInf } @@ -177,7 +178,7 @@ object Duration { */ val MinusInf: Duration = new Duration with Infinite { override def toString = "Duration.MinusInf" - def compare(other: Duration) = if (other eq this) 0 else -1 + def compare(other: Duration): Int = if (other eq this) 0 else -1 def unary_- : Duration = Inf } @@ -188,7 +189,7 @@ object Duration { def parse(s: String): Duration = unapply(s).get implicit object DurationIsOrdered extends Ordering[Duration] { - def compare(a: Duration, b: Duration) = a compare b + def compare(a: Duration, b: Duration): Int = a compare b } } @@ -263,17 +264,17 @@ abstract class Duration extends Serializable with Ordered[Duration] { def fromNow: Deadline = Deadline.now + this // Java API - def lt(other: Duration) = this < other - def lteq(other: Duration) = this <= other - def gt(other: Duration) = this > other - def gteq(other: Duration) = this >= other - def plus(other: Duration) = this + other - def minus(other: Duration) = this - other - def mul(factor: Double) = this * factor - def div(factor: Double) = this / factor - def div(other: Duration) = this / other - def neg() = -this - def isFinite() = finite_? + def lt(other: Duration): Boolean = this < other + def lteq(other: Duration): Boolean = this <= other + def gt(other: Duration): Boolean = this > other + def gteq(other: Duration): Boolean = this >= other + def plus(other: Duration): Duration = this + other + def minus(other: Duration): Duration = this - other + def mul(factor: Double): Duration = this * factor + def div(factor: Double): Duration = this / factor + def div(other: Duration): Double = this / other + def neg(): Duration = -this + def isFinite(): Boolean = finite_? } object FiniteDuration { @@ -349,31 +350,19 @@ class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration { else c } - def +(other: Duration) = { - if (!other.finite_?) { - other - } else { - fromNanos(add(toNanos, other.toNanos)) - } - } + def +(other: Duration): Duration = if (!other.finite_?) other else fromNanos(add(toNanos, other.toNanos)) - def -(other: Duration) = { - if (!other.finite_?) { - other - } else { - fromNanos(add(toNanos, -other.toNanos)) - } - } + def -(other: Duration): Duration = if (!other.finite_?) other else fromNanos(add(toNanos, -other.toNanos)) - def *(factor: Double) = fromNanos(long2double(toNanos) * factor) + def *(factor: Double): FiniteDuration = fromNanos(long2double(toNanos) * factor) - def /(factor: Double) = fromNanos(long2double(toNanos) / factor) + def /(factor: Double): FiniteDuration = fromNanos(long2double(toNanos) / factor) - def /(other: Duration) = if (other.finite_?) long2double(toNanos) / other.toNanos else 0 + def /(other: Duration): Double = if (other.finite_?) long2double(toNanos) / other.toNanos else 0 - def unary_- = Duration(-length, unit) + def unary_- : FiniteDuration = Duration(-length, unit) - def finite_? = true + def finite_? : Boolean = true override def equals(other: Any) = (other.asInstanceOf[AnyRef] eq this) || other.isInstanceOf[FiniteDuration] && @@ -385,178 +374,74 @@ class FiniteDuration(val length: Long, val unit: TimeUnit) extends Duration { } } -class DurationInt(n: Int) { +private[akka] trait DurationOps { import duration.Classifier + protected def from(timeUnit: TimeUnit): FiniteDuration + def nanoseconds: FiniteDuration = from(NANOSECONDS) + def nanos: FiniteDuration = from(NANOSECONDS) + def nanosecond: FiniteDuration = from(NANOSECONDS) + def nano: FiniteDuration = from(NANOSECONDS) - def nanoseconds = Duration(n, NANOSECONDS) - def nanos = Duration(n, NANOSECONDS) - def nanosecond = Duration(n, NANOSECONDS) - def nano = Duration(n, NANOSECONDS) + def microseconds: FiniteDuration = from(MICROSECONDS) + def micros: FiniteDuration = from(MICROSECONDS) + def microsecond: FiniteDuration = from(MICROSECONDS) + def micro: FiniteDuration = from(MICROSECONDS) - def microseconds = Duration(n, MICROSECONDS) - def micros = Duration(n, MICROSECONDS) - def microsecond = Duration(n, MICROSECONDS) - def micro = Duration(n, MICROSECONDS) + def milliseconds: FiniteDuration = from(MILLISECONDS) + def millis: FiniteDuration = from(MILLISECONDS) + def millisecond: FiniteDuration = from(MILLISECONDS) + def milli: FiniteDuration = from(MILLISECONDS) - def milliseconds = Duration(n, MILLISECONDS) - def millis = Duration(n, MILLISECONDS) - def millisecond = Duration(n, MILLISECONDS) - def milli = Duration(n, MILLISECONDS) + def seconds: FiniteDuration = from(SECONDS) + def second: FiniteDuration = from(SECONDS) - def seconds = Duration(n, SECONDS) - def second = Duration(n, SECONDS) + def minutes: FiniteDuration = from(MINUTES) + def minute: FiniteDuration = from(MINUTES) - def minutes = Duration(n, MINUTES) - def minute = Duration(n, MINUTES) + def hours: FiniteDuration = from(HOURS) + def hour: FiniteDuration = from(HOURS) - def hours = Duration(n, HOURS) - def hour = Duration(n, HOURS) + def days: FiniteDuration = from(DAYS) + def day: FiniteDuration = from(DAYS) - def days = Duration(n, DAYS) - def day = Duration(n, DAYS) + def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS)) + def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS)) + def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS)) + def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(NANOSECONDS)) - def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) + def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS)) + def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS)) + def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS)) + def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MICROSECONDS)) - def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) + def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS)) + def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS)) + def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS)) + def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MILLISECONDS)) - def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) + def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(SECONDS)) + def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(SECONDS)) - def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS)) - def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS)) + def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MINUTES)) + def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(MINUTES)) - def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES)) - def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES)) + def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(HOURS)) + def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(HOURS)) - def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS)) - def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS)) - - def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS)) - def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS)) + def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(DAYS)) + def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(from(DAYS)) } -class DurationLong(n: Long) { - import duration.Classifier - - def nanoseconds = Duration(n, NANOSECONDS) - def nanos = Duration(n, NANOSECONDS) - def nanosecond = Duration(n, NANOSECONDS) - def nano = Duration(n, NANOSECONDS) - - def microseconds = Duration(n, MICROSECONDS) - def micros = Duration(n, MICROSECONDS) - def microsecond = Duration(n, MICROSECONDS) - def micro = Duration(n, MICROSECONDS) - - def milliseconds = Duration(n, MILLISECONDS) - def millis = Duration(n, MILLISECONDS) - def millisecond = Duration(n, MILLISECONDS) - def milli = Duration(n, MILLISECONDS) - - def seconds = Duration(n, SECONDS) - def second = Duration(n, SECONDS) - - def minutes = Duration(n, MINUTES) - def minute = Duration(n, MINUTES) - - def hours = Duration(n, HOURS) - def hour = Duration(n, HOURS) - - def days = Duration(n, DAYS) - def day = Duration(n, DAYS) - - def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, NANOSECONDS)) - - def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MICROSECONDS)) - - def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MILLISECONDS)) - - def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS)) - def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, SECONDS)) - - def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES)) - def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, MINUTES)) - - def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS)) - def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, HOURS)) - - def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS)) - def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(n, DAYS)) +class DurationInt(n: Int) extends DurationOps { + override protected def from(timeUnit: TimeUnit): FiniteDuration = Duration(n, timeUnit) } -class DurationDouble(d: Double) { - import duration.Classifier +class DurationLong(n: Long) extends DurationOps { + override protected def from(timeUnit: TimeUnit): FiniteDuration = Duration(n, timeUnit) +} - def nanoseconds = Duration(d, NANOSECONDS) - def nanos = Duration(d, NANOSECONDS) - def nanosecond = Duration(d, NANOSECONDS) - def nano = Duration(d, NANOSECONDS) - - def microseconds = Duration(d, MICROSECONDS) - def micros = Duration(d, MICROSECONDS) - def microsecond = Duration(d, MICROSECONDS) - def micro = Duration(d, MICROSECONDS) - - def milliseconds = Duration(d, MILLISECONDS) - def millis = Duration(d, MILLISECONDS) - def millisecond = Duration(d, MILLISECONDS) - def milli = Duration(d, MILLISECONDS) - - def seconds = Duration(d, SECONDS) - def second = Duration(d, SECONDS) - - def minutes = Duration(d, MINUTES) - def minute = Duration(d, MINUTES) - - def hours = Duration(d, HOURS) - def hour = Duration(d, HOURS) - - def days = Duration(d, DAYS) - def day = Duration(d, DAYS) - - def nanoseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS)) - def nanos[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS)) - def nanosecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS)) - def nano[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, NANOSECONDS)) - - def microseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS)) - def micros[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS)) - def microsecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS)) - def micro[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MICROSECONDS)) - - def milliseconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS)) - def millis[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS)) - def millisecond[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS)) - def milli[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MILLISECONDS)) - - def seconds[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, SECONDS)) - def second[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, SECONDS)) - - def minutes[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MINUTES)) - def minute[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, MINUTES)) - - def hours[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, HOURS)) - def hour[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, HOURS)) - - def days[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, DAYS)) - def day[C, CC <: Classifier[C]](c: C)(implicit ev: CC): CC#R = ev.convert(Duration(d, DAYS)) +class DurationDouble(d: Double) extends DurationOps { + override protected def from(timeUnit: TimeUnit): FiniteDuration = Duration(d, timeUnit) } //TODO add @SerialVersionUID(1L) when SI-4804 is fixed @@ -565,24 +450,27 @@ case class Timeout(duration: Duration) { def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) } +/** + * A Timeout is a wrapper on top of Duration to be more specific about what the duration means. + */ object Timeout { /** * A timeout with zero duration, will cause most requests to always timeout. */ - val zero = new Timeout(Duration.Zero) + val zero: Timeout = new Timeout(Duration.Zero) /** * A Timeout with infinite duration. Will never timeout. Use extreme caution with this * as it may cause memory leaks, blocked threads, or may not even be supported by * the receiver, which would result in an exception. */ - val never = new Timeout(Duration.Inf) + val never: Timeout = new Timeout(Duration.Inf) - def apply(timeout: Long) = new Timeout(timeout) - def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) + def apply(timeout: Long): Timeout = new Timeout(timeout) + def apply(length: Long, unit: TimeUnit): Timeout = new Timeout(length, unit) - implicit def durationToTimeout(duration: Duration) = new Timeout(duration) - implicit def intToTimeout(timeout: Int) = new Timeout(timeout) - implicit def longToTimeout(timeout: Long) = new Timeout(timeout) + implicit def durationToTimeout(duration: Duration): Timeout = new Timeout(duration) + implicit def intToTimeout(timeout: Int): Timeout = new Timeout(timeout) + implicit def longToTimeout(timeout: Long): Timeout = new Timeout(timeout) } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 25cb279f2e..a3618359ac 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -45,18 +45,13 @@ object Helpers { else base64(next, sb) } - def ignore[E: Manifest](body: ⇒ Unit) { - try { - body - } catch { - case e if manifest[E].erasure.isAssignableFrom(e.getClass) ⇒ () - } - } + //FIXME docs + def ignore[E: Manifest](body: ⇒ Unit): Unit = + try body catch { case e if manifest[E].erasure.isAssignableFrom(e.getClass) ⇒ () } - def withPrintStackTraceOnError(body: ⇒ Unit) { - try { - body - } catch { + //FIXME docs + def withPrintStackTraceOnError(body: ⇒ Unit): Unit = { + try body catch { case e: Throwable ⇒ val sw = new java.io.StringWriter() var root = e diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index 1153c9e045..3289ed8f13 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -91,7 +91,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) { /** * Applies the supplied function to all keys and their values */ - def foreach(fun: (K, V) ⇒ Unit) { + def foreach(fun: (K, V) ⇒ Unit): Unit = { import scala.collection.JavaConversions._ container.entrySet foreach { e ⇒ e.getValue.foreach(fun(e.getKey, _)) } } @@ -112,7 +112,7 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) { /** * Returns the key set. */ - def keys = scala.collection.JavaConversions.collectionAsScalaIterable(container.keySet) + def keys: Iterable[K] = scala.collection.JavaConversions.collectionAsScalaIterable(container.keySet) /** * Disassociates the value of type V from the key of type K diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 14c787d3f6..da93170019 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -24,9 +24,7 @@ class Switch(startAsOn: Boolean = false) { protected def transcend(from: Boolean, action: ⇒ Unit): Boolean = synchronized { if (switch.compareAndSet(from, !from)) { - try { - action - } catch { + try action catch { case e ⇒ switch.compareAndSet(!from, from) // revert status throw e @@ -62,18 +60,12 @@ class Switch(startAsOn: Boolean = false) { /** * Executes the provided action and returns its value if the switch is IMMEDIATELY on (i.e. no lock involved) */ - def ifOnYield[T](action: ⇒ T): Option[T] = { - if (switch.get) Some(action) - else None - } + def ifOnYield[T](action: ⇒ T): Option[T] = if (switch.get) Some(action) else None /** * Executes the provided action and returns its value if the switch is IMMEDIATELY off (i.e. no lock involved) */ - def ifOffYield[T](action: ⇒ T): Option[T] = { - if (!switch.get) Some(action) - else None - } + def ifOffYield[T](action: ⇒ T): Option[T] = if (!switch.get) Some(action) else None /** * Executes the provided action and returns if the action was executed or not, if the switch is IMMEDIATELY on (i.e. no lock involved) @@ -138,15 +130,15 @@ class Switch(startAsOn: Boolean = false) { /** * Executes the given code while holding this switch’s lock, i.e. protected from concurrent modification of the switch status. */ - def locked[T](code: ⇒ T) = synchronized { code } + def locked[T](code: ⇒ T): T = synchronized { code } /** * Returns whether the switch is IMMEDIATELY on (no locking) */ - def isOn = switch.get + def isOn: Boolean = switch.get /** * Returns whether the switch is IMMEDDIATELY off (no locking) */ - def isOff = !isOn + def isOff: Boolean = !isOn } diff --git a/akka-actor/src/main/scala/akka/util/Reflect.scala b/akka-actor/src/main/scala/akka/util/Reflect.scala index 25c56a983f..3a46edeab1 100644 --- a/akka-actor/src/main/scala/akka/util/Reflect.scala +++ b/akka-actor/src/main/scala/akka/util/Reflect.scala @@ -6,8 +6,10 @@ package akka.util /** * Collection of internal reflection utilities which may or may not be * available (most services specific to HotSpot, but fails gracefully). + * + * INTERNAL API */ -object Reflect { +private[akka] object Reflect { /** * This optionally holds a function which looks N levels above itself diff --git a/akka-actor/src/main/scala/akka/util/Unsafe.java b/akka-actor/src/main/scala/akka/util/Unsafe.java index 608cb3d46e..ace3c1baac 100644 --- a/akka-actor/src/main/scala/akka/util/Unsafe.java +++ b/akka-actor/src/main/scala/akka/util/Unsafe.java @@ -7,6 +7,9 @@ package akka.util; import java.lang.reflect.Field; +/** + * INTERNAL API + */ public final class Unsafe { public final static sun.misc.Unsafe instance; static { diff --git a/akka-actor/src/main/scala/akka/util/cps/package.scala b/akka-actor/src/main/scala/akka/util/cps/package.scala index 198c2beacd..a1b4bc39eb 100644 --- a/akka-actor/src/main/scala/akka/util/cps/package.scala +++ b/akka-actor/src/main/scala/akka/util/cps/package.scala @@ -7,6 +7,7 @@ package akka.util import scala.util.continuations._ import akka.dispatch.MessageDispatcher +//FIXME Needs docs package object cps { def matchC[A, B, C, D](in: A)(pf: PartialFunction[A, B @cpsParam[C, D]]): B @cpsParam[C, D] = pf(in) diff --git a/akka-actor/src/main/scala/akka/util/duration/package.scala b/akka-actor/src/main/scala/akka/util/duration/package.scala index 7f14a0be48..6a7d28a6e6 100644 --- a/akka-actor/src/main/scala/akka/util/duration/package.scala +++ b/akka-actor/src/main/scala/akka/util/duration/package.scala @@ -5,7 +5,7 @@ package akka.util import java.util.concurrent.TimeUnit - +//FIXME Needs docs package object duration { trait Classifier[C] { type R @@ -15,38 +15,32 @@ package object duration { object span implicit object spanConvert extends Classifier[span.type] { type R = FiniteDuration - def convert(d: FiniteDuration) = d + def convert(d: FiniteDuration): FiniteDuration = d } object fromNow implicit object fromNowConvert extends Classifier[fromNow.type] { type R = Deadline - def convert(d: FiniteDuration) = Deadline.now + d + def convert(d: FiniteDuration): Deadline = Deadline.now + d } - implicit def intToDurationInt(n: Int) = new DurationInt(n) - implicit def longToDurationLong(n: Long) = new DurationLong(n) - implicit def doubleToDurationDouble(d: Double) = new DurationDouble(d) + implicit def intToDurationInt(n: Int): DurationInt = new DurationInt(n) + implicit def longToDurationLong(n: Long): DurationLong = new DurationLong(n) + implicit def doubleToDurationDouble(d: Double): DurationDouble = new DurationDouble(d) - implicit def pairIntToDuration(p: (Int, TimeUnit)) = Duration(p._1, p._2) - implicit def pairLongToDuration(p: (Long, TimeUnit)) = Duration(p._1, p._2) - implicit def durationToPair(d: Duration) = (d.length, d.unit) + implicit def pairIntToDuration(p: (Int, TimeUnit)): FiniteDuration = Duration(p._1, p._2) + implicit def pairLongToDuration(p: (Long, TimeUnit)): FiniteDuration = Duration(p._1, p._2) + implicit def durationToPair(d: Duration): (Long, TimeUnit) = (d.length, d.unit) /* * avoid reflection based invocation by using non-duck type */ - class IntMult(i: Int) { - def *(d: Duration) = d * i - } - implicit def intMult(i: Int) = new IntMult(i) + class IntMult(i: Int) { def *(d: Duration): Duration = d * i } + implicit def intMult(i: Int): IntMult = new IntMult(i) - class LongMult(l: Long) { - def *(d: Duration) = d * l - } - implicit def longMult(l: Long) = new LongMult(l) + class LongMult(l: Long) { def *(d: Duration): Duration = d * l } + implicit def longMult(l: Long): LongMult = new LongMult(l) - class DoubleMult(f: Double) { - def *(d: Duration) = d * f - } - implicit def doubleMult(f: Double) = new DoubleMult(f) + class DoubleMult(f: Double) { def *(d: Duration): Duration = d * f } + implicit def doubleMult(f: Double): DoubleMult = new DoubleMult(f) } From 5cbcb612b2469d5b140798380d8016be5a2642f2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 20 May 2012 15:56:52 +0200 Subject: [PATCH 09/12] Moving the HWT stuff from org.jboss.netty.akka.util to akka.util.internal --- .../util/internal/ConcurrentIdentityHashMap.java | 2 +- .../util/internal}/HashedWheelTimer.java | 12 +++++------- .../akka/util/internal/ReusableIterator.java | 2 +- .../akka/util/internal/SystemPropertyUtil.java | 2 +- .../akka/util => akka/util/internal}/Timeout.java | 2 +- .../akka/util => akka/util/internal}/Timer.java | 4 ++-- .../akka/util => akka/util/internal}/TimerTask.java | 6 +++--- .../src/main/scala/akka/actor/ActorSystem.scala | 5 ++--- akka-actor/src/main/scala/akka/actor/Scheduler.scala | 3 ++- 9 files changed, 18 insertions(+), 20 deletions(-) rename akka-actor/src/main/java/{org/jboss/netty => }/akka/util/internal/ConcurrentIdentityHashMap.java (99%) rename akka-actor/src/main/java/{org/jboss/netty/akka/util => akka/util/internal}/HashedWheelTimer.java (97%) rename akka-actor/src/main/java/{org/jboss/netty => }/akka/util/internal/ReusableIterator.java (95%) rename akka-actor/src/main/java/{org/jboss/netty => }/akka/util/internal/SystemPropertyUtil.java (98%) rename akka-actor/src/main/java/{org/jboss/netty/akka/util => akka/util/internal}/Timeout.java (97%) rename akka-actor/src/main/java/{org/jboss/netty/akka/util => akka/util/internal}/Timer.java (92%) rename akka-actor/src/main/java/{org/jboss/netty/akka/util => akka/util/internal}/TimerTask.java (82%) diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java b/akka-actor/src/main/java/akka/util/internal/ConcurrentIdentityHashMap.java similarity index 99% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java rename to akka-actor/src/main/java/akka/util/internal/ConcurrentIdentityHashMap.java index ff8a568d02..eb83c98f35 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java +++ b/akka-actor/src/main/java/akka/util/internal/ConcurrentIdentityHashMap.java @@ -18,7 +18,7 @@ * Expert Group and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ -package org.jboss.netty.akka.util.internal; +package akka.util.internal; import java.util.AbstractCollection; import java.util.AbstractMap; diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java similarity index 97% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java rename to akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java index 9eba51e53f..25841861c5 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java +++ b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java @@ -13,12 +13,10 @@ * License for the specific language governing permissions and limitations * under the License. */ -package org.jboss.netty.akka.util; +package akka.util.internal; import akka.event.LoggingAdapter; import akka.util.Duration; -import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; -import org.jboss.netty.akka.util.internal.ReusableIterator; import java.util.*; import java.util.concurrent.ThreadFactory; @@ -34,7 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; *

Tick Duration

* * As described with 'approximated', this timer does not execute the scheduled - * {@link TimerTask} on time. {@link org.jboss.netty.akka.util.HashedWheelTimer}, on every tick, will + * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will * check if there are any {@link TimerTask}s behind the schedule and execute * them. *

@@ -46,7 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * *

Ticks per Wheel (Wheel Size)

* - * {@link org.jboss.netty.akka.util.HashedWheelTimer} maintains a data structure called 'wheel'. + * {@link HashedWheelTimer} maintains a data structure called 'wheel'. * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash * function is 'dead line of the task'. The default number of ticks per wheel * (i.e. the size of the wheel) is 512. You could specify a larger value @@ -54,7 +52,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * *

Do not create many instances.

* - * {@link org.jboss.netty.akka.util.HashedWheelTimer} creates a new thread whenever it is instantiated and + * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and * started. Therefore, you should make sure to create only one instance and * share it across your application. One of the common mistakes, that makes * your application unresponsive, is to create a new instance in @@ -63,7 +61,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * *

Implementation Details

* - * {@link org.jboss.netty.akka.util.HashedWheelTimer} is based on + * {@link HashedWheelTimer} is based on * George Varghese and * Tony Lauck's paper, * 'Hashed diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java b/akka-actor/src/main/java/akka/util/internal/ReusableIterator.java similarity index 95% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java rename to akka-actor/src/main/java/akka/util/internal/ReusableIterator.java index 210edbe65d..8c8e5e50e5 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java +++ b/akka-actor/src/main/java/akka/util/internal/ReusableIterator.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package org.jboss.netty.akka.util.internal; +package akka.util.internal; import java.util.Iterator; diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java b/akka-actor/src/main/java/akka/util/internal/SystemPropertyUtil.java similarity index 98% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java rename to akka-actor/src/main/java/akka/util/internal/SystemPropertyUtil.java index bf3e2ac571..affef54bfc 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java +++ b/akka-actor/src/main/java/akka/util/internal/SystemPropertyUtil.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package org.jboss.netty.akka.util.internal; +package akka.util.internal; import java.util.regex.Pattern; diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java b/akka-actor/src/main/java/akka/util/internal/Timeout.java similarity index 97% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java rename to akka-actor/src/main/java/akka/util/internal/Timeout.java index dbda2110d3..a03534bb8d 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java +++ b/akka-actor/src/main/java/akka/util/internal/Timeout.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package org.jboss.netty.akka.util; +package akka.util.internal; /** * A handle associated with a {@link TimerTask} that is returned by a diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java b/akka-actor/src/main/java/akka/util/internal/Timer.java similarity index 92% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java rename to akka-actor/src/main/java/akka/util/internal/Timer.java index b5bd8c6a7c..9cb02794de 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java +++ b/akka-actor/src/main/java/akka/util/internal/Timer.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package org.jboss.netty.akka.util; +package akka.util.internal; import akka.util.Duration; import java.util.Set; @@ -45,7 +45,7 @@ public interface Timer { Timeout newTimeout(TimerTask task, Duration delay); /** - * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all + * Releases all resources acquired by this {@link Timer} and cancels all * tasks which were scheduled but not executed yet. * * @return the handles associated with the tasks which were canceled by diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java b/akka-actor/src/main/java/akka/util/internal/TimerTask.java similarity index 82% rename from akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java rename to akka-actor/src/main/java/akka/util/internal/TimerTask.java index 3d0190d8f5..673dde67c7 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java +++ b/akka-actor/src/main/java/akka/util/internal/TimerTask.java @@ -13,11 +13,11 @@ * License for the specific language governing permissions and limitations * under the License. */ -package org.jboss.netty.akka.util; +package akka.util.internal; /** * A task which is executed after the delay specified with - * {@link Timer#newTimeout(org.jboss.netty.akka.util.TimerTask, long, java.util.concurrent.TimeUnit)} + * {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)} * . * * @author The Netty Project @@ -28,7 +28,7 @@ public interface TimerTask { /** * Executed after the delay specified with - * {@link Timer#newTimeout(org.jboss.netty.akka.util.TimerTask, long, java.util.concurrent.TimeUnit)} + * {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)} * . * * @param timeout diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index a1d30ddbc6..ab2996f0a7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -7,16 +7,15 @@ package akka.actor import akka.event._ import akka.dispatch._ import akka.pattern.ask -import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.TimeUnit.MILLISECONDS import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec -import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import java.io.Closeable import akka.dispatch.Await.{ Awaitable, CanAwait } import akka.util._ +import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import collection.immutable.Stack import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } +import java.util.concurrent.TimeUnit.MILLISECONDS object ActorSystem { diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 91e54a592d..8ad3d8ee98 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -5,12 +5,13 @@ package akka.actor import akka.util.Duration -import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTimeout, Timer } +import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTimeout, Timer } import akka.event.LoggingAdapter import akka.dispatch.MessageDispatcher import java.io.Closeable import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec +import akka.util.internal._ //#scheduler /** From 1a3329baa2c9376288d2b534c6935ae870df26a4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 20 May 2012 16:00:24 +0200 Subject: [PATCH 10/12] #2091 - Adding a small intro to the Microkernel docs to state what the purpose of it is. --- akka-docs/java/microkernel.rst | 4 ++++ akka-docs/scala/microkernel.rst | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/akka-docs/java/microkernel.rst b/akka-docs/java/microkernel.rst index 551c118e94..12afbc93c9 100644 --- a/akka-docs/java/microkernel.rst +++ b/akka-docs/java/microkernel.rst @@ -4,6 +4,10 @@ Microkernel (Java) ================== +The purpose of the Akka Microkernel is to offer a bundling mechanism so that you can distribute +an Akka application as a single payload, without the need to run in a Java Application Server or manually +having to create a launcher script. + The Akka Microkernel is included in the Akka download found at `downloads`_. .. _downloads: http://akka.io/downloads diff --git a/akka-docs/scala/microkernel.rst b/akka-docs/scala/microkernel.rst index 8fb1aec2c2..df0e623eee 100644 --- a/akka-docs/scala/microkernel.rst +++ b/akka-docs/scala/microkernel.rst @@ -4,6 +4,10 @@ Microkernel (Scala) =================== +The purpose of the Akka Microkernel is to offer a bundling mechanism so that you can distribute +an Akka application as a single payload, without the need to run in a Java Application Server or manually +having to create a launcher script. + The Akka Microkernel is included in the Akka download found at `downloads`_. .. _downloads: http://akka.io/downloads From 4a2227fc95314610577fd27eb75c669da1e98ad6 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 20 May 2012 19:03:20 +0200 Subject: [PATCH 11/12] Removing the AtomicReference from Dispatcher and restructured the code a bit --- .../akka/dispatch/BalancingDispatcher.scala | 2 +- .../main/scala/akka/dispatch/Dispatcher.scala | 33 ++++++++++++------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index ee492409ec..dea29643c7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -87,7 +87,7 @@ class BalancingDispatcher( @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit = if (messageQueue.hasMessages && i.hasNext - && (executorService.get().executor match { + && (executorService match { case lm: LoadMetrics ⇒ lm.atFullThrottle == false case other ⇒ true }) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 3a73bf0718..8dd7ecf8a2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -33,11 +33,15 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected val executorServiceFactory: ExecutorServiceFactory = - executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) + private class LazyExecutorServiceDelegate(factory: ExecutorServiceFactory) extends ExecutorServiceDelegate { + lazy val executor: ExecutorService = factory.createExecutorService + def copy(): LazyExecutorServiceDelegate = new LazyExecutorServiceDelegate(factory) + } - protected val executorService = new AtomicReference[ExecutorServiceDelegate]( - new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) + @volatile private var executorServiceDelegate: LazyExecutorServiceDelegate = + new LazyExecutorServiceDelegate(executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory)) + + protected final def executorService: ExecutorService = executorServiceDelegate /** * INTERNAL USE ONLY @@ -62,11 +66,11 @@ class Dispatcher( */ protected[akka] def executeTask(invocation: TaskInvocation) { try { - executorService.get() execute invocation + executorService execute invocation } catch { case e: RejectedExecutionException ⇒ try { - executorService.get() execute invocation + executorService execute invocation } catch { case e2: RejectedExecutionException ⇒ prerequisites.eventStream.publish(Error(e, getClass.getName, getClass, "executeTask was rejected twice!")) @@ -83,10 +87,15 @@ class Dispatcher( /** * INTERNAL USE ONLY */ - protected[akka] def shutdown: Unit = - Option(executorService.getAndSet(new ExecutorServiceDelegate { - lazy val executor = executorServiceFactory.createExecutorService - })) foreach { _.shutdown() } + protected[akka] def shutdown: Unit = { + val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy + val es = synchronized { // FIXME getAndSet using ARFU or Unsafe + val service = executorServiceDelegate + executorServiceDelegate = newDelegate // just a quick getAndSet + service + } + es.shutdown() + } /** * Returns if it was registered @@ -97,12 +106,12 @@ class Dispatcher( if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { - executorService.get() execute mbox + executorService execute mbox true } catch { case e: RejectedExecutionException ⇒ try { - executorService.get() execute mbox + executorService execute mbox true } catch { //Retry once case e: RejectedExecutionException ⇒ From e357b9825b11617679a1ba44e1e5fdbb44e45f4c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 20 May 2012 19:06:31 +0200 Subject: [PATCH 12/12] Adding return types in Dispatcher --- akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 8dd7ecf8a2..c8ae187c66 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -46,7 +46,7 @@ class Dispatcher( /** * INTERNAL USE ONLY */ - protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { + protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) @@ -55,7 +55,7 @@ class Dispatcher( /** * INTERNAL USE ONLY */ - protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) = { + protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = { val mbox = receiver.mailbox mbox.systemEnqueue(receiver.self, invocation) registerForExecution(mbox, false, true) @@ -124,7 +124,7 @@ class Dispatcher( } else false } - override val toString = Logging.simpleName(this) + "[" + id + "]" + override val toString: String = Logging.simpleName(this) + "[" + id + "]" } object PriorityGenerator {