Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2011-11-29 11:31:01 +01:00
commit bcaadb9a1d
11 changed files with 427 additions and 284 deletions

View file

@ -14,7 +14,7 @@ import static org.junit.Assert.*;
public class JavaExtension {
static class Provider implements ExtensionIdProvider {
public ExtensionId lookup() { return defaultInstance; }
public ExtensionId<TestExtension> lookup() { return defaultInstance; }
}
public final static TestExtensionId defaultInstance = new TestExtensionId();

View file

@ -5,7 +5,6 @@ package akka.actor
*/
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.japi.{ Option JOption }
import akka.util.Duration
import akka.util.duration._
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
@ -14,6 +13,9 @@ import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
import akka.serialization.SerializationExtension
import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.japi.{ Creator, Option JOption }
object TypedActorSpec {
@ -135,6 +137,23 @@ object TypedActorSpec {
class StackedImpl extends Stacked {
override def stacked: String = "FOOBAR" //Uppercase
}
trait LifeCycles {
def crash(): Unit
}
class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles {
override def crash(): Unit = throw new IllegalStateException("Crash!")
override def preStart(): Unit = latch.countDown()
override def postStop(): Unit = for (i 1 to 3) latch.countDown()
override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i 1 to 5) latch.countDown()
override def postRestart(reason: Throwable): Unit = for (i 1 to 7) latch.countDown()
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -148,18 +167,18 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
newFooBar(Props().withTimeout(Timeout(d)))
def newFooBar(props: Props): Foo =
system.typedActorOf(classOf[Foo], classOf[Bar], props)
TypedActor(system).typedActorOf(classOf[Foo], classOf[Bar], props)
def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked =
system.typedActorOf(classOf[Stacked], classOf[StackedImpl], props)
TypedActor(system).typedActorOf(classOf[Stacked], classOf[StackedImpl], props)
def mustStop(typedActor: AnyRef) = system.typedActor.stop(typedActor) must be(true)
def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true)
"TypedActors" must {
"be able to instantiate" in {
val t = newFooBar
system.typedActor.isTypedActor(t) must be(true)
TypedActor(system).isTypedActor(t) must be(true)
mustStop(t)
}
@ -169,7 +188,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
}
"not stop non-started ones" in {
system.typedActor.stop(null) must be(false)
TypedActor(system).stop(null) must be(false)
}
"throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in {
@ -188,7 +207,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to call toString" in {
val t = newFooBar
t.toString must be(system.typedActor.getActorRefFor(t).toString)
t.toString must be(TypedActor(system).getActorRefFor(t).toString)
mustStop(t)
}
@ -201,7 +220,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to call hashCode" in {
val t = newFooBar
t.hashCode must be(system.typedActor.getActorRefFor(t).hashCode)
t.hashCode must be(TypedActor(system).getActorRefFor(t).hashCode)
mustStop(t)
}
@ -264,7 +283,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to handle exceptions when calling methods" in {
filterEvents(EventFilter[IllegalStateException]("expected")) {
val boss = actorOf(Props(context {
case p: Props context.sender ! context.typedActorOf(classOf[Foo], classOf[Bar], p)
case p: Props context.sender ! TypedActor(context).typedActorOf(classOf[Foo], classOf[Bar], p)
}).withFaultHandler(OneForOneStrategy {
case e: IllegalStateException if e.getMessage == "expected" FaultHandlingStrategy.Resume
}))
@ -296,7 +315,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
}
"be able to support implementation only typed actors" in {
val t = system.typedActorOf[Foo, Bar](Props())
val t = TypedActor(system).typedActorOf[Foo, Bar](Props())
val f = t.futurePigdog(200)
val f2 = t.futurePigdog(0)
f2.isCompleted must be(false)
@ -306,7 +325,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
}
"be able to support implementation only typed actors with complex interfaces" in {
val t = system.typedActorOf[Stackable1 with Stackable2, StackedImpl]()
val t = TypedActor(system).typedActorOf[Stackable1 with Stackable2, StackedImpl]()
t.stackable1 must be("foo")
t.stackable2 must be("bar")
mustStop(t)
@ -333,17 +352,16 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to serialize and deserialize invocations" in {
import java.io._
val serialization = SerializationExtension(system)
val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
out.writeObject(m)
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
out.writeObject(m)
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
mNew.method must be(m.method)
@ -353,17 +371,16 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to serialize and deserialize invocations' parameters" in {
import java.io._
val someFoo: Foo = new Bar
val serialization = SerializationExtension(system)
val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
out.writeObject(m)
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val m = TypedActor.MethodCall(classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
out.writeObject(m)
out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
mNew.method must be(m.method)
@ -375,5 +392,14 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
mNew.parameters(2).asInstanceOf[Int] must be === 1
}
}
"be able to override lifecycle callbacks" in {
val latch = new CountDownLatch(16)
val ta = TypedActor(system)
val t: LifeCycles = ta.typedActorOf(classOf[LifeCycles], new Creator[LifeCyclesImpl] { def create = new LifeCyclesImpl(latch) }, Props())
t.crash()
ta.poisonPill(t)
latch.await(10, TimeUnit.SECONDS) must be === true
}
}
}

View file

@ -29,7 +29,9 @@ class TypedActorPoolSpec extends AkkaSpec {
import ActorPoolSpec._
"Actor Pool (2)" must {
"support typed actors" in {
val pool = system.createProxy[Foo](new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
val ta = TypedActor(system)
val pool = ta.createProxy[Foo](new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
val typedActor = TypedActor(context)
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
@ -38,7 +40,7 @@ class TypedActorPoolSpec extends AkkaSpec {
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance(p: Props) = system.typedActor.getActorRefFor(context.typedActorOf[Foo, FooImpl](props = p.withTimeout(10 seconds)))
def instance(p: Props) = typedActor.getActorRefFor(typedActor.typedActorOf[Foo, FooImpl](props = p.withTimeout(10 seconds)))
def receive = _route
}, Props().withTimeout(10 seconds).withFaultHandler(faultHandler))
@ -47,7 +49,7 @@ class TypedActorPoolSpec extends AkkaSpec {
for ((i, r) results)
r.get must equal(i * i)
system.typedActor.stop(pool)
ta.stop(pool)
}
}
}

View file

@ -16,7 +16,7 @@ import akka.util.{ Duration, Helpers }
* Exposes contextual information for the actor and the current message.
* TODO: everything here for current compatibility - could be limited more
*/
trait ActorContext extends ActorRefFactory with TypedActorFactory {
trait ActorContext extends ActorRefFactory {
def self: ActorRef
@ -81,8 +81,6 @@ private[akka] class ActorCell(
protected final def guardian = self
protected def typedActor = system.typedActor
final def provider = system.provider
override def receiveTimeout: Option[Long] = if (receiveTimeoutData._1 > 0) Some(receiveTimeoutData._1) else None

View file

@ -381,7 +381,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
override def isTerminated(): Boolean = true
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
case d: DeadLetter eventStream.publish(d)
case _ eventStream.publish(DeadLetter(message, sender, this))
}

View file

@ -8,12 +8,7 @@ import akka.actor._
import akka.event._
import akka.dispatch._
import akka.util.duration._
import java.net.InetAddress
import com.eaio.uuid.UUID
import akka.serialization.Serialization
import akka.remote.RemoteAddress
import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.io.File
@ -25,10 +20,8 @@ import java.lang.reflect.InvocationTargetException
import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import scala.annotation.tailrec
import akka.serialization.SerializationExtension
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
object ActorSystem {
@ -156,7 +149,7 @@ object ActorSystem {
* configuration, e.g. dispatchers, deployments, remote capabilities and
* addresses. It is also the entry point for creating or looking up actors.
*/
abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
abstract class ActorSystem extends ActorRefFactory {
import ActorSystem._
/**
@ -219,9 +212,6 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
// FIXME: do not publish this
def deadLetterMailbox: Mailbox
// FIXME: TypedActor should be an extension
def typedActor: TypedActor
/**
* Light-weight scheduler for running asynchronous tasks after some deadline
* in the future. Not terribly precise but cheap.
@ -349,15 +339,9 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
private final val nextName = new AtomicLong
override protected def randomName(): String = Helpers.base64(nextName.incrementAndGet())
@volatile
private var _typedActor: TypedActor = _
def typedActor = _typedActor
def /(actorName: String): ActorPath = guardian.path / actorName
private lazy val _start: this.type = {
// TODO can we do something better than loading SerializationExtension from here?
_typedActor = new TypedActor(settings, SerializationExtension(this))
provider.init(this)
deadLetters.init(dispatcher, provider.rootPath)
// this starts the reaper actor and the user-configured logging subscribers, which are also actors

View file

@ -29,7 +29,22 @@ trait Extension
* otherwise you'll get the same extension loaded multiple times.
*/
trait ExtensionId[T <: Extension] {
/**
* Returns an instance of the extension identified by this ExtensionId instance.
*/
def apply(system: ActorSystem): T = system.registerExtension(this)
/**
* Returns an instance of the extension identified by this ExtensionId instance.
* Java API
*/
def get(system: ActorSystem): T = apply(system)
/**
* Is used by Akka to instantiate the Extension identified by this ExtensionId,
* internal use only.
*/
def createExtension(system: ActorSystemImpl): T
}

View file

@ -12,12 +12,189 @@ import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._
import akka.serialization.SerializationExtension
object TypedActor {
trait TypedActorFactory {
protected def actorFactory: ActorRefFactory
protected def typedActor: TypedActorExtension
/**
* Stops the underlying ActorRef for the supplied TypedActor proxy,
* if any, returns whether it could find the find the ActorRef or not
*/
def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null false
case ref ref.stop; true
}
/**
* Sends a PoisonPill the underlying ActorRef for the supplied TypedActor proxy,
* if any, returns whether it could find the find the ActorRef or not
*/
def poisonPill(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null false
case ref ref ! PoisonPill; true
}
/**
* Returns wether the supplied AnyRef is a TypedActor proxy or not
*/
def isTypedActor(proxyOrNot: AnyRef): Boolean
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
def getActorRefFor(proxy: AnyRef): ActorRef
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the suppli ed interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, impl, impl.newInstance, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(actorFactory, impl, impl.newInstance, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
typedActor.createProxyAndTypedActor(actorFactory, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader)
}
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](constructor: Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R =
typedActor.createProxy[R](actorFactory, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R =
typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) constructor.create, props, None, loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) constructor.create, props, Some(name), loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, loader: ClassLoader): R =
typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) constructor, props, None, loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) constructor, props, Some(name), loader)
}
object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvider {
def lookup() = this
def createExtension(system: ActorSystemImpl): TypedActorExtension = new TypedActorExtension(system)
/**
* Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
* will be children to the specified context, this allows for creating hierarchies of TypedActors.
* Do _not_ let this instance escape the TypedActor since that will not be thread-safe.
*/
def apply(context: ActorContext): TypedActorFactory = ContextualTypedActorFactory(apply(context.system), context)
/**
* Returns a contextual TypedActorFactory of this extension, this means that any TypedActors created by this TypedActorExtension
* will be children to the specified context, this allows for creating hierarchies of TypedActors.
* Do _not_ let this instance escape the TypedActor since that will not be thread-safe.
*
* Java API
*/
def get(context: ActorContext): TypedActorFactory = apply(context)
/**
* This class represents a Method call, and has a reference to the Method to be called and the parameters to supply
* It's sent to the ActorRef backing the TypedActor and can be serialized and deserialized
*/
case class MethodCall(ser: Serialization, method: Method, parameters: Array[AnyRef]) {
case class MethodCall(method: Method, parameters: Array[AnyRef]) {
def isOneWay = method.getReturnType == java.lang.Void.TYPE
def returnsFuture_? = classOf[Future[_]].isAssignableFrom(method.getReturnType)
@ -41,7 +218,7 @@ object TypedActor {
case null SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null)
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]())
case ps
val serializers: Array[Serializer] = ps map ser.findSerializerFor
val serializers: Array[Serializer] = ps map SerializationExtension(Serialization.currentSystem.value).findSerializerFor
val serializedParameters: Array[Array[Byte]] = Array.ofDim[Array[Byte]](serializers.length)
for (i 0 until serializers.length)
serializedParameters(i) = serializers(i) toBinary parameters(i) //Mutable for the sake of sanity
@ -63,21 +240,21 @@ object TypedActor {
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
val serialization = SerializationExtension(system)
MethodCall(serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null null
case a if a.length == 0 Array[AnyRef]()
case a
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
for (i 0 until a.length) {
for (i 0 until a.length)
deserializedParameters(i) = serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
}
deserializedParameters
})
}
}
private val selfReference = new ThreadLocal[AnyRef]
private val appReference = new ThreadLocal[ActorSystem]
private val currentSystem = new ThreadLocal[ActorSystem]
/**
* Returns the reference to the proxy when called inside a method call in a TypedActor
@ -105,7 +282,7 @@ object TypedActor {
/**
* Returns the akka system (for a TypedActor) when inside a method call in a TypedActor.
*/
def system = appReference.get match {
def system = currentSystem.get match {
case null throw new IllegalStateException("Calling TypedActor.system outside of a TypedActor implementation method!")
case some some
}
@ -119,220 +296,37 @@ object TypedActor {
* Returns the default timeout (for a TypedActor) when inside a method call in a TypedActor.
*/
implicit def timeout = system.settings.ActorTimeout
}
trait TypedActorFactory { this: ActorRefFactory
protected def typedActor: TypedActor
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
* Implementation of TypedActor as an Actor
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R =
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(this, interface, impl.create, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, None, loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, Some(name), loader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*/
def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, Option(name), if (loader eq null) clazz.getClassLoader else loader)
}
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](constructor: Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R =
typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R =
typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) constructor.create, props, None, loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) constructor.create, props, Some(name), loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, loader: ClassLoader): R =
typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) constructor, props, None, loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) constructor, props, Some(name), loader)
}
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
* A TypedActor in Akka is an implementation of the Active Objects Pattern, i.e. an object with asynchronous method dispatch
*
* It consists of 2 parts:
* The Interface
* The Implementation
*
* Given a combination of Interface and Implementation, a JDK Dynamic Proxy object with the Interface will be returned
*
* The semantics is as follows,
* any methods in the Interface that returns Unit/void will use fire-and-forget semantics (same as Actor !)
* any methods in the Interface that returns Option/JOption will use ask + block-with-timeout-return-none-if-timeout semantics
* any methods in the Interface that returns anything else will use ask + block-with-timeout-throw-if-timeout semantics
*
* TypedActors needs, just like Actors, to be Stopped when they are no longer needed, use TypedActor.stop(proxy)
*/
class TypedActor(val settings: ActorSystem.Settings, var ser: Serialization) {
import TypedActor.MethodCall
/**
* Stops the underlying ActorRef for the supplied TypedActor proxy, if any, returns whether it could stop it or not
*/
def stop(proxy: AnyRef): Boolean = getActorRefFor(proxy) match {
case null false
case ref ref.stop; true
}
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match {
case null null
case handler handler.actor
}
/**
* Returns wether the supplied AnyRef is a TypedActor proxy or not
*/
def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null
/* Internal API */
private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {
case null null
case other Proxy.getInvocationHandler(other) match {
case null null
case handler: TypedActorInvocationHandler handler
case _ null
}
}
else null
private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, props: Props, name: Option[String], loader: ClassLoader): R = {
val proxyVar = new AtomVar[R]
configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader)
}
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: T, props: Props, name: Option[String], loader: ClassLoader): R =
createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), props, name, loader)
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val timeout = props.timeout match {
case Props.`defaultTimeout` settings.ActorTimeout
case x x
}
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar, timeout)).asInstanceOf[T]
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props)
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: T) extends Actor {
val me = createInstance
override def preStart(): Unit = me match {
case l: PreStart l.preStart()
case _ super.preStart()
}
override def postStop(): Unit = me match {
case l: PostStop l.postStop()
case _ super.postStop()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = me match {
case l: PreRestart l.preRestart(reason, message)
case _ super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = me match {
case l: PostRestart l.postRestart(reason)
case _ super.postRestart(reason)
}
def receive = {
case m: MethodCall
TypedActor.selfReference set proxyVar.get
TypedActor.appReference set system
TypedActor.currentSystem set system
try {
if (m.isOneWay) m(me)
else {
@ -349,25 +343,73 @@ class TypedActor(val settings: ActorSystem.Settings, var ser: Serialization) {
sender ! m(me)
}
} catch {
case e: Exception sender ! Status.Failure(e)
case t: Throwable sender ! Status.Failure(t); throw t
}
}
} finally {
TypedActor.selfReference set null
TypedActor.appReference set null
TypedActor.currentSystem set null
}
}
}
private[akka] class TypedActorInvocationHandler(actorVar: AtomVar[ActorRef], timeout: Timeout) extends InvocationHandler {
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/
trait PreStart {
/**
* User overridable callback.
* <p/>
* Is called when an Actor is started by invoking 'actor'.
*/
def preStart(): Unit = ()
}
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/
trait PostStop {
/**
* User overridable callback.
* <p/>
* Is called when 'actor.stop()' is invoked.
*/
def postStop(): Unit = ()
}
/**
* Mix this into your TypedActor to be able to hook into its lifecycle
*/
trait PreRestart {
/**
* User overridable callback.
* <p/>
* Is called on a crashed Actor right BEFORE it is restarted to allow clean
* up of resources before Actor is terminated.
* By default it calls postStop()
*/
def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
}
trait PostRestart {
/**
* User overridable callback.
* <p/>
* Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
* By default it calls preStart()
*/
def postRestart(reason: Throwable): Unit = ()
}
private[akka] class TypedActorInvocationHandler(extension: TypedActorExtension, actorVar: AtomVar[ActorRef], timeout: Timeout) extends InvocationHandler {
def actor = actorVar.get
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" actor.toString
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "hashCode" actor.hashCode.asInstanceOf[AnyRef]
case _
MethodCall(ser, method, args) match {
MethodCall(method, args) match {
case m if m.isOneWay actor ! m; null //Null return value
case m if m.returnsFuture_? actor.?(m, timeout)
case m if m.returnsJOption_? || m.returnsOption_?
@ -382,3 +424,67 @@ class TypedActor(val settings: ActorSystem.Settings, var ser: Serialization) {
}
}
}
case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory {
override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy)
override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot)
}
class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory with Extension {
import TypedActor._ //Import the goodies from the companion object
protected def actorFactory: ActorRefFactory = system
protected def typedActor = this
val serialization = SerializationExtension(system)
val settings = system.settings
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
def getActorRefFor(proxy: AnyRef): ActorRef = invocationHandlerFor(proxy) match {
case null null
case handler handler.actor
}
/**
* Returns wether the supplied AnyRef is a TypedActor proxy or not
*/
def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null
// Private API
private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, props: Props, name: Option[String], loader: ClassLoader): R = {
val proxyVar = new AtomVar[R]
configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader)
}
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: T, props: Props, name: Option[String], loader: ClassLoader): R =
createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), props, name, loader)
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val timeout = props.timeout match {
case Props.`defaultTimeout` settings.ActorTimeout
case x x
}
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(this, actorVar, timeout)).asInstanceOf[T]
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props)
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces
private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {
case null null
case other Proxy.getInvocationHandler(other) match {
case null null
case handler: TypedActorInvocationHandler handler
case _ null
}
}
else null
}

View file

@ -14,6 +14,7 @@ import akka.actor.Timeout
import akka.dispatch.FutureTimeoutException
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
@ -268,6 +269,7 @@ object Logging {
val AllLogLevels = Seq(ErrorLevel: AnyRef, WarningLevel, InfoLevel, DebugLevel).asInstanceOf[Seq[LogLevel]]
val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
val errorFormatWithoutCause = "[ERROR] [%s] [%s] [%s] %s".intern
val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern
val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern
val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern
@ -311,7 +313,10 @@ object Logging {
def level = ErrorLevel
}
object Error {
def apply(logSource: String, message: Any) = new Error(new EventHandlerException, logSource, message)
def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message)
/** Null Object used for errors without cause Throwable */
object NoCause extends NoStackTrace
}
case class Warning(logSource: String, message: Any = "") extends LogEvent {
@ -363,13 +368,15 @@ object Logging {
}
}
def error(event: Error) =
println(errorFormat.format(
def error(event: Error) = {
val f = if (event.cause == Error.NoCause) errorFormatWithoutCause else errorFormat
println(f.format(
timestamp,
event.thread.getName,
event.logSource,
event.message,
stackTraceFor(event.cause)))
}
def warning(event: Warning) =
println(warningFormat.format(
@ -429,14 +436,14 @@ object Logging {
}
def stackTraceFor(e: Throwable) = {
if (e ne null) {
if ((e eq null) || e == Error.NoCause) {
""
} else {
import java.io.{ StringWriter, PrintWriter }
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
sw.toString
} else {
"[NO STACK TRACE]"
}
}

View file

@ -282,7 +282,7 @@ class ActiveRemoteClientHandler(
val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler {
def runOnceNow(thunk: Unit) = timer.newTimeout(new TimerTask() {
def runOnceNow(thunk: Unit): Unit = timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() }
}, 0, TimeUnit.MILLISECONDS)

View file

@ -37,9 +37,13 @@ class Slf4jEventHandler extends Actor with SLF4JLogging {
val mdcThreadAttributeName = "sourceThread"
def receive = {
case event @ Error(cause, logSource, message)
withMdc(mdcThreadAttributeName, event.thread.getName) {
Logger(logSource).error(message.toString, cause)
cause match {
case Error.NoCause Logger(logSource).error(message.toString)
case _ Logger(logSource).error(message.toString, cause)
}
}
case event @ Warning(logSource, message)
@ -62,7 +66,8 @@ class Slf4jEventHandler extends Actor with SLF4JLogging {
sender ! LoggerInitialized
}
def withMdc(name: String, value: String)(logStatement: Unit) {
@inline
final def withMdc(name: String, value: String)(logStatement: Unit) {
MDC.put(name, value)
try {
logStatement