From 10517e518333c162b906b6bcef6f462a72ba7b2a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 Nov 2011 13:35:10 +0100 Subject: [PATCH 1/4] Skip stack trace when log error without exception. See #1402 --- .../src/main/scala/akka/event/Logging.scala | 20 ++++++--- .../main/scala/akka/event/slf4j/SLF4J.scala | 41 +++++++++++++++---- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 5b3ae4b801..af3fc25797 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -268,6 +268,7 @@ object Logging { val AllLogLevels = Seq(ErrorLevel: AnyRef, WarningLevel, InfoLevel, DebugLevel).asInstanceOf[Seq[LogLevel]] val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern @@ -311,7 +312,12 @@ object Logging { def level = ErrorLevel } object Error { - def apply(logSource: String, message: Any) = new Error(new EventHandlerException, logSource, message) + def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message) + + /** Null Object used for errors without cause Throwable */ + object NoCause extends RuntimeException { + setStackTrace(Array.empty[StackTraceElement]) + } } case class Warning(logSource: String, message: Any = "") extends LogEvent { @@ -363,13 +369,15 @@ object Logging { } } - def error(event: Error) = - println(errorFormat.format( + def error(event: Error) = { + val f = if (event.cause == Error.NoCause) errorFormatWithoutCause else errorFormat + println(f.format( timestamp, event.thread.getName, event.logSource, event.message, stackTraceFor(event.cause))) + } def warning(event: Warning) = println(warningFormat.format( @@ -429,14 +437,14 @@ object Logging { } def stackTraceFor(e: Throwable) = { - if (e ne null) { + if ((e eq null) || e == Error.NoCause) { + "" + } else { import java.io.{ StringWriter, PrintWriter } val sw = new StringWriter val pw = new PrintWriter(sw) e.printStackTrace(pw) sw.toString - } else { - "[NO STACK TRACE]" } } diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index dc175e4e82..73d37a838b 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -5,7 +5,7 @@ package akka.event.slf4j import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory } - +import org.slf4j.MDC import akka.event.Logging._ import akka.actor._ @@ -27,31 +27,54 @@ object Logger { /** * SLF4J Event Handler. * + * The thread in which the logging was performed is captured in + * Mapped Diagnostic Context (MDC) with attribute name "sourceThread". + * * @author Jonas Bonér */ class Slf4jEventHandler extends Actor with SLF4JLogging { + val mdcThreadAttributeName = "sourceThread" + def receive = { + case event @ Error(cause, logSource, message) ⇒ - Logger(logSource).error("[{}] [{}] [{}]", - Array[AnyRef](event.thread.getName, message.asInstanceOf[AnyRef], stackTraceFor(cause))) + withMdc(mdcThreadAttributeName, event.thread.getName) { + cause match { + case Error.NoCause ⇒ Logger(logSource).error(message.toString) + case _ ⇒ Logger(logSource).error(message.toString, cause) + } + } case event @ Warning(logSource, message) ⇒ - Logger(logSource).warn("[{}] [{}]", - event.thread.getName, message.asInstanceOf[AnyRef]) + withMdc(mdcThreadAttributeName, event.thread.getName) { + Logger(logSource).warn("{}", message.asInstanceOf[AnyRef]) + } case event @ Info(logSource, message) ⇒ - Logger(logSource).info("[{}] [{}]", - event.thread.getName, message.asInstanceOf[AnyRef]) + withMdc(mdcThreadAttributeName, event.thread.getName) { + Logger(logSource).info("{}", message.asInstanceOf[AnyRef]) + } case event @ Debug(logSource, message) ⇒ - Logger(logSource).debug("[{}] [{}]", - event.thread.getName, message.asInstanceOf[AnyRef]) + withMdc(mdcThreadAttributeName, event.thread.getName) { + Logger(logSource).debug("{}", message.asInstanceOf[AnyRef]) + } case InitializeLogger(_) ⇒ log.info("Slf4jEventHandler started") sender ! LoggerInitialized } + @inline + final def withMdc(name: String, value: String)(logStatement: ⇒ Unit) { + MDC.put(name, value) + try { + logStatement + } finally { + MDC.remove(name) + } + } + } From 539e12a7a403c2b29fe5d43e842426da8f2ce1e7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 28 Nov 2011 16:14:22 +0100 Subject: [PATCH 2/4] Making TypedActors an Akka Extension and adding LifeCycle overrides to TypedActors, see #1371 and #1397 --- .../test/java/akka/actor/JavaExtension.java | 2 +- .../scala/akka/actor/TypedActorSpec.scala | 88 ++- .../scala/akka/routing/ActorPoolSpec.scala | 8 +- .../src/main/scala/akka/actor/ActorCell.scala | 4 +- .../main/scala/akka/actor/ActorSystem.scala | 18 +- .../src/main/scala/akka/actor/Extension.scala | 15 + .../main/scala/akka/actor/TypedActor.scala | 544 +++++++++++------- 7 files changed, 405 insertions(+), 274 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index 2600188cac..fefec7640a 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -14,7 +14,7 @@ import static org.junit.Assert.*; public class JavaExtension { static class Provider implements ExtensionIdProvider { - public ExtensionId lookup() { return defaultInstance; } + public ExtensionId lookup() { return defaultInstance; } } public final static TestExtensionId defaultInstance = new TestExtensionId(); diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index ac796d407d..00fb05561d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -5,7 +5,6 @@ package akka.actor */ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import akka.japi.{ Option ⇒ JOption } import akka.util.Duration import akka.util.duration._ import akka.dispatch.{ Dispatchers, Future, KeptPromise } @@ -14,6 +13,9 @@ import java.util.concurrent.atomic.AtomicReference import annotation.tailrec import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } import akka.serialization.SerializationExtension +import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart } +import java.util.concurrent.{ TimeUnit, CountDownLatch } +import akka.japi.{ Creator, Option ⇒ JOption } object TypedActorSpec { @@ -135,6 +137,23 @@ object TypedActorSpec { class StackedImpl extends Stacked { override def stacked: String = "FOOBAR" //Uppercase } + + trait LifeCycles { + def crash(): Unit + } + + class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles { + + override def crash(): Unit = throw new IllegalStateException("Crash!") + + override def preStart(): Unit = latch.countDown() + + override def postStop(): Unit = for (i ← 1 to 3) latch.countDown() + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown() + + override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown() + } } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -148,18 +167,18 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte newFooBar(Props().withTimeout(Timeout(d))) def newFooBar(props: Props): Foo = - system.typedActorOf(classOf[Foo], classOf[Bar], props) + TypedActor(system).typedActorOf(classOf[Foo], classOf[Bar], props) def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked = - system.typedActorOf(classOf[Stacked], classOf[StackedImpl], props) + TypedActor(system).typedActorOf(classOf[Stacked], classOf[StackedImpl], props) - def mustStop(typedActor: AnyRef) = system.typedActor.stop(typedActor) must be(true) + def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true) "TypedActors" must { "be able to instantiate" in { val t = newFooBar - system.typedActor.isTypedActor(t) must be(true) + TypedActor(system).isTypedActor(t) must be(true) mustStop(t) } @@ -169,7 +188,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte } "not stop non-started ones" in { - system.typedActor.stop(null) must be(false) + TypedActor(system).stop(null) must be(false) } "throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in { @@ -188,7 +207,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to call toString" in { val t = newFooBar - t.toString must be(system.typedActor.getActorRefFor(t).toString) + t.toString must be(TypedActor(system).getActorRefFor(t).toString) mustStop(t) } @@ -201,7 +220,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to call hashCode" in { val t = newFooBar - t.hashCode must be(system.typedActor.getActorRefFor(t).hashCode) + t.hashCode must be(TypedActor(system).getActorRefFor(t).hashCode) mustStop(t) } @@ -264,7 +283,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to handle exceptions when calling methods" in { filterEvents(EventFilter[IllegalStateException]("expected")) { val boss = actorOf(Props(context ⇒ { - case p: Props ⇒ context.sender ! context.typedActorOf(classOf[Foo], classOf[Bar], p) + case p: Props ⇒ context.sender ! TypedActor(context).typedActorOf(classOf[Foo], classOf[Bar], p) }).withFaultHandler(OneForOneStrategy { case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume })) @@ -296,7 +315,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte } "be able to support implementation only typed actors" in { - val t = system.typedActorOf[Foo, Bar](Props()) + val t = TypedActor(system).typedActorOf[Foo, Bar](Props()) val f = t.futurePigdog(200) val f2 = t.futurePigdog(0) f2.isCompleted must be(false) @@ -306,7 +325,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte } "be able to support implementation only typed actors with complex interfaces" in { - val t = system.typedActorOf[Stackable1 with Stackable2, StackedImpl]() + val t = TypedActor(system).typedActorOf[Stackable1 with Stackable2, StackedImpl]() t.stackable1 must be("foo") t.stackable2 must be("bar") mustStop(t) @@ -333,17 +352,16 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to serialize and deserialize invocations" in { import java.io._ - val serialization = SerializationExtension(system) - val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]()) - val baos = new ByteArrayOutputStream(8192 * 4) - val out = new ObjectOutputStream(baos) - - out.writeObject(m) - out.close() - - val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { + val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]()) + val baos = new ByteArrayOutputStream(8192 * 4) + val out = new ObjectOutputStream(baos) + + out.writeObject(m) + out.close() + + val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) + val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] mNew.method must be(m.method) @@ -353,17 +371,16 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to serialize and deserialize invocations' parameters" in { import java.io._ val someFoo: Foo = new Bar - val serialization = SerializationExtension(system) - val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef])) - val baos = new ByteArrayOutputStream(8192 * 4) - val out = new ObjectOutputStream(baos) - - out.writeObject(m) - out.close() - - val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { + val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef])) + val baos = new ByteArrayOutputStream(8192 * 4) + val out = new ObjectOutputStream(baos) + + out.writeObject(m) + out.close() + + val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) + val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] mNew.method must be(m.method) @@ -375,5 +392,14 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte mNew.parameters(2).asInstanceOf[Int] must be === 1 } } + + "be able to override lifecycle callbacks" in { + val latch = new CountDownLatch(16) + val ta = TypedActor(system) + val t: LifeCycles = ta.typedActorOf(classOf[LifeCycles], new Creator[LifeCyclesImpl] { def create = new LifeCyclesImpl(latch) }, Props()) + t.crash() + ta.poisonPill(t) + latch.await(10, TimeUnit.SECONDS) must be === true + } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index e4ccf34768..d9503e31b7 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -29,7 +29,9 @@ class TypedActorPoolSpec extends AkkaSpec { import ActorPoolSpec._ "Actor Pool (2)" must { "support typed actors" in { - val pool = system.createProxy[Foo](new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + val ta = TypedActor(system) + val pool = ta.createProxy[Foo](new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + val typedActor = TypedActor(context) def lowerBound = 1 def upperBound = 5 def pressureThreshold = 1 @@ -38,7 +40,7 @@ class TypedActorPoolSpec extends AkkaSpec { def rampupRate = 0.1 def backoffRate = 0.50 def backoffThreshold = 0.50 - def instance(p: Props) = system.typedActor.getActorRefFor(context.typedActorOf[Foo, FooImpl](props = p.withTimeout(10 seconds))) + def instance(p: Props) = typedActor.getActorRefFor(typedActor.typedActorOf[Foo, FooImpl](props = p.withTimeout(10 seconds))) def receive = _route }, Props().withTimeout(10 seconds).withFaultHandler(faultHandler)) @@ -47,7 +49,7 @@ class TypedActorPoolSpec extends AkkaSpec { for ((i, r) ← results) r.get must equal(i * i) - system.typedActor.stop(pool) + ta.stop(pool) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 330824290f..afd462ff1e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -16,7 +16,7 @@ import akka.util.{ Duration, Helpers } * Exposes contextual information for the actor and the current message. * TODO: everything here for current compatibility - could be limited more */ -trait ActorContext extends ActorRefFactory with TypedActorFactory { +trait ActorContext extends ActorRefFactory { def self: ActorRef @@ -81,8 +81,6 @@ private[akka] class ActorCell( protected final def guardian = self - protected def typedActor = system.typedActor - final def provider = system.provider override def receiveTimeout: Option[Long] = if (receiveTimeoutData._1 > 0) Some(receiveTimeoutData._1) else None diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 83d7aa9d3c..4370149064 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -8,12 +8,7 @@ import akka.actor._ import akka.event._ import akka.dispatch._ import akka.util.duration._ -import java.net.InetAddress -import com.eaio.uuid.UUID -import akka.serialization.Serialization -import akka.remote.RemoteAddress import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.NANOSECONDS import java.io.File @@ -25,10 +20,8 @@ import java.lang.reflect.InvocationTargetException import akka.util.{ Helpers, Duration, ReflectiveAccess } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.CountDownLatch -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import scala.annotation.tailrec -import akka.serialization.SerializationExtension import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap object ActorSystem { @@ -151,7 +144,7 @@ object ActorSystem { * configuration, e.g. dispatchers, deployments, remote capabilities and * addresses. It is also the entry point for creating or looking up actors. */ -abstract class ActorSystem extends ActorRefFactory with TypedActorFactory { +abstract class ActorSystem extends ActorRefFactory { import ActorSystem._ /** @@ -209,9 +202,6 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory { // FIXME: do not publish this def deadLetterMailbox: Mailbox - // FIXME: TypedActor should be an extension - def typedActor: TypedActor - /** * Light-weight scheduler for running asynchronous tasks after some deadline * in the future. Not terribly precise but cheap. @@ -337,15 +327,9 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A private final val nextName = new AtomicLong override protected def randomName(): String = Helpers.base64(nextName.incrementAndGet()) - @volatile - private var _typedActor: TypedActor = _ - def typedActor = _typedActor - def /(actorName: String): ActorPath = guardian.path / actorName private lazy val _start: this.type = { - // TODO can we do something better than loading SerializationExtension from here? - _typedActor = new TypedActor(settings, SerializationExtension(this)) provider.init(this) deadLetters.init(dispatcher, provider.rootPath) // this starts the reaper actor and the user-configured logging subscribers, which are also actors diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index 3850ef4462..bfd4ab6a52 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -29,7 +29,22 @@ trait Extension * otherwise you'll get the same extension loaded multiple times. */ trait ExtensionId[T <: Extension] { + + /** + * Returns an instance of the extension identified by this ExtensionId instance. + */ def apply(system: ActorSystem): T = system.registerExtension(this) + + /** + * Returns an instance of the extension identified by this ExtensionId instance. + * Java API + */ + def get(system: ActorSystem): T = apply(system) + + /** + * Is used by Akka to instantiate the Extension identified by this ExtensionId, + * internal use only. + */ def createExtension(system: ActorSystemImpl): T } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index b6103048ff..3ae639e95f 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -12,12 +12,189 @@ import akka.serialization.{ Serializer, Serialization } import akka.dispatch._ import akka.serialization.SerializationExtension -object TypedActor { +trait TypedActorFactory { + + protected def actorFactory: ActorRefFactory + + protected def typedActor: TypedActorExtension + + /** + * Stops the underlying ActorRef for the supplied TypedActor proxy, + * if any, returns whether it could find the find the ActorRef or not + */ + def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match { + case null ⇒ false + case ref ⇒ ref.stop; true + } + + /** + * Sends a PoisonPill the underlying ActorRef for the supplied TypedActor proxy, + * if any, returns whether it could find the find the ActorRef or not + */ + def poisonPill(proxy: AnyRef): Boolean = getActorRefFor(proxy) match { + case null ⇒ false + case ref ⇒ ref ! PoisonPill; true + } + + /** + * Returns wether the supplied AnyRef is a TypedActor proxy or not + */ + def isTypedActor(proxyOrNot: AnyRef): Boolean + + /** + * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found + */ + def getActorRefFor(proxy: AnyRef): ActorRef + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the suppli ed interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, interface.getClassLoader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), interface.getClassLoader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, interface.getClassLoader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), interface.getClassLoader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) + */ + def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(actorFactory, impl, impl.newInstance, props, None, loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) + */ + def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(actorFactory, impl, impl.newInstance, props, Some(name), loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) + */ + def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = { + val clazz = m.erasure.asInstanceOf[Class[T]] + typedActor.createProxyAndTypedActor(actorFactory, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader) + } + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R = + typedActor.createProxy[R](actorFactory, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader) + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = + typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, None, loader) + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Some(name), loader) + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = + typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, None, loader) + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = + typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Some(name), loader) + +} + +object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl): TypedActorExtension = new TypedActorExtension(system) + + /** + * Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension + * will be children to the specified context, this allows for creating hierarchies of TypedActors. + * Do _not_ let this instance escape the TypedActor since that will not be thread-safe. + */ + def apply(context: ActorContext): TypedActorFactory = ContextualTypedActorFactory(apply(context.system), context) + + /** + * Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension + * will be children to the specified context, this allows for creating hierarchies of TypedActors. + * Do _not_ let this instance escape the TypedActor since that will not be thread-safe. + * + * Java API + */ + def get(context: ActorContext): TypedActorFactory = apply(context) + /** * This class represents a Method call, and has a reference to the Method to be called and the parameters to supply * It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized */ - case class MethodCall(ser: Serialization, method: Method, parameters: Array[AnyRef]) { + case class MethodCall(method: Method, parameters: Array[AnyRef]) { def isOneWay = method.getReturnType == java.lang.Void.TYPE def returnsFuture_? = classOf[Future[_]].isAssignableFrom(method.getReturnType) @@ -41,7 +218,7 @@ object TypedActor { case null ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null) case ps if ps.length == 0 ⇒ SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]()) case ps ⇒ - val serializers: Array[Serializer] = ps map ser.findSerializerFor + val serializers: Array[Serializer] = ps map SerializationExtension(Serialization.currentSystem.value).findSerializerFor val serializedParameters: Array[Array[Byte]] = Array.ofDim[Array[Byte]](serializers.length) for (i ← 0 until serializers.length) serializedParameters(i) = serializers(i) toBinary parameters(i) //Mutable for the sake of sanity @@ -63,21 +240,21 @@ object TypedActor { "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") val serialization = SerializationExtension(system) - MethodCall(serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { + MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { case null ⇒ null case a if a.length == 0 ⇒ Array[AnyRef]() case a ⇒ val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity - for (i ← 0 until a.length) { + for (i ← 0 until a.length) deserializedParameters(i) = serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i)) - } + deserializedParameters }) } } private val selfReference = new ThreadLocal[AnyRef] - private val appReference = new ThreadLocal[ActorSystem] + private val currentSystem = new ThreadLocal[ActorSystem] /** * Returns the reference to the proxy when called inside a method call in a TypedActor @@ -105,7 +282,7 @@ object TypedActor { /** * Returns the akka system (for a TypedActor) when inside a method call in a TypedActor. */ - def system = appReference.get match { + def system = currentSystem.get match { case null ⇒ throw new IllegalStateException("Calling TypedActor.system outside of a TypedActor implementation method!") case some ⇒ some } @@ -119,220 +296,37 @@ object TypedActor { * Returns the default timeout (for a TypedActor) when inside a method call in a TypedActor. */ implicit def timeout = system.settings.ActorTimeout -} - -trait TypedActorFactory { this: ActorRefFactory ⇒ - - protected def typedActor: TypedActor /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class + * Implementation of TypedActor as an Actor */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) - */ - def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, None, loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) - */ - def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Some(name), loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) - */ - def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = { - val clazz = m.erasure.asInstanceOf[Class[T]] - typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader) - } - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R = - typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = - typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, None, loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Some(name), loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = - typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, None, loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Some(name), loader) - -} - -//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala -/** - * A TypedActor in Akka is an implementation of the Active Objects Pattern, i.e. an object with asynchronous method dispatch - * - * It consists of 2 parts: - * The Interface - * The Implementation - * - * Given a combination of Interface and Implementation, a JDK Dynamic Proxy object with the Interface will be returned - * - * The semantics is as follows, - * any methods in the Interface that returns Unit/void will use fire-and-forget semantics (same as Actor !) - * any methods in the Interface that returns Option/JOption will use ask + block-with-timeout-return-none-if-timeout semantics - * any methods in the Interface that returns anything else will use ask + block-with-timeout-throw-if-timeout semantics - * - * TypedActors needs, just like Actors, to be Stopped when they are no longer needed, use TypedActor.stop(proxy) - */ -class TypedActor(val settings: ActorSystem.Settings, var ser: Serialization) { - - import TypedActor.MethodCall - /** - * Stops the underlying ActorRef for the supplied TypedActor proxy, if any, returns whether it could stop it or not - */ - def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match { - case null ⇒ false - case ref ⇒ ref.stop; true - } - - /** - * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found - */ - def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match { - case null ⇒ null - case handler ⇒ handler.actor - } - - /** - * Returns wether the supplied AnyRef is a TypedActor proxy or not - */ - def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null - - /* Internal API */ - - private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = - if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match { - case null ⇒ null - case other ⇒ Proxy.getInvocationHandler(other) match { - case null ⇒ null - case handler: TypedActorInvocationHandler ⇒ handler - case _ ⇒ null - } - } - else null - - private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: Option[String], loader: ClassLoader): R = { - val proxyVar = new AtomVar[R] - configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader) - } - - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: Option[String], loader: ClassLoader): R = - createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader) - - private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = { - //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling - val actorVar = new AtomVar[ActorRef](null) - val timeout = props.timeout match { - case Props.`defaultTimeout` ⇒ settings.ActorTimeout - case x ⇒ x - } - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar, timeout)).asInstanceOf[T] - proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props) - actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet - proxyVar.get - } - - private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces - private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor { val me = createInstance + + override def preStart(): Unit = me match { + case l: PreStart ⇒ l.preStart() + case _ ⇒ super.preStart() + } + + override def postStop(): Unit = me match { + case l: PostStop ⇒ l.postStop() + case _ ⇒ super.postStop() + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = me match { + case l: PreRestart ⇒ l.preRestart(reason, message) + case _ ⇒ super.preRestart(reason, message) + } + + override def postRestart(reason: Throwable): Unit = me match { + case l: PostRestart ⇒ l.postRestart(reason) + case _ ⇒ super.postRestart(reason) + } + def receive = { case m: MethodCall ⇒ TypedActor.selfReference set proxyVar.get - TypedActor.appReference set system + TypedActor.currentSystem set system try { if (m.isOneWay) m(me) else { @@ -349,25 +343,73 @@ class TypedActor(val settings: ActorSystem.Settings, var ser: Serialization) { sender ! m(me) } } catch { - case e: Exception ⇒ sender ! Status.Failure(e) + case t: Throwable ⇒ sender ! Status.Failure(t); throw t } } } finally { TypedActor.selfReference set null - TypedActor.appReference set null + TypedActor.currentSystem set null } } } - private[akka] class TypedActorInvocationHandler(actorVar: AtomVar[ActorRef], timeout: Timeout) extends InvocationHandler { + /** + * Mix this into your TypedActor to be able to hook into its lifecycle + */ + trait PreStart { + /** + * User overridable callback. + *

+ * Is called when an Actor is started by invoking 'actor'. + */ + def preStart(): Unit = () + } + + /** + * Mix this into your TypedActor to be able to hook into its lifecycle + */ + trait PostStop { + /** + * User overridable callback. + *

+ * Is called when 'actor.stop()' is invoked. + */ + def postStop(): Unit = () + } + + /** + * Mix this into your TypedActor to be able to hook into its lifecycle + */ + trait PreRestart { + /** + * User overridable callback. + *

+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. + * By default it calls postStop() + */ + def preRestart(reason: Throwable, message: Option[Any]): Unit = () + } + + trait PostRestart { + /** + * User overridable callback. + *

+ * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. + * By default it calls preStart() + */ + def postRestart(reason: Throwable): Unit = () + } + + private[akka] class TypedActorInvocationHandler(extension: TypedActorExtension, actorVar: AtomVar[ActorRef], timeout: Timeout) extends InvocationHandler { def actor = actorVar.get def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString - case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean + case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ - MethodCall(ser, method, args) match { + MethodCall(method, args) match { case m if m.isOneWay ⇒ actor ! m; null //Null return value case m if m.returnsFuture_? ⇒ actor.?(m, timeout) case m if m.returnsJOption_? || m.returnsOption_? ⇒ @@ -382,3 +424,67 @@ class TypedActor(val settings: ActorSystem.Settings, var ser: Serialization) { } } } + +case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory { + override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy) + override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot) +} + +class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory with Extension { + import TypedActor._ //Import the goodies from the companion object + protected def actorFactory: ActorRefFactory = system + protected def typedActor = this + + val serialization = SerializationExtension(system) + val settings = system.settings + + /** + * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found + */ + def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match { + case null ⇒ null + case handler ⇒ handler.actor + } + + /** + * Returns wether the supplied AnyRef is a TypedActor proxy or not + */ + def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null + + // Private API + + private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: Option[String], loader: ClassLoader): R = { + val proxyVar = new AtomVar[R] + configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader) + } + + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: Option[String], loader: ClassLoader): R = + createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader) + + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = { + //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling + val actorVar = new AtomVar[ActorRef](null) + val timeout = props.timeout match { + case Props.`defaultTimeout` ⇒ settings.ActorTimeout + case x ⇒ x + } + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(this, actorVar, timeout)).asInstanceOf[T] + proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props) + actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet + proxyVar.get + } + + private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces + + private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = + if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match { + case null ⇒ null + case other ⇒ Proxy.getInvocationHandler(other) match { + case null ⇒ null + case handler: TypedActorInvocationHandler ⇒ handler + case _ ⇒ null + } + } + else null +} \ No newline at end of file From 01e43c3edbde026bf1a3b6ede2ea15fb79b5b437 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 Nov 2011 17:08:10 +0100 Subject: [PATCH 3/4] Use scala.util.control.NoStackTrace --- akka-actor/src/main/scala/akka/event/Logging.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index af3fc25797..bae930cb17 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -14,6 +14,7 @@ import akka.actor.Timeout import akka.dispatch.FutureTimeoutException import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorRefProvider +import scala.util.control.NoStackTrace object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream @@ -315,9 +316,7 @@ object Logging { def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message) /** Null Object used for errors without cause Throwable */ - object NoCause extends RuntimeException { - setStackTrace(Array.empty[StackTraceElement]) - } + object NoCause extends NoStackTrace } case class Warning(logSource: String, message: Any = "") extends LogEvent { From 16850388e0aa73a5b35d84282c35e5220ebd2b20 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 28 Nov 2011 17:24:41 +0100 Subject: [PATCH 4/4] Fixing #1379 - making the DLAR the default sender in tell --- akka-actor/src/main/scala/akka/actor/ActorRef.scala | 2 +- .../src/main/scala/akka/remote/netty/NettyRemoteSupport.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index afa7299669..e62a04938a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -381,7 +381,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { override def isTerminated(): Boolean = true - override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { + override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match { case d: DeadLetter ⇒ eventStream.publish(d) case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index cf922feafe..d12f5ea7e4 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -282,7 +282,7 @@ class ActiveRemoteClientHandler( val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { - def runOnceNow(thunk: ⇒ Unit) = timer.newTimeout(new TimerTask() { + def runOnceNow(thunk: ⇒ Unit): Unit = timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() } }, 0, TimeUnit.MILLISECONDS)