Merge branch 'master' into derekjw-future-dispatch

This commit is contained in:
Derek Williams 2011-03-01 16:16:33 -07:00
commit 2b37b60a87
4 changed files with 168 additions and 59 deletions

View file

@ -656,14 +656,18 @@ class LocalActorRef private[akka] (
cancelReceiveTimeout
dispatcher.detach(this)
_status = ActorRefInternals.SHUTDOWN
actor.postStop
Actor.registry.unregister(this)
if (isRemotingEnabled) {
if (isClientManaged_?)
Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid)
Actor.remote.unregister(this)
try {
actor.postStop
} finally {
currentMessage = null
Actor.registry.unregister(this)
if (isRemotingEnabled) {
if (isClientManaged_?)
Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid)
Actor.remote.unregister(this)
}
setActorSelfFields(actorInstance.get,null)
}
setActorSelfFields(actorInstance.get,null)
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
@ -819,8 +823,9 @@ class LocalActorRef private[akka] (
try {
cancelReceiveTimeout // FIXME: leave this here?
actor(messageHandle.message)
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException => {} // received message while actor is shutting down, ignore
case e: InterruptedException => { currentMessage = null } // received message while actor is shutting down, ignore
case e => handleExceptionInDispatch(e, messageHandle.message)
} finally {
checkReceiveTimeout // Reschedule receive timeout
@ -830,8 +835,6 @@ class LocalActorRef private[akka] (
Actor.log.slf4j.error("Could not invoke actor [{}]", this)
Actor.log.slf4j.error("Problem", e)
throw e
} finally {
currentMessage = null //TODO: Don't reset this, we might want to resend the message
}
}
}
@ -941,6 +944,8 @@ class LocalActorRef private[akka] (
} catch {
case e => Actor.log.slf4j.debug("Unexpected exception during restart",e)
false //An error or exception here should trigger a retry
} finally {
currentMessage = null
}
Actor.log.slf4j.debug("Restart: {} for [{}].", success, id)

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor.supervisor
import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.actor._
import akka.config.Supervision._
import org.scalatest.{BeforeAndAfterAll, WordSpec}
import org.scalatest.matchers.MustMatchers
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
import Ticket669Spec._
override def afterAll = Actor.registry.shutdownAll
"A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Permanent) :: Nil)
)
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be (true)
}
"be able to reply on failure during postStop" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start
val supervised = Actor.actorOf[Supervised]
val supervisor = Supervisor(SupervisorConfig(
AllForOneStrategy(List(classOf[Exception]), 5, 10000),
Supervise(supervised, Temporary) :: Nil)
)
supervised.!("test")(Some(sender))
latch.await(5, TimeUnit.SECONDS) must be (true)
}
}
}
object Ticket669Spec {
class Sender(latch: CountDownLatch) extends Actor {
def receive = {
case "failure1" => latch.countDown
case "failure2" => latch.countDown
case _ => { }
}
}
class Supervised extends Actor {
def receive = {
case msg => throw new Exception("test")
}
override def preRestart(reason: scala.Throwable) {
self.reply_?("failure1")
}
override def postStop {
self.reply_?("failure2")
}
}
}

View file

@ -2,19 +2,19 @@ package sample.fsm.buncher
import scala.reflect.ClassManifest
import akka.util.Duration
import akka.actor.{FSM, Actor}
import akka.actor.{FSM, Actor, ActorRef}
/*
* generic typed object buncher.
*
* To instantiate it, use the factory method like so:
* Buncher(100, 500)(x : List[AnyRef] => x foreach println)
* which will yield a fully functional and started ActorRef.
* The type of messages allowed is strongly typed to match the
* supplied processing method; other messages are discarded (and
* possibly logged).
*/
object Buncher {
* generic typed object buncher.
*
* To instantiate it, use the factory method like so:
* Buncher(100, 500)(x : List[AnyRef] => x foreach println)
* which will yield a fully functional ActorRef.
* The type of messages allowed is strongly typed to match the
* supplied processing method; other messages are discarded (and
* possibly logged).
*/
object GenericBuncher {
trait State
case object Idle extends State
case object Active extends State
@ -22,54 +22,87 @@ object Buncher {
case object Flush // send out current queue immediately
case object Stop // poison pill
case class Data[A](start : Long, xs : List[A])
def apply[A : Manifest](singleTimeout : Duration,
multiTimeout : Duration)(f : List[A] => Unit) =
Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout).deliver(f))
class MsgExtractor[A : Manifest] {
def unapply(m : AnyRef) : Option[A] = {
if (ClassManifest.fromClass(m.getClass) <:< manifest[A]) {
Some(m.asInstanceOf[A])
} else {
None
}
}
}
}
class Buncher[A : Manifest] private (val singleTimeout : Duration, val multiTimeout : Duration)
extends Actor with FSM[Buncher.State, Buncher.Data[A]] {
import Buncher._
abstract class GenericBuncher[A : Manifest, B] (val singleTimeout : Duration, val multiTimeout : Duration)
extends Actor with FSM[GenericBuncher.State, B] {
import GenericBuncher._
import FSM._
private val manifestA = manifest[A]
private var send : List[A] => Unit = _
private def deliver(f : List[A] => Unit) = { send = f; this }
private def now = System.currentTimeMillis
private def check(m : AnyRef) = ClassManifest.fromClass(m.getClass) <:< manifestA
startWith(Idle, Data(0, Nil))
protected def empty : B
protected def merge(acc : B, elem : A) : B
protected def send(acc : B) : Unit
protected def flush(acc : B) = {
send(acc)
cancelTimer("multi")
goto(Idle) using empty
}
val Msg = new MsgExtractor[A]
startWith(Idle, empty)
when(Idle) {
case Event(m : AnyRef, _) if check(m) =>
goto(Active) using Data(now, m.asInstanceOf[A] :: Nil)
case Event(Msg(m), acc) =>
setTimer("multi", StateTimeout, multiTimeout, false)
goto(Active) using merge(acc, m)
case Event(Flush, _) => stay
case Event(Stop, _) => stop
}
when(Active, stateTimeout = Some(singleTimeout)) {
case Event(m : AnyRef, Data(start, xs)) if check(m) =>
val l = m.asInstanceOf[A] :: xs
if (now - start > multiTimeout.toMillis) {
send(l.reverse)
goto(Idle) using Data(0, Nil)
} else {
stay using Data(start, l)
}
case Event(StateTimeout, Data(_, xs)) =>
send(xs.reverse)
goto(Idle) using Data(0, Nil)
case Event(Flush, Data(_, xs)) =>
send(xs.reverse)
goto(Idle) using Data(0, Nil)
case Event(Stop, Data(_, xs)) =>
send(xs.reverse)
case Event(Msg(m), acc) =>
stay using merge(acc, m)
case Event(StateTimeout, acc) =>
flush(acc)
case Event(Flush, acc) =>
flush(acc)
case Event(Stop, acc) =>
send(acc)
cancelTimer("multi")
stop
}
initialize
}
object Buncher {
case class Target(target : ActorRef) // for setting the target for default send action
val Stop = GenericBuncher.Stop // make special message objects visible for Buncher clients
val Flush = GenericBuncher.Flush
def apply[A : Manifest](singleTimeout : Duration,
multiTimeout : Duration) =
Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout))
}
class Buncher[A : Manifest](singleTimeout : Duration, multiTimeout : Duration)
extends GenericBuncher[A, List[A]](singleTimeout, multiTimeout) {
import Buncher._
private var target : Option[ActorRef] = None
protected def send(acc : List[A]) : Unit = if (target.isDefined) target.get ! acc.reverse
protected def empty : List[A] = Nil
protected def merge(l : List[A], elem : A) = elem :: l
whenUnhandled {
case Event(Target(t), _) =>
target = Some(t)
stay
}
}

View file

@ -106,7 +106,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback", sbt.DefaultMavenRepository)
lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo)
lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo)
lazy val scalazModuleConfig = ModuleConfiguration("org.scalaz", ScalaToolsSnapshotRepo)
// -------------------------------------------------------------------------------------------------------------------
// Versions
@ -186,8 +185,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" //MIT
lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9" % "compile" //ApacheV2
lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9" % "test" //ApacheV2
lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2
lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2
lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" //LGPL 2.1