Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2012-02-03 16:31:56 +01:00
commit 2ea6e97a41
16 changed files with 334 additions and 22 deletions

View file

@ -13,6 +13,7 @@ import akka.pattern.{ ask, AskTimeoutException }
class ActorTimeoutSpec extends AkkaSpec { class ActorTimeoutSpec extends AkkaSpec {
val testTimeout = 200.millis.dilated val testTimeout = 200.millis.dilated
val leeway = 500.millis.dilated
"An Actor-based Future" must { "An Actor-based Future" must {
@ -20,13 +21,13 @@ class ActorTimeoutSpec extends AkkaSpec {
implicit val timeout = Timeout(testTimeout) implicit val timeout = Timeout(testTimeout)
val echo = system.actorOf(Props.empty) val echo = system.actorOf(Props.empty)
val f = (echo ? "hallo") val f = (echo ? "hallo")
intercept[AskTimeoutException] { Await.result(f, testTimeout * 2) } intercept[AskTimeoutException] { Await.result(f, testTimeout + leeway) }
} }
"use explicitly supplied timeout" in { "use explicitly supplied timeout" in {
val echo = system.actorOf(Props.empty) val echo = system.actorOf(Props.empty)
val f = echo.?("hallo")(testTimeout) val f = echo.?("hallo")(testTimeout)
intercept[AskTimeoutException] { Await.result(f, testTimeout * 2) } intercept[AskTimeoutException] { Await.result(f, testTimeout + leeway) }
} }
} }
} }

View file

@ -33,7 +33,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick)) collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS)) assert(countDownLatch.await(2, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3) val countDownLatch2 = new CountDownLatch(3)
@ -43,14 +43,21 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
assert(countDownLatch2.await(2, TimeUnit.SECONDS)) assert(countDownLatch2.await(2, TimeUnit.SECONDS))
} }
"should stop continuous scheduling if the receiving actor has been terminated" in { "should stop continuous scheduling if the receiving actor has been terminated" taggedAs TimingTest in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case x testActor ! x
}
}))
// run immediately and then every 100 milliseconds // run immediately and then every 100 milliseconds
collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, testActor, "msg")) collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, actor, "msg"))
expectNoMsg(1 second)
// stop the actor and, hence, the continuous messaging from happening // stop the actor and, hence, the continuous messaging from happening
testActor ! PoisonPill actor ! PoisonPill
expectNoMsg(500 milliseconds) expectMsg("msg")
} }
"schedule once" in { "schedule once" in {
@ -69,7 +76,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
countDownLatch.getCount must be(3) countDownLatch.getCount must be(3)
// after 1 second the wait should fail // after 1 second the wait should fail
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false) assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
// should still be 1 left // should still be 1 left
countDownLatch.getCount must be(1) countDownLatch.getCount must be(1)
} }
@ -93,7 +100,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
} }
"be cancellable during initial delay" in { "be cancellable during initial delay" taggedAs TimingTest in {
val ticks = new AtomicInteger val ticks = new AtomicInteger
val initialDelay = 200.milliseconds.dilated val initialDelay = 200.milliseconds.dilated
@ -108,7 +115,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
ticks.get must be(0) ticks.get must be(0)
} }
"be cancellable after initial delay" in { "be cancellable after initial delay" taggedAs TimingTest in {
val ticks = new AtomicInteger val ticks = new AtomicInteger
val initialDelay = 20.milliseconds.dilated val initialDelay = 20.milliseconds.dilated
@ -179,7 +186,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
Await.ready(ticks, 3 seconds) Await.ready(ticks, 3 seconds)
} }
"schedule with different initial delay and frequency" in { "schedule with different initial delay and frequency" taggedAs TimingTest in {
val ticks = new TestLatch(3) val ticks = new TestLatch(3)
case object Msg case object Msg

View file

@ -0,0 +1,197 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package akka.jsr166y;
import java.util.Random;
/**
* A random number generator isolated to the current thread. Like the
* global {@link java.util.Random} generator used by the {@link
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
* with an internally generated seed that may not otherwise be
* modified. When applicable, use of {@code ThreadLocalRandom} rather
* than shared {@code Random} objects in concurrent programs will
* typically encounter much less overhead and contention. Use of
* {@code ThreadLocalRandom} is particularly appropriate when multiple
* tasks (for example, each a {@link ForkJoinTask}) use random numbers
* in parallel in thread pools.
*
* <p>Usages of this class should typically be of the form:
* {@code ThreadLocalRandom.current().nextX(...)} (where
* {@code X} is {@code Int}, {@code Long}, etc).
* When all usages are of this form, it is never possible to
* accidently share a {@code ThreadLocalRandom} across multiple threads.
*
* <p>This class also provides additional commonly used bounded random
* generation methods.
*
* @since 1.7
* @author Doug Lea
*/
public class ThreadLocalRandom extends Random {
// same constants as Random, but must be redeclared because private
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
/**
* The random seed. We can't use super.seed.
*/
private long rnd;
/**
* Initialization flag to permit calls to setSeed to succeed only
* while executing the Random constructor. We can't allow others
* since it would cause setting seed in one part of a program to
* unintentionally impact other usages by the thread.
*/
boolean initialized;
// Padding to help avoid memory contention among seed updates in
// different TLRs in the common case that they are located near
// each other.
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/**
* The actual ThreadLocal
*/
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
}
};
/**
* Constructor called only by localRandom.initialValue.
*/
ThreadLocalRandom() {
super();
initialized = true;
}
/**
* Returns the current thread's {@code ThreadLocalRandom}.
*
* @return the current thread's {@code ThreadLocalRandom}
*/
public static ThreadLocalRandom current() {
return localRandom.get();
}
/**
* Throws {@code UnsupportedOperationException}. Setting seeds in
* this generator is not supported.
*
* @throws UnsupportedOperationException always
*/
public void setSeed(long seed) {
if (initialized)
throw new UnsupportedOperationException();
rnd = (seed ^ multiplier) & mask;
}
protected int next(int bits) {
rnd = (rnd * multiplier + addend) & mask;
return (int) (rnd >>> (48-bits));
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @throws IllegalArgumentException if least greater than or equal
* to bound
* @return the next value
*/
public int nextInt(int least, int bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextInt(bound - least) + least;
}
/**
* Returns a pseudorandom, uniformly distributed value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public long nextLong(long n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
// Divide n by two until small enough for nextInt. On each
// iteration (at most 31 of them but usually much less),
// randomly choose both whether to include high bit in result
// (offset) and whether to continue with the lower vs upper
// half (which makes a difference only if odd).
long offset = 0;
while (n >= Integer.MAX_VALUE) {
int bits = next(2);
long half = n >>> 1;
long nextn = ((bits & 2) == 0) ? half : n - half;
if ((bits & 1) == 0)
offset += n - nextn;
n = nextn;
}
return offset + nextInt((int) n);
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public long nextLong(long least, long bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextLong(bound - least) + least;
}
/**
* Returns a pseudorandom, uniformly distributed {@code double} value
* between 0 (inclusive) and the specified value (exclusive).
*
* @param n the bound on the random number to be returned. Must be
* positive.
* @return the next value
* @throws IllegalArgumentException if n is not positive
*/
public double nextDouble(double n) {
if (n <= 0)
throw new IllegalArgumentException("n must be positive");
return nextDouble() * n;
}
/**
* Returns a pseudorandom, uniformly distributed value between the
* given least value (inclusive) and bound (exclusive).
*
* @param least the least value returned
* @param bound the upper bound (exclusive)
* @return the next value
* @throws IllegalArgumentException if least greater than or equal
* to bound
*/
public double nextDouble(double least, double bound) {
if (least >= bound)
throw new IllegalArgumentException();
return nextDouble() * (bound - least) + least;
}
private static final long serialVersionUID = -5851777807851030925L;
}

View file

@ -18,7 +18,7 @@ package com.typesafe.config;
* interface is likely to grow new methods over time, so third-party * interface is likely to grow new methods over time, so third-party
* implementations will break. * implementations will break.
*/ */
public interface ConfigValue extends ConfigMergeable, java.io.Serializable { public interface ConfigValue extends ConfigMergeable {
/** /**
* The origin of the value (file, line number, etc.), for debugging and * The origin of the value (file, line number, etc.), for debugging and
* error messages. * error messages.

View file

@ -3,6 +3,8 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.Serializable;
import com.typesafe.config.ConfigException; import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigMergeable; import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigOrigin; import com.typesafe.config.ConfigOrigin;
@ -16,7 +18,7 @@ import com.typesafe.config.ConfigValue;
* improperly-factored and non-modular code. Please don't add parent(). * improperly-factored and non-modular code. Please don't add parent().
* *
*/ */
abstract class AbstractConfigValue implements ConfigValue, MergeableValue { abstract class AbstractConfigValue implements ConfigValue, MergeableValue, Serializable {
final private SimpleConfigOrigin origin; final private SimpleConfigOrigin origin;

View file

@ -3,6 +3,7 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.ObjectStreamException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -228,4 +229,14 @@ final class ConfigDelayedMerge extends AbstractConfigValue implements
sb.append("# ) end of unresolved merge\n"); sb.append("# ) end of unresolved merge\n");
} }
} }
// This ridiculous hack is because some JDK versions apparently can't
// serialize an array, which is used to implement ArrayList and EmptyList.
// maybe
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6446627
private Object writeReplace() throws ObjectStreamException {
// switch to LinkedList
return new ConfigDelayedMerge(origin(),
new java.util.LinkedList<AbstractConfigValue>(stack), ignoresFallbacks);
}
} }

View file

@ -3,6 +3,7 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.ObjectStreamException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -192,4 +193,14 @@ class ConfigDelayedMergeObject extends AbstractConfigObject implements
protected AbstractConfigValue peek(String key) { protected AbstractConfigValue peek(String key) {
throw notResolved(); throw notResolved();
} }
// This ridiculous hack is because some JDK versions apparently can't
// serialize an array, which is used to implement ArrayList and EmptyList.
// maybe
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6446627
private Object writeReplace() throws ObjectStreamException {
// switch to LinkedList
return new ConfigDelayedMergeObject(origin(),
new java.util.LinkedList<AbstractConfigValue>(stack), ignoresFallbacks);
}
} }

View file

@ -3,6 +3,7 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.ObjectStreamException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -284,4 +285,14 @@ final class ConfigSubstitution extends AbstractConfigValue implements
} }
} }
} }
// This ridiculous hack is because some JDK versions apparently can't
// serialize an array, which is used to implement ArrayList and EmptyList.
// maybe
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6446627
private Object writeReplace() throws ObjectStreamException {
// switch to LinkedList
return new ConfigSubstitution(origin(), new java.util.LinkedList<Object>(pieces),
prefixLength, ignoresFallbacks);
}
} }

View file

@ -3,12 +3,13 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.Serializable;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import com.typesafe.config.ConfigException; import com.typesafe.config.ConfigException;
final class Path { final class Path implements Serializable {
final private String first; final private String first;
final private Path remainder; final private Path remainder;

View file

@ -3,6 +3,7 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.Serializable;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -28,7 +29,7 @@ import com.typesafe.config.ConfigValueType;
* with a one-level java.util.Map from paths to non-null values. Null values are * with a one-level java.util.Map from paths to non-null values. Null values are
* not "in" the map. * not "in" the map.
*/ */
final class SimpleConfig implements Config, MergeableValue, java.io.Serializable { final class SimpleConfig implements Config, MergeableValue, Serializable {
final private AbstractConfigObject object; final private AbstractConfigObject object;

View file

@ -3,6 +3,7 @@
*/ */
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.ObjectStreamException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -21,7 +22,8 @@ final class SimpleConfigList extends AbstractConfigValue implements ConfigList {
final private boolean resolved; final private boolean resolved;
SimpleConfigList(ConfigOrigin origin, List<AbstractConfigValue> value) { SimpleConfigList(ConfigOrigin origin, List<AbstractConfigValue> value) {
this(origin, value, ResolveStatus.fromValues(value)); this(origin, value, ResolveStatus
.fromValues(value));
} }
SimpleConfigList(ConfigOrigin origin, List<AbstractConfigValue> value, SimpleConfigList(ConfigOrigin origin, List<AbstractConfigValue> value,
@ -366,4 +368,14 @@ final class SimpleConfigList extends AbstractConfigValue implements ConfigList {
protected SimpleConfigList newCopy(boolean ignoresFallbacks, ConfigOrigin newOrigin) { protected SimpleConfigList newCopy(boolean ignoresFallbacks, ConfigOrigin newOrigin) {
return new SimpleConfigList(newOrigin, value); return new SimpleConfigList(newOrigin, value);
} }
// This ridiculous hack is because some JDK versions apparently can't
// serialize an array, which is used to implement ArrayList and EmptyList.
// maybe
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6446627
private Object writeReplace() throws ObjectStreamException {
// switch to LinkedList
return new SimpleConfigList(origin(), new java.util.LinkedList<AbstractConfigValue>(value),
resolveStatus());
}
} }

View file

@ -4,6 +4,7 @@
package com.typesafe.config.impl; package com.typesafe.config.impl;
import java.io.File; import java.io.File;
import java.io.Serializable;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
@ -17,7 +18,7 @@ import com.typesafe.config.ConfigOrigin;
// it would be cleaner to have a class hierarchy for various origin types, // it would be cleaner to have a class hierarchy for various origin types,
// but was hoping this would be enough simpler to be a little messy. eh. // but was hoping this would be enough simpler to be a little messy. eh.
final class SimpleConfigOrigin implements ConfigOrigin, java.io.Serializable { final class SimpleConfigOrigin implements ConfigOrigin, Serializable {
final private String description; final private String description;
final private int lineNumber; final private int lineNumber;
final private int endLineNumber; final private int endLineNumber;

View file

@ -1,6 +1,8 @@
package com.typesafe.config.impl; package com.typesafe.config.impl;
final class SubstitutionExpression { import java.io.Serializable;
final class SubstitutionExpression implements Serializable {
final private Path path; final private Path path;
final private boolean optional; final private boolean optional;

View file

@ -22,8 +22,16 @@ import akka.event.Logging.LogEventException
import akka.event.Logging.Debug import akka.event.Logging.Debug
object Await { object Await {
/**
* Internal Akka use only
*/
sealed trait CanAwait sealed trait CanAwait
/**
* Classes that implement Awaitable can be used with Await,
* this is used to do blocking operations (blocking in the "pause this thread" sense)
*/
trait Awaitable[+T] { trait Awaitable[+T] {
/** /**
* Should throw [[java.util.concurrent.TimeoutException]] if times out * Should throw [[java.util.concurrent.TimeoutException]] if times out
@ -40,7 +48,22 @@ object Await {
private[this] implicit final val permit = new CanAwait {} private[this] implicit final val permit = new CanAwait {}
/**
* Blocks the current Thread to wait for the given awaitable to be ready.
* WARNING: Blocking operation, use with caution.
*
* @throws [[java.util.concurrent.TimeoutException]] if times out
* @returns The returned value as returned by Awaitable.ready
*/
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
/**
* Blocks the current Thread to wait for the given awaitable to have a result.
* WARNING: Blocking operation, use with caution.
*
* @throws [[java.util.concurrent.TimeoutException]] if times out
* @returns The returned value as returned by Awaitable.result
*/
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
} }

View file

@ -184,8 +184,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
You need to enable the Slf4jEventHandler in the 'event-handlers' element in You need to enable the Slf4jEventHandler in the 'event-handlers' element in
the :ref:`configuration`. Here you can also define the log level of the event bus. the :ref:`configuration`. Here you can also define the log level of the event bus.
More fine grained log levels can be defined in the configuration of the SLF4J backend More fine grained log levels can be defined in the configuration of the SLF4J backend
(e.g. logback.xml). The String representation of the source object that is used when (e.g. logback.xml).
creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
.. code-block:: ruby .. code-block:: ruby
@ -194,6 +193,23 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
loglevel = "DEBUG" loglevel = "DEBUG"
} }
The SLF4J logger selected for each log event is chosen based on the
:class:`Class` of the log source specified when creating the
:class:`LoggingAdapter`, unless that was given directly as a string in which
case that string is used (i.e. ``LoggerFactory.getLogger(Class c)`` is used in
the first case and ``LoggerFactory.getLogger(String s)`` in the second).
.. note::
Beware that the the actor systems name is appended to a :class:`String` log
source if the LoggingAdapter was created giving an :class:`ActorSystem` to
the factory. If this is not intended, give a :class:`LoggingBus` instead as
shown below:
.. code-block:: scala
final LoggingAdapter log = Logging.getLogger(system.eventStream(), "my.nice.string");
Logging Thread and Akka Source in MDC Logging Thread and Akka Source in MDC
------------------------------------- -------------------------------------

View file

@ -217,8 +217,7 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
You need to enable the Slf4jEventHandler in the 'event-handlers' element in You need to enable the Slf4jEventHandler in the 'event-handlers' element in
the :ref:`configuration`. Here you can also define the log level of the event bus. the :ref:`configuration`. Here you can also define the log level of the event bus.
More fine grained log levels can be defined in the configuration of the SLF4J backend More fine grained log levels can be defined in the configuration of the SLF4J backend
(e.g. logback.xml). The String representation of the source object that is used when (e.g. logback.xml).
creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
.. code-block:: ruby .. code-block:: ruby
@ -227,6 +226,23 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
loglevel = "DEBUG" loglevel = "DEBUG"
} }
The SLF4J logger selected for each log event is chosen based on the
:class:`Class[_]` of the log source specified when creating the
:class:`LoggingAdapter`, unless that was given directly as a string in which
case that string is used (i.e. ``LoggerFactory.getLogger(c: Class[_])`` is used in
the first case and ``LoggerFactory.getLogger(s: String)`` in the second).
.. note::
Beware that the the actor systems name is appended to a :class:`String` log
source if the LoggingAdapter was created giving an :class:`ActorSystem` to
the factory. If this is not intended, give a :class:`LoggingBus` instead as
shown below:
.. code-block:: scala
val log = Logging(system.eventStream, "my.nice.string")
Logging Thread and Akka Source in MDC Logging Thread and Akka Source in MDC
------------------------------------- -------------------------------------