diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index de4512c094..f40261cddb 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -135,7 +135,7 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } - protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { + def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) { if (mailbox.dispatcherLock.tryLock()) { try { executor execute mailbox diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index 006ae6e843..549765d9fe 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -147,6 +147,8 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = def start = if (active.compareAndSet(false, true)) retainNonDaemon + def execute(task: Runnable) {} + def shutdown = if (active.compareAndSet(true, false)) releaseNonDaemon def isShutdown = !active.get diff --git a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala index fe84e04ff2..360c8a143f 100644 --- a/akka-actor/src/main/scala/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MailboxHandling.scala @@ -105,10 +105,10 @@ trait MailboxFactory { */ protected def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = mailboxType match { // FIXME make generic (work for TypedActor as well) - case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef.uuid, ActorType.ScalaActor, None).asInstanceOf[MessageQueue] - case RedisBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("RedisBasedDurableMailbox is not yet supported") - case BeanstalkBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("BeanstalkBasedDurableMailbox is not yet supported") - case ZooKeeperBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("ZooKeeperBasedDurableMailbox is not yet supported") + case FileBasedDurableMailbox(serializer) => EnterpriseModule.createFileBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case ZooKeeperBasedDurableMailbox(serializer) => EnterpriseModule.createZooKeeperBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case BeanstalkBasedDurableMailbox(serializer) => EnterpriseModule.createBeanstalkBasedMailbox(actorRef).asInstanceOf[MessageQueue] + case RedisBasedDurableMailbox(serializer) => EnterpriseModule.createRedisBasedMailbox(actorRef).asInstanceOf[MessageQueue] case AMQPBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("AMQPBasedDurableMailbox is not yet supported") case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 06516e52d4..d2ff59854a 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -65,6 +65,8 @@ trait MessageDispatcher extends MailboxFactory with Logging { def dispatch(invocation: MessageInvocation): Unit + def execute(task: Runnable): Unit + def start: Unit def shutdown: Unit diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 5ad1b89aca..7559785dcf 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -30,6 +30,8 @@ trait ThreadPoolBuilder extends Logging { protected var executor: ExecutorService = _ + def execute(task: Runnable) = executor execute task + def isShutdown = executor.isShutdown def buildThreadPool(): Unit = synchronized { diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 6a719d3834..d0824941ce 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -201,7 +201,7 @@ object ReflectiveAccess extends Logging { object EnterpriseModule { - type FileBasedMailbox = { + type Mailbox = { def enqueue(message: MessageInvocation) def dequeue: MessageInvocation } @@ -222,16 +222,23 @@ object ReflectiveAccess extends Logging { def ensureEnterpriseEnabled = if (!isEnterpriseEnabled) throw new ModuleNotAvailableException( "Feature is only available in Akka Enterprise") - def createFileBasedMailbox( - uuid: Uuid, actorType: ActorType, typedActorInfo: Option[Tuple2[String, String]]): FileBasedMailbox = { + def createFileBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.cluster.FileBasedMailbox", actorRef) + + def createZooKeeperBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.cluster.ZooKeeperBasedMailbox", actorRef) + + def createBeanstalkBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.cluster.BeanstalkBasedMailbox", actorRef) + + def createRedisBasedMailbox(actorRef: ActorRef): Mailbox = createMailbox("se.scalablesolutions.akka.cluster.RedisBasedMailbox", actorRef) + + private def createMailbox(mailboxClassname: String, actorRef: ActorRef): Mailbox = { ensureEnterpriseEnabled createInstance( - "se.scalablesolutions.akka.cluster.FileBasedMailbox", - Array(classOf[Uuid], classOf[ActorType], classOf[Option[Tuple2[String, String]]]), - Array(uuid, actorType, typedActorInfo).asInstanceOf[Array[AnyRef]], + mailboxClassname, + Array(classOf[ActorRef]), + Array(actorRef).asInstanceOf[Array[AnyRef]], loader) - .getOrElse(throw new IllegalActorStateException("Could not create file-based mailbox")) - .asInstanceOf[FileBasedMailbox] + .getOrElse(throw new IllegalActorStateException("Could not create durable mailbox [" + mailboxClassname + "] for actor [" + actorRef + "]")) + .asInstanceOf[Mailbox] } } @@ -249,9 +256,11 @@ object ReflectiveAccess extends Logging { Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { case e: java.lang.reflect.InvocationTargetException => + e.printStackTrace log.error(e.getCause, "Could not instantiate class [%s]", clazz.getName) None case e: Exception => + e.printStackTrace log.error(e.getCause, "Could not instantiate class [%s]", clazz.getName) None } @@ -269,9 +278,11 @@ object ReflectiveAccess extends Logging { Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { case e: java.lang.reflect.InvocationTargetException => + e.printStackTrace log.error(e.getCause, "Could not instantiate class [%s] due to [%s]", fqn, e.toString) None case e: Exception => + e.printStackTrace log.error(e.getCause, "Could not instantiate class [%s] due to [%s]", fqn, e.toString) None } @@ -284,9 +295,11 @@ object ReflectiveAccess extends Logging { Option(instance.get(null).asInstanceOf[T]) } catch { case e: java.lang.reflect.InvocationTargetException => + e.printStackTrace log.error(e.getCause, "Could not instantiate class [%s]", fqn) None case e: Exception => + e.printStackTrace log.error(e.getCause, "Could not instantiate class [%s]", fqn) None } diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 1ea9d8f986..40c5756e04 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -23,7 +23,7 @@ message RemoteActorRefProtocol { } /** - * Defines a remote ActorRef that "remembers" and uses its original typed Actor instance + * Defines a remote Typed ActorRef that "remembers" and uses its original typed Actor instance * on the original node. */ message RemoteTypedActorRefProtocol { diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index c07417c0e2..2482e15b05 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -192,7 +192,7 @@ object ActorSerialization { } val ar = new LocalActorRef( - uuidFrom(protocol.getUuid.getHigh,protocol.getUuid.getLow), + uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), protocol.getId, protocol.getOriginalAddress.getHostname, protocol.getOriginalAddress.getPort, @@ -231,7 +231,7 @@ object RemoteActorSerialization { * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol) + Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s", protocol) RemoteActorRef( protocol.getClassOrServiceName, protocol.getActorClassname, @@ -299,7 +299,11 @@ object RemoteActorSerialization { .setIsOneWay(isOneWay) val id = registerSupervisorAsRemoteActor - if (id.isDefined) requestBuilder.setSupervisorUuid(UuidProtocol.newBuilder.setHigh(id.get.getTime).setLow(id.get.getClockSeqAndNode).build) + if (id.isDefined) requestBuilder.setSupervisorUuid( + UuidProtocol.newBuilder + .setHigh(id.get.getTime) + .setLow(id.get.getClockSeqAndNode) + .build) senderOption.foreach { sender => RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender)