Merge remote-tracking branch 'origin/master' into wip-1685-remote-cleaup-∂π
This commit is contained in:
commit
52d6e5625d
432 changed files with 8354 additions and 2582 deletions
|
|
@ -8,7 +8,6 @@ import java.io.File
|
|||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import com.typesafe.config.ConfigParseOptions
|
||||
import com.typesafe.config.ConfigResolveOptions
|
||||
|
||||
@deprecated("use ActorSystem instead", "2.0")
|
||||
object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import akka.dispatch.Future
|
|||
import akka.dispatch.OldFuture
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.net.InetSocketAddress
|
||||
import akka.migration.AskableActorRef
|
||||
|
||||
/**
|
||||
* Migration replacement for `object akka.actor.Actor`.
|
||||
|
|
@ -54,7 +54,6 @@ object OldActor {
|
|||
|
||||
@deprecated("OldActor.remote should not be used", "2.0")
|
||||
lazy val remote: OldRemoteSupport = new OldRemoteSupport
|
||||
|
||||
}
|
||||
|
||||
@deprecated("use Actor", "2.0")
|
||||
|
|
@ -66,6 +65,8 @@ abstract class OldActor extends Actor {
|
|||
|
||||
implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef)
|
||||
|
||||
implicit def askableActorRef(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)
|
||||
|
||||
@deprecated("Use context.become instead", "2.0")
|
||||
def become(behavior: Receive, discardOld: Boolean = true) = context.become(behavior, discardOld)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
package akka.dispatch
|
||||
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.util.duration._
|
||||
import akka.AkkaException
|
||||
import akka.util.BoxedType
|
||||
import akka.util.Duration
|
||||
|
|
|
|||
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.migration
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.dispatch.Future
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* Implementation detail of the “ask” pattern enrichment of ActorRef
|
||||
*/
|
||||
private[akka] final class AskableActorRef(val actorRef: ActorRef) {
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* flow {
|
||||
* val f = worker.ask(request)(timeout)
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
||||
*/
|
||||
def ask(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and returns a [[akka.dispatch.Future]]
|
||||
* holding the eventual reply message; this means that the target actor
|
||||
* needs to send the result to the `sender` reference provided. The Future
|
||||
* will be completed with an [[akka.actor.AskTimeoutException]] after the
|
||||
* given timeout has expired; this is independent from any timeout applied
|
||||
* while awaiting a result for this future (i.e. in
|
||||
* `Await.result(..., timeout)`).
|
||||
*
|
||||
* <b>Warning:</b>
|
||||
* When using future callbacks, inside actors you need to carefully avoid closing over
|
||||
* the containing actor’s object, i.e. do not call methods or access mutable state
|
||||
* on the enclosing actor from within the callback. This would break the actor
|
||||
* encapsulation and may introduce synchronization bugs and race conditions because
|
||||
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
|
||||
* there is not yet a way to detect these illegal accesses at compile time.
|
||||
*
|
||||
* <b>Recommended usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* flow {
|
||||
* val f = worker ? request
|
||||
* EnrichedRequest(request, f())
|
||||
* } pipeTo nextActor
|
||||
* }}}
|
||||
*
|
||||
* [see the [[akka.dispatch.Future]] companion object for a description of `flow`]
|
||||
*/
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
|
||||
|
||||
/**
|
||||
* This method is just there to catch 2.0-unsupported usage and print deprecation warnings for it.
|
||||
*/
|
||||
@deprecated("use ?(msg)(timeout), this method has dangerous ambiguity", "2.0-migration")
|
||||
def ?(message: Any, timeout: Timeout)(i: Int = 0): Future[Any] = this.?(message)(timeout)
|
||||
}
|
||||
|
|
@ -21,7 +21,7 @@ package object migration {
|
|||
implicit def actorRef2OldActorRef(actorRef: ActorRef) = new OldActorRef(actorRef)
|
||||
|
||||
class OldActorRef(actorRef: ActorRef) {
|
||||
@deprecated("Actors are automatically started when creatd, i.e. remove old call to start()", "2.0")
|
||||
@deprecated("Actors are automatically started when created, i.e. remove old call to start()", "2.0")
|
||||
def start(): ActorRef = actorRef
|
||||
|
||||
@deprecated("Stop with ActorSystem or ActorContext instead", "2.0")
|
||||
|
|
@ -31,4 +31,7 @@ package object migration {
|
|||
def stop(): Unit = GlobalActorSystem.stop(actorRef)
|
||||
}
|
||||
|
||||
implicit def ask(actorRef: ActorRef) = new akka.migration.AskableActorRef(actorRef)
|
||||
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout = null): Future[Any] = akka.pattern.ask(actorRef, message)(timeout)
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor;
|
||||
|
||||
|
|
@ -16,32 +16,32 @@ import static org.junit.Assert.*;
|
|||
|
||||
public class JavaExtension {
|
||||
|
||||
static class Provider implements ExtensionIdProvider {
|
||||
static class TestExtensionId extends AbstractExtensionId<TestExtension> implements ExtensionIdProvider {
|
||||
public final static TestExtensionId TestExtensionProvider = new TestExtensionId();
|
||||
|
||||
public ExtensionId<TestExtension> lookup() {
|
||||
return defaultInstance;
|
||||
return TestExtensionId.TestExtensionProvider;
|
||||
}
|
||||
}
|
||||
|
||||
public final static TestExtensionId defaultInstance = new TestExtensionId();
|
||||
|
||||
static class TestExtensionId extends AbstractExtensionId<TestExtension> {
|
||||
public TestExtension createExtension(ActorSystemImpl i) {
|
||||
public TestExtension createExtension(ExtendedActorSystem i) {
|
||||
return new TestExtension(i);
|
||||
}
|
||||
}
|
||||
|
||||
static class TestExtension implements Extension {
|
||||
public final ActorSystemImpl system;
|
||||
public final ExtendedActorSystem system;
|
||||
|
||||
public TestExtension(ActorSystemImpl i) {
|
||||
public TestExtension(ExtendedActorSystem i) {
|
||||
system = i;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class OtherExtension implements Extension {
|
||||
static final ExtensionKey<OtherExtension> key = new ExtensionKey<OtherExtension>(OtherExtension.class) {};
|
||||
static final ExtensionKey<OtherExtension> key = new ExtensionKey<OtherExtension>(OtherExtension.class) {
|
||||
};
|
||||
|
||||
public final ActorSystemImpl system;
|
||||
|
||||
public OtherExtension(ActorSystemImpl i) {
|
||||
system = i;
|
||||
}
|
||||
|
|
@ -51,8 +51,8 @@ public class JavaExtension {
|
|||
|
||||
@BeforeClass
|
||||
public static void beforeAll() {
|
||||
Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]").withFallback(
|
||||
AkkaSpec.testConf());
|
||||
Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtensionId\" ]")
|
||||
.withFallback(AkkaSpec.testConf());
|
||||
system = ActorSystem.create("JavaExtension", c);
|
||||
}
|
||||
|
||||
|
|
@ -64,10 +64,10 @@ public class JavaExtension {
|
|||
|
||||
@Test
|
||||
public void mustBeAccessible() {
|
||||
assertSame(system.extension(defaultInstance).system, system);
|
||||
assertSame(defaultInstance.apply(system).system, system);
|
||||
assertSame(system.extension(TestExtensionId.TestExtensionProvider).system, system);
|
||||
assertSame(TestExtensionId.TestExtensionProvider.apply(system).system, system);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void mustBeAdHoc() {
|
||||
assertSame(OtherExtension.key.apply(system).system, system);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -8,6 +8,7 @@ import akka.testkit._
|
|||
import org.scalatest.BeforeAndAfterEach
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
object ActorFireForgetRequestReplySpec {
|
||||
|
||||
|
|
@ -80,7 +81,8 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
|
|||
|
||||
"should shutdown crashed temporary actor" in {
|
||||
filterEvents(EventFilter[Exception]("Expected exception")) {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
|
||||
actor.isTerminated must be(false)
|
||||
actor ! "Die"
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.testkit._
|
|||
import akka.util.duration._
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
object ActorLifeCycleSpec {
|
||||
|
||||
|
|
@ -35,7 +36,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
|
||||
filterException[ActorKilledException] {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
|
||||
|
|
@ -69,7 +71,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
"default for preRestart and postRestart is to call postStop and preStart respectively" in {
|
||||
filterException[ActorKilledException] {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
|
||||
|
|
@ -99,7 +102,8 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
|
|||
|
||||
"not invoke preRestart and postRestart when never restarted using OneForOneStrategy" in {
|
||||
val id = newUuid().toString
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))))
|
||||
val gen = new AtomicInteger(0)
|
||||
val props = Props(new LifeCycleTestActor(testActor, id, gen))
|
||||
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
object ActorLookupSpec {
|
||||
|
||||
|
|
@ -39,11 +40,13 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
|
|||
val c2 = system.actorOf(p, "c2")
|
||||
val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
val user = system.asInstanceOf[ActorSystemImpl].guardian
|
||||
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
|
||||
val root = system.asInstanceOf[ActorSystemImpl].lookupRoot
|
||||
val sysImpl = system.asInstanceOf[ActorSystemImpl]
|
||||
|
||||
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match {
|
||||
val user = sysImpl.guardian
|
||||
val syst = sysImpl.systemGuardian
|
||||
val root = sysImpl.lookupRoot
|
||||
|
||||
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match {
|
||||
case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -15,6 +15,7 @@ import akka.util.ReflectiveAccess
|
|||
import akka.serialization.Serialization
|
||||
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||
import akka.dispatch.{ Await, DefaultPromise, Promise, Future }
|
||||
import akka.pattern.ask
|
||||
|
||||
object ActorRefSpec {
|
||||
|
||||
|
|
@ -287,7 +288,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
val baos = new ByteArrayOutputStream(8192 * 32)
|
||||
val out = new ObjectOutputStream(baos)
|
||||
|
||||
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.address
|
||||
val sysImpl = system.asInstanceOf[ActorSystemImpl]
|
||||
val addr = sysImpl.provider.rootPath.address
|
||||
val serialized = SerializedActorRef(addr + "/non-existing")
|
||||
|
||||
out.writeObject(serialized)
|
||||
|
|
@ -295,9 +297,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
out.flush
|
||||
out.close
|
||||
|
||||
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
Serialization.currentSystem.withValue(sysImpl) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing")
|
||||
in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -358,8 +360,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
}))
|
||||
|
||||
val ffive = (ref ? (5, timeout)).mapTo[String]
|
||||
val fnull = (ref ? (null, timeout)).mapTo[String]
|
||||
val ffive = (ref.ask(5)(timeout)).mapTo[String]
|
||||
val fnull = (ref.ask(null)(timeout)).mapTo[String]
|
||||
ref ! PoisonPill
|
||||
|
||||
Await.result(ffive, timeout.duration) must be("five")
|
||||
|
|
@ -374,6 +376,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||
|
||||
val ref = context.actorOf(
|
||||
Props(new Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
|
|
@ -382,7 +387,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
}))
|
||||
|
||||
protected def receive = { case "sendKill" ⇒ ref ! Kill }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
}))
|
||||
|
||||
boss ! "sendKill"
|
||||
Await.ready(latch, 5 seconds)
|
||||
|
|
|
|||
|
|
@ -1,22 +1,27 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
import akka.testkit._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.dispatch.Await
|
||||
import akka.util.duration._
|
||||
import scala.collection.JavaConverters
|
||||
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
|
||||
|
||||
class JavaExtensionSpec extends JavaExtension with JUnitSuite
|
||||
|
||||
object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider {
|
||||
def lookup = this
|
||||
def createExtension(s: ActorSystemImpl) = new TestExtension(s)
|
||||
def createExtension(s: ExtendedActorSystem) = new TestExtension(s)
|
||||
}
|
||||
|
||||
// Dont't place inside ActorSystemSpec object, since it will not be garbage collected and reference to system remains
|
||||
class TestExtension(val system: ActorSystemImpl) extends Extension
|
||||
class TestExtension(val system: ExtendedActorSystem) extends Extension
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExtension$"]""") {
|
||||
|
||||
"An ActorSystem" must {
|
||||
|
|
@ -27,6 +32,58 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
|
|||
system.hasExtension(TestExtension) must be(true)
|
||||
}
|
||||
|
||||
"run termination callbacks in order" in {
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
val system2 = ActorSystem("TerminationCallbacks", AkkaSpec.testConf)
|
||||
val result = new ConcurrentLinkedQueue[Int]
|
||||
val count = 10
|
||||
val latch = TestLatch(count)
|
||||
|
||||
for (i ← 1 to count) {
|
||||
system2.registerOnTermination {
|
||||
(i % 3).millis.dilated.sleep()
|
||||
result add i
|
||||
latch.countDown()
|
||||
}
|
||||
}
|
||||
|
||||
system2.shutdown()
|
||||
Await.ready(latch, 5 seconds)
|
||||
|
||||
val expected = (for (i ← 1 to count) yield i).reverse
|
||||
result.asScala.toSeq must be(expected)
|
||||
|
||||
}
|
||||
|
||||
"awaitTermination after termination callbacks" in {
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
|
||||
@volatile
|
||||
var callbackWasRun = false
|
||||
|
||||
system2.registerOnTermination {
|
||||
50.millis.dilated.sleep()
|
||||
callbackWasRun = true
|
||||
}
|
||||
|
||||
system2.scheduler.scheduleOnce(200.millis.dilated) { system2.shutdown() }
|
||||
|
||||
system2.awaitTermination(5 seconds)
|
||||
callbackWasRun must be(true)
|
||||
}
|
||||
|
||||
"throw RejectedExecutionException when shutdown" in {
|
||||
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
|
||||
system2.shutdown()
|
||||
system2.awaitTermination(5 seconds)
|
||||
|
||||
intercept[RejectedExecutionException] {
|
||||
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }
|
||||
}.getMessage must be("Must be called prior to system shutdown.")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
@ -10,6 +10,7 @@ import akka.testkit.DefaultTimeout
|
|||
import java.util.concurrent.TimeoutException
|
||||
import akka.dispatch.Await
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.{ ask, AskTimeoutException }
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
|
||||
|
|
@ -44,7 +45,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
|
|||
"use explicitly supplied timeout" in {
|
||||
within(testTimeout - 100.millis, testTimeout + 300.millis) {
|
||||
val echo = system.actorOf(Props.empty)
|
||||
val f = echo.?("hallo", testTimeout)
|
||||
val f = echo.?("hallo")(testTimeout)
|
||||
try {
|
||||
intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) }
|
||||
} finally { system.stop(echo) }
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import akka.testkit._
|
|||
import akka.util.duration._
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec
|
||||
|
|
@ -23,7 +24,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
import DeathWatchSpec._
|
||||
|
||||
lazy val supervisor = system.actorOf(Props[Supervisor], "watchers")
|
||||
lazy val supervisor = system.actorOf(Props(new Supervisor(SupervisorStrategy.defaultStrategy)), "watchers")
|
||||
|
||||
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
|
||||
|
||||
|
|
@ -94,7 +95,8 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
"notify with a Terminated message once when an Actor is stopped but not when restarted" in {
|
||||
filterException[ActorKilledException] {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception])))))
|
||||
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
||||
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
|
|
@ -115,13 +117,13 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
"fail a monitor which does not handle Terminated()" in {
|
||||
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
|
||||
case class FF(fail: Failed)
|
||||
val supervisor = system.actorOf(Props[Supervisor]
|
||||
.withFaultHandler(new OneForOneStrategy(FaultHandlingStrategy.makeDecider(List(classOf[Exception])), Some(0)) {
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
testActor.tell(FF(Failed(cause)), child)
|
||||
super.handleFailure(context, child, cause, stats, children)
|
||||
}
|
||||
}))
|
||||
val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) {
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
|
||||
testActor.tell(FF(Failed(cause)), child)
|
||||
super.handleFailure(context, child, cause, stats, children)
|
||||
}
|
||||
}
|
||||
val supervisor = system.actorOf(Props(new Supervisor(strategy)))
|
||||
|
||||
val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
|
||||
val brother = Await.result((supervisor ? Props(new Actor {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,12 +1,12 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
import FSM._
|
||||
import akka.util.Duration
|
||||
|
||||
object FSMTransitionSpec {
|
||||
|
||||
|
|
@ -72,8 +72,9 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
|||
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
||||
val sup = system.actorOf(Props(new Actor {
|
||||
context.watch(fsm)
|
||||
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = Duration.Inf)(List(classOf[Throwable]))
|
||||
def receive = { case _ ⇒ }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
|
||||
}))
|
||||
|
||||
within(300 millis) {
|
||||
fsm ! SubscribeTransitionCallBack(forward)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -9,6 +9,7 @@ import akka.util.duration._
|
|||
import Actor._
|
||||
import akka.util.Duration
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
object ForwardActorSpec {
|
||||
val ExpectedMessage = "FOO"
|
||||
|
|
@ -46,7 +47,7 @@ class ForwardActorSpec extends AkkaSpec {
|
|||
|
||||
"forward actor reference when invoking forward on ask" in {
|
||||
val chain = createForwardingChain(system)
|
||||
chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage }
|
||||
chain.ask(ExpectedMessage)(5 seconds) onSuccess { case ExpectedMessage ⇒ testActor ! ExpectedMessage }
|
||||
expectMsg(5 seconds, ExpectedMessage)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -4,256 +4,327 @@
|
|||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import akka.util.ByteString
|
||||
import akka.util.cps._
|
||||
import akka.util.{ ByteString, Duration, Timer }
|
||||
import akka.util.duration._
|
||||
import scala.util.continuations._
|
||||
import akka.testkit._
|
||||
import akka.dispatch.{ Await, Future }
|
||||
import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher }
|
||||
import java.net.{ SocketAddress }
|
||||
import akka.pattern.ask
|
||||
|
||||
object IOActorSpec {
|
||||
import IO._
|
||||
|
||||
class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef, started: TestLatch) extends Actor {
|
||||
class SimpleEchoServer(addressPromise: Promise[SocketAddress]) extends Actor {
|
||||
|
||||
import context.dispatcher
|
||||
implicit val timeout = context.system.settings.ActorTimeout
|
||||
val server = IOManager(context.system) listen ("localhost", 0)
|
||||
|
||||
override def preStart = {
|
||||
listen(ioManager, host, port)
|
||||
started.open()
|
||||
}
|
||||
|
||||
def createWorker = context.actorOf(Props(new Actor with IO {
|
||||
def receiveIO = {
|
||||
case NewClient(server) ⇒
|
||||
val socket = server.accept()
|
||||
loopC {
|
||||
val bytes = socket.read()
|
||||
socket write bytes
|
||||
}
|
||||
}
|
||||
}))
|
||||
val state = IO.IterateeRef.Map.sync[IO.Handle]()
|
||||
|
||||
def receive = {
|
||||
case msg: NewClient ⇒
|
||||
createWorker forward msg
|
||||
|
||||
case IO.Listening(`server`, address) ⇒
|
||||
addressPromise success address
|
||||
|
||||
case IO.NewClient(`server`) ⇒
|
||||
val socket = server.accept()
|
||||
state(socket) flatMap (_ ⇒ IO repeat (IO.takeAny map socket.write))
|
||||
|
||||
case IO.Read(socket, bytes) ⇒
|
||||
state(socket)(IO Chunk bytes)
|
||||
|
||||
case IO.Closed(socket, cause) ⇒
|
||||
state -= socket
|
||||
|
||||
}
|
||||
|
||||
override def postStop {
|
||||
server.close()
|
||||
state.keySet foreach (_.close())
|
||||
}
|
||||
}
|
||||
|
||||
class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
|
||||
class SimpleEchoClient(address: SocketAddress) extends Actor {
|
||||
|
||||
lazy val socket: SocketHandle = connect(ioManager, host, port)(reader)
|
||||
lazy val reader: ActorRef = context.actorOf(Props({
|
||||
new Actor with IO {
|
||||
def receiveIO = {
|
||||
case length: Int ⇒
|
||||
val bytes = socket.read(length)
|
||||
sender ! bytes
|
||||
}
|
||||
}
|
||||
}))
|
||||
val socket = IOManager(context.system) connect (address)
|
||||
|
||||
val state = IO.IterateeRef.sync()
|
||||
|
||||
def receive = {
|
||||
|
||||
def receiveIO = {
|
||||
case bytes: ByteString ⇒
|
||||
val source = sender
|
||||
socket write bytes
|
||||
reader forward bytes.length
|
||||
state flatMap { _ ⇒
|
||||
IO take bytes.length map (source ! _) recover {
|
||||
case e ⇒ source ! Status.Failure(e)
|
||||
}
|
||||
}
|
||||
|
||||
case IO.Read(`socket`, bytes) ⇒
|
||||
state(IO Chunk bytes)
|
||||
|
||||
case IO.Closed(`socket`, cause) ⇒
|
||||
state(IO EOF cause)
|
||||
throw (cause getOrElse new RuntimeException("Socket closed"))
|
||||
|
||||
}
|
||||
|
||||
override def postStop {
|
||||
socket.close()
|
||||
state(IO EOF None)
|
||||
}
|
||||
}
|
||||
|
||||
sealed trait KVCommand {
|
||||
def bytes: ByteString
|
||||
}
|
||||
|
||||
case class KVSet(key: String, value: String) extends KVCommand {
|
||||
val bytes = ByteString("SET " + key + " " + value.length + "\r\n" + value + "\r\n")
|
||||
}
|
||||
|
||||
case class KVGet(key: String) extends KVCommand {
|
||||
val bytes = ByteString("GET " + key + "\r\n")
|
||||
}
|
||||
|
||||
case object KVGetAll extends KVCommand {
|
||||
val bytes = ByteString("GETALL\r\n")
|
||||
}
|
||||
|
||||
// Basic Redis-style protocol
|
||||
class KVStore(host: String, port: Int, ioManager: ActorRef, started: TestLatch) extends Actor {
|
||||
class KVStore(addressPromise: Promise[SocketAddress]) extends Actor {
|
||||
|
||||
import context.dispatcher
|
||||
implicit val timeout = context.system.settings.ActorTimeout
|
||||
import context.system
|
||||
|
||||
var kvs: Map[String, ByteString] = Map.empty
|
||||
val state = IO.IterateeRef.Map.sync[IO.Handle]()
|
||||
|
||||
override def preStart = {
|
||||
listen(ioManager, host, port)
|
||||
started.open()
|
||||
}
|
||||
var kvs: Map[String, String] = Map.empty
|
||||
|
||||
def createWorker = context.actorOf(Props(new Actor with IO {
|
||||
def receiveIO = {
|
||||
case NewClient(server) ⇒
|
||||
val socket = server.accept()
|
||||
loopC {
|
||||
val cmd = socket.read(ByteString("\r\n")).utf8String
|
||||
val result = matchC(cmd.split(' ')) {
|
||||
case Array("SET", key, length) ⇒
|
||||
val value = socket read length.toInt
|
||||
server.owner ? (('set, key, value)) map ((x: Any) ⇒ ByteString("+OK\r\n"))
|
||||
case Array("GET", key) ⇒
|
||||
server.owner ? (('get, key)) map {
|
||||
case Some(b: ByteString) ⇒ ByteString("$" + b.length + "\r\n") ++ b
|
||||
case None ⇒ ByteString("$-1\r\n")
|
||||
}
|
||||
case Array("GETALL") ⇒
|
||||
server.owner ? 'getall map {
|
||||
case m: Map[_, _] ⇒
|
||||
(ByteString("*" + (m.size * 2) + "\r\n") /: m) {
|
||||
case (result, (k: String, v: ByteString)) ⇒
|
||||
val kBytes = ByteString(k)
|
||||
result ++ ByteString("$" + kBytes.length + "\r\n") ++ kBytes ++ ByteString("$" + v.length + "\r\n") ++ v
|
||||
}
|
||||
}
|
||||
}
|
||||
result recover {
|
||||
case e ⇒ ByteString("-" + e.getClass.toString + "\r\n")
|
||||
} foreach { bytes ⇒
|
||||
socket write bytes
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
val server = IOManager(context.system) listen ("localhost", 0)
|
||||
|
||||
val EOL = ByteString("\r\n")
|
||||
|
||||
def receive = {
|
||||
case msg: NewClient ⇒ createWorker forward msg
|
||||
case ('set, key: String, value: ByteString) ⇒
|
||||
kvs += (key -> value)
|
||||
sender.tell((), self)
|
||||
case ('get, key: String) ⇒ sender.tell(kvs.get(key), self)
|
||||
case 'getall ⇒ sender.tell(kvs, self)
|
||||
|
||||
case IO.Listening(`server`, address) ⇒
|
||||
addressPromise success address
|
||||
|
||||
case IO.NewClient(`server`) ⇒
|
||||
val socket = server.accept()
|
||||
state(socket) flatMap { _ ⇒
|
||||
IO repeat {
|
||||
IO takeUntil EOL map (_.utf8String split ' ') flatMap {
|
||||
|
||||
case Array("SET", key, length) ⇒
|
||||
for {
|
||||
value ← IO take length.toInt
|
||||
_ ← IO takeUntil EOL
|
||||
} yield {
|
||||
kvs += (key -> value.utf8String)
|
||||
ByteString("+OK\r\n")
|
||||
}
|
||||
|
||||
case Array("GET", key) ⇒
|
||||
IO Iteratee {
|
||||
kvs get key map { value ⇒
|
||||
ByteString("$" + value.length + "\r\n" + value + "\r\n")
|
||||
} getOrElse ByteString("$-1\r\n")
|
||||
}
|
||||
|
||||
case Array("GETALL") ⇒
|
||||
IO Iteratee {
|
||||
(ByteString("*" + (kvs.size * 2) + "\r\n") /: kvs) {
|
||||
case (result, (k, v)) ⇒
|
||||
val kBytes = ByteString(k)
|
||||
val vBytes = ByteString(v)
|
||||
result ++
|
||||
ByteString("$" + kBytes.length) ++ EOL ++
|
||||
kBytes ++ EOL ++
|
||||
ByteString("$" + vBytes.length) ++ EOL ++
|
||||
vBytes ++ EOL
|
||||
}
|
||||
}
|
||||
|
||||
} map (socket write)
|
||||
}
|
||||
}
|
||||
|
||||
case IO.Read(socket, bytes) ⇒
|
||||
state(socket)(IO Chunk bytes)
|
||||
|
||||
case IO.Closed(socket, cause) ⇒
|
||||
state -= socket
|
||||
|
||||
}
|
||||
|
||||
override def postStop {
|
||||
server.close()
|
||||
state.keySet foreach (_.close())
|
||||
}
|
||||
}
|
||||
|
||||
class KVClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
|
||||
class KVClient(address: SocketAddress) extends Actor {
|
||||
|
||||
import context.dispatcher
|
||||
implicit val timeout = context.system.settings.ActorTimeout
|
||||
val socket = IOManager(context.system) connect (address)
|
||||
|
||||
var socket: SocketHandle = _
|
||||
val state = IO.IterateeRef.sync()
|
||||
|
||||
override def preStart {
|
||||
socket = connect(ioManager, host, port)
|
||||
}
|
||||
val EOL = ByteString("\r\n")
|
||||
|
||||
def reply(msg: Any) = sender.tell(msg, self)
|
||||
|
||||
def receiveIO = {
|
||||
case ('set, key: String, value: ByteString) ⇒
|
||||
socket write (ByteString("SET " + key + " " + value.length + "\r\n") ++ value)
|
||||
reply(readResult)
|
||||
|
||||
case ('get, key: String) ⇒
|
||||
socket write ByteString("GET " + key + "\r\n")
|
||||
reply(readResult)
|
||||
|
||||
case 'getall ⇒
|
||||
socket write ByteString("GETALL\r\n")
|
||||
reply(readResult)
|
||||
}
|
||||
|
||||
def readResult = {
|
||||
val resultType = socket.read(1).utf8String
|
||||
resultType match {
|
||||
case "+" ⇒ socket.read(ByteString("\r\n")).utf8String
|
||||
case "-" ⇒ sys error socket.read(ByteString("\r\n")).utf8String
|
||||
case "$" ⇒
|
||||
val length = socket.read(ByteString("\r\n")).utf8String
|
||||
socket.read(length.toInt)
|
||||
case "*" ⇒
|
||||
val count = socket.read(ByteString("\r\n")).utf8String
|
||||
var result: Map[String, ByteString] = Map.empty
|
||||
repeatC(count.toInt / 2) {
|
||||
val k = readBytes
|
||||
val v = readBytes
|
||||
result += (k.utf8String -> v)
|
||||
def receive = {
|
||||
case cmd: KVCommand ⇒
|
||||
val source = sender
|
||||
socket write cmd.bytes
|
||||
state flatMap { _ ⇒
|
||||
readResult map (source !) recover {
|
||||
case e ⇒ source ! Status.Failure(e)
|
||||
}
|
||||
result
|
||||
case _ ⇒ sys error "Unexpected response"
|
||||
}
|
||||
|
||||
case IO.Read(`socket`, bytes) ⇒
|
||||
state(IO Chunk bytes)
|
||||
|
||||
case IO.Closed(`socket`, cause) ⇒
|
||||
state(IO EOF cause)
|
||||
throw (cause getOrElse new RuntimeException("Socket closed"))
|
||||
|
||||
}
|
||||
|
||||
override def postStop {
|
||||
socket.close()
|
||||
state(IO EOF None)
|
||||
}
|
||||
|
||||
def readResult: IO.Iteratee[Any] = {
|
||||
IO take 1 map (_.utf8String) flatMap {
|
||||
case "+" ⇒ IO takeUntil EOL map (msg ⇒ msg.utf8String)
|
||||
case "-" ⇒ IO takeUntil EOL flatMap (err ⇒ IO throwErr new RuntimeException(err.utf8String))
|
||||
case "$" ⇒
|
||||
IO takeUntil EOL map (_.utf8String.toInt) flatMap {
|
||||
case -1 ⇒ IO Done None
|
||||
case length ⇒
|
||||
for {
|
||||
value ← IO take length
|
||||
_ ← IO takeUntil EOL
|
||||
} yield Some(value.utf8String)
|
||||
}
|
||||
case "*" ⇒
|
||||
IO takeUntil EOL map (_.utf8String.toInt) flatMap {
|
||||
case -1 ⇒ IO Done None
|
||||
case length ⇒
|
||||
IO.takeList(length)(readResult) flatMap { list ⇒
|
||||
((Right(Map()): Either[String, Map[String, String]]) /: list.grouped(2)) {
|
||||
case (Right(m), List(Some(k: String), Some(v: String))) ⇒ Right(m + (k -> v))
|
||||
case (Right(_), _) ⇒ Left("Unexpected Response")
|
||||
case (left, _) ⇒ left
|
||||
} fold (msg ⇒ IO throwErr new RuntimeException(msg), IO Done _)
|
||||
}
|
||||
}
|
||||
case _ ⇒ IO throwErr new RuntimeException("Unexpected Response")
|
||||
}
|
||||
}
|
||||
|
||||
def readBytes = {
|
||||
val resultType = socket.read(1).utf8String
|
||||
if (resultType != "$") sys error "Unexpected response"
|
||||
val length = socket.read(ByteString("\r\n")).utf8String
|
||||
socket.read(length.toInt)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
|
||||
class IOActorSpec extends AkkaSpec with DefaultTimeout {
|
||||
import IOActorSpec._
|
||||
|
||||
/**
|
||||
* Retries the future until a result is returned or until one of the limits are hit. If no
|
||||
* limits are provided the future will be retried indefinitely until a result is returned.
|
||||
*
|
||||
* @param count number of retries
|
||||
* @param timeout duration to retry within
|
||||
* @param delay duration to wait before retrying
|
||||
* @param filter determines which exceptions should be retried
|
||||
* @return a future containing the result or the last exception before a limit was hit.
|
||||
*/
|
||||
def retry[T](count: Option[Int] = None, timeout: Option[Duration] = None, delay: Option[Duration] = Some(100 millis), filter: Option[Throwable ⇒ Boolean] = None)(future: ⇒ Future[T])(implicit executor: ExecutionContext): Future[T] = {
|
||||
|
||||
val promise = Promise[T]()(executor)
|
||||
|
||||
val timer = timeout match {
|
||||
case Some(duration) ⇒ Some(Timer(duration))
|
||||
case None ⇒ None
|
||||
}
|
||||
|
||||
def check(n: Int, e: Throwable): Boolean =
|
||||
(count.isEmpty || (n < count.get)) && (timer.isEmpty || timer.get.isTicking) && (filter.isEmpty || filter.get(e))
|
||||
|
||||
def run(n: Int) {
|
||||
future onComplete {
|
||||
case Left(e) if check(n, e) ⇒
|
||||
if (delay.isDefined) {
|
||||
executor match {
|
||||
case m: MessageDispatcher ⇒ m.prerequisites.scheduler.scheduleOnce(delay.get)(run(n + 1))
|
||||
case _ ⇒ // Thread.sleep, ignore, or other?
|
||||
}
|
||||
} else run(n + 1)
|
||||
case v ⇒ promise complete v
|
||||
}
|
||||
}
|
||||
|
||||
run(0)
|
||||
|
||||
promise
|
||||
}
|
||||
|
||||
"an IO Actor" must {
|
||||
"run echo server" in {
|
||||
val started = TestLatch(1)
|
||||
val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer
|
||||
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064, ioManager, started)))
|
||||
Await.ready(started, timeout.duration)
|
||||
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064, ioManager)))
|
||||
val f1 = client ? ByteString("Hello World!1")
|
||||
val f2 = client ? ByteString("Hello World!2")
|
||||
val f3 = client ? ByteString("Hello World!3")
|
||||
Await.result(f1, timeout.duration) must equal(ByteString("Hello World!1"))
|
||||
Await.result(f2, timeout.duration) must equal(ByteString("Hello World!2"))
|
||||
Await.result(f3, timeout.duration) must equal(ByteString("Hello World!3"))
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
filterException[java.net.ConnectException] {
|
||||
val addressPromise = Promise[SocketAddress]()
|
||||
val server = system.actorOf(Props(new SimpleEchoServer(addressPromise)))
|
||||
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
|
||||
val client = system.actorOf(Props(new SimpleEchoClient(address)))
|
||||
val f1 = retry() { client ? ByteString("Hello World!1") }
|
||||
val f2 = retry() { client ? ByteString("Hello World!2") }
|
||||
val f3 = retry() { client ? ByteString("Hello World!3") }
|
||||
Await.result(f1, TestLatch.DefaultTimeout) must equal(ByteString("Hello World!1"))
|
||||
Await.result(f2, TestLatch.DefaultTimeout) must equal(ByteString("Hello World!2"))
|
||||
Await.result(f3, TestLatch.DefaultTimeout) must equal(ByteString("Hello World!3"))
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
}
|
||||
}
|
||||
|
||||
"run echo server under high load" in {
|
||||
val started = TestLatch(1)
|
||||
val ioManager = system.actorOf(Props(new IOManager()))
|
||||
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065, ioManager, started)))
|
||||
Await.ready(started, timeout.duration)
|
||||
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065, ioManager)))
|
||||
val list = List.range(0, 1000)
|
||||
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
|
||||
assert(Await.result(f, timeout.duration).size === 1000)
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
}
|
||||
|
||||
"run echo server under high load with small buffer" in {
|
||||
val started = TestLatch(1)
|
||||
val ioManager = system.actorOf(Props(new IOManager(2)))
|
||||
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8066, ioManager, started)))
|
||||
Await.ready(started, timeout.duration)
|
||||
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8066, ioManager)))
|
||||
val list = List.range(0, 1000)
|
||||
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
|
||||
assert(Await.result(f, timeout.duration).size === 1000)
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
filterException[java.net.ConnectException] {
|
||||
val addressPromise = Promise[SocketAddress]()
|
||||
val server = system.actorOf(Props(new SimpleEchoServer(addressPromise)))
|
||||
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
|
||||
val client = system.actorOf(Props(new SimpleEchoClient(address)))
|
||||
val list = List.range(0, 100)
|
||||
val f = Future.traverse(list)(i ⇒ retry() { client ? ByteString(i.toString) })
|
||||
assert(Await.result(f, TestLatch.DefaultTimeout).size === 100)
|
||||
system.stop(client)
|
||||
system.stop(server)
|
||||
}
|
||||
}
|
||||
|
||||
"run key-value store" in {
|
||||
val started = TestLatch(1)
|
||||
val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer
|
||||
val server = system.actorOf(Props(new KVStore("localhost", 8067, ioManager, started)))
|
||||
Await.ready(started, timeout.duration)
|
||||
val client1 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager)))
|
||||
val client2 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager)))
|
||||
val f1 = client1 ? (('set, "hello", ByteString("World")))
|
||||
val f2 = client1 ? (('set, "test", ByteString("No one will read me")))
|
||||
val f3 = client1 ? (('get, "hello"))
|
||||
Await.ready(f2, timeout.duration)
|
||||
val f4 = client2 ? (('set, "test", ByteString("I'm a test!")))
|
||||
Await.ready(f4, timeout.duration)
|
||||
val f5 = client1 ? (('get, "test"))
|
||||
val f6 = client2 ? 'getall
|
||||
Await.result(f1, timeout.duration) must equal("OK")
|
||||
Await.result(f2, timeout.duration) must equal("OK")
|
||||
Await.result(f3, timeout.duration) must equal(ByteString("World"))
|
||||
Await.result(f4, timeout.duration) must equal("OK")
|
||||
Await.result(f5, timeout.duration) must equal(ByteString("I'm a test!"))
|
||||
Await.result(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
|
||||
system.stop(client1)
|
||||
system.stop(client2)
|
||||
system.stop(server)
|
||||
system.stop(ioManager)
|
||||
filterException[java.net.ConnectException] {
|
||||
val addressPromise = Promise[SocketAddress]()
|
||||
val server = system.actorOf(Props(new KVStore(addressPromise)))
|
||||
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
|
||||
val client1 = system.actorOf(Props(new KVClient(address)))
|
||||
val client2 = system.actorOf(Props(new KVClient(address)))
|
||||
val f1 = retry() { client1 ? KVSet("hello", "World") }
|
||||
val f2 = retry() { client1 ? KVSet("test", "No one will read me") }
|
||||
val f3 = f1 flatMap { _ ⇒ retry() { client1 ? KVGet("hello") } }
|
||||
val f4 = f2 flatMap { _ ⇒ retry() { client2 ? KVSet("test", "I'm a test!") } }
|
||||
val f5 = f4 flatMap { _ ⇒ retry() { client1 ? KVGet("test") } }
|
||||
val f6 = Future.sequence(List(f3, f5)) flatMap { _ ⇒ retry() { client2 ? KVGetAll } }
|
||||
Await.result(f1, TestLatch.DefaultTimeout) must equal("OK")
|
||||
Await.result(f2, TestLatch.DefaultTimeout) must equal("OK")
|
||||
Await.result(f3, TestLatch.DefaultTimeout) must equal(Some("World"))
|
||||
Await.result(f4, TestLatch.DefaultTimeout) must equal("OK")
|
||||
Await.result(f5, TestLatch.DefaultTimeout) must equal(Some("I'm a test!"))
|
||||
Await.result(f6, TestLatch.DefaultTimeout) must equal(Map("hello" -> "World", "test" -> "I'm a test!"))
|
||||
system.stop(client1)
|
||||
system.stop(client2)
|
||||
system.stop(server)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -14,6 +14,8 @@ import akka.testkit.AkkaSpec
|
|||
import akka.testkit.DefaultTimeout
|
||||
import akka.testkit.TestLatch
|
||||
import akka.util.duration._
|
||||
import akka.util.Duration
|
||||
import akka.pattern.ask
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
||||
|
|
@ -28,7 +30,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
"A RestartStrategy" must {
|
||||
|
||||
"ensure that slave stays dead after max restarts within time range" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
|
||||
val boss = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -74,7 +77,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave is immortal without max restarts and time range" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))
|
||||
|
||||
val countDownLatch = new TestLatch(100)
|
||||
|
||||
|
|
@ -96,7 +99,8 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave restarts after number of crashes not within time range" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 500)))
|
||||
val boss = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -153,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"ensure that slave is not restarted after max retries" in {
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), Some(2), None)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))
|
||||
|
||||
val restartLatch = new TestLatch
|
||||
val secondRestartLatch = new TestLatch
|
||||
|
|
@ -208,11 +212,12 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
|
|||
val countDownLatch = new TestLatch(2)
|
||||
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(withinTimeRange = 1 second)(List(classOf[Throwable]))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.watch(context.actorOf(p))
|
||||
case t: Terminated ⇒ maxNoOfRestartsLatch.open()
|
||||
}
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))))
|
||||
}))
|
||||
|
||||
val slaveProps = Props(new Actor {
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import akka.util.duration._
|
|||
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
|
||||
import akka.testkit._
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -133,7 +134,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
|||
val restartLatch = new TestLatch
|
||||
val pingLatch = new TestLatch(6)
|
||||
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(3, 1 second)(List(classOf[Exception])))))
|
||||
val props = Props(new Actor {
|
||||
def receive = {
|
||||
case Ping ⇒ pingLatch.countDown()
|
||||
|
|
@ -164,8 +165,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
|
|||
def receive = {
|
||||
case Msg(ts) ⇒
|
||||
val now = System.nanoTime
|
||||
// Make sure that no message has been dispatched before the scheduled time (10ms = 10000000ns) has occurred
|
||||
if (now - ts < 10000000) throw new RuntimeException("Interval is too small: " + (now - ts))
|
||||
// Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
|
||||
if (now - ts < 10.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts))
|
||||
ticks.countDown()
|
||||
}
|
||||
}))
|
||||
|
|
|
|||
|
|
@ -1,9 +1,14 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
class Supervisor extends Actor {
|
||||
/**
|
||||
* For testing Supervisor behavior, normally you don't supply the strategy
|
||||
* from the outside like this.
|
||||
*/
|
||||
class Supervisor(override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case x: Props ⇒ sender ! context.actorOf(x)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,18 +1,25 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import akka.testkit._
|
||||
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
|
||||
object SupervisorHierarchySpec {
|
||||
class FireWorkerException(msg: String) extends Exception(msg)
|
||||
|
||||
class CountDownActor(countDown: CountDownLatch) extends Actor {
|
||||
/**
|
||||
* For testing Supervisor behavior, normally you don't supply the strategy
|
||||
* from the outside like this.
|
||||
*/
|
||||
class CountDownActor(countDown: CountDownLatch, override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
protected def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
|
|
@ -33,12 +40,12 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
"restart manager and workers in AllForOne" in {
|
||||
val countDown = new CountDownLatch(4)
|
||||
|
||||
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None)))
|
||||
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Exception])))))
|
||||
|
||||
val managerProps = Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None))
|
||||
val managerProps = Props(new CountDownActor(countDown, AllForOneStrategy()(List())))
|
||||
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
val workerProps = Props(new CountDownActor(countDown))
|
||||
val workerProps = Props(new CountDownActor(countDown, SupervisorStrategy.defaultStrategy))
|
||||
val workerOne, workerTwo, workerThree = Await.result((manager ? workerProps).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
filterException[ActorKilledException] {
|
||||
|
|
@ -55,13 +62,16 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
|
|||
val countDownMessages = new CountDownLatch(1)
|
||||
val countDownMax = new CountDownLatch(1)
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages))))
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
|
||||
|
||||
val crasher = context.watch(context.actorOf(Props(new CountDownActor(countDownMessages, SupervisorStrategy.defaultStrategy))))
|
||||
|
||||
protected def receive = {
|
||||
case "killCrasher" ⇒ crasher ! Kill
|
||||
case Terminated(_) ⇒ countDownMax.countDown()
|
||||
}
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 1, 5000)))
|
||||
}))
|
||||
|
||||
filterException[ActorKilledException] {
|
||||
boss ! "killCrasher"
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
@ -8,6 +8,8 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await }
|
|||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.pattern.ask
|
||||
import akka.util.duration._
|
||||
|
||||
object SupervisorMiscSpec {
|
||||
val config = """
|
||||
|
|
@ -28,7 +30,8 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
|
|||
filterEvents(EventFilter[Exception]("Kill")) {
|
||||
val countDownLatch = new CountDownLatch(4)
|
||||
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 5000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds)(List(classOf[Exception])))))
|
||||
|
||||
val workerProps = Props(new Actor {
|
||||
override def postRestart(cause: Throwable) { countDownLatch.countDown() }
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
|
@ -11,9 +11,10 @@ import akka.testkit.TestEvent._
|
|||
import akka.testkit._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
object SupervisorSpec {
|
||||
val Timeout = 5 seconds
|
||||
val Timeout = 5.seconds
|
||||
|
||||
case object DieReply
|
||||
|
||||
|
|
@ -53,6 +54,8 @@ object SupervisorSpec {
|
|||
|
||||
var s: ActorRef = _
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception]))
|
||||
|
||||
def receive = {
|
||||
case Die ⇒ temp forward Die
|
||||
case Terminated(`temp`) ⇒ sendTo ! "terminated"
|
||||
|
|
@ -66,7 +69,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
|
||||
import SupervisorSpec._
|
||||
|
||||
val TimeoutMillis = Timeout.dilated.toMillis.toInt
|
||||
val DilatedTimeout = Timeout.dilated
|
||||
|
||||
// =====================================================
|
||||
// Creating actors and supervisors
|
||||
|
|
@ -75,45 +78,51 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
def temporaryActorAllForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||
val temporaryActor = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(temporaryActor, supervisor)
|
||||
}
|
||||
|
||||
def singleActorAllForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def singleActorOneForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsAllForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def multipleActorsOneForOne = {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong1, pingpong2, pingpong3 = child(supervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, supervisor)
|
||||
}
|
||||
|
||||
def nestedSupervisorsAllForOne = {
|
||||
val topSupervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, TimeoutMillis)))
|
||||
val topSupervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(List(classOf[Exception])))))
|
||||
val pingpong1 = child(topSupervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
val middleSupervisor = child(topSupervisor, Props[Supervisor].withFaultHandler(AllForOneStrategy(Nil, 3, TimeoutMillis)))
|
||||
val middleSupervisor = child(topSupervisor, Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 3, withinTimeRange = DilatedTimeout)(Nil))))
|
||||
val pingpong2, pingpong3 = child(middleSupervisor, Props(new PingPongActor(testActor)))
|
||||
|
||||
(pingpong1, pingpong2, pingpong3, topSupervisor)
|
||||
|
|
@ -128,20 +137,20 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
}
|
||||
|
||||
def ping(pingPongActor: ActorRef) = {
|
||||
Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
||||
Await.result(pingPongActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
|
||||
expectMsg(Timeout, PingMessage)
|
||||
}
|
||||
|
||||
def kill(pingPongActor: ActorRef) = {
|
||||
val result = (pingPongActor ? (DieReply, TimeoutMillis))
|
||||
val result = (pingPongActor.?(DieReply)(DilatedTimeout))
|
||||
expectMsg(Timeout, ExceptionMessage)
|
||||
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) }
|
||||
intercept[RuntimeException] { Await.result(result, DilatedTimeout) }
|
||||
}
|
||||
|
||||
"A supervisor" must {
|
||||
|
||||
"not restart child more times than permitted" in {
|
||||
val master = system.actorOf(Props(new Master(testActor)).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
|
||||
val master = system.actorOf(Props(new Master(testActor)))
|
||||
|
||||
master ! Die
|
||||
expectMsg(3 seconds, "terminated")
|
||||
|
|
@ -151,7 +160,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
"not restart temporary actor" in {
|
||||
val (temporaryActor, _) = temporaryActorAllForOne
|
||||
|
||||
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) }
|
||||
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply)(DilatedTimeout), DilatedTimeout) }
|
||||
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
|
@ -277,7 +286,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
|
||||
"must attempt restart when exception during restart" in {
|
||||
val inits = new AtomicInteger(0)
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil))))
|
||||
|
||||
val dyingProps = Props(new Actor {
|
||||
inits.incrementAndGet
|
||||
|
|
@ -297,11 +307,11 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
|
||||
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
|
||||
intercept[RuntimeException] {
|
||||
Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis)
|
||||
Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
|
||||
Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
|
||||
|
||||
inits.get must be(3)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
@ -12,6 +12,7 @@ import akka.testkit.AkkaSpec
|
|||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.dispatch.{ Await, Dispatchers }
|
||||
import akka.pattern.ask
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {
|
||||
|
|
@ -22,11 +23,12 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
|
|||
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
||||
within(5 seconds) {
|
||||
val p = Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)(List(classOf[Exception]))
|
||||
def receive = {
|
||||
case p: Props ⇒ sender ! context.actorOf(p)
|
||||
}
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.path }
|
||||
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000))
|
||||
})
|
||||
val headActor = system.actorOf(p)
|
||||
val middleActor = Await.result((headActor ? p).mapTo[ActorRef], timeout.duration)
|
||||
val lastActor = Await.result((middleActor ? p).mapTo[ActorRef], timeout.duration)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor
|
||||
|
||||
|
|
@ -11,6 +11,8 @@ import akka.testkit.AkkaSpec
|
|||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import akka.util.duration._
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
|
||||
|
|
@ -24,7 +26,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
"A supervised actor with lifecycle PERMANENT" should {
|
||||
"be able to reply on failure during preRestart" in {
|
||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(5, 10 seconds)(List(classOf[Exception])))))
|
||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
supervised.!("test")(testActor)
|
||||
|
|
@ -35,7 +38,8 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
|
|||
|
||||
"be able to reply on failure during postStop" in {
|
||||
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
|
||||
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))
|
||||
val supervisor = system.actorOf(Props(new Supervisor(
|
||||
AllForOneStrategy(maxNrOfRetries = 0)(List(classOf[Exception])))))
|
||||
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
|
||||
|
||||
supervised.!("test")(testActor)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
package akka.actor
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
|
||||
|
|
@ -18,6 +18,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
|||
import akka.japi.{ Creator, Option ⇒ JOption }
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
|
||||
import akka.pattern.ask
|
||||
|
||||
object TypedActorSpec {
|
||||
|
||||
|
|
@ -298,10 +299,13 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
|
||||
"be able to handle exceptions when calling methods" in {
|
||||
filterEvents(EventFilter[IllegalStateException]("expected")) {
|
||||
val boss = system.actorOf(Props(context ⇒ {
|
||||
case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p)
|
||||
}).withFaultHandler(OneForOneStrategy {
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume
|
||||
val boss = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy() {
|
||||
case e: IllegalStateException if e.getMessage == "expected" ⇒ SupervisorStrategy.Resume
|
||||
}
|
||||
def receive = {
|
||||
case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p)
|
||||
}
|
||||
}))
|
||||
val t = Await.result((boss ? TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(2 seconds)).mapTo[Foo], timeout.duration)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.dispatch
|
||||
|
||||
|
|
@ -20,6 +20,7 @@ import akka.util.duration._
|
|||
import akka.event.Logging.Error
|
||||
import com.typesafe.config.Config
|
||||
import akka.util.Duration
|
||||
import akka.pattern.ask
|
||||
|
||||
object ActorModelSpec {
|
||||
|
||||
|
|
@ -450,7 +451,6 @@ object DispatcherModelSpec {
|
|||
private val instance: MessageDispatcher = {
|
||||
configureThreadPool(config,
|
||||
threadPoolConfig ⇒ new Dispatcher(prerequisites,
|
||||
config.getString("name"),
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
|
|
@ -525,7 +525,6 @@ object BalancingDispatcherModelSpec {
|
|||
private val instance: MessageDispatcher = {
|
||||
configureThreadPool(config,
|
||||
threadPoolConfig ⇒ new BalancingDispatcher(prerequisites,
|
||||
config.getString("name"),
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import akka.util.Duration
|
|||
import akka.util.duration._
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher }
|
||||
import akka.pattern.ask
|
||||
|
||||
object DispatcherActorSpec {
|
||||
val config = """
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.actor.dispatch
|
||||
|
||||
|
|
@ -58,11 +58,6 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) {
|
|||
dispatcher.throughput must be(17)
|
||||
}
|
||||
|
||||
"use specific name" in {
|
||||
val dispatcher = lookup("myapp.mydispatcher")
|
||||
dispatcher.name must be("mydispatcher")
|
||||
}
|
||||
|
||||
"use specific id" in {
|
||||
val dispatcher = lookup("myapp.mydispatcher")
|
||||
dispatcher.id must be("myapp.mydispatcher")
|
||||
|
|
@ -95,7 +90,6 @@ class DispatchersSpec extends AkkaSpec(DispatchersSpec.config) {
|
|||
val d1 = lookup("myapp.mydispatcher")
|
||||
val d2 = lookup("myapp.mydispatcher")
|
||||
d1 must be === d2
|
||||
d1.name must be("mydispatcher")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import akka.actor.{ Props, Actor }
|
|||
import akka.testkit.AkkaSpec
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers }
|
||||
import akka.pattern.ask
|
||||
|
||||
object PinnedActorSpec {
|
||||
val config = """
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.config
|
||||
|
|
@ -23,8 +23,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
|||
getString("akka.version") must equal("2.0-SNAPSHOT")
|
||||
settings.ConfigVersion must equal("2.0-SNAPSHOT")
|
||||
|
||||
getBoolean("akka.daemonic") must equal(false)
|
||||
|
||||
getString("akka.actor.default-dispatcher.type") must equal("Dispatcher")
|
||||
getString("akka.actor.default-dispatcher.name") must equal("default-dispatcher")
|
||||
getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000)
|
||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0)
|
||||
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(3.0)
|
||||
|
|
@ -37,9 +38,15 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
|||
getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout") must equal(1 * 1000)
|
||||
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
|
||||
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)
|
||||
|
||||
getBoolean("akka.actor.serialize-messages") must equal(false)
|
||||
settings.SerializeAllMessages must equal(false)
|
||||
|
||||
getInt("akka.scheduler.ticksPerWheel") must equal(512)
|
||||
settings.SchedulerTicksPerWheel must equal(512)
|
||||
|
||||
getMilliseconds("akka.scheduler.tickDuration") must equal(100)
|
||||
settings.SchedulerTickDuration must equal(100 millis)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.dataflow
|
||||
|
||||
|
|
@ -9,6 +9,7 @@ import akka.actor.future2actor
|
|||
import akka.util.duration._
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.pattern.ask
|
||||
|
||||
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
|
||||
|
||||
|
|
|
|||
|
|
@ -11,11 +11,12 @@ import akka.testkit.{ EventFilter, filterEvents, filterException }
|
|||
import akka.util.duration._
|
||||
import akka.testkit.AkkaSpec
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import java.lang.ArithmeticException
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.testkit.TestLatch
|
||||
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
|
||||
import scala.runtime.NonLocalReturnControl
|
||||
import akka.pattern.ask
|
||||
import java.lang.{ IllegalStateException, ArithmeticException }
|
||||
|
||||
object FutureSpec {
|
||||
class TestActor extends Actor {
|
||||
|
|
@ -323,17 +324,35 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
}))
|
||||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] }
|
||||
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
|
||||
}
|
||||
|
||||
"zip" in {
|
||||
val timeout = 10000 millis
|
||||
val f = new IllegalStateException("test")
|
||||
intercept[IllegalStateException] {
|
||||
Await.result(Promise.failed[String](f) zip Promise.successful("foo"), timeout)
|
||||
} must be(f)
|
||||
|
||||
intercept[IllegalStateException] {
|
||||
Await.result(Promise.successful("foo") zip Promise.failed[String](f), timeout)
|
||||
} must be(f)
|
||||
|
||||
intercept[IllegalStateException] {
|
||||
Await.result(Promise.failed[String](f) zip Promise.failed[String](f), timeout)
|
||||
} must be(f)
|
||||
|
||||
Await.result(Promise.successful("foo") zip Promise.successful("foo"), timeout) must be(("foo", "foo"))
|
||||
}
|
||||
|
||||
"fold by composing" in {
|
||||
val actors = (1 to 10).toList map { _ ⇒
|
||||
system.actorOf(Props(new Actor {
|
||||
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); sender.tell(add) }
|
||||
}))
|
||||
}
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] }
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(10000).mapTo[Int] }
|
||||
Await.result(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)), timeout.duration) must be(45)
|
||||
}
|
||||
|
||||
|
|
@ -350,7 +369,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
}))
|
||||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] }
|
||||
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
|
||||
}
|
||||
}
|
||||
|
|
@ -382,7 +401,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
}))
|
||||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200))(timeout).mapTo[Int] }
|
||||
assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45)
|
||||
}
|
||||
|
||||
|
|
@ -399,7 +418,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
}))
|
||||
}
|
||||
val timeout = 10000
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
||||
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100))(timeout).mapTo[Int] }
|
||||
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
|
||||
}
|
||||
}
|
||||
|
|
@ -440,7 +459,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
"shouldHandleThrowables" in {
|
||||
class ThrowableTest(m: String) extends Throwable(m)
|
||||
|
||||
filterException[ThrowableTest] {
|
||||
EventFilter[ThrowableTest](occurrences = 4) intercept {
|
||||
val f1 = Future[Any] { throw new ThrowableTest("test") }
|
||||
intercept[ThrowableTest] { Await.result(f1, timeout.duration) }
|
||||
|
||||
|
|
@ -859,6 +878,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
Await.result(p, timeout.duration) must be(result)
|
||||
}
|
||||
}
|
||||
"zip properly" in {
|
||||
f { (future, result) ⇒
|
||||
Await.result(future zip Promise.successful("foo"), timeout.duration) must be((result, "foo"))
|
||||
(evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")), timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes")
|
||||
}
|
||||
}
|
||||
"not recover from exception" in { f((future, result) ⇒ Await.result(future.recover({ case _ ⇒ "pigdog" }), timeout.duration) must be(result)) }
|
||||
"perform action on result" in {
|
||||
f { (future, result) ⇒
|
||||
|
|
@ -892,6 +917,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
"retain exception with map" in { f((future, message) ⇒ (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) }
|
||||
"retain exception with flatMap" in { f((future, message) ⇒ (evaluating { Await.result(future flatMap (_ ⇒ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) }
|
||||
"not perform action with foreach" is pending
|
||||
|
||||
"zip properly" in {
|
||||
f { (future, message) ⇒ (evaluating { Await.result(future zip Promise.successful("foo"), timeout.duration) } must produce[E]).getMessage must be(message) }
|
||||
}
|
||||
"recover from exception" in { f((future, message) ⇒ Await.result(future.recover({ case e if e.getMessage == message ⇒ "pigdog" }), timeout.duration) must be("pigdog")) }
|
||||
"not perform action on result" is pending
|
||||
"project a failure" in { f((future, message) ⇒ Await.result(future.failed, timeout.duration).getMessage must be(message)) }
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ package akka.dispatch
|
|||
|
||||
import akka.actor.{ Props, LocalActorRef, Actor }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.Duration
|
||||
import akka.pattern.ask
|
||||
import akka.util.duration._
|
||||
import akka.testkit.DefaultTimeout
|
||||
import com.typesafe.config.Config
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.event
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.event
|
||||
|
||||
|
|
@ -15,6 +15,8 @@ import akka.actor._
|
|||
|
||||
object LoggingReceiveSpec {
|
||||
class TestLogActor extends Actor {
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(List(classOf[Throwable]))
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
}
|
||||
|
|
@ -149,7 +151,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
within(3 seconds) {
|
||||
val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian
|
||||
val lname = lifecycleGuardian.path.toString
|
||||
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor])
|
||||
val sname = supervisor.path.toString
|
||||
val sclass = classOf[TestLogActor]
|
||||
|
||||
|
|
|
|||
36
akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala
Normal file
36
akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.duration._
|
||||
|
||||
class AskSpec extends AkkaSpec {
|
||||
|
||||
"The “ask” pattern" must {
|
||||
|
||||
"return broken promises on DeadLetters" in {
|
||||
val dead = system.actorFor("/system/deadLetters")
|
||||
val f = dead.ask(42)(1 second)
|
||||
f.isCompleted must be(true)
|
||||
f.value.get match {
|
||||
case Left(_: AskTimeoutException) ⇒
|
||||
case v ⇒ fail(v + " was not Left(AskTimeoutException)")
|
||||
}
|
||||
}
|
||||
|
||||
"return broken promises on EmptyLocalActorRefs" in {
|
||||
val empty = system.actorFor("unknown")
|
||||
implicit val timeout = system.settings.ActorTimeout
|
||||
val f = empty ? 3.14
|
||||
f.isCompleted must be(true)
|
||||
f.value.get match {
|
||||
case Left(_: AskTimeoutException) ⇒
|
||||
case v ⇒ fail(v + " was not Left(AskTimeoutException)")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,11 +1,8 @@
|
|||
package akka.performance.trading.domain
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.actor.Extension
|
||||
import akka.actor.ExtensionId
|
||||
import akka.actor.ExtensionIdProvider
|
||||
import akka.actor.ActorSystemImpl
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorSystem }
|
||||
|
||||
abstract trait TradeObserver {
|
||||
def trade(bid: Bid, ask: Ask)
|
||||
|
|
@ -38,5 +35,5 @@ object TotalTradeCounterExtension
|
|||
extends ExtensionId[TotalTradeCounter]
|
||||
with ExtensionIdProvider {
|
||||
override def lookup = TotalTradeCounterExtension
|
||||
override def createExtension(system: ActorSystemImpl) = new TotalTradeCounter
|
||||
override def createExtension(system: ExtendedActorSystem) = new TotalTradeCounter
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
|
||||
object ConfiguredLocalRoutingSpec {
|
||||
val config = """
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.routing
|
||||
|
||||
|
|
@ -10,6 +10,7 @@ import akka.dispatch.Await
|
|||
import akka.util.duration._
|
||||
import akka.actor.ActorRef
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.pattern.ask
|
||||
|
||||
object ResizerSpec {
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.routing
|
||||
|
||||
|
|
@ -12,6 +12,7 @@ import akka.dispatch.Await
|
|||
import akka.util.Duration
|
||||
import akka.config.ConfigurationException
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.pattern.ask
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import com.typesafe.config.Config
|
||||
|
||||
|
|
@ -31,10 +32,7 @@ object RoutingSpec {
|
|||
"""
|
||||
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
println("Hello")
|
||||
}
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
|
||||
class Echo extends Actor {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
|
@ -13,6 +13,7 @@ import akka.util.Timeout
|
|||
import akka.util.duration._
|
||||
import scala.reflect.BeanInfo
|
||||
import com.google.protobuf.Message
|
||||
import akka.pattern.ask
|
||||
|
||||
class ProtobufSerializer extends Serializer {
|
||||
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.testkit
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
|
|
@ -53,11 +53,12 @@ class DurationSpec extends WordSpec with MustMatchers {
|
|||
"support fromNow" in {
|
||||
val dead = 2.seconds.fromNow
|
||||
val dead2 = 2 seconds fromNow
|
||||
dead.timeLeft must be > 1.second
|
||||
dead2.timeLeft must be > 1.second
|
||||
// view bounds vs. very local type inference vs. operator precedence: sigh
|
||||
dead.timeLeft must be > (1 second: Duration)
|
||||
dead2.timeLeft must be > (1 second: Duration)
|
||||
1.second.sleep
|
||||
dead.timeLeft must be < 1.second
|
||||
dead2.timeLeft must be < 1.second
|
||||
dead.timeLeft must be < (1 second: Duration)
|
||||
dead2.timeLeft must be < (1 second: Duration)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.util
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.actor;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch;
|
||||
|
|
|
|||
|
|
@ -1,12 +1,11 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
abstract class AbstractMessageDispatcher {
|
||||
private volatile int _shutdownSchedule; // not initialized because this is faster: 0 == UNSCHEDULED
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
@ -103,6 +103,13 @@ abstract class AbstractConfigValue implements ConfigValue, MergeableValue {
|
|||
throw badMergeException();
|
||||
}
|
||||
|
||||
protected AbstractConfigValue mergedWithNonObject(AbstractConfigValue fallback) {
|
||||
// falling back to a non-object doesn't merge anything, and also
|
||||
// prohibits merging any objects that we fall back to later.
|
||||
// so we have to switch to ignoresFallbacks mode.
|
||||
return newCopy(true /* ignoresFallbacks */, origin);
|
||||
}
|
||||
|
||||
public AbstractConfigValue withOrigin(ConfigOrigin origin) {
|
||||
if (this.origin == origin)
|
||||
return this;
|
||||
|
|
@ -130,10 +137,7 @@ abstract class AbstractConfigValue implements ConfigValue, MergeableValue {
|
|||
return mergedWithObject((AbstractConfigObject) other);
|
||||
}
|
||||
} else {
|
||||
// falling back to a non-object doesn't merge anything, and also
|
||||
// prohibits merging any objects that we fall back to later.
|
||||
// so we have to switch to ignoresFallbacks mode.
|
||||
return newCopy(true /* ignoresFallbacks */, origin);
|
||||
return mergedWithNonObject((AbstractConfigValue) other);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
@ -84,13 +84,10 @@ final class ConfigSubstitution extends AbstractConfigValue implements
|
|||
((AbstractConfigValue) fallback).ignoresFallbacks());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractConfigValue mergedWithObject(AbstractConfigObject fallback) {
|
||||
protected AbstractConfigValue mergedLater(AbstractConfigValue fallback) {
|
||||
if (ignoresFallbacks)
|
||||
throw new ConfigException.BugOrBroken("should not be reached");
|
||||
|
||||
// if we turn out to be an object, and the fallback also does,
|
||||
// then a merge may be required; delay until we resolve.
|
||||
List<AbstractConfigValue> newStack = new ArrayList<AbstractConfigValue>();
|
||||
newStack.add(this);
|
||||
newStack.add(fallback);
|
||||
|
|
@ -98,6 +95,23 @@ final class ConfigSubstitution extends AbstractConfigValue implements
|
|||
fallback.ignoresFallbacks());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractConfigValue mergedWithObject(AbstractConfigObject fallback) {
|
||||
// if we turn out to be an object, and the fallback also does,
|
||||
// then a merge may be required; delay until we resolve.
|
||||
return mergedLater(fallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractConfigValue mergedWithNonObject(AbstractConfigValue fallback) {
|
||||
// if the optional substitution ends up getting deleted (because it is
|
||||
// not present), we'll have to use the fallback. So delay the merge.
|
||||
if (pieces.size() == 1 && ((SubstitutionExpression) pieces.get(0)).optional())
|
||||
return mergedLater(fallback);
|
||||
else
|
||||
return super.mergedWithNonObject(fallback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<ConfigSubstitution> unmergedValues() {
|
||||
return Collections.singleton(this);
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2011 Typesafe Inc. <http://typesafe.com>
|
||||
* Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com>
|
||||
*/
|
||||
package com.typesafe.config.impl;
|
||||
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue