From 42cf34d664accd48b9d51c40d9e061bcda4019cc Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 1 Mar 2011 08:01:53 +0100 Subject: [PATCH 1/5] Support self.reply in preRestart and postStop after exception in receive. Closes #669 --- .../src/main/scala/akka/actor/ActorRef.scala | 6 +- .../akka/actor/supervisor/Ticket669Spec.scala | 72 +++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a4f3c2baeb..98318ea017 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -657,6 +657,7 @@ class LocalActorRef private[akka] ( dispatcher.detach(this) _status = ActorRefInternals.SHUTDOWN actor.postStop + currentMessage = null Actor.registry.unregister(this) if (isRemotingEnabled) { if (isClientManaged_?) @@ -819,6 +820,7 @@ class LocalActorRef private[akka] ( try { cancelReceiveTimeout // FIXME: leave this here? actor(messageHandle.message) + currentMessage = null // reset current message after successful invocation } catch { case e: InterruptedException => {} // received message while actor is shutting down, ignore case e => handleExceptionInDispatch(e, messageHandle.message) @@ -830,8 +832,6 @@ class LocalActorRef private[akka] ( Actor.log.slf4j.error("Could not invoke actor [{}]", this) Actor.log.slf4j.error("Problem", e) throw e - } finally { - currentMessage = null //TODO: Don't reset this, we might want to resend the message } } } @@ -941,6 +941,8 @@ class LocalActorRef private[akka] ( } catch { case e => Actor.log.slf4j.debug("Unexpected exception during restart",e) false //An error or exception here should trigger a retry + } finally { + currentMessage = null } Actor.log.slf4j.debug("Restart: {} for [{}].", success, id) diff --git a/akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala new file mode 100644 index 0000000000..54c8179152 --- /dev/null +++ b/akka-actor/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor.supervisor + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import akka.actor._ +import akka.config.Supervision._ + +import org.scalatest.{BeforeAndAfterAll, WordSpec} +import org.scalatest.matchers.MustMatchers + +class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { + import Ticket669Spec._ + + override def afterAll = Actor.registry.shutdownAll + + "A supervised actor with lifecycle PERMANENT" should { + "be able to reply on failure during preRestart" in { + val latch = new CountDownLatch(1) + val sender = Actor.actorOf(new Sender(latch)).start + + val supervised = Actor.actorOf[Supervised] + val supervisor = Supervisor(SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 5, 10000), + Supervise(supervised, Permanent) :: Nil) + ) + + supervised.!("test")(Some(sender)) + latch.await(5, TimeUnit.SECONDS) must be (true) + } + + "be able to reply on failure during postStop" in { + val latch = new CountDownLatch(1) + val sender = Actor.actorOf(new Sender(latch)).start + + val supervised = Actor.actorOf[Supervised] + val supervisor = Supervisor(SupervisorConfig( + AllForOneStrategy(List(classOf[Exception]), 5, 10000), + Supervise(supervised, Temporary) :: Nil) + ) + + supervised.!("test")(Some(sender)) + latch.await(5, TimeUnit.SECONDS) must be (true) + } + } +} + +object Ticket669Spec { + class Sender(latch: CountDownLatch) extends Actor { + def receive = { + case "failure1" => latch.countDown + case "failure2" => latch.countDown + case _ => { } + } + } + + class Supervised extends Actor { + def receive = { + case msg => throw new Exception("test") + } + + override def preRestart(reason: scala.Throwable) { + self.reply_?("failure1") + } + + override def postStop { + self.reply_?("failure2") + } + } +} \ No newline at end of file From 7be3ddb88154d98c1dc81397b78dd45cfc36efa0 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 1 Mar 2011 11:57:00 +0100 Subject: [PATCH 2/5] Ensure proper cleanup even if postStop throws an exception. --- .../src/main/scala/akka/actor/ActorRef.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 98318ea017..6716a7c322 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -656,15 +656,18 @@ class LocalActorRef private[akka] ( cancelReceiveTimeout dispatcher.detach(this) _status = ActorRefInternals.SHUTDOWN - actor.postStop - currentMessage = null - Actor.registry.unregister(this) - if (isRemotingEnabled) { - if (isClientManaged_?) - Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid) - Actor.remote.unregister(this) + try { + actor.postStop + } finally { + currentMessage = null + Actor.registry.unregister(this) + if (isRemotingEnabled) { + if (isClientManaged_?) + Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid) + Actor.remote.unregister(this) + } + setActorSelfFields(actorInstance.get,null) } - setActorSelfFields(actorInstance.get,null) } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } From 8fe909a4e0108d8b39324ec31d7c35a3c8d9c33e Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 1 Mar 2011 12:07:52 +0100 Subject: [PATCH 3/5] Reset currentMessage if InterruptedException is thrown --- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6716a7c322..307dfa62dc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -825,7 +825,7 @@ class LocalActorRef private[akka] ( actor(messageHandle.message) currentMessage = null // reset current message after successful invocation } catch { - case e: InterruptedException => {} // received message while actor is shutting down, ignore + case e: InterruptedException => { currentMessage = null } // received message while actor is shutting down, ignore case e => handleExceptionInDispatch(e, messageHandle.message) } finally { checkReceiveTimeout // Reschedule receive timeout From 65aa143a3f1cb7154de81f961df51b7b032912ad Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Tue, 1 Mar 2011 18:29:37 +0530 Subject: [PATCH 4/5] now using sjson without scalaz dependency --- project/build/AkkaProject.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 5b92c3a0fe..adfff18ab4 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -106,7 +106,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback", sbt.DefaultMavenRepository) lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo) lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo) - lazy val scalazModuleConfig = ModuleConfiguration("org.scalaz", ScalaToolsSnapshotRepo) // ------------------------------------------------------------------------------------------------------------------- // Versions @@ -186,8 +185,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" //MIT - lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9" % "compile" //ApacheV2 - lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9" % "test" //ApacheV2 + lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 + lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" //LGPL 2.1 From ec84822675e86ef25c56c930a598eb692129f7d0 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 1 Mar 2011 20:46:52 +0100 Subject: [PATCH 5/5] update Buncher to make it more generic --- .../src/main/scala/Buncher.scala | 125 +++++++++++------- 1 file changed, 79 insertions(+), 46 deletions(-) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index 8c232255d0..a10528961f 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -2,19 +2,19 @@ package sample.fsm.buncher import scala.reflect.ClassManifest import akka.util.Duration -import akka.actor.{FSM, Actor} +import akka.actor.{FSM, Actor, ActorRef} /* -* generic typed object buncher. -* -* To instantiate it, use the factory method like so: -* Buncher(100, 500)(x : List[AnyRef] => x foreach println) -* which will yield a fully functional and started ActorRef. -* The type of messages allowed is strongly typed to match the -* supplied processing method; other messages are discarded (and -* possibly logged). -*/ -object Buncher { + * generic typed object buncher. + * + * To instantiate it, use the factory method like so: + * Buncher(100, 500)(x : List[AnyRef] => x foreach println) + * which will yield a fully functional ActorRef. + * The type of messages allowed is strongly typed to match the + * supplied processing method; other messages are discarded (and + * possibly logged). + */ +object GenericBuncher { trait State case object Idle extends State case object Active extends State @@ -22,54 +22,87 @@ object Buncher { case object Flush // send out current queue immediately case object Stop // poison pill - case class Data[A](start : Long, xs : List[A]) - - def apply[A : Manifest](singleTimeout : Duration, - multiTimeout : Duration)(f : List[A] => Unit) = - Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout).deliver(f)) + class MsgExtractor[A : Manifest] { + def unapply(m : AnyRef) : Option[A] = { + if (ClassManifest.fromClass(m.getClass) <:< manifest[A]) { + Some(m.asInstanceOf[A]) + } else { + None + } + } + } } -class Buncher[A : Manifest] private (val singleTimeout : Duration, val multiTimeout : Duration) - extends Actor with FSM[Buncher.State, Buncher.Data[A]] { - import Buncher._ +abstract class GenericBuncher[A : Manifest, B] (val singleTimeout : Duration, val multiTimeout : Duration) + extends Actor with FSM[GenericBuncher.State, B] { + import GenericBuncher._ import FSM._ - private val manifestA = manifest[A] - - private var send : List[A] => Unit = _ - private def deliver(f : List[A] => Unit) = { send = f; this } - - private def now = System.currentTimeMillis - private def check(m : AnyRef) = ClassManifest.fromClass(m.getClass) <:< manifestA - - startWith(Idle, Data(0, Nil)) + protected def empty : B + protected def merge(acc : B, elem : A) : B + protected def send(acc : B) : Unit + + protected def flush(acc : B) = { + send(acc) + cancelTimer("multi") + goto(Idle) using empty + } + + val Msg = new MsgExtractor[A] + + startWith(Idle, empty) when(Idle) { - case Event(m : AnyRef, _) if check(m) => - goto(Active) using Data(now, m.asInstanceOf[A] :: Nil) + case Event(Msg(m), acc) => + setTimer("multi", StateTimeout, multiTimeout, false) + goto(Active) using merge(acc, m) case Event(Flush, _) => stay case Event(Stop, _) => stop } when(Active, stateTimeout = Some(singleTimeout)) { - case Event(m : AnyRef, Data(start, xs)) if check(m) => - val l = m.asInstanceOf[A] :: xs - if (now - start > multiTimeout.toMillis) { - send(l.reverse) - goto(Idle) using Data(0, Nil) - } else { - stay using Data(start, l) - } - case Event(StateTimeout, Data(_, xs)) => - send(xs.reverse) - goto(Idle) using Data(0, Nil) - case Event(Flush, Data(_, xs)) => - send(xs.reverse) - goto(Idle) using Data(0, Nil) - case Event(Stop, Data(_, xs)) => - send(xs.reverse) + case Event(Msg(m), acc) => + stay using merge(acc, m) + case Event(StateTimeout, acc) => + flush(acc) + case Event(Flush, acc) => + flush(acc) + case Event(Stop, acc) => + send(acc) + cancelTimer("multi") stop } initialize } + +object Buncher { + case class Target(target : ActorRef) // for setting the target for default send action + + val Stop = GenericBuncher.Stop // make special message objects visible for Buncher clients + val Flush = GenericBuncher.Flush + + def apply[A : Manifest](singleTimeout : Duration, + multiTimeout : Duration) = + Actor.actorOf(new Buncher[A](singleTimeout, multiTimeout)) +} + +class Buncher[A : Manifest](singleTimeout : Duration, multiTimeout : Duration) + extends GenericBuncher[A, List[A]](singleTimeout, multiTimeout) { + + import Buncher._ + + private var target : Option[ActorRef] = None + protected def send(acc : List[A]) : Unit = if (target.isDefined) target.get ! acc.reverse + + protected def empty : List[A] = Nil + + protected def merge(l : List[A], elem : A) = elem :: l + + whenUnhandled { + case Event(Target(t), _) => + target = Some(t) + stay + } + +}