Merge branch 'master' into wip-channels-∂π

This commit is contained in:
Roland 2013-01-31 20:26:01 +01:00
commit 1b331dc547
575 changed files with 8084 additions and 6947 deletions

125
.gitignore vendored
View file

@ -1,72 +1,73 @@
*.vim
*~
*# *#
src_managed *.iml
activemq-data *.ipr
project/akka-build.properties *.iws
project/plugins/project *.pyc
project/boot/* *.tm.epoch
*/project/build/target *.vim
*/project/boot */project/boot
*/project/build/target
*/project/project.target.config-classes */project/project.target.config-classes
lib_managed *~
etags .#*
tags .*.swp
.DS_Store
.cache
.cache
.classpath
.codefellow
.ensime*
.eprj
.history
.idea
.manager
.multi-jvm
.project
.scala_dependencies
.scalastyle
.settings
.tags .tags
.tags_sorted_by_file .tags_sorted_by_file
TAGS .target
akka.tmproj .worksheet
reports
target
deploy/*.jar
.history
data
out
logs
.#*
.codefellow
storage
.ensime*
_dump
.manager
manifest.mf
semantic.cache
tm*.log
tm*.lck
tm.out
*.tm.epoch
.DS_Store
*.iws
*.ipr
*.iml
run-codefellow
.project
.settings
.classpath
.cache
.idea
.scala_dependencies
.cache
multiverse.log
.eprj
.*.swp
akka-docs/_build/
akka-docs/rst_preprocessed/
akka-contrib/rst_preprocessed/
*.pyc
akka-docs/exts/
_akka_cluster/
Makefile Makefile
TAGS
_akka_cluster/
_dump
_mb
activemq-data
akka-contrib/rst_preprocessed/
akka-docs/_build/
akka-docs/exts/
akka-docs/rst_preprocessed/
akka-osgi/src/main/resources/*.conf
akka.sublime-project akka.sublime-project
akka.sublime-workspace akka.sublime-workspace
.target akka.tmproj
.multi-jvm
_mb
schoir.props
worker*.log
mongoDB/
redis/
beanstalk/ beanstalk/
.scalastyle
bin/ bin/
.worksheet data
deploy/*.jar
etags
lib_managed
logs
manifest.mf
mongoDB/
multiverse.log
out
project/akka-build.properties
project/boot/*
project/plugins/project
redis/
reports
run-codefellow
schoir.props
semantic.cache
src_managed
storage
tags
target
tm*.lck
tm*.log
tm.out
worker*.log

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor; package akka.actor;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor; package akka.actor;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor; package akka.actor;
@ -14,4 +14,4 @@ class MyNonPublicActorClass extends UntypedActor {
@Override public void onReceive(Object msg) { @Override public void onReceive(Object msg) {
getSender().tell(msg, getSelf()); getSender().tell(msg, getSelf());
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.routing; package akka.routing;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.util; package akka.util;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka package akka

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -119,6 +119,16 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
system.stop(supervisor) system.stop(supervisor)
} }
"log failues in postStop" in {
val a = system.actorOf(Props(new Actor {
def receive = Actor.emptyBehavior
override def postStop { throw new Exception("hurrah") }
}))
EventFilter[Exception]("hurrah", occurrences = 1) intercept {
a ! PoisonPill
}
}
"clear the behavior stack upon restart" in { "clear the behavior stack upon restart" in {
case class Become(recv: ActorContext Receive) case class Become(recv: ActorContext Receive)
val a = system.actorOf(Props(new Actor { val a = system.actorOf(Props(new Actor {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -312,4 +312,4 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -145,7 +145,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
def contextStackMustBeEmpty = ActorCell.contextStack.get.headOption must be === None def contextStackMustBeEmpty = ActorCell.contextStack.get.headOption must be === None
filterException[akka.actor.ActorInitializationException] { EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new Actor { actorOf(Props(new Actor {
@ -155,49 +155,63 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result)))) actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result))))))) actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(promiseIntercept(new FailingInheritingOuterActor(actorOf(Props(new InnerActor))))(result)))) actorOf(Props(promiseIntercept(new FailingInheritingOuterActor(actorOf(Props(new InnerActor))))(result))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 2) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 2) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 2) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result))))))) actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new OuterActor(actorOf(Props(new InnerActor { actorOf(Props(new OuterActor(actorOf(Props(new InnerActor {
@ -206,21 +220,27 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 2) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
} }
contextStackMustBeEmpty contextStackMustBeEmpty
}
EventFilter[ActorInitializationException](occurrences = 1) intercept {
intercept[akka.actor.ActorInitializationException] { intercept[akka.actor.ActorInitializationException] {
wrap(result wrap(result
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ new InnerActor; new InnerActor })(result))))))) actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ new InnerActor; new InnerActor })(result)))))))

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -270,6 +270,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false)) val t = probe.expectMsg(Terminated(a)(existenceConfirmed = true, addressTerminated = false))
t.existenceConfirmed must be(true) t.existenceConfirmed must be(true)
t.addressTerminated must be(false) t.addressTerminated must be(false)
system.shutdown()
} }
"shut down when /user escalates" in { "shut down when /user escalates" in {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -87,7 +87,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
"make previous and next state data available in onTransition" in { "make previous and next state data available in onTransition" in {
val fsm = system.actorOf(Props(new OtherFSM(testActor))) val fsm = system.actorOf(Props(new OtherFSM(testActor)))
within(300 millis) { within(1 second) {
fsm ! "tick" fsm ! "tick"
expectMsg((0, 1)) expectMsg((0, 1))
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,37 +1,42 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor package akka.actor
import language.postfixOps import language.postfixOps
import java.io.Closeable
import org.scalatest.BeforeAndAfterEach import java.util.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit._
import scala.concurrent.Await
import akka.pattern.ask
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ future, Await, ExecutionContext }
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.Try
import scala.util.control.NonFatal
import org.scalatest.BeforeAndAfterEach
import com.typesafe.config.{ Config, ConfigFactory }
import akka.pattern.ask
import akka.testkit._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) object SchedulerSpec {
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout with ImplicitSender { val testConf = ConfigFactory.parseString("""
private val cancellables = new ConcurrentLinkedQueue[Cancellable]() akka.scheduler.implementation = akka.actor.DefaultScheduler
akka.scheduler.ticks-per-wheel = 32
""").withFallback(AkkaSpec.testConf)
val testConfRevolver = ConfigFactory.parseString("""
akka.scheduler.implementation = akka.actor.LightArrayRevolverScheduler
""").withFallback(testConf)
}
trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with ImplicitSender { this: AkkaSpec
import system.dispatcher import system.dispatcher
def collectCancellable(c: Cancellable): Cancellable = { def collectCancellable(c: Cancellable): Cancellable
cancellables.add(c)
c
}
override def afterEach {
while (cancellables.peek() ne null) {
for (c Option(cancellables.poll())) {
c.cancel()
c.isCancelled must be === true
}
}
}
"A Scheduler" must { "A Scheduler" must {
"schedule more than once" in { "schedule more than once" taggedAs TimingTest in {
case object Tick case object Tick
case object Tock case object Tock
@ -76,7 +81,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
expectNoMsg(500 millis) expectNoMsg(500 millis)
} }
"schedule once" in { "schedule once" taggedAs TimingTest in {
case object Tick case object Tick
val countDownLatch = new CountDownLatch(3) val countDownLatch = new CountDownLatch(3)
val tickActor = system.actorOf(Props(new Actor { val tickActor = system.actorOf(Props(new Actor {
@ -100,7 +105,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
/** /**
* ticket #372 * ticket #372
*/ */
"be cancellable" in { "be cancellable" taggedAs TimingTest in {
for (_ 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel() for (_ 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel()
expectNoMsg(2 seconds) expectNoMsg(2 seconds)
@ -124,12 +129,12 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
"be cancellable after initial delay" taggedAs TimingTest in { "be cancellable after initial delay" taggedAs TimingTest in {
val ticks = new AtomicInteger val ticks = new AtomicInteger
val initialDelay = 20.milliseconds.dilated val initialDelay = 90.milliseconds.dilated
val delay = 200.milliseconds.dilated val delay = 500.milliseconds.dilated
val timeout = collectCancellable(system.scheduler.schedule(initialDelay, delay) { val timeout = collectCancellable(system.scheduler.schedule(initialDelay, delay) {
ticks.incrementAndGet() ticks.incrementAndGet()
}) })
Thread.sleep((initialDelay + 100.milliseconds.dilated).toMillis) Thread.sleep((initialDelay + 200.milliseconds.dilated).toMillis)
timeout.cancel() timeout.cancel()
Thread.sleep((delay + 100.milliseconds.dilated).toMillis) Thread.sleep((delay + 100.milliseconds.dilated).toMillis)
@ -139,7 +144,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
/** /**
* ticket #307 * ticket #307
*/ */
"pick up schedule after actor restart" in { "pick up schedule after actor restart" taggedAs TimingTest in {
object Ping object Ping
object Crash object Crash
@ -169,7 +174,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
Await.ready(pingLatch, 5 seconds) Await.ready(pingLatch, 5 seconds)
} }
"never fire prematurely" in { "never fire prematurely" taggedAs TimingTest in {
val ticks = new TestLatch(300) val ticks = new TestLatch(300)
case class Msg(ts: Long) case class Msg(ts: Long)
@ -205,7 +210,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
collectCancellable(system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg)) collectCancellable(system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg))
Await.ready(ticks, 3 seconds) Await.ready(ticks, 3 seconds)
(System.nanoTime() - startTime).nanos.toMillis must be(1800L plusOrMinus 199) // LARS is a bit more aggressive in scheduling recurring tasks at the right
// frequency and may execute them a little earlier; the actual expected timing
// is 1599ms on a fast machine or 1699ms on a loaded one (plus some room for jenkins)
(System.nanoTime() - startTime).nanos.toMillis must be(1750L plusOrMinus 250)
} }
"adjust for scheduler inaccuracy" taggedAs TimingTest in { "adjust for scheduler inaccuracy" taggedAs TimingTest in {
@ -230,5 +238,276 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
// Rate // Rate
n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3) n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3)
} }
"handle timeouts equal to multiple of wheel period" taggedAs TimingTest in {
val timeout = 3200 milliseconds
val barrier = TestLatch()
import system.dispatcher
val job = system.scheduler.scheduleOnce(timeout)(barrier.countDown())
try {
Await.ready(barrier, 5000 milliseconds)
} finally {
job.cancel()
}
}
"survive being stressed without cancellation" taggedAs TimingTest in {
val r = ThreadLocalRandom.current()
val N = 100000
for (_ 1 to N) {
val next = r.nextInt(3000)
val now = System.nanoTime
system.scheduler.scheduleOnce(next.millis) {
val stop = System.nanoTime
testActor ! (stop - now - next * 1000000L)
}
}
val latencies = within(5.seconds) {
for (i 1 to N) yield try expectMsgType[Long] catch {
case NonFatal(e) throw new Exception(s"failed expecting the $i-th latency", e)
}
}
val histogram = latencies groupBy (_ / 100000000L)
for (k histogram.keys.toSeq.sorted) {
system.log.info(f"${k * 100}%3d: ${histogram(k).size}")
}
}
} }
} }
class DefaultSchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with SchedulerSpec {
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
"A HashedWheelTimer" must {
"not mess up long timeouts" taggedAs LongRunningTest in {
val longish = Long.MaxValue.nanos
val barrier = TestLatch()
import system.dispatcher
val job = system.scheduler.scheduleOnce(longish)(barrier.countDown())
intercept[TimeoutException] {
// this used to fire after 46 seconds due to wrap-around
Await.ready(barrier, 90 seconds)
}
job.cancel()
}
}
def collectCancellable(c: Cancellable): Cancellable = {
cancellables.add(c)
c
}
override def afterEach {
while (cancellables.peek() ne null) {
for (c Option(cancellables.poll())) {
c.cancel()
c.isCancelled must be === true
}
}
}
}
class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRevolver) with SchedulerSpec {
def collectCancellable(c: Cancellable): Cancellable = c
"A LightArrayRevolverScheduler" must {
"survive being stressed with cancellation" taggedAs TimingTest in {
import system.dispatcher
val r = ThreadLocalRandom.current
val N = 1000000
val tasks = for (_ 1 to N) yield {
val next = r.nextInt(3000)
val now = System.nanoTime
system.scheduler.scheduleOnce(next.millis) {
val stop = System.nanoTime
testActor ! (stop - now - next * 1000000L)
}
}
// get somewhat into the middle of things
Thread.sleep(500)
val cancellations = for (t tasks) yield {
t.cancel()
if (t.isCancelled) 1 else 0
}
val cancelled = cancellations.sum
println(cancelled)
val latencies = within(5.seconds) {
for (i 1 to (N - cancelled)) yield try expectMsgType[Long] catch {
case NonFatal(e) throw new Exception(s"failed expecting the $i-th latency", e)
}
}
val histogram = latencies groupBy (_ / 100000000L)
for (k histogram.keys.toSeq.sorted) {
system.log.info(f"${k * 100}%3d: ${histogram(k).size}")
}
expectNoMsg(1.second)
}
"survive vicious enqueueing" in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
import driver._
import system.dispatcher
val counter = new AtomicInteger
val terminated = future {
var rounds = 0
while (Try(sched.scheduleOnce(Duration.Zero)(())(localEC)).isSuccess) {
Thread.sleep(1)
driver.wakeUp(step)
rounds += 1
}
rounds
}
def delay = if (ThreadLocalRandom.current.nextBoolean) step * 2 else step
val N = 1000000
(1 to N) foreach (_ sched.scheduleOnce(delay)(counter.incrementAndGet()))
sched.close()
Await.result(terminated, 3.seconds.dilated) must be > 10
awaitCond(counter.get == N)
}
}
"execute multiple jobs at once when expiring multiple buckets" in {
withScheduler() { (sched, driver)
implicit def ec = localEC
import driver._
val start = step / 2
(0 to 3) foreach (i sched.scheduleOnce(start + step * i, testActor, "hello"))
expectNoMsg(step)
wakeUp(step)
expectWait(step)
wakeUp(step * 4 + step / 2)
expectWait(step / 2)
(0 to 3) foreach (_ expectMsg(Duration.Zero, "hello"))
}
}
"correctly wrap around wheel rounds" in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
implicit def ec = localEC
import driver._
val start = step / 2
(0 to 3) foreach (i sched.scheduleOnce(start + step * i, probe.ref, "hello"))
probe.expectNoMsg(step)
wakeUp(step)
expectWait(step)
// the following are no for-comp to see which iteration fails
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
expectWait(step)
}
}
"correctly execute jobs when clock wraps around" in {
withScheduler(Long.MaxValue - 200000000L) { (sched, driver)
implicit def ec = localEC
import driver._
val start = step / 2
(0 to 3) foreach (i sched.scheduleOnce(start + step * i, testActor, "hello"))
expectNoMsg(step)
wakeUp(step)
expectWait(step)
// the following are no for-comp to see which iteration fails
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectWait(step)
}
}
"reliably reject jobs when shutting down" in {
withScheduler() { (sched, driver)
import system.dispatcher
val counter = new AtomicInteger
future { Thread.sleep(5); sched.close() }
val headroom = 200
var overrun = headroom
val cap = 1000000
val (success, failure) = Iterator
.continually(Try(sched.scheduleOnce(100.millis)(counter.incrementAndGet())))
.take(cap)
.takeWhile(_.isSuccess || { overrun -= 1; overrun >= 0 })
.partition(_.isSuccess)
val s = success.size
s must be < cap
awaitCond(s == counter.get, message = s"$s was not ${counter.get}")
failure.size must be === headroom
}
}
}
trait Driver {
def wakeUp(d: FiniteDuration): Unit
def expectWait(): FiniteDuration
def expectWait(d: FiniteDuration) { expectWait() must be(d) }
def probe: TestProbe
def step: FiniteDuration
}
val localEC = new ExecutionContext {
def execute(runnable: Runnable) { runnable.run() }
def reportFailure(t: Throwable) { t.printStackTrace() }
}
def withScheduler(start: Long = 0L, config: Config = ConfigFactory.empty)(thunk: (Scheduler with Closeable, Driver) Unit): Unit = {
import akka.actor.{ LightArrayRevolverScheduler LARS }
val lbq = new LinkedBlockingQueue[Long]
val prb = TestProbe()
val tf = system.asInstanceOf[ActorSystemImpl].threadFactory
val sched =
new { @volatile var time = start } with LARS(config.withFallback(system.settings.config), log, tf) {
override protected def clock(): Long = {
// println(s"clock=$time")
time
}
override protected def waitNanos(ns: Long): Unit = {
// println(s"waiting $ns")
prb.ref ! ns
try time += lbq.take()
catch {
case _: InterruptedException Thread.currentThread.interrupt()
}
}
}
val driver = new Driver {
def wakeUp(d: FiniteDuration) { lbq.offer(d.toNanos) }
def expectWait(): FiniteDuration = probe.expectMsgType[Long].nanos
def probe = prb
def step = sched.TickDuration
}
driver.expectWait()
try thunk(sched, driver)
catch {
case NonFatal(ex)
try sched.close()
catch { case _: Exception }
throw ex
}
sched.close()
}
}

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -211,7 +211,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
import TypedActorSpec._ import TypedActorSpec._
def newFooBar: Foo = newFooBar(Duration(2, "s")) def newFooBar: Foo = newFooBar(timeout.duration)
def newFooBar(d: FiniteDuration): Foo = def newFooBar(d: FiniteDuration): Foo =
TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d))) TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)))
@ -221,7 +221,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
def newStacked(): Stacked = def newStacked(): Stacked =
TypedActor(system).typedActorOf( TypedActor(system).typedActorOf(
TypedProps[StackedImpl](classOf[Stacked], classOf[StackedImpl]).withTimeout(Timeout(2000))) TypedProps[StackedImpl](classOf[Stacked], classOf[StackedImpl]).withTimeout(timeout))
def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true) def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true)
@ -296,7 +296,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
t.nullJOption() must be === JOption.none t.nullJOption() must be === JOption.none
t.nullOption() must be === None t.nullOption() must be === None
t.nullReturn() must be === null t.nullReturn() must be === null
Await.result(t.nullFuture(), remaining) must be === null Await.result(t.nullFuture(), timeout.duration) must be === null
} }
"be able to call Future-returning methods non-blockingly" in { "be able to call Future-returning methods non-blockingly" in {
@ -307,11 +307,11 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
mustStop(t) mustStop(t)
} }
"be able to call multiple Future-returning methods non-blockingly" in { "be able to call multiple Future-returning methods non-blockingly" in within(timeout.duration) {
val t = newFooBar val t = newFooBar
val futures = for (i 1 to 20) yield (i, t.futurePigdog(20, i)) val futures = for (i 1 to 20) yield (i, t.futurePigdog(20, i))
for ((i, f) futures) { for ((i, f) futures) {
Await.result(f, timeout.duration) must be("Pigdog" + i) Await.result(f, remaining) must be("Pigdog" + i)
} }
mustStop(t) mustStop(t)
} }
@ -330,11 +330,11 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
mustStop(t) mustStop(t)
} }
"be able to compose futures without blocking" in { "be able to compose futures without blocking" in within(timeout.duration) {
val t, t2 = newFooBar(2 seconds) val t, t2 = newFooBar(remaining)
val f = t.futureComposePigdogFrom(t2) val f = t.futureComposePigdogFrom(t2)
f.isCompleted must be(false) f.isCompleted must be(false)
Await.result(f, timeout.duration) must equal("PIGDOG") Await.result(f, remaining) must equal("PIGDOG")
mustStop(t) mustStop(t)
mustStop(t2) mustStop(t2)
} }
@ -391,13 +391,13 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
mustStop(t) mustStop(t)
} }
"be able to support implementation only typed actors" in { "be able to support implementation only typed actors" in within(timeout.duration) {
val t: Foo = TypedActor(system).typedActorOf(TypedProps[Bar]()) val t: Foo = TypedActor(system).typedActorOf(TypedProps[Bar]())
val f = t.futurePigdog(200) val f = t.futurePigdog(200)
val f2 = t.futurePigdog(0) val f2 = t.futurePigdog(0)
f2.isCompleted must be(false) f2.isCompleted must be(false)
f.isCompleted must be(false) f.isCompleted must be(false)
Await.result(f, timeout.duration) must equal(Await.result(f2, timeout.duration)) Await.result(f, remaining) must equal(Await.result(f2, remaining))
mustStop(t) mustStop(t)
} }
@ -408,13 +408,13 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
mustStop(t) mustStop(t)
} }
"be able to use balancing dispatcher" in { "be able to use balancing dispatcher" in within(timeout.duration) {
val thais = for (i 1 to 60) yield newFooBar("pooled-dispatcher", 6 seconds) val thais = for (i 1 to 60) yield newFooBar("pooled-dispatcher", 6 seconds)
val iterator = new CyclicIterator(thais) val iterator = new CyclicIterator(thais)
val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i)) val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i))
for ((i, r) results) Await.result(r, timeout.duration) must be("Pigdog" + i) for ((i, r) results) Await.result(r, remaining) must be("Pigdog" + i)
for (t thais) mustStop(t) for (t thais) mustStop(t)
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor.dispatch package akka.actor.dispatch
@ -80,21 +80,21 @@ object ActorModelSpec {
} }
def receive = { def receive = {
case AwaitLatch(latch) ack; latch.await(); busy.switchOff() case AwaitLatch(latch) { ack; latch.await(); busy.switchOff() }
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff() case Meet(sign, wait) { ack; sign.countDown(); wait.await(); busy.switchOff() }
case Wait(time) ack; Thread.sleep(time); busy.switchOff() case Wait(time) { ack; Thread.sleep(time); busy.switchOff() }
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff() case WaitAck(time, l) { ack; Thread.sleep(time); l.countDown(); busy.switchOff() }
case Reply(msg) ack; sender ! msg; busy.switchOff() case Reply(msg) { ack; sender ! msg; busy.switchOff() }
case TryReply(msg) ack; sender.tell(msg, null); busy.switchOff() case TryReply(msg) { ack; sender.tell(msg, null); busy.switchOff() }
case Forward(to, msg) ack; to.forward(msg); busy.switchOff() case Forward(to, msg) { ack; to.forward(msg); busy.switchOff() }
case CountDown(latch) ack; latch.countDown(); busy.switchOff() case CountDown(latch) { ack; latch.countDown(); busy.switchOff() }
case Increment(count) ack; count.incrementAndGet(); busy.switchOff() case Increment(count) { ack; count.incrementAndGet(); busy.switchOff() }
case CountDownNStop(l) ack; l.countDown(); context.stop(self); busy.switchOff() case CountDownNStop(l) { ack; l.countDown(); context.stop(self); busy.switchOff() }
case Restart ack; busy.switchOff(); throw new Exception("Restart requested") case Restart { ack; busy.switchOff(); throw new Exception("Restart requested") }
case Interrupt ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") case Interrupt { ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") }
case InterruptNicely(msg) ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() case InterruptNicely(msg) { ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() }
case ThrowException(e: Throwable) ack; busy.switchOff(); throw e case ThrowException(e: Throwable) { ack; busy.switchOff(); throw e }
case DoubleStop ack; context.stop(self); context.stop(self); busy.switchOff case DoubleStop { ack; context.stop(self); context.stop(self); busy.switchOff }
} }
} }
@ -229,16 +229,17 @@ object ActorModelSpec {
} }
} }
@tailrec def await(until: Long)(condition: Boolean): Unit = if (System.currentTimeMillis() <= until) { @tailrec def await(until: Long)(condition: Boolean): Unit =
var done = false if (System.currentTimeMillis() <= until) {
try { var done = false
done = condition try {
if (!done) Thread.sleep(25) done = condition
} catch { if (!done) Thread.sleep(25)
case e: InterruptedException } catch {
} case e: InterruptedException
if (!done) await(until)(condition) }
} else throw new AssertionError("await failed") if (!done) await(until)(condition)
} else throw new AssertionError("await failed")
} }
abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout { abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout {
@ -414,17 +415,28 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
val a = newTestActor(dispatcher.id) val a = newTestActor(dispatcher.id)
val f1 = a ? Reply("foo") val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar") val f2 = a ? Reply("bar")
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)).future } val f3 = a ? Interrupt
Thread.interrupted() // CallingThreadDispatcher may necessitate this
val f4 = a ? Reply("foo2") val f4 = a ? Reply("foo2")
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(new ActorInterruptedException(ie)).future } val f5 = a ? Interrupt
Thread.interrupted() // CallingThreadDispatcher may necessitate this
val f6 = a ? Reply("bar2") val f6 = a ? Reply("bar2")
val c = system.scheduler.scheduleOnce(2.seconds) {
import collection.JavaConverters._
Thread.getAllStackTraces().asScala foreach {
case (thread, stack)
println(s"$thread:")
stack foreach (s println(s"\t$s"))
}
}
assert(Await.result(f1, remaining) === "foo") assert(Await.result(f1, remaining) === "foo")
assert(Await.result(f2, remaining) === "bar") assert(Await.result(f2, remaining) === "bar")
assert(Await.result(f4, remaining) === "foo2") assert(Await.result(f4, remaining) === "foo2")
assert(intercept[ActorInterruptedException](Await.result(f3, remaining)).getCause.getMessage === "Ping!") assert(intercept[ActorInterruptedException](Await.result(f3, remaining)).getCause.getMessage === "Ping!")
assert(Await.result(f6, remaining) === "bar2") assert(Await.result(f6, remaining) === "bar2")
assert(intercept[ActorInterruptedException](Await.result(f5, remaining)).getCause.getMessage === "Ping!") assert(intercept[ActorInterruptedException](Await.result(f5, remaining)).getCause.getMessage === "Ping!")
c.cancel()
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor.dispatch package akka.actor.dispatch

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.config package akka.config
@ -32,10 +32,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
settings.SerializeAllMessages must equal(false) settings.SerializeAllMessages must equal(false)
getInt("akka.scheduler.ticks-per-wheel") must equal(512) getInt("akka.scheduler.ticks-per-wheel") must equal(512)
settings.SchedulerTicksPerWheel must equal(512)
getMilliseconds("akka.scheduler.tick-duration") must equal(100) getMilliseconds("akka.scheduler.tick-duration") must equal(100)
settings.SchedulerTickDuration must equal(100 millis) getString("akka.scheduler.implementation") must equal("akka.actor.LightArrayRevolverScheduler")
getBoolean("akka.daemonic") must be(false) getBoolean("akka.daemonic") must be(false)
settings.Daemonicity must be(false) settings.Daemonicity must be(false)

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.dataflow package akka.dataflow

View file

@ -4,6 +4,7 @@ import java.util.concurrent.{ ExecutorService, Executor, Executors }
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._ import scala.concurrent._
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
import akka.util.SerializedSuspendableExecutionContext
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -81,4 +82,82 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
Await.ready(latch, timeout.duration) Await.ready(latch, timeout.duration)
} }
} }
"A SerializedSuspendableExecutionContext" must {
"be suspendable and resumable" in {
val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global)
val counter = new AtomicInteger(0)
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
perform(_ + 1)
perform(x { sec.suspend(); x * 2 })
awaitCond(counter.get == 2)
perform(_ + 4)
perform(_ * 2)
sec.size must be === 2
Thread.sleep(500)
sec.size must be === 2
counter.get must be === 2
sec.resume()
awaitCond(counter.get == 12)
perform(_ * 2)
awaitCond(counter.get == 24)
sec.isEmpty must be === true
}
"execute 'throughput' number of tasks per sweep" in {
val submissions = new AtomicInteger(0)
val counter = new AtomicInteger(0)
val underlying = new ExecutionContext {
override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) }
override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) }
}
val throughput = 25
val sec = SerializedSuspendableExecutionContext(throughput)(underlying)
sec.suspend()
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
val total = 1000
1 to total foreach { _ perform(_ + 1) }
sec.size() must be === total
sec.resume()
awaitCond(counter.get == total)
submissions.get must be === (total / throughput)
sec.isEmpty must be === true
}
"execute tasks in serial" in {
val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global)
val total = 10000
val counter = new AtomicInteger(0)
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
1 to total foreach { i perform(c if (c == (i - 1)) c + 1 else c) }
awaitCond(counter.get == total)
sec.isEmpty must be === true
}
"should relinquish thread when suspended" in {
val submissions = new AtomicInteger(0)
val counter = new AtomicInteger(0)
val underlying = new ExecutionContext {
override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) }
override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) }
}
val throughput = 25
val sec = SerializedSuspendableExecutionContext(throughput)(underlying)
sec.suspend()
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
perform(_ + 1)
1 to 10 foreach { _ perform(identity) }
perform(x { sec.suspend(); x * 2 })
perform(_ + 8)
sec.size must be === 13
sec.resume()
awaitCond(counter.get == 2)
sec.resume()
awaitCond(counter.get == 10)
sec.isEmpty must be === true
submissions.get must be === 2
}
}
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.dispatch package akka.dispatch

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.event package akka.event

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.event package akka.event

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.event package akka.event

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.pattern package akka.pattern
@ -70,4 +70,4 @@ class AskSpec extends AkkaSpec {
} }
} }

View file

@ -1,11 +1,12 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.pattern package akka.pattern
import akka.testkit._ import akka.testkit._
import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Promise, Future, Await } import scala.concurrent.{ Future, Await }
import scala.annotation.tailrec import scala.annotation.tailrec
class CircuitBreakerMTSpec extends AkkaSpec { class CircuitBreakerMTSpec extends AkkaSpec {
@ -13,42 +14,49 @@ class CircuitBreakerMTSpec extends AkkaSpec {
"A circuit breaker being called by many threads" must { "A circuit breaker being called by many threads" must {
val callTimeout = 1.second.dilated val callTimeout = 1.second.dilated
val resetTimeout = 2.seconds.dilated val resetTimeout = 2.seconds.dilated
val breaker = new CircuitBreaker(system.scheduler, 5, callTimeout, resetTimeout) val maxFailures = 5
val breaker = new CircuitBreaker(system.scheduler, maxFailures, callTimeout, resetTimeout)
val numberOfTestCalls = 100
def openBreaker(): Unit = { def openBreaker(): Unit = {
@tailrec def call(attemptsLeft: Int): Unit = { // returns true if the breaker is open
attemptsLeft must be > (0) def failingCall(): Boolean =
if (Await.result(breaker.withCircuitBreaker(Future(throw new RuntimeException("FAIL"))) recover { Await.result(breaker.withCircuitBreaker(Future(throw new RuntimeException("FAIL"))) recover {
case _: CircuitBreakerOpenException false case _: CircuitBreakerOpenException true
case _ true case _ false
}, remaining)) call(attemptsLeft - 1) }, remaining)
// fire some failing calls
1 to (maxFailures + 1) foreach { _ failingCall() }
// and then continue with failing calls until the breaker is open
awaitCond(failingCall())
}
def testCallsWithBreaker(): immutable.IndexedSeq[Future[String]] = {
val aFewActive = new TestLatch(5)
for (_ 1 to numberOfTestCalls) yield breaker.withCircuitBreaker(Future {
aFewActive.countDown()
Await.ready(aFewActive, 5.seconds.dilated)
"succeed"
}) recoverWith {
case _: CircuitBreakerOpenException
aFewActive.countDown()
Future.successful("CBO")
} }
call(10)
} }
"allow many calls while in closed state with no errors" in { "allow many calls while in closed state with no errors" in {
val futures = testCallsWithBreaker()
val futures = for (i 1 to 100) yield breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" })
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("succeed") result.toSet must be === Set("succeed")
} }
"transition to open state upon reaching failure limit and fail-fast" in { "transition to open state upon reaching failure limit and fail-fast" in {
openBreaker() openBreaker()
val futures = testCallsWithBreaker()
val futures = for (i 1 to 100) yield breaker.withCircuitBreaker(Future {
Thread.sleep(10); "success"
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("CBO") result.toSet must be === Set("CBO")
} }
@ -58,17 +66,12 @@ class CircuitBreakerMTSpec extends AkkaSpec {
openBreaker() openBreaker()
// breaker should become half-open after a while
Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated)
val futures = for (i 1 to 100) yield breaker.withCircuitBreaker(Future { val futures = testCallsWithBreaker()
Thread.sleep(10); "succeed"
}) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("succeed", "CBO") result.toSet must be === Set("succeed", "CBO")
} }
@ -76,20 +79,20 @@ class CircuitBreakerMTSpec extends AkkaSpec {
val halfOpenLatch = new TestLatch(1) val halfOpenLatch = new TestLatch(1)
breaker.onHalfOpen(halfOpenLatch.countDown()) breaker.onHalfOpen(halfOpenLatch.countDown())
openBreaker() openBreaker()
Await.ready(halfOpenLatch, 5.seconds.dilated)
Await.ready(breaker.withCircuitBreaker(Future("succeed")), resetTimeout)
val futures = (1 to 100) map { // breaker should become half-open after a while
i Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated)
breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" }) recoverWith {
case _: CircuitBreakerOpenException Promise.successful("CBO").future
}
}
// one successful call should close the latch
val closedLatch = new TestLatch(1)
breaker.onClose(closedLatch.countDown())
breaker.withCircuitBreaker(Future("succeed"))
Await.ready(closedLatch, 5.seconds.dilated)
val futures = testCallsWithBreaker()
val result = Await.result(Future.sequence(futures), 5.second.dilated) val result = Await.result(Future.sequence(futures), 5.second.dilated)
result.size must be(numberOfTestCalls)
result.size must be(100)
result.toSet must be === Set("succeed") result.toSet must be === Set("succeed")
} }
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.pattern package akka.pattern
@ -10,7 +10,7 @@ import scala.concurrent.duration._
import akka.testkit._ import akka.testkit._
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import akka.actor.{ ActorSystem, Scheduler } import akka.actor.{ ActorSystem, Scheduler }
import concurrent.{ ExecutionContext, Future, Await } import scala.concurrent.{ ExecutionContext, Future, Await }
object CircuitBreakerSpec { object CircuitBreakerSpec {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.pattern package akka.pattern

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.routing package akka.routing

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.routing package akka.routing

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.routing package akka.routing
@ -49,4 +49,4 @@ class CustomRouteSpec extends AkkaSpec {
} }
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.routing package akka.routing

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.routing package akka.routing

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.serialization package akka.serialization
@ -8,17 +8,20 @@ import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter } import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor._ import akka.actor._
import akka.dispatch._
import java.io._ import java.io._
import scala.concurrent.Await import scala.concurrent.Await
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.reflect.BeanInfo import scala.reflect.BeanInfo
import com.google.protobuf.Message import com.google.protobuf.Message
import com.typesafe.config._
import akka.pattern.ask import akka.pattern.ask
import org.apache.commons.codec.binary.Hex.{ encodeHex, decodeHex }
object SerializeSpec { object SerializationTests {
val config = """ val serializeConf = """
akka { akka {
actor { actor {
serializers { serializers {
@ -26,13 +29,13 @@ object SerializeSpec {
} }
serialization-bindings { serialization-bindings {
"akka.serialization.SerializeSpec$Person" = java "akka.serialization.SerializationTests$Person" = java
"akka.serialization.SerializeSpec$Address" = java "akka.serialization.SerializationTests$Address" = java
"akka.serialization.TestSerializble" = test "akka.serialization.TestSerializable" = test
"akka.serialization.SerializeSpec$PlainMessage" = test "akka.serialization.SerializationTests$PlainMessage" = test
"akka.serialization.SerializeSpec$A" = java "akka.serialization.SerializationTests$A" = java
"akka.serialization.SerializeSpec$B" = test "akka.serialization.SerializationTests$B" = test
"akka.serialization.SerializeSpec$D" = test "akka.serialization.SerializationTests$D" = test
} }
} }
} }
@ -45,11 +48,11 @@ object SerializeSpec {
case class Record(id: Int, person: Person) case class Record(id: Int, person: Person)
class SimpleMessage(s: String) extends TestSerializble class SimpleMessage(s: String) extends TestSerializable
class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s) class ExtendedSimpleMessage(s: String, i: Int) extends SimpleMessage(s)
trait AnotherInterface extends TestSerializble trait AnotherInterface extends TestSerializable
class AnotherMessage extends AnotherInterface class AnotherMessage extends AnotherInterface
@ -67,11 +70,67 @@ object SerializeSpec {
class D extends A class D extends A
class E extends D class E extends D
val verifySerializabilityConf = """
akka {
actor {
serialize-messages = on
serialize-creators = on
}
}
"""
class FooActor extends Actor {
def receive = {
case s: String sender ! s
}
}
class FooUntypedActor extends UntypedActor {
def onReceive(message: Any) {}
}
class NonSerializableActor(system: ActorSystem) extends Actor {
def receive = {
case s: String sender ! s
}
}
def mostlyReferenceSystem: ActorSystem = {
val referenceConf = ConfigFactory.defaultReference()
val mostlyReferenceConf = AkkaSpec.testConf.withFallback(referenceConf)
ActorSystem("SerializationSystem", mostlyReferenceConf)
}
val systemMessageMultiSerializerConf = """
akka {
actor {
serializers {
test = "akka.serialization.TestSerializer"
}
serialization-bindings {
"akka.dispatch.SystemMessage" = test
}
}
}
"""
val systemMessageClasses = List[Class[_]](
classOf[Create],
classOf[Recreate],
classOf[Suspend],
classOf[Resume],
classOf[Terminate],
classOf[Supervise],
classOf[ChildTerminated],
classOf[Watch],
classOf[Unwatch],
NoMessage.getClass)
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SerializeSpec extends AkkaSpec(SerializeSpec.config) { class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) {
import SerializeSpec._ import SerializationTests._
val ser = SerializationExtension(system) val ser = SerializationExtension(system)
import ser._ import ser._
@ -156,7 +215,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
"give warning for message with several bindings" in { "give warning for message with several bindings" in {
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept { EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
ser.serializerFor(classOf[Both]).getClass must be(classOf[TestSerializer]) ser.serializerFor(classOf[Both]).getClass must (be(classOf[TestSerializer]) or be(classOf[JavaSerializer]))
} }
} }
@ -164,7 +223,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer]) ser.serializerFor(classOf[A]).getClass must be(classOf[JavaSerializer])
ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer]) ser.serializerFor(classOf[B]).getClass must be(classOf[TestSerializer])
EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept { EventFilter.warning(start = "Multiple serializers found", occurrences = 1) intercept {
ser.serializerFor(classOf[C]).getClass must be(classOf[JavaSerializer]) ser.serializerFor(classOf[C]).getClass must (be(classOf[TestSerializer]) or be(classOf[JavaSerializer]))
} }
} }
@ -194,36 +253,9 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) {
} }
} }
object VerifySerializabilitySpec {
val conf = """
akka {
actor {
serialize-messages = on
serialize-creators = on
}
}
"""
class FooActor extends Actor {
def receive = {
case s: String sender ! s
}
}
class FooUntypedActor extends UntypedActor {
def onReceive(message: Any) {}
}
class NonSerializableActor(system: ActorSystem) extends Actor {
def receive = {
case s: String sender ! s
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) { class VerifySerializabilitySpec extends AkkaSpec(SerializationTests.verifySerializabilityConf) {
import VerifySerializabilitySpec._ import SerializationTests._
implicit val timeout = Timeout(5 seconds) implicit val timeout = Timeout(5 seconds)
"verify config" in { "verify config" in {
@ -260,7 +292,98 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf)
} }
} }
trait TestSerializble @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReferenceSerializationSpec extends AkkaSpec(SerializationTests.mostlyReferenceSystem) {
import SerializationTests._
val ser = SerializationExtension(system)
def serializerMustBe(toSerialize: Class[_], expectedSerializer: Class[_]) =
ser.serializerFor(toSerialize).getClass must be(expectedSerializer)
"Serialization settings from reference.conf" must {
"declare Serializable classes to be use JavaSerializer" in {
serializerMustBe(classOf[Serializable], classOf[JavaSerializer])
serializerMustBe(classOf[String], classOf[JavaSerializer])
for (smc systemMessageClasses) {
serializerMustBe(smc, classOf[JavaSerializer])
}
}
"declare Array[Byte] to use ByteArraySerializer" in {
serializerMustBe(classOf[Array[Byte]], classOf[ByteArraySerializer])
}
"not support serialization for other classes" in {
intercept[NotSerializableException] { ser.serializerFor(classOf[Object]) }
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyReferenceSystem) {
import SerializationTests._
val ser = SerializationExtension(system)
"Cross-version serialization compatibility" must {
def verify(obj: Any, asExpected: String): Unit =
String.valueOf(encodeHex(ser.serialize(obj, obj.getClass).get)) must be(asExpected)
"be preserved for the Create SystemMessage" in {
verify(Create(1234), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465bcdf9f7f2675038d0200014900037569647870000004d27671007e0003")
}
"be preserved for the Recreate SystemMessage" in {
verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720016616b6b612e64697370617463682e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
}
"be preserved for the Suspend SystemMessage" in {
verify(Suspend(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e53757370656e6464e531d5d134b59902000078707671007e0003")
}
"be preserved for the Resume SystemMessage" in {
verify(Resume(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e526573756d65dc5e646d445fcb010200014c000f63617573656442794661696c7572657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
}
"be preserved for the Terminate SystemMessage" in {
verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5465726d696e61746509d66ca68318700f02000078707671007e0003")
}
"be preserved for the Supervise SystemMessage" in {
verify(Supervise(FakeActorRef("child"), true, 2468), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5375706572766973652d0b363f56ab5feb0200035a00056173796e634900037569644c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787001000009a47372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
}
"be preserved for the ChildTerminated SystemMessage" in {
verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
}
"be preserved for the Watch SystemMessage" in {
verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720013616b6b612e64697370617463682e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
}
"be preserved for the Unwatch SystemMessage" in {
verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
}
"be preserved for the NoMessage SystemMessage" in {
verify(NoMessage, "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720018616b6b612e64697370617463682e4e6f4d65737361676524b401a3610ccb70dd02000078707671007e0003")
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class OverriddenSystemMessageSerializationSpec extends AkkaSpec(SerializationTests.systemMessageMultiSerializerConf) {
import SerializationTests._
val ser = SerializationExtension(system)
"Overridden SystemMessage serialization" must {
"resolve to a single serializer" in {
EventFilter.warning(start = "Multiple serializers found", occurrences = 0) intercept {
for (smc systemMessageClasses) {
ser.serializerFor(smc).getClass must be(classOf[TestSerializer])
}
}
}
}
}
trait TestSerializable
class TestSerializer extends Serializer { class TestSerializer extends Serializer {
def includeManifest: Boolean = false def includeManifest: Boolean = false
@ -273,3 +396,26 @@ class TestSerializer extends Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
} }
@SerialVersionUID(1)
case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable {
override def fillInStackTrace = null
}
@SerialVersionUID(1)
case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope {
override def path = RootActorPath(Address("proto", "SomeSystem"), name)
override def forward(message: Any)(implicit context: ActorContext) = ???
override def isTerminated = ???
override def start() = ???
override def resume(causedByFailure: Throwable) = ???
override def suspend() = ???
override def restart(cause: Throwable) = ???
override def stop() = ???
override def sendSystemMessage(message: SystemMessage) = ???
override def provider = ???
override def getParent = ???
override def getChild(name: Iterator[String]) = ???
override def isLocal = ???
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = ???
}

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.testkit package akka.testkit

View file

@ -269,9 +269,19 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers {
(b.compact eq b) (b.compact eq b)
} }
} }
"asByteBuffers" in {
check { (a: ByteString) if (a.isCompact) a.asByteBuffers.size == 1 && a.asByteBuffers.head == a.asByteBuffer else a.asByteBuffers.size > 0 }
check { (a: ByteString) a.asByteBuffers.foldLeft(ByteString.empty) { (bs, bb) bs ++ ByteString(bb) } == a }
check { (a: ByteString) a.asByteBuffers.forall(_.isReadOnly) }
check { (a: ByteString)
import scala.collection.JavaConverters.iterableAsScalaIterableConverter;
a.asByteBuffers.zip(a.getByteBuffers().asScala).forall(x x._1 == x._2)
}
}
} }
"behave like a Vector" when { "behave like a Vector" when {
"concatenating" in { check { (a: ByteString, b: ByteString) likeVectors(a, b) { (a, b) (a ++ b) } } } "concatenating" in { check { (a: ByteString, b: ByteString) likeVectors(a, b) { _ ++ _ } } }
"calling apply" in { "calling apply" in {
check { slice: ByteStringSlice check { slice: ByteStringSlice

View file

@ -1,39 +1,16 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.util package akka.util
import language.postfixOps import language.postfixOps
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await
import java.util.concurrent.TimeUnit._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.TestLatch
import java.util.concurrent.TimeoutException
import akka.testkit.LongRunningTest
class DurationSpec extends AkkaSpec { class DurationSpec extends AkkaSpec {
"A HashedWheelTimer" must {
"not mess up long timeouts" taggedAs LongRunningTest in {
val longish = Long.MaxValue.nanos
val barrier = TestLatch()
import system.dispatcher
val job = system.scheduler.scheduleOnce(longish)(barrier.countDown())
intercept[TimeoutException] {
// this used to fire after 46 seconds due to wrap-around
Await.ready(barrier, 90 seconds)
}
job.cancel()
}
}
"Duration" must { "Duration" must {
"form a one-dimensional vector field" in { "form a one-dimensional vector field" in {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.util package akka.util
@ -129,4 +129,4 @@ class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout {
tasks.foreach(Await.result(_, timeout.duration)) tasks.foreach(Await.result(_, timeout.duration))
} }
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.util package akka.util

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.util package akka.util

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor; package akka.actor;

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
//#scheduler
/**
* An Akka scheduler service. This one needs one special behavior: if
* Closeable, it MUST execute all outstanding tasks upon .close() in order
* to properly shutdown all dispatchers.
*
* Furthermore, this timer service MUST throw IllegalStateException if it
* cannot schedule a task. Once scheduled, the task MUST be executed. If
* executed upon close(), the task may execute before its timeout.
*
* Scheduler implementation are loaded reflectively at ActorSystem start-up
* with the following constructor arguments:
* 1) the systems com.typesafe.config.Config (from system.settings.config)
* 2) a akka.event.LoggingAdapter
* 3) a java.util.concurrent.ThreadFactory
*/
public abstract class AbstractScheduler extends AbstractSchedulerBase {
/**
* Schedules a function to be run repeatedly with an initial delay and
* a frequency. E.g. if you would like the function to be run after 2
* seconds and thereafter every 100ms you would set delay = Duration(2,
* TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
*/
@Override
public abstract Cancellable schedule(FiniteDuration initialDelay,
FiniteDuration interval, Runnable runnable, ExecutionContext executor);
/**
* Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed.
*/
@Override
public abstract Cancellable scheduleOnce(FiniteDuration delay, Runnable runnable,
ExecutionContext executor);
/**
* The maximum supported task frequency of this scheduler, i.e. the inverse
* of the minimum time interval between executions of a recurring task, in Hz.
*/
@Override
public abstract double maxFrequency();
}
//#scheduler

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor.dungeon; package akka.actor.dungeon;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.dispatch; package akka.dispatch;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.dispatch; package akka.dispatch;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.pattern; package akka.pattern;

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.pattern; package akka.pattern;
@ -18,4 +18,4 @@ final class AbstractPromiseActorRef {
throw new ExceptionInInitializerError(t); throw new ExceptionInInitializerError(t);
} }
} }
} }

View file

@ -20,10 +20,13 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import akka.dispatch.SystemMessage;
import akka.util.Helpers; import akka.util.Helpers;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
@ -89,7 +92,6 @@ public class HashedWheelTimer implements Timer {
boolean shutdown = false; boolean shutdown = false;
final long tickDuration; final long tickDuration;
final Set<HashedWheelTimeout>[] wheel; final Set<HashedWheelTimeout>[] wheel;
final ReusableIterator<HashedWheelTimeout>[] iterators;
final int mask; final int mask;
final ReadWriteLock lock = new ReentrantReadWriteLock(); final ReadWriteLock lock = new ReentrantReadWriteLock();
volatile int wheelCursor; volatile int wheelCursor;
@ -127,7 +129,6 @@ public class HashedWheelTimer implements Timer {
// Normalize ticksPerWheel to power of two and initialize the wheel. // Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel); wheel = createWheel(ticksPerWheel);
iterators = createIterators(wheel);
mask = wheel.length - 1; mask = wheel.length - 1;
// Convert to standardized tickDuration // Convert to standardized tickDuration
@ -152,20 +153,11 @@ public class HashedWheelTimer implements Timer {
final Set<HashedWheelTimeout>[] wheel = new Set[normalizeTicksPerWheel(ticksPerWheel)]; final Set<HashedWheelTimeout>[] wheel = new Set[normalizeTicksPerWheel(ticksPerWheel)];
for (int i = 0; i < wheel.length; i ++) { for (int i = 0; i < wheel.length; i ++) {
wheel[i] = Collections.newSetFromMap(new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4)); wheel[i] = Collections.newSetFromMap(new ConcurrentHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
} }
return wheel; return wheel;
} }
@SuppressWarnings("unchecked")
private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
for (int i = 0; i < wheel.length; i ++) {
iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
}
return iterators;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) { private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1; int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) { while (normalizedTicksPerWheel < ticksPerWheel) {
@ -268,6 +260,8 @@ public class HashedWheelTimer implements Timer {
// one tick early; that shouldnt matter since were talking 270 years here // one tick early; that shouldnt matter since were talking 270 years here
if (relativeIndex < 0) relativeIndex = delay / tickDuration; if (relativeIndex < 0) relativeIndex = delay / tickDuration;
if (relativeIndex == 0) relativeIndex = 1; if (relativeIndex == 0) relativeIndex = 1;
// if an integral number of wheel rotations, schedule one tick earlier
if ((relativeIndex & mask) == 0) relativeIndex--;
final long remainingRounds = relativeIndex / wheel.length; final long remainingRounds = relativeIndex / wheel.length;
// Add the timeout to the wheel. // Add the timeout to the wheel.
@ -321,16 +315,16 @@ public class HashedWheelTimer implements Timer {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
final int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; final int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
return fetchExpiredTimeouts(iterators[newWheelCursor], deadline); return fetchExpiredTimeouts(wheel[newWheelCursor], deadline);
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
private ArrayList<HashedWheelTimeout> fetchExpiredTimeouts(final ReusableIterator<HashedWheelTimeout> i, final long deadline) { private ArrayList<HashedWheelTimeout> fetchExpiredTimeouts(final Iterable<HashedWheelTimeout> it, final long deadline) {
final ArrayList<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>(); final ArrayList<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>();
List<HashedWheelTimeout> slipped = null; List<HashedWheelTimeout> slipped = null;
i.rewind(); Iterator<HashedWheelTimeout> i = it.iterator();
while (i.hasNext()) { while (i.hasNext()) {
HashedWheelTimeout timeout = i.next(); HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0) { if (timeout.remainingRounds <= 0) {
@ -455,10 +449,11 @@ public class HashedWheelTimer implements Timer {
return Unsafe.instance.compareAndSwapInt(this, _stateOffset, old, future); return Unsafe.instance.compareAndSwapInt(this, _stateOffset, old, future);
} }
public void cancel() { public boolean cancel() {
if (updateState(ST_INIT, ST_CANCELLED)) { if (updateState(ST_INIT, ST_CANCELLED)) {
parent.wheel[stopIndex].remove(this); parent.wheel[stopIndex].remove(this);
} return true;
} else return false;
} }
public boolean isCancelled() { public boolean isCancelled() {
@ -481,6 +476,14 @@ public class HashedWheelTimer implements Timer {
} }
} }
@Override public final int hashCode() {
return System.identityHashCode(this);
}
@Override public final boolean equals(final Object that) {
return this == that;
}
@Override @Override
public String toString() { public String toString() {
final long currentTime = System.nanoTime(); final long currentTime = System.nanoTime();

View file

@ -1,27 +0,0 @@
/*
* Copyright 2009 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package akka.util.internal;
import java.util.Iterator;
/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
*/
public interface ReusableIterator<E> extends Iterator<E> {
void rewind();
}

View file

@ -51,6 +51,10 @@ public interface Timeout {
* Cancels the {@link TimerTask} associated with this handle. It the * Cancels the {@link TimerTask} associated with this handle. It the
* task has been executed or cancelled already, it will return with no * task has been executed or cancelled already, it will return with no
* side effect. * side effect.
*
* @return whether the caller was the one who actually cancelled this
* timeout (there can be at most one; never returns true if the Timeout
* expired)
*/ */
void cancel(); boolean cancel();
} }

View file

@ -364,7 +364,31 @@ akka {
# ticks per wheel. # ticks per wheel.
# For more information see: http://www.jboss.org/netty/ # For more information see: http://www.jboss.org/netty/
tick-duration = 100ms tick-duration = 100ms
# The timer uses a circular wheel of buckets to store the timer tasks.
# This should be set such that the majority of scheduled timeouts (for high
# scheduling frequency) will be shorter than one rotation of the wheel
# (ticks-per-wheel * ticks-duration)
# THIS MUST BE A POWER OF TWO!
ticks-per-wheel = 512 ticks-per-wheel = 512
# This setting selects the timer implementation which shall be loaded at
# system start-up. Built-in choices are:
# - akka.actor.DefaultScheduler (HWT)
# - akka.actor.LightArrayRevolverScheduler
# (to be benchmarked and evaluated)
# The class given here must implement the akka.actor.Scheduler interface
# and offer a constructor which takes three arguments:
# 1) com.typesafe.config.Config
# 2) akka.event.LoggingAdapter
# 3) java.util.concurrent.ThreadFactory
implementation = akka.actor.LightArrayRevolverScheduler
# When shutting down the scheduler, there will typically be a thread which
# needs to be stopped, and this timeout determines how long to wait for
# that to happen. In case of timeout the shutdown of the actor system will
# proceed without running possibly still enqueued tasks.
shutdown-timeout = 5s
} }
io { io {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka package akka
@ -14,10 +14,6 @@ package akka
@SerialVersionUID(1L) @SerialVersionUID(1L)
class AkkaException(message: String, cause: Throwable) extends RuntimeException(message, cause) with Serializable { class AkkaException(message: String, cause: Throwable) extends RuntimeException(message, cause) with Serializable {
def this(msg: String) = this(msg, null) def this(msg: String) = this(msg, null)
lazy val uuid: String = java.util.UUID.randomUUID().toString
override def toString(): String = uuid + super.toString()
} }
/** /**

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -15,6 +15,7 @@ import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Superv
import akka.event.Logging.{ LogEvent, Debug, Error } import akka.event.Logging.{ LogEvent, Debug, Error }
import akka.japi.Procedure import akka.japi.Procedure
import akka.dispatch.NullMessage import akka.dispatch.NullMessage
import scala.concurrent.ExecutionContext
/** /**
* The actor context - the view of the actor cell from the actor. * The actor context - the view of the actor cell from the actor.
@ -119,7 +120,7 @@ trait ActorContext extends ActorRefFactory {
* Returns the dispatcher (MessageDispatcher) that is used for this Actor. * Returns the dispatcher (MessageDispatcher) that is used for this Actor.
* Importing this member will place a implicit MessageDispatcher in scope. * Importing this member will place a implicit MessageDispatcher in scope.
*/ */
implicit def dispatcher: MessageDispatcher implicit def dispatcher: ExecutionContext
/** /**
* The system that the actor belongs to. * The system that the actor belongs to.
@ -214,19 +215,19 @@ private[akka] trait Cell {
*/ */
def start(): this.type def start(): this.type
/** /**
* Recursively suspend this actor and all its children. Must not throw exceptions. * Recursively suspend this actor and all its children. Is only allowed to throw Fatal Throwables.
*/ */
def suspend(): Unit def suspend(): Unit
/** /**
* Recursively resume this actor and all its children. Must not throw exceptions. * Recursively resume this actor and all its children. Is only allowed to throw Fatal Throwables.
*/ */
def resume(causedByFailure: Throwable): Unit def resume(causedByFailure: Throwable): Unit
/** /**
* Restart this actor (will recursively restart or stop all children). Must not throw exceptions. * Restart this actor (will recursively restart or stop all children). Is only allowed to throw Fatal Throwables.
*/ */
def restart(cause: Throwable): Unit def restart(cause: Throwable): Unit
/** /**
* Recursively terminate this actor and all its children. Must not throw exceptions. * Recursively terminate this actor and all its children. Is only allowed to throw Fatal Throwables.
*/ */
def stop(): Unit def stop(): Unit
/** /**
@ -246,16 +247,26 @@ private[akka] trait Cell {
* Get the stats for the named child, if that exists. * Get the stats for the named child, if that exists.
*/ */
def getChildByName(name: String): Option[ChildStats] def getChildByName(name: String): Option[ChildStats]
/** /**
* Enqueue a message to be sent to the actor; may or may not actually * Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is. * schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions. * Is only allowed to throw Fatal Throwables.
*/ */
def tell(message: Any, sender: ActorRef): Unit def sendMessage(msg: Envelope): Unit
/** /**
* Enqueue a message to be sent to the actor; may or may not actually * Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is. * schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions. * Is only allowed to throw Fatal Throwables.
*/
final def sendMessage(message: Any, sender: ActorRef): Unit =
sendMessage(Envelope(message, sender, system))
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
* Is only allowed to throw Fatal Throwables.
*/ */
def sendSystemMessage(msg: SystemMessage): Unit def sendSystemMessage(msg: SystemMessage): Unit
/** /**
@ -286,8 +297,8 @@ private[akka] object ActorCell {
} }
final val emptyCancellable: Cancellable = new Cancellable { final val emptyCancellable: Cancellable = new Cancellable {
def isCancelled = false def isCancelled: Boolean = false
def cancel() {} def cancel(): Boolean = false
} }
final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyBehaviorStack: List[Actor.Receive] = Nil
@ -392,35 +403,22 @@ private[akka] class ActorCell(
checkReceiveTimeout // Reschedule receive timeout checkReceiveTimeout // Reschedule receive timeout
} }
def autoReceiveMessage(msg: Envelope): Unit = if (msg.message != NullMessage) { def autoReceiveMessage(msg: Envelope): Unit =
if (system.settings.DebugAutoReceive) if (msg.message != NullMessage) {
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match { msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid) case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated case t: Terminated watchedActorTerminated(t)
if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor) case AddressTerminated(address) addressTerminated(address)
watchedActorTerminated(t) case Kill throw new ActorKilledException("Kill")
case AddressTerminated(address) addressTerminated(address) case PoisonPill self.stop()
case Kill throw new ActorKilledException("Kill") case SelectParent(m) parent.tell(m, msg.sender)
case PoisonPill self.stop() case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectParent(m) parent.tell(m, msg.sender) case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ } }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
} }
}
/**
* When a parent is watching a child and it terminates due to AddressTerminated,
* it should be removed to support immediate creation of child with same name.
*
* For remote deployed actors ChildTerminated should be sent to the supervisor
* to clean up child references of remote deployed actors when remote node
* goes down, i.e. triggered by AddressTerminated, but that is the responsibility
* of the ActorRefProvider to handle that scenario.
*/
private def removeChildWhenToAddressTerminated(child: ActorRef): Unit =
childrenRefs.getByRef(child) foreach { crs removeChildAndGetStateChange(crs.child) }
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
@ -497,16 +495,17 @@ private[akka] class ActorCell(
} }
} }
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) { private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit =
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() if (!isTerminating) {
initChild(child) match { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
case Some(crs) initChild(child) match {
crs.uid = uid case Some(crs)
handleSupervise(child, async) crs.uid = uid
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) handleSupervise(child, async)
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
}
} }
}
// future extension point // future extension point
protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match { protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
* *
* {{{ * {{{
* import ActorDSL._ * import ActorDSL._
* import concurrent.util.duration._ * import scala.concurrent.util.duration._
* *
* implicit val system: ActorSystem = ... * implicit val system: ActorSystem = ...
* *

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
import scala.annotation.tailrec import scala.annotation.tailrec

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -350,7 +350,7 @@ private[akka] class LocalActorRef private[akka] (
override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message) override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.tell(message, sender) override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)
override def restart(cause: Throwable): Unit = actorCell.restart(cause) override def restart(cause: Throwable): Unit = actorCell.restart(cause)
@ -446,7 +446,10 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
override def isTerminated(): Boolean = true override def isTerminated(): Boolean = true
override def sendSystemMessage(message: SystemMessage): Unit = specialHandle(message) override def sendSystemMessage(message: SystemMessage): Unit = {
if (Mailbox.debug) println(s"ELAR $path having enqueued $message")
specialHandle(message)
}
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match { override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match {
case d: DeadLetter case d: DeadLetter
@ -478,7 +481,8 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match { override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
case d: DeadLetter if (!specialHandle(d.message)) eventStream.publish(d) case d: DeadLetter if (!specialHandle(d.message)) eventStream.publish(d)
case _ if (!specialHandle(message)) eventStream.publish(DeadLetter(message, sender, this)) case _ if (!specialHandle(message))
eventStream.publish(DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
} }
override protected def specialHandle(msg: Any): Boolean = msg match { override protected def specialHandle(msg: Any): Boolean = msg match {

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -11,8 +11,8 @@ import akka.util.{ Switch, Helpers }
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.util.Collections.EmptyImmutableSeq import akka.util.Collections.EmptyImmutableSeq
import scala.util.{ Success, Failure } import scala.util.{ Success, Failure }
import scala.concurrent.{ Future, Promise }
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ ExecutionContext, Future, Promise }
/** /**
* Interface for all ActorRef providers to implement. * Interface for all ActorRef providers to implement.
@ -51,8 +51,8 @@ trait ActorRefProvider {
*/ */
def settings: ActorSystem.Settings def settings: ActorSystem.Settings
//FIXME WHY IS THIS HERE? //FIXME Only here because of AskSupport, should be dealt with
def dispatcher: MessageDispatcher def dispatcher: ExecutionContext
/** /**
* Initialization of an ActorRefProvider happens in two steps: first * Initialization of an ActorRefProvider happens in two steps: first
@ -169,7 +169,7 @@ trait ActorRefFactory {
/** /**
* Returns the default MessageDispatcher associated with this ActorRefFactory * Returns the default MessageDispatcher associated with this ActorRefFactory
*/ */
implicit def dispatcher: MessageDispatcher implicit def dispatcher: ExecutionContext
/** /**
* Father of all children created by this interface. * Father of all children created by this interface.
@ -326,13 +326,15 @@ private[akka] object SystemGuardian {
* *
* Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported. * Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported.
*/ */
class LocalActorRefProvider( class LocalActorRefProvider private[akka] (
_systemName: String, _systemName: String,
override val settings: ActorSystem.Settings, override val settings: ActorSystem.Settings,
val eventStream: EventStream, val eventStream: EventStream,
override val scheduler: Scheduler, override val scheduler: Scheduler,
val dynamicAccess: DynamicAccess, val dynamicAccess: DynamicAccess,
override val deployer: Deployer) extends ActorRefProvider { override val deployer: Deployer,
_deadLetters: Option[ActorPath InternalActorRef])
extends ActorRefProvider {
// this is the constructor needed for reflectively instantiating the provider // this is the constructor needed for reflectively instantiating the provider
def this(_systemName: String, def this(_systemName: String,
@ -345,13 +347,15 @@ class LocalActorRefProvider(
eventStream, eventStream,
scheduler, scheduler,
dynamicAccess, dynamicAccess,
new Deployer(settings, dynamicAccess)) new Deployer(settings, dynamicAccess),
None)
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
private[akka] val log: LoggingAdapter = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")") private[akka] val log: LoggingAdapter = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")")
override val deadLetters: InternalActorRef = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream) override val deadLetters: InternalActorRef =
_deadLetters.getOrElse((p: ActorPath) new DeadLetterActorRef(this, p, eventStream)).apply(rootPath / "deadLetters")
/* /*
* generate name for temporary actor refs * generate name for temporary actor refs
@ -382,7 +386,7 @@ class LocalActorRefProvider(
override def isTerminated: Boolean = stopped.isOn override def isTerminated: Boolean = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match { override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match {
case Failed(ex, _) if sender ne null causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() case Failed(ex, _) if sender ne null { causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() }
case NullMessage // do nothing case NullMessage // do nothing
case _ log.error(this + " received unexpected message [" + message + "]") case _ log.error(this + " received unexpected message [" + message + "]")
}) })
@ -449,10 +453,11 @@ class LocalActorRefProvider(
stopWhenAllTerminationHooksDone() stopWhenAllTerminationHooksDone()
} }
def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { def stopWhenAllTerminationHooksDone(): Unit =
eventStream.stopDefaultLoggers() if (terminationHooks.isEmpty) {
context.stop(self) eventStream.stopDefaultLoggers()
} context.stop(self)
}
// guardian MUST NOT lose its children during restart // guardian MUST NOT lose its children during restart
override def preRestart(cause: Throwable, msg: Option[Any]) {} override def preRestart(cause: Throwable, msg: Option[Any]) {}
@ -468,7 +473,7 @@ class LocalActorRefProvider(
@volatile @volatile
private var system: ActorSystemImpl = _ private var system: ActorSystemImpl = _
def dispatcher: MessageDispatcher = system.dispatcher def dispatcher: ExecutionContext = system.dispatcher
lazy val terminationPromise: Promise[Unit] = Promise[Unit]() lazy val terminationPromise: Promise[Unit] = Promise[Unit]()
@ -549,6 +554,7 @@ class LocalActorRefProvider(
def init(_system: ActorSystemImpl) { def init(_system: ActorSystemImpl) {
system = _system system = _system
rootGuardian.start()
// chain death watchers so that killing guardian stops the application // chain death watchers so that killing guardian stops the application
systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian)) systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian)) rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -70,4 +70,4 @@ trait ScalaActorSelection {
this: ActorSelection this: ActorSelection
def !(msg: Any)(implicit sender: ActorRef = Actor.noSender) = tell(msg, sender) def !(msg: Any)(implicit sender: ActorRef = Actor.noSender) = tell(msg, sender)
} }

View file

@ -1,25 +1,24 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
import java.io.Closeable
import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import com.typesafe.config.{ Config, ConfigFactory }
import akka.event._ import akka.event._
import akka.dispatch._ import akka.dispatch._
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.dungeon.ChildrenContainer
import akka.util._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.{ FiniteDuration, Duration } import scala.concurrent.duration.{ FiniteDuration, Duration }
import scala.concurrent.{ Await, Awaitable, CanAwait, Future } import scala.concurrent.{ Await, Awaitable, CanAwait, Future, ExecutionContext }
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
import scala.util.control.NonFatal import scala.util.control.{ NonFatal, ControlThrowable }
import akka.util._
import java.io.Closeable
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.dungeon.ChildrenContainer
object ActorSystem { object ActorSystem {
@ -161,8 +160,7 @@ object ActorSystem {
case x Some(x) case x Some(x)
} }
final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) final val SchedulerClass: String = getString("akka.scheduler.implementation")
final val SchedulerTicksPerWheel: Int = getInt("akka.scheduler.ticks-per-wheel")
final val Daemonicity: Boolean = getBoolean("akka.daemonic") final val Daemonicity: Boolean = getBoolean("akka.daemonic")
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
@ -320,7 +318,7 @@ abstract class ActorSystem extends ActorRefFactory {
* explicitly. * explicitly.
* Importing this member will place the default MessageDispatcher in scope. * Importing this member will place the default MessageDispatcher in scope.
*/ */
implicit def dispatcher: MessageDispatcher implicit def dispatcher: ExecutionContext
/** /**
* Register a block of code (callback) to run after ActorSystem.shutdown has been issued and * Register a block of code (callback) to run after ActorSystem.shutdown has been issued and
@ -465,7 +463,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
new Thread.UncaughtExceptionHandler() { new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = { def uncaughtException(thread: Thread, cause: Throwable): Unit = {
cause match { cause match {
case NonFatal(_) | _: InterruptedException log.error(cause, "Uncaught error from thread [{}]", thread.getName) case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable log.error(cause, "Uncaught error from thread [{}]", thread.getName)
case _ case _
if (settings.JvmExitOnFatalError) { if (settings.JvmExitOnFatalError) {
try { try {
@ -566,7 +564,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher
def terminationFuture: Future[Unit] = provider.terminationFuture def terminationFuture: Future[Unit] = provider.terminationFuture
def lookupRoot: InternalActorRef = provider.rootGuardian def lookupRoot: InternalActorRef = provider.rootGuardian
@ -601,6 +599,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def shutdown(): Unit = guardian.stop() def shutdown(): Unit = guardian.stop()
//#create-scheduler
/** /**
* Create the scheduler service. This one needs one special behavior: if * Create the scheduler service. This one needs one special behavior: if
* Closeable, it MUST execute all outstanding tasks upon .close() in order * Closeable, it MUST execute all outstanding tasks upon .close() in order
@ -611,12 +610,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
* executed upon close(), the task may execute before its timeout. * executed upon close(), the task may execute before its timeout.
*/ */
protected def createScheduler(): Scheduler = protected def createScheduler(): Scheduler =
new DefaultScheduler( dynamicAccess.createInstanceFor[Scheduler](settings.SchedulerClass, immutable.Seq(
new HashedWheelTimer(log, classOf[Config] -> settings.config,
threadFactory.withName(threadFactory.name + "-scheduler"), classOf[LoggingAdapter] -> log,
settings.SchedulerTickDuration, classOf[ThreadFactory] -> threadFactory.withName(threadFactory.name + "-scheduler"))).get
settings.SchedulerTicksPerWheel), //#create-scheduler
log)
/* /*
* This is called after the last actor has signaled its termination, i.e. * This is called after the last actor has signaled its termination, i.e.
@ -628,15 +626,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
case _ case _
} }
private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef] private val extensions = new ConcurrentHashMap[ExtensionId[_], AnyRef]
/** /**
* Returns any extension registered to the specified Extension or returns null if not registered * Returns any extension registered to the specified Extension or returns null if not registered
*/ */
@tailrec @tailrec
private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
case c: CountDownLatch c.await(); findExtension(ext) //Registration in process, await completion and retry case c: CountDownLatch
case other other.asInstanceOf[T] //could be a T or null, in which case we return the null as T c.await(); findExtension(ext) //Registration in process, await completion and retry
case other
other.asInstanceOf[T] //could be a T or null, in which case we return the null as T
} }
@tailrec @tailrec

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
import java.net.URI import java.net.URI
@ -19,10 +19,27 @@ import scala.collection.immutable
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) { final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
// Please note that local/non-local distinction must be preserved:
// host.isDefined == hasGlobalScope
// host.isEmpty == hasLocalScope
// hasLocalScope == !hasGlobalScope
def this(protocol: String, system: String) = this(protocol, system, None, None) def this(protocol: String, system: String) = this(protocol, system, None, None)
def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port)) def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
/**
* Returns true if this Address is only defined locally. It is not safe to send locally scoped addresses to remote
* hosts. See also [[akka.actor.Address#hasGlobalScope]].
*/
def hasLocalScope: Boolean = host.isEmpty
/**
* Returns true if this Address is usable globally. Unlike locally defined addresses ([[akka.actor.Address#hasLocalScope]])
* addresses of global scope are safe to sent to other hosts, as they globally and uniquely identify an addressable
* entity.
*/
def hasGlobalScope: Boolean = host.isDefined
/** /**
* Returns the canonical String representation of this Address formatted as: * Returns the canonical String representation of this Address formatted as:
* *
@ -130,4 +147,4 @@ object ActorPathExtractor extends PathUtils {
} catch { } catch {
case _: URISyntaxException None case _: URISyntaxException None
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -101,4 +101,4 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces
} recover { case i: InvocationTargetException if i.getTargetException ne null throw i.getTargetException } } recover { case i: InvocationTargetException if i.getTargetException ne null throw i.getTargetException }
} }
} }
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -52,6 +52,9 @@ trait ExtensionId[T <: Extension] {
* internal use only. * internal use only.
*/ */
def createExtension(system: ExtendedActorSystem): T def createExtension(system: ExtendedActorSystem): T
override final def hashCode: Int = System.identityHashCode(this)
override final def equals(other: Any): Boolean = this eq other.asInstanceOf[AnyRef]
} }
/** /**

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -97,10 +97,11 @@ object FSM {
if (repeat) scheduler.schedule(timeout, timeout, actor, this) if (repeat) scheduler.schedule(timeout, timeout, actor, this)
else scheduler.scheduleOnce(timeout, actor, this)) else scheduler.scheduleOnce(timeout, actor, this))
def cancel(): Unit = if (ref.isDefined) { def cancel(): Unit =
ref.get.cancel() if (ref.isDefined) {
ref = None ref.get.cancel()
} ref = None
}
} }
/** /**

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -151,7 +151,7 @@ private[akka] class RepointableActorRef(
} }
} else this } else this
def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.tell(message, sender) def !(message: Any)(implicit sender: ActorRef = Actor.noSender) = underlying.sendMessage(message, sender)
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
@ -181,7 +181,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
while (!queue.isEmpty) { while (!queue.isEmpty) {
queue.poll() match { queue.poll() match {
case s: SystemMessage cell.sendSystemMessage(s) case s: SystemMessage cell.sendSystemMessage(s)
case e: Envelope cell.tell(e.message, e.sender) case e: Envelope cell.sendMessage(e)
} }
} }
} finally { } finally {
@ -203,21 +203,20 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer
def getChildByName(name: String): Option[ChildRestartStats] = None def getChildByName(name: String): Option[ChildRestartStats] = None
def tell(message: Any, sender: ActorRef): Unit = { def sendMessage(msg: Envelope): Unit = {
val useSender = if (sender eq Actor.noSender) system.deadLetters else sender
if (lock.tryLock(timeout.length, timeout.unit)) { if (lock.tryLock(timeout.length, timeout.unit)) {
try { try {
val cell = self.underlying val cell = self.underlying
if (cellIsReady(cell)) { if (cellIsReady(cell)) {
cell.tell(message, useSender) cell.sendMessage(msg)
} else if (!queue.offer(Envelope(message, useSender, system))) { } else if (!queue.offer(msg)) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + message.getClass + " due to enqueue failure")) system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + msg.message.getClass + " due to enqueue failure"))
system.deadLetters ! DeadLetter(message, useSender, self) system.deadLetters ! DeadLetter(msg.message, msg.sender, self)
} } else if (Mailbox.debug) println(s"$self temp queueing ${msg.message} from ${msg.sender}")
} finally lock.unlock() } finally lock.unlock()
} else { } else {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + message.getClass + " due to lock timeout")) system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + msg.message.getClass + " due to lock timeout"))
system.deadLetters ! DeadLetter(message, useSender, self) system.deadLetters ! DeadLetter(msg.message, msg.sender, self)
} }
} }
@ -244,7 +243,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
if (!wasEnqueued) { if (!wasEnqueued) {
system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure")) system.eventStream.publish(Warning(self.path.toString, getClass, "dropping system message " + msg + " due to enqueue failure"))
system.deadLetters ! DeadLetter(msg, self, self) system.deadLetters ! DeadLetter(msg, self, self)
} } else if (Mailbox.debug) println(s"$self temp queueing system $msg")
} }
} finally lock.unlock() } finally lock.unlock()
} else { } else {

View file

@ -1,19 +1,31 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
import scala.concurrent.duration.Duration
import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout HWTimeout, Timer }
import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher
import java.io.Closeable import java.io.Closeable
import java.util.concurrent.atomic.{ AtomicReference, AtomicLong } import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray }
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.util.internal._ import scala.collection.immutable
import concurrent.ExecutionContext import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._
import scala.util.control.{ NoStackTrace, NonFatal }
import com.typesafe.config.Config
import akka.event.LoggingAdapter
import akka.util.Helpers
import akka.util.Unsafe.{ instance unsafe }
import akka.util.internal.{ HashedWheelTimer, Timeout HWTimeout, Timer HWTimer, TimerTask HWTimerTask }
/**
* This exception is thrown by Scheduler.schedule* when scheduling is not
* possible, e.g. after shutting down the Scheduler.
*/
private case class SchedulerException(msg: String) extends akka.AkkaException(msg) with NoStackTrace
// The Scheduler trait is included in the documentation. KEEP THE LINES SHORT!!! // The Scheduler trait is included in the documentation. KEEP THE LINES SHORT!!!
//#scheduler //#scheduler
@ -25,6 +37,12 @@ import scala.concurrent.duration.FiniteDuration
* Furthermore, this timer service MUST throw IllegalStateException if it * Furthermore, this timer service MUST throw IllegalStateException if it
* cannot schedule a task. Once scheduled, the task MUST be executed. If * cannot schedule a task. Once scheduled, the task MUST be executed. If
* executed upon close(), the task may execute before its timeout. * executed upon close(), the task may execute before its timeout.
*
* Scheduler implementation are loaded reflectively at ActorSystem start-up
* with the following constructor arguments:
* 1) the systems com.typesafe.config.Config (from system.settings.config)
* 2) a akka.event.LoggingAdapter
* 3) a java.util.concurrent.ThreadFactory
*/ */
trait Scheduler { trait Scheduler {
/** /**
@ -35,11 +53,19 @@ trait Scheduler {
* *
* Java & Scala API * Java & Scala API
*/ */
def schedule( final def schedule(
initialDelay: FiniteDuration, initialDelay: FiniteDuration,
interval: FiniteDuration, interval: FiniteDuration,
receiver: ActorRef, receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
schedule(initialDelay, interval, new Runnable {
def run = {
receiver ! message
if (receiver.isTerminated)
throw new SchedulerException("timer active for terminated actor")
}
})
/** /**
* Schedules a function to be run repeatedly with an initial delay and a * Schedules a function to be run repeatedly with an initial delay and a
@ -49,10 +75,11 @@ trait Scheduler {
* *
* Scala API * Scala API
*/ */
def schedule( final def schedule(
initialDelay: FiniteDuration, initialDelay: FiniteDuration,
interval: FiniteDuration)(f: Unit)( interval: FiniteDuration)(f: Unit)(
implicit executor: ExecutionContext): Cancellable implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, interval, new Runnable { override def run = f })
/** /**
* Schedules a function to be run repeatedly with an initial delay and * Schedules a function to be run repeatedly with an initial delay and
@ -67,6 +94,31 @@ trait Scheduler {
interval: FiniteDuration, interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a message to be sent once with a delay, i.e. a time period that has
* to pass before the message is sent.
*
* Java & Scala API
*/
final def scheduleOnce(
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
scheduleOnce(delay, new Runnable {
override def run = receiver ! message
})
/**
* Schedules a function to be run once with a delay, i.e. a time period that has
* to pass before the function is run.
*
* Scala API
*/
final def scheduleOnce(delay: FiniteDuration)(f: Unit)(
implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = f })
/** /**
* Schedules a Runnable to be run once with a delay, i.e. a time period that * Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed. * has to pass before the runnable is executed.
@ -78,28 +130,17 @@ trait Scheduler {
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/** /**
* Schedules a message to be sent once with a delay, i.e. a time period that has * The maximum supported task frequency of this scheduler, i.e. the inverse
* to pass before the message is sent. * of the minimum time interval between executions of a recurring task, in Hz.
*
* Java & Scala API
*/ */
def scheduleOnce( def maxFrequency: Double
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a function to be run once with a delay, i.e. a time period that has
* to pass before the function is run.
*
* Scala API
*/
def scheduleOnce(
delay: FiniteDuration)(f: Unit)(
implicit executor: ExecutionContext): Cancellable
} }
//#scheduler //#scheduler
// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler
//#cancellable //#cancellable
/** /**
* Signifies something that can be cancelled * Signifies something that can be cancelled
@ -108,14 +149,16 @@ trait Scheduler {
*/ */
trait Cancellable { trait Cancellable {
/** /**
* Cancels this Cancellable * Cancels this Cancellable and returns true if that was successful.
* If this cancellable was (concurrently) cancelled already, then this method
* will return false although isCancelled will return true.
* *
* Java & Scala API * Java & Scala API
*/ */
def cancel(): Unit def cancel(): Boolean
/** /**
* Returns whether this Cancellable has been cancelled * Returns true if and only if this Cancellable has been successfully cancelled
* *
* Java & Scala API * Java & Scala API
*/ */
@ -123,6 +166,317 @@ trait Cancellable {
} }
//#cancellable //#cancellable
/**
* This scheduler implementation is based on a revolving wheel of buckets,
* like Nettys HashedWheelTimer, which it advances at a fixed tick rate and
* dispatches tasks it finds in the current bucket to their respective
* ExecutionContexts. The tasks are held in TaskHolders, which upon
* cancellation null out their reference to the actual task, leaving only this
* shell to be cleaned up when the wheel reaches that bucket next time. This
* enables the use of a simple linked list to chain the TaskHolders off the
* wheel.
*
* Also noteworthy is that this scheduler does not obtain a current time stamp
* when scheduling single-shot tasks, instead it always rounds up the task
* delay to a full multiple of the TickDuration. This means that tasks are
* scheduled possibly one tick later than they could be (if checking that
* now() + delay <= nextTick were done).
*/
class LightArrayRevolverScheduler(config: Config,
log: LoggingAdapter,
threadFactory: ThreadFactory)
extends {
val WheelShift = {
val ticks = config.getInt("akka.scheduler.ticks-per-wheel")
val shift = 31 - Integer.numberOfLeadingZeros(ticks)
if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2")
shift
}
val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS)
} with AtomicReferenceArray[LightArrayRevolverScheduler.TaskHolder](1 << WheelShift) with Scheduler with Closeable {
import LightArrayRevolverScheduler._
private val oneNs = Duration.fromNanos(1l)
private def roundUp(d: FiniteDuration): FiniteDuration =
try {
((d + TickDuration - oneNs) / TickDuration).toLong * TickDuration
} catch {
case _: IllegalArgumentException d // rouding up Long.MaxValue.nanos overflows
}
/**
* Clock implementation is replaceable (for testing); the implementation must
* return a monotonically increasing series of Long nanoseconds.
*/
protected def clock(): Long = System.nanoTime
/**
* Overridable for tests
*/
protected def waitNanos(nanos: Long): Unit = {
// see http://www.javamex.com/tutorials/threads/sleep_issues.shtml
val sleepMs = if (Helpers.isWindows) (nanos + 4999999) / 10000000 * 10 else (nanos + 999999) / 1000000
try Thread.sleep(sleepMs) catch {
case _: InterruptedException Thread.currentThread.interrupt() // we got woken up
}
}
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
try new AtomicReference[Cancellable] with Cancellable { self
set(schedule(
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get != null)
swap(schedule(this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException // ignore failure to enqueue or terminated target actor
}
}
}, roundUp(initialDelay)))
@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null if (c != null) c.cancel()
case old if (!compareAndSet(old, c)) swap(c)
}
}
@tailrec final def cancel(): Boolean = {
get match {
case null false
case c
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || cancel()
}
}
override def isCancelled: Boolean = get == null
} catch {
case SchedulerException(msg) throw new IllegalStateException(msg)
}
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
try schedule(runnable, roundUp(delay))
catch {
case SchedulerException(msg) throw new IllegalStateException(msg)
}
private def execDirectly(t: TimerTask): Unit = {
try t.run() catch {
case e: InterruptedException throw e
case _: SchedulerException // ignore terminated actors
case NonFatal(e) log.error(e, "exception while executing timer task")
}
}
override def close(): Unit = Await.result(stop(), ShutdownTimeout) foreach execDirectly
override val maxFrequency: Double = 1.second / TickDuration
/*
* BELOW IS THE ACTUAL TIMER IMPLEMENTATION
*/
private val start = clock()
private val tickNanos = TickDuration.toNanos
private val wheelMask = length() - 1
@volatile private var currentBucket = 0
private def schedule(r: Runnable, delay: FiniteDuration)(implicit ec: ExecutionContext): TimerTask =
if (delay <= Duration.Zero) {
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
ec.execute(r)
NotCancellable
} else {
val ticks = (delay.toNanos / tickNanos).toInt
val rounds = (ticks >> WheelShift).toInt
/*
* works as follows:
* - ticks are calculated to be never too early
* - base off of currentBucket, even after that was moved in the meantime
* - timer thread will swap in Pause, increment currentBucket, swap in null
* - hence spin on Pause, else normal CAS
* - stopping will set all buckets to Pause (in clearAll), so we need only check there
*/
@tailrec
def rec(t: TaskHolder): TimerTask = {
val bucket = (currentBucket + ticks) & wheelMask
get(bucket) match {
case Pause
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
rec(t)
case tail
t.next = tail
if (compareAndSet(bucket, tail, t)) t
else rec(t)
}
}
rec(new TaskHolder(r, null, rounds))
}
private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
def stop(): Future[immutable.Seq[TimerTask]] =
if (stopped.compareAndSet(null, Promise())) {
timerThread.interrupt()
stopped.get.future
} else Future.successful(Nil)
private def clearAll(): immutable.Seq[TimerTask] = {
def collect(curr: TaskHolder, acc: Vector[TimerTask]): Vector[TimerTask] = {
curr match {
case null acc
case x collect(x.next, acc :+ x)
}
}
(0 until length()) flatMap (i collect(getAndSet(i, Pause), Vector.empty))
}
@volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {
var tick = 0
override final def run =
try nextTick()
catch {
case t: Throwable
log.error(t, "exception on LARS timer thread")
stopped.get match {
case null
val thread = threadFactory.newThread(this)
log.info("starting new LARS thread")
try thread.start()
catch {
case e: Throwable log.error(e, "LARS cannot start new thread, ships going down!")
}
timerThread = thread
case x x success clearAll()
}
throw t
}
@tailrec final def nextTick(): Unit = {
val sleepTime = start + tick * tickNanos - clock()
if (sleepTime > 0) {
waitNanos(sleepTime)
} else {
// first get the list of tasks out and turn the wheel
val bucket = currentBucket
val tasks = getAndSet(bucket, Pause)
val next = (bucket + 1) & wheelMask
currentBucket = next
set(bucket, if (tasks eq null) Empty else null)
// then process the tasks and keep the non-ripe ones in a list
var last: TaskHolder = null // the last element of the putBack list
@tailrec def rec1(task: TaskHolder, nonRipe: TaskHolder): TaskHolder = {
if ((task eq null) || (task eq Empty)) nonRipe
else if (task.isCancelled) rec1(task.next, nonRipe)
else if (task.rounds > 0) {
task.rounds -= 1
val next = task.next
task.next = nonRipe
if (last == null) last = task
rec1(next, task)
} else {
task.executeTask()
rec1(task.next, nonRipe)
}
}
val putBack = rec1(tasks, null)
// finally put back the non-ripe ones, who had their rounds decremented
@tailrec def rec2() {
val tail = get(bucket)
last.next = tail
if (!compareAndSet(bucket, tail, putBack)) rec2()
}
if (last != null) rec2()
// and off to the next tick
tick += 1
}
stopped.get match {
case null nextTick()
case x x success clearAll()
}
}
})
timerThread.start()
}
object LightArrayRevolverScheduler {
private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task"))
/**
* INTERNAL API
*/
protected[actor] trait TimerTask extends Runnable with Cancellable
/**
* INTERNAL API
*/
protected[actor] class TaskHolder(@volatile var task: Runnable,
@volatile var next: TaskHolder,
@volatile var rounds: Int)(
implicit executionContext: ExecutionContext) extends TimerTask {
@tailrec
private final def extractTask(cancel: Boolean): Runnable = {
task match {
case null | CancelledTask null // null means expired
case x
if (unsafe.compareAndSwapObject(this, taskOffset, x, if (cancel) CancelledTask else null)) x
else extractTask(cancel)
}
}
private[akka] final def executeTask(): Boolean = extractTask(cancel = false) match {
case null | CancelledTask false
case other
try {
executionContext execute other
true
} catch {
case _: InterruptedException { Thread.currentThread.interrupt(); false }
case NonFatal(e) { executionContext.reportFailure(e); false }
}
}
/**
* utility method to directly run the task, e.g. as clean-up action
*/
def run(): Unit = extractTask(cancel = false) match {
case null
case r r.run()
}
override def cancel(): Boolean = extractTask(cancel = true) != null
override def isCancelled: Boolean = task eq CancelledTask
}
private val CancelledTask = new Runnable { def run = () }
private val NotCancellable = new TimerTask {
def cancel(): Boolean = false
def isCancelled: Boolean = false
def run(): Unit = ()
}
// marker object during wheel movement
private val Pause = new TaskHolder(null, null, 0)(null)
// we need two empty tokens so wheel passing can be detected in schedule()
private val Empty = new TaskHolder(null, null, 0)(null)
}
/** /**
* Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher. * Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher.
* Note that dispatcher is by-name parameter, because dispatcher might not be initialized * Note that dispatcher is by-name parameter, because dispatcher might not be initialized
@ -132,35 +486,19 @@ trait Cancellable {
* if it does not enqueue a task. Once a task is queued, it MUST be executed or * if it does not enqueue a task. Once a task is queued, it MUST be executed or
* returned from stop(). * returned from stop().
*/ */
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable { class DefaultScheduler(config: Config,
override def schedule(initialDelay: FiniteDuration, log: LoggingAdapter,
delay: FiniteDuration, threadFactory: ThreadFactory) extends Scheduler with Closeable {
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = {
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(
new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
executor execute new Runnable {
override def run = {
receiver ! message
// Check if the receiver is still alive and kicking before reschedule the task
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver)
else {
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
}
}
}
}
},
initialDelay))
}
override def schedule(initialDelay: FiniteDuration, val TicksPerWheel = {
delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable = val ticks = config.getInt("akka.scheduler.ticks-per-wheel")
schedule(initialDelay, delay, new Runnable { override def run = f }) val shift = 31 - Integer.numberOfLeadingZeros(ticks)
if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2")
ticks
}
val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
private val hashedWheelTimer = new HashedWheelTimer(log, threadFactory, TickDuration, TicksPerWheel)
override def schedule(initialDelay: FiniteDuration, override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration, delay: FiniteDuration,
@ -168,14 +506,19 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val continuousCancellable = new ContinuousCancellable val continuousCancellable = new ContinuousCancellable
continuousCancellable.init( continuousCancellable.init(
hashedWheelTimer.newTimeout( hashedWheelTimer.newTimeout(
new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling { new AtomicLong(System.nanoTime + initialDelay.toNanos) with HWTimerTask with ContinuousScheduling {
override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable { override def run(timeout: HWTimeout): Unit =
override def run = { executor.execute(new Runnable {
runnable.run() override def run = {
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos) try {
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable) runnable.run()
} val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
}) scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
} catch {
case _: SchedulerException // actor target terminated
}
}
})
}, },
initialDelay)) initialDelay))
} }
@ -183,16 +526,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
new DefaultCancellable( new DefaultCancellable(
hashedWheelTimer.newTimeout( hashedWheelTimer.newTimeout(
new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) }, new HWTimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) },
delay)) delay))
override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = private trait ContinuousScheduling { this: HWTimerTask
scheduleOnce(delay, new Runnable { override def run = receiver ! message })
override def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = f })
private trait ContinuousScheduling { this: TimerTask
def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) { def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) {
try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException } // stop recurring if timer is stopped try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException } // stop recurring if timer is stopped
} }
@ -209,23 +546,25 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val i = hashedWheelTimer.stop().iterator() val i = hashedWheelTimer.stop().iterator()
while (i.hasNext) execDirectly(i.next()) while (i.hasNext) execDirectly(i.next())
} }
override def maxFrequency: Double = 1.second / TickDuration
} }
private[akka] object ContinuousCancellable { private[akka] object ContinuousCancellable {
val initial: HWTimeout = new HWTimeout { val initial: HWTimeout = new HWTimeout {
override def getTimer: Timer = null override def getTimer: HWTimer = null
override def getTask: TimerTask = null override def getTask: HWTimerTask = null
override def isExpired: Boolean = false override def isExpired: Boolean = false
override def isCancelled: Boolean = false override def isCancelled: Boolean = false
override def cancel: Unit = () override def cancel: Boolean = true
} }
val cancelled: HWTimeout = new HWTimeout { val cancelled: HWTimeout = new HWTimeout {
override def getTimer: Timer = null override def getTimer: HWTimer = null
override def getTask: TimerTask = null override def getTask: HWTimerTask = null
override def isExpired: Boolean = false override def isExpired: Boolean = false
override def isCancelled: Boolean = true override def isCancelled: Boolean = true
override def cancel: Unit = () override def cancel: Boolean = false
} }
} }
/** /**
@ -245,10 +584,10 @@ private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout](Con
} }
def isCancelled(): Boolean = get().isCancelled() def isCancelled(): Boolean = get().isCancelled()
def cancel(): Unit = getAndSet(ContinuousCancellable.cancelled).cancel() def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel()
} }
private[akka] class DefaultCancellable(timeout: HWTimeout) extends AtomicReference[HWTimeout](timeout) with Cancellable { private[akka] class DefaultCancellable(timeout: HWTimeout) extends AtomicReference[HWTimeout](timeout) with Cancellable {
override def cancel(): Unit = getAndSet(ContinuousCancellable.cancelled).cancel() override def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel()
override def isCancelled: Boolean = get().isCancelled override def isCancelled: Boolean = get().isCancelled
} }

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor
@ -674,7 +674,7 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac
* INTERNAL USE ONLY * INTERNAL USE ONLY
*/ */
private[akka] def invocationHandlerFor(@deprecatedName('typedActor_?) typedActor: AnyRef): TypedActorInvocationHandler = private[akka] def invocationHandlerFor(@deprecatedName('typedActor_?) typedActor: AnyRef): TypedActorInvocationHandler =
if ((typedActor ne null) && Proxy.isProxyClass(typedActor.getClass)) typedActor match { if ((typedActor ne null) && classOf[Proxy].isAssignableFrom(typedActor.getClass) && Proxy.isProxyClass(typedActor.getClass)) typedActor match {
case null null case null null
case other Proxy.getInvocationHandler(other) match { case other Proxy.getInvocationHandler(other) match {
case null null case null null

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor package akka.actor

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor.dsl package akka.actor.dsl

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.actor.dsl package akka.actor.dsl

Some files were not shown because too many files have changed in this diff Show more