Fixed issue with sender reference in Active Objects

This commit is contained in:
Jonas Bonér 2010-05-28 10:58:32 +02:00
parent 87276a4938
commit 61a7cdf9d5
16 changed files with 127 additions and 91 deletions

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface immutable {}

View file

@ -1,11 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface oneway {}

View file

@ -21,11 +21,9 @@ import java.lang.reflect.{InvocationTargetException, Method}
object Annotations {
import se.scalablesolutions.akka.actor.annotation._
val oneway = classOf[oneway]
val transactionrequired = classOf[transactionrequired]
val prerestart = classOf[prerestart]
val postrestart = classOf[postrestart]
val immutable = classOf[immutable]
val inittransactionalstate = classOf[inittransactionalstate]
}
@ -470,6 +468,8 @@ private[akka] sealed case class AspectInit(
def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout)
}
// FIXME: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
/**
* AspectWerkz Aspect that is turning POJOs into Active Object.
* Is deployed on a 'per-instance' basis.
@ -477,7 +477,6 @@ private[akka] sealed case class AspectInit(
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@Aspect("perInstance")
// TODO: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false
private var target: Class[_] = _
@ -488,10 +487,8 @@ private[akka] sealed class ActiveObjectAspect {
@Around("execution(* *.*(..))")
def invoke(joinPoint: JoinPoint): AnyRef = {
instance = joinPoint.getThis
ActiveObjectContext.sender.value = instance
if (!isInitialized) {
val init = AspectInitRegistry.initFor(instance)
val init = AspectInitRegistry.initFor(joinPoint.getThis)
target = init.target
actorRef = init.actorRef
remoteAddress = init.remoteAddress
@ -509,11 +506,13 @@ private[akka] sealed class ActiveObjectAspect {
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
if (isOneWay(rtti)) {
actorRef ! Invocation(joinPoint, true, true)
val isOneWay = isVoid(rtti)
val sender = ActiveObjectContext.sender.value
if (isOneWay) {
actorRef ! Invocation(joinPoint, true, true, sender)
null.asInstanceOf[AnyRef]
} else {
val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
val result = actorRef !! (Invocation(joinPoint, false, isOneWay, sender), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
@ -521,7 +520,7 @@ private[akka] sealed class ActiveObjectAspect {
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val oneWay_? = isOneWay(rtti) || isVoid(rtti)
val isOneWay = isVoid(rtti)
val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId)
@ -530,14 +529,14 @@ private[akka] sealed class ActiveObjectAspect {
.setUuid(actorRef.uuid)
.setTimeout(timeout)
.setIsActor(false)
.setIsOneWay(oneWay_?)
.setIsOneWay(isOneWay)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = actorRef.registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val remoteMessage = requestBuilder.build
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None)
if (oneWay_?) null // for void methods
if (isOneWay) null // for void methods
else {
if (future.isDefined) {
future.get.await
@ -554,10 +553,8 @@ private[akka] sealed class ActiveObjectAspect {
throw cause
} else future.result
private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway)
private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false
val escapedArgs = for (arg <- args) yield {
@ -576,10 +573,15 @@ private[akka] sealed class ActiveObjectAspect {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable private[akka] case class Invocation(joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean) {
@serializable private[akka] case class Invocation(
joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean, sender: AnyRef) {
override def toString: String = synchronized {
"Invocation [joinPoint: " + joinPoint.toString + ", isOneWay: " + isOneWay + ", isVoid: " + isVoid + "]"
"Invocation [joinPoint: " + joinPoint.toString +
", isOneWay: " + isOneWay +
", isVoid: " + isVoid +
", sender: " + sender +
"]"
}
override def hashCode: Int = synchronized {
@ -587,6 +589,7 @@ private[akka] sealed class ActiveObjectAspect {
result = HashCode.hash(result, joinPoint)
result = HashCode.hash(result, isOneWay)
result = HashCode.hash(result, isVoid)
result = HashCode.hash(result, sender)
result
}
@ -595,7 +598,8 @@ private[akka] sealed class ActiveObjectAspect {
that.isInstanceOf[Invocation] &&
that.asInstanceOf[Invocation].joinPoint == joinPoint &&
that.asInstanceOf[Invocation].isOneWay == isOneWay &&
that.asInstanceOf[Invocation].isVoid == isVoid
that.asInstanceOf[Invocation].isVoid == isVoid &&
that.asInstanceOf[Invocation].sender == sender
}
}
@ -621,7 +625,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) self.makeTransactionRequired
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
context = ctx
@ -667,11 +672,9 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
}
def receive = {
case Invocation(joinPoint, isOneWay, _) =>
context.foreach { ctx =>
val sender = ActiveObjectContext.sender.value
if (sender ne null) ctx._sender = sender
}
case Invocation(joinPoint, isOneWay, _, sender) =>
context.foreach(ctx => if (sender ne null) ctx._sender = sender)
ActiveObjectContext.sender.value = joinPoint.getThis // set next sender
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else self.reply(joinPoint.proceed)
@ -719,8 +722,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
!arg.isInstanceOf[java.lang.Float] &&
!arg.isInstanceOf[java.lang.Double] &&
!arg.isInstanceOf[java.lang.Boolean] &&
!arg.isInstanceOf[java.lang.Character] &&
!arg.getClass.isAnnotationPresent(Annotations.immutable)) {
!arg.isInstanceOf[java.lang.Character]) {
hasMutableArgument = true
}
if (arg.getClass.getName.contains(ActiveObject.AW_PROXY_PREFIX)) unserializable = true

View file

@ -237,7 +237,6 @@ object Actor extends Logging {
}
}
/**
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>

View file

@ -353,14 +353,10 @@ trait ActorRef extends TransactionManagement {
*/
def reply(message: Any) = if(!reply_?(message)) throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." +
"\n\tAlternatively, you can use setReplyToAddress to make sure the actor can be contacted over the network.")
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an Active Object from an instance NOT an Active Object.")
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently
* being processed.
@ -1206,8 +1202,7 @@ sealed class LocalActorRef private[akka](
!message.getClass.isArray &&
!message.isInstanceOf[List[_]] &&
!message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
!message.isInstanceOf[scala.collection.immutable.Set[_]] &&
!message.getClass.isAnnotationPresent(Annotations.immutable)) {
!message.isInstanceOf[scala.collection.immutable.Set[_]]) {
Serializer.Java.deepClone(message)
} else message
} else message

View file

@ -59,6 +59,13 @@ object Dispatchers {
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int = THROUGHPUT) = new ExecutorBasedEventDrivenDispatcher(name, throughput)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name, THROUGHPUT)
/**
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
* <p/>

View file

@ -56,6 +56,8 @@ import se.scalablesolutions.akka.actor.ActorRef
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
@volatile private var active: Boolean = false
val name: String = "event-driven:executor:dispatcher:" + _name

View file

@ -299,11 +299,11 @@ class RemoteClientHandler(val name: String,
} else {
if (reply.hasSupervisorUuid()) {
val supervisorUuid = reply.getSupervisorUuid
if (!supervisors.containsKey(supervisorUuid))
throw new IllegalStateException("Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined)
throw new IllegalStateException("Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
if (!supervisedActor.supervisor.isDefined) throw new IllegalStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply))
}
future.completeWithException(null, parseException(reply))
@ -330,7 +330,8 @@ class RemoteClientHandler(val name: String,
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@ -338,12 +339,14 @@ class RemoteClientHandler(val name: String,
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
}

View file

@ -8,16 +8,16 @@ import org.junit.{Before, After, Test}
import java.util.concurrent.{ CountDownLatch, TimeUnit }
@RunWith(classOf[JUnitRunner])
class ActorUtilTest extends junit.framework.TestCase with Suite with MustMatchers {
class ActorObjectUtilFunctionsTest extends junit.framework.TestCase with Suite with MustMatchers {
import Actor._
@Test def testSpawn = {
val latch = new CountDownLatch(1)
spawn {
latch.countDown
latch.countDown
}
val done = latch.await(5,TimeUnit.SECONDS)
val done = latch.await(10,TimeUnit.SECONDS)
done must be (true)
}
}

View file

@ -1,9 +1,6 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.oneway;
public interface Bar {
@oneway
void bar(String msg);
Ext getExt();
}

View file

@ -1,7 +1,6 @@
package se.scalablesolutions.akka.api;
import com.google.inject.Inject;
import se.scalablesolutions.akka.actor.annotation.oneway;
public class Foo extends se.scalablesolutions.akka.serialization.Serializable.JavaJSON {
@Inject

View file

@ -47,15 +47,32 @@ class AkkaLoader extends Logging {
private def printBanner = {
log.info(
"""
==============================
__ __
_____ | | _| | _______
\__ \ | |/ / |/ /\__ \
/ __ \| <| < / __ \_
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
==================================================
t
t t t
t t tt t
tt t t tt t
t ttttttt t ttt t
t tt ttt t ttt t
t t ttt t ttt t t
tt t ttt ttt ttt t
t t ttt ttt t tt t
t ttt ttt t t
tt ttt ttt t
ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
ttt tt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt tt ttt ttt
tttt ttttttttt tttttttt tttt
ttttttttt ttt ttt ttt ttt ttttttttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt ttt ttt ttt ttt ttt ttt ttt
ttt tt ttt ttt ttt ttt ttt ttt
tttttttt ttt ttt ttt ttt tttttttt
==================================================
""")
log.info(" Running version %s", Config.VERSION)
log.info("==============================")
log.info(" Running version %s", Config.VERSION)
log.info("==================================================")
}
}

View file

@ -159,7 +159,7 @@ private[akka] object MongoStorageBackend extends
val cnt =
if (finish.isDefined) {
val f = finish.get.asInstanceOf[Int]
if (f >= s) Math.min(count, (f - s)) else count
if (f >= s) math.min(count, (f - s)) else count
}
else count

View file

@ -49,7 +49,7 @@ class AccountActor extends Transactor {
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
if (amount > m)
failer !! "Failure"
self.reply(m - amount)
else self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
@ -101,14 +101,24 @@ import org.scalatest.junit.JUnitSuite
class RedisPersistentActorSpec extends JUnitSuite {
@Test
def testSuccessfulDebit = {
val bactor = actorOf(new AccountActor)
val bactor = actorOf[AccountActor]
bactor.start
// val failer = actorOf[PersistentFailerActor]
val failer = actorOf(new PersistentFailerActor)
val a: Option[BigInt] = bactor !! Credit("a-123", 5000)
val failer = actorOf[PersistentFailerActor]
failer.start
val acc = "a-123"
val a: Option[BigInt] = bactor !! Credit(acc, 5000)
println("a = " + a)
println(bactor !! Balance(acc))
/**
bactor !! Debit("a-123", 3000, failer)
val c: Int = (bactor !! LogSize).get
println(c)
bactor !! Debit("a-123", 3000, failer)
assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
val d: Int = (bactor !! LogSize).get
println(d)
bactor !! Credit("a-123", 7000)
assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
@ -116,8 +126,8 @@ class RedisPersistentActorSpec extends JUnitSuite {
bactor !! Debit("a-123", 8000, failer)
assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
/**
val c: Int = (bactor !! LogSize).get
println(c)
assertTrue(7 == c)
import scala.collection.mutable.ArrayBuffer
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
@ -125,9 +135,10 @@ class RedisPersistentActorSpec extends JUnitSuite {
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
**/
**/
}
/**
@Test
def testUnsuccessfulDebit = {
val bactor = actorOf(new AccountActor)
@ -170,4 +181,5 @@ class RedisPersistentActorSpec extends JUnitSuite {
// val c: Int = (bactor !! LogSize).get
// assertTrue(3 == c)
}
**/
}

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.actor.ActiveObject;
import se.scalablesolutions.akka.actor.ActiveObjectContext;
public class Receiver {
private ActiveObjectContext context = null;
public SimpleService receive() {
System.out.println("------ RECEIVE");
return (SimpleService) context.getSender();
}
}

View file

@ -8,6 +8,8 @@ import javax.ws.rs.Path;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import se.scalablesolutions.akka.actor.ActiveObject;
import se.scalablesolutions.akka.actor.ActiveObjectContext;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
@ -28,7 +30,8 @@ public class SimpleService {
private boolean hasStartedTicking = false;
private TransactionalMap<String, Integer> storage;
private Receiver receiver = ActiveObject.newInstance(Receiver.class);
@GET
@Produces({"application/json"})
public String count() {
@ -38,6 +41,8 @@ public class SimpleService {
hasStartedTicking = true;
return "Tick: 0\n";
} else {
// Grabs the sender address and returns it
//SimpleService sender = receiver.receive();
int counter = (Integer)storage.get(KEY).get() + 1;
storage.put(KEY, counter);
return "Tick: " + counter + "\n";