diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a4f3c2baeb..307dfa62dc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -656,14 +656,18 @@ class LocalActorRef private[akka] ( cancelReceiveTimeout dispatcher.detach(this) _status = ActorRefInternals.SHUTDOWN - actor.postStop - Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid) - Actor.remote.unregister(this) + try { + actor.postStop + } finally { + currentMessage = null + Actor.registry.unregister(this) + if (isRemotingEnabled) { + if (isClientManaged_?) + Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid) + Actor.remote.unregister(this) + } + setActorSelfFields(actorInstance.get,null) } - setActorSelfFields(actorInstance.get,null) } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } @@ -819,8 +823,9 @@ class LocalActorRef private[akka] ( try { cancelReceiveTimeout // FIXME: leave this here? actor(messageHandle.message) + currentMessage = null // reset current message after successful invocation } catch { - case e: InterruptedException => {} // received message while actor is shutting down, ignore + case e: InterruptedException => { currentMessage = null } // received message while actor is shutting down, ignore case e => handleExceptionInDispatch(e, messageHandle.message) } finally { checkReceiveTimeout // Reschedule receive timeout @@ -830,8 +835,6 @@ class LocalActorRef private[akka] ( Actor.log.slf4j.error("Could not invoke actor [{}]", this) Actor.log.slf4j.error("Problem", e) throw e - } finally { - currentMessage = null //TODO: Don't reset this, we might want to resend the message } } } @@ -941,6 +944,8 @@ class LocalActorRef private[akka] ( } catch { case e => Actor.log.slf4j.debug("Unexpected exception during restart",e) false //An error or exception here should trigger a retry + } finally { + currentMessage = null } Actor.log.slf4j.debug("Restart: {} for [{}].", success, id) diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala new file mode 100644 index 0000000000..54c8179152 --- /dev/null +++ b/akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.supervisor + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import akka.actor._ +import akka.config.Supervision._ + +import org.scalatest.{BeforeAndAfterAll, WordSpec} +import org.scalatest.matchers.MustMatchers + +class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { + import Ticket669Spec._ + + override def afterAll = Actor.registry.shutdownAll + + "A supervised actor with lifecycle PERMANENT" should { + "be able to reply on failure during preRestart" in { + val latch = new CountDownLatch(1) + val sender = Actor.actorOf(new Sender(latch)).start + + val supervised = Actor.actorOf[Supervised] + val supervisor = Supervisor(SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 5, 10000), + Supervise(supervised, Permanent) :: Nil) + ) + + supervised.!("test")(Some(sender)) + latch.await(5, TimeUnit.SECONDS) must be (true) + } + + "be able to reply on failure during postStop" in { + val latch = new CountDownLatch(1) + val sender = Actor.actorOf(new Sender(latch)).start + + val supervised = Actor.actorOf[Supervised] + val supervisor = Supervisor(SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 5, 10000), + Supervise(supervised, Temporary) :: Nil) + ) + + supervised.!("test")(Some(sender)) + latch.await(5, TimeUnit.SECONDS) must be (true) + } + } +} + +object Ticket669Spec { + class Sender(latch: CountDownLatch) extends Actor { + def receive = { + case "failure1" => latch.countDown + case "failure2" => latch.countDown + case _ => { } + } + } + + class Supervised extends Actor { + def receive = { + case msg => throw new Exception("test") + } + + override def preRestart(reason: scala.Throwable) { + self.reply_?("failure1") + } + + override def postStop { + self.reply_?("failure2") + } + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index 8c232255d0..a10528961f 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -2,19 +2,19 @@ package sample.fsm.buncher import scala.reflect.ClassManifest import akka.util.Duration -import akka.actor.{FSM, Actor} +import akka.actor.{FSM, Actor, ActorRef} /* -* generic typed object buncher. -* -* To instantiate it, use the factory method like so: -* Buncher(100, 500)(x : List[AnyRef] => x foreach println) -* which will yield a fully functional and started ActorRef. -* The type of messages allowed is strongly typed to match the -* supplied processing method; other messages are discarded (and -* possibly logged). -*/ -object Buncher { + * generic typed object buncher. + * + * To instantiate it, use the factory method like so: + * Buncher(100, 500)(x : List[AnyRef] => x foreach println) + * which will yield a fully functional ActorRef. + * The type of messages allowed is strongly typed to match the + * supplied processing method; other messages are discarded (and + * possibly logged). + */ +object GenericBuncher { trait State case object Idle extends State case object Active extends State @@ -22,54 +22,87 @@ object Buncher { case object Flush // send out current queue immediately case object Stop // poison pill - case class Data[A](start : Long, xs : List[A]) - - def apply[A : Manifest](singleTimeout : Duration, - multiTimeout : Duration)(f : List[A] => Unit) = - Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout).deliver(f)) + class MsgExtractor[A : Manifest] { + def unapply(m : AnyRef) : Option[A] = { + if (ClassManifest.fromClass(m.getClass) <:< manifest[A]) { + Some(m.asInstanceOf[A]) + } else { + None + } + } + } } -class Buncher[A : Manifest] private (val singleTimeout : Duration, val multiTimeout : Duration) - extends Actor with FSM[Buncher.State, Buncher.Data[A]] { - import Buncher._ +abstract class GenericBuncher[A : Manifest, B] (val singleTimeout : Duration, val multiTimeout : Duration) + extends Actor with FSM[GenericBuncher.State, B] { + import GenericBuncher._ import FSM._ - private val manifestA = manifest[A] - - private var send : List[A] => Unit = _ - private def deliver(f : List[A] => Unit) = { send = f; this } - - private def now = System.currentTimeMillis - private def check(m : AnyRef) = ClassManifest.fromClass(m.getClass) <:< manifestA - - startWith(Idle, Data(0, Nil)) + protected def empty : B + protected def merge(acc : B, elem : A) : B + protected def send(acc : B) : Unit + + protected def flush(acc : B) = { + send(acc) + cancelTimer("multi") + goto(Idle) using empty + } + + val Msg = new MsgExtractor[A] + + startWith(Idle, empty) when(Idle) { - case Event(m : AnyRef, _) if check(m) => - goto(Active) using Data(now, m.asInstanceOf[A] :: Nil) + case Event(Msg(m), acc) => + setTimer("multi", StateTimeout, multiTimeout, false) + goto(Active) using merge(acc, m) case Event(Flush, _) => stay case Event(Stop, _) => stop } when(Active, stateTimeout = Some(singleTimeout)) { - case Event(m : AnyRef, Data(start, xs)) if check(m) => - val l = m.asInstanceOf[A] :: xs - if (now - start > multiTimeout.toMillis) { - send(l.reverse) - goto(Idle) using Data(0, Nil) - } else { - stay using Data(start, l) - } - case Event(StateTimeout, Data(_, xs)) => - send(xs.reverse) - goto(Idle) using Data(0, Nil) - case Event(Flush, Data(_, xs)) => - send(xs.reverse) - goto(Idle) using Data(0, Nil) - case Event(Stop, Data(_, xs)) => - send(xs.reverse) + case Event(Msg(m), acc) => + stay using merge(acc, m) + case Event(StateTimeout, acc) => + flush(acc) + case Event(Flush, acc) => + flush(acc) + case Event(Stop, acc) => + send(acc) + cancelTimer("multi") stop } initialize } + +object Buncher { + case class Target(target : ActorRef) // for setting the target for default send action + + val Stop = GenericBuncher.Stop // make special message objects visible for Buncher clients + val Flush = GenericBuncher.Flush + + def apply[A : Manifest](singleTimeout : Duration, + multiTimeout : Duration) = + Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout)) +} + +class Buncher[A : Manifest](singleTimeout : Duration, multiTimeout : Duration) + extends GenericBuncher[A, List[A]](singleTimeout, multiTimeout) { + + import Buncher._ + + private var target : Option[ActorRef] = None + protected def send(acc : List[A]) : Unit = if (target.isDefined) target.get ! acc.reverse + + protected def empty : List[A] = Nil + + protected def merge(l : List[A], elem : A) = elem :: l + + whenUnhandled { + case Event(Target(t), _) => + target = Some(t) + stay + } + +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5b92c3a0fe..adfff18ab4 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -106,7 +106,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback", sbt.DefaultMavenRepository) lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo) lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo) - lazy val scalazModuleConfig = ModuleConfiguration("org.scalaz", ScalaToolsSnapshotRepo) // ------------------------------------------------------------------------------------------------------------------- // Versions @@ -186,8 +185,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" //MIT - lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9" % "compile" //ApacheV2 - lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9" % "test" //ApacheV2 + lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 + lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" //LGPL 2.1