Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
910b61dd91
21 changed files with 296 additions and 289 deletions
|
|
@ -19,41 +19,43 @@ import se.scalablesolutions.akka.util.Logging
|
|||
*/
|
||||
object AMQP {
|
||||
case class ConnectionParameters(
|
||||
host: String = ConnectionFactory.DEFAULT_HOST,
|
||||
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
|
||||
username: String = ConnectionFactory.DEFAULT_USER,
|
||||
password: String = ConnectionFactory.DEFAULT_PASS,
|
||||
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
|
||||
initReconnectDelay: Long = 5000,
|
||||
connectionCallback: Option[ActorRef] = None)
|
||||
host: String = ConnectionFactory.DEFAULT_HOST,
|
||||
port: Int = ConnectionFactory.DEFAULT_AMQP_PORT,
|
||||
username: String = ConnectionFactory.DEFAULT_USER,
|
||||
password: String = ConnectionFactory.DEFAULT_PASS,
|
||||
virtualHost: String = ConnectionFactory.DEFAULT_VHOST,
|
||||
initReconnectDelay: Long = 5000,
|
||||
connectionCallback: Option[ActorRef] = None)
|
||||
|
||||
case class ChannelParameters(
|
||||
shutdownListener: Option[ShutdownListener] = None,
|
||||
channelCallback: Option[ActorRef] = None)
|
||||
shutdownListener: Option[ShutdownListener] = None,
|
||||
channelCallback: Option[ActorRef] = None)
|
||||
|
||||
case class ExchangeParameters(
|
||||
exchangeName: String,
|
||||
exchangeType: ExchangeType,
|
||||
exchangeDurable: Boolean = false,
|
||||
exchangeAutoDelete: Boolean = true,
|
||||
exchangePassive: Boolean = false,
|
||||
configurationArguments: Map[String, AnyRef] = Map())
|
||||
exchangeName: String,
|
||||
exchangeType: ExchangeType,
|
||||
exchangeDurable: Boolean = false,
|
||||
exchangeAutoDelete: Boolean = true,
|
||||
exchangePassive: Boolean = false,
|
||||
configurationArguments: Map[String, AnyRef] = Map())
|
||||
|
||||
case class ProducerParameters(exchangeParameters: ExchangeParameters,
|
||||
producerId: Option[String] = None,
|
||||
returnListener: Option[ReturnListener] = None,
|
||||
channelParameters: Option[ChannelParameters] = None)
|
||||
case class ProducerParameters(
|
||||
exchangeParameters: ExchangeParameters,
|
||||
producerId: Option[String] = None,
|
||||
returnListener: Option[ReturnListener] = None,
|
||||
channelParameters: Option[ChannelParameters] = None)
|
||||
|
||||
case class ConsumerParameters(exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
deliveryHandler: ActorRef,
|
||||
queueName: Option[String] = None,
|
||||
queueDurable: Boolean = false,
|
||||
queueAutoDelete: Boolean = true,
|
||||
queuePassive: Boolean = false,
|
||||
queueExclusive: Boolean = false,
|
||||
selfAcknowledging: Boolean = true,
|
||||
channelParameters: Option[ChannelParameters] = None) {
|
||||
case class ConsumerParameters(
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
deliveryHandler: ActorRef,
|
||||
queueName: Option[String] = None,
|
||||
queueDurable: Boolean = false,
|
||||
queueAutoDelete: Boolean = true,
|
||||
queuePassive: Boolean = false,
|
||||
queueExclusive: Boolean = false,
|
||||
selfAcknowledging: Boolean = true,
|
||||
channelParameters: Option[ChannelParameters] = None) {
|
||||
if (queueDurable && queueName.isEmpty) {
|
||||
throw new IllegalArgumentException("A queue name is required when requesting a durable queue.")
|
||||
}
|
||||
|
|
@ -74,30 +76,33 @@ object AMQP {
|
|||
|
||||
def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = {
|
||||
val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters))
|
||||
consumer.startLink(consumerParameters.deliveryHandler)
|
||||
val handler = consumerParameters.deliveryHandler
|
||||
if (handler.supervisor.isEmpty) consumer.startLink(handler)
|
||||
connection.startLink(consumer)
|
||||
consumer ! Start
|
||||
consumer
|
||||
}
|
||||
|
||||
def newRpcClient[O,I](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcClientSerializer[O,I],
|
||||
channelParameters: Option[ChannelParameters] = None): ActorRef = {
|
||||
def newRpcClient[O,I](
|
||||
connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcClientSerializer[O,I],
|
||||
channelParameters: Option[ChannelParameters] = None): ActorRef = {
|
||||
val rpcActor: ActorRef = actorOf(new RpcClientActor[O,I](exchangeParameters, routingKey, serializer, channelParameters))
|
||||
connection.startLink(rpcActor)
|
||||
rpcActor ! Start
|
||||
rpcActor
|
||||
}
|
||||
|
||||
def newRpcServer[I,O](connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcServerSerializer[I,O],
|
||||
requestHandler: I => O,
|
||||
queueName: Option[String] = None,
|
||||
channelParameters: Option[ChannelParameters] = None) = {
|
||||
def newRpcServer[I,O](
|
||||
connection: ActorRef,
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcServerSerializer[I,O],
|
||||
requestHandler: I => O,
|
||||
queueName: Option[String] = None,
|
||||
channelParameters: Option[ChannelParameters] = None) = {
|
||||
val producer = newProducer(connection, new ProducerParameters(new ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters))
|
||||
val rpcServer = actorOf(new RpcServerActor[I,O](producer, serializer, requestHandler))
|
||||
val consumer = newConsumer(connection, new ConsumerParameters(exchangeParameters, routingKey, rpcServer
|
||||
|
|
|
|||
|
|
@ -11,19 +11,19 @@ import com.rabbitmq.client.ShutdownSignalException
|
|||
sealed trait AMQPMessage
|
||||
sealed trait InternalAMQPMessage extends AMQPMessage
|
||||
|
||||
case class Message(payload: Array[Byte],
|
||||
routingKey: String,
|
||||
mandatory: Boolean = false,
|
||||
immediate: Boolean = false,
|
||||
properties: Option[BasicProperties] = None) extends AMQPMessage
|
||||
|
||||
case class Delivery(payload: Array[Byte],
|
||||
routingKey: String,
|
||||
deliveryTag: Long,
|
||||
properties: BasicProperties,
|
||||
sender: Option[ActorRef]) extends AMQPMessage
|
||||
|
||||
case class Message(
|
||||
payload: Array[Byte],
|
||||
routingKey: String,
|
||||
mandatory: Boolean = false,
|
||||
immediate: Boolean = false,
|
||||
properties: Option[BasicProperties] = None) extends AMQPMessage
|
||||
|
||||
case class Delivery(
|
||||
payload: Array[Byte],
|
||||
routingKey: String,
|
||||
deliveryTag: Long,
|
||||
properties: BasicProperties,
|
||||
sender: Option[ActorRef]) extends AMQPMessage
|
||||
|
||||
// connection messages
|
||||
case object Connect extends AMQPMessage
|
||||
|
|
@ -51,10 +51,10 @@ private[akka] case class ConnectionShutdown(cause: ShutdownSignalException) exte
|
|||
private[akka] case class ChannelShutdown(cause: ShutdownSignalException) extends InternalAMQPMessage
|
||||
|
||||
private[akka] class MessageNotDeliveredException(
|
||||
val message: String,
|
||||
val replyCode: Int,
|
||||
val replyText: String,
|
||||
val exchange: String,
|
||||
val routingKey: String,
|
||||
val properties: BasicProperties,
|
||||
val body: Array[Byte]) extends RuntimeException(message)
|
||||
val message: String,
|
||||
val replyCode: Int,
|
||||
val replyText: String,
|
||||
val exchange: String,
|
||||
val routingKey: String,
|
||||
val properties: BasicProperties,
|
||||
val body: Array[Byte]) extends RuntimeException(message)
|
||||
|
|
|
|||
|
|
@ -13,7 +13,8 @@ import com.rabbitmq.client.AMQP.BasicProperties
|
|||
import java.lang.Throwable
|
||||
|
||||
private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
|
||||
extends FaultTolerantChannelActor(consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
|
||||
extends FaultTolerantChannelActor(
|
||||
consumerParameters.exchangeParameters, consumerParameters.channelParameters) {
|
||||
|
||||
import consumerParameters._
|
||||
import exchangeParameters._
|
||||
|
|
@ -34,10 +35,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
|
|||
queueName match {
|
||||
case Some(name) =>
|
||||
log.debug("Declaring new queue [%s] for %s", name, toString)
|
||||
if (queuePassive) {
|
||||
ch.queueDeclarePassive(name)
|
||||
} else {
|
||||
ch.queueDeclare(name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments))
|
||||
if (queuePassive) ch.queueDeclarePassive(name)
|
||||
else {
|
||||
ch.queueDeclare(
|
||||
name, queueDurable, queueExclusive, queueAutoDelete,
|
||||
JavaConversions.asMap(configurationArguments))
|
||||
}
|
||||
case None =>
|
||||
log.debug("Declaring new generated queue for %s", toString)
|
||||
|
|
@ -85,7 +87,6 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
|
|||
throw new IllegalArgumentException(errorMessage)
|
||||
}
|
||||
|
||||
|
||||
override def preRestart(reason: Throwable) = {
|
||||
listenerTag = None
|
||||
super.preRestart(reason)
|
||||
|
|
@ -97,11 +98,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters)
|
|||
super.shutdown
|
||||
}
|
||||
|
||||
override def toString(): String =
|
||||
override def toString =
|
||||
"AMQP.Consumer[id= "+ self.id +
|
||||
", exchange=" + exchangeName +
|
||||
", exchangeType=" + exchangeType +
|
||||
", durable=" + exchangeDurable +
|
||||
", autoDelete=" + exchangeAutoDelete + "]"
|
||||
", exchange=" + exchangeName +
|
||||
", exchangeType=" + exchangeType +
|
||||
", durable=" + exchangeDurable +
|
||||
", autoDelete=" + exchangeAutoDelete + "]"
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -60,7 +60,6 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
|
|||
}
|
||||
|
||||
private def connect = if (connection.isEmpty || !connection.get.isOpen) {
|
||||
|
||||
try {
|
||||
connection = Some(connectionFactory.newConnection)
|
||||
connection.foreach {
|
||||
|
|
@ -118,5 +117,4 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
|
|||
notifyCallback(Reconnecting)
|
||||
connect
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,8 @@ import com.rabbitmq.client._
|
|||
import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters
|
||||
|
||||
private[amqp] class ProducerActor(producerParameters: ProducerParameters)
|
||||
extends FaultTolerantChannelActor(producerParameters.exchangeParameters, producerParameters.channelParameters) {
|
||||
extends FaultTolerantChannelActor(
|
||||
producerParameters.exchangeParameters, producerParameters.channelParameters) {
|
||||
|
||||
import producerParameters._
|
||||
import exchangeParameters._
|
||||
|
|
@ -32,29 +33,29 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters)
|
|||
case Some(listener) => ch.setReturnListener(listener)
|
||||
case None => ch.setReturnListener(new ReturnListener() {
|
||||
def handleBasicReturn(
|
||||
replyCode: Int,
|
||||
replyText: String,
|
||||
exchange: String,
|
||||
routingKey: String,
|
||||
properties: com.rabbitmq.client.AMQP.BasicProperties,
|
||||
body: Array[Byte]) = {
|
||||
replyCode: Int,
|
||||
replyText: String,
|
||||
exchange: String,
|
||||
routingKey: String,
|
||||
properties: com.rabbitmq.client.AMQP.BasicProperties,
|
||||
body: Array[Byte]) = {
|
||||
throw new MessageNotDeliveredException(
|
||||
"Could not deliver message [" + body +
|
||||
"] with reply code [" + replyCode +
|
||||
"] with reply text [" + replyText +
|
||||
"] and routing key [" + routingKey +
|
||||
"] to exchange [" + exchange + "]",
|
||||
"] with reply code [" + replyCode +
|
||||
"] with reply text [" + replyText +
|
||||
"] and routing key [" + routingKey +
|
||||
"] to exchange [" + exchange + "]",
|
||||
replyCode, replyText, exchange, routingKey, properties, body)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
override def toString(): String =
|
||||
override def toString =
|
||||
"AMQP.Poducer[id= "+ self.id +
|
||||
", exchange=" + exchangeName +
|
||||
", exchangeType=" + exchangeType +
|
||||
", durable=" + exchangeDurable +
|
||||
", autoDelete=" + exchangeAutoDelete + "]"
|
||||
", exchange=" + exchangeName +
|
||||
", exchangeType=" + exchangeType +
|
||||
", durable=" + exchangeDurable +
|
||||
", autoDelete=" + exchangeAutoDelete + "]"
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,10 +10,12 @@ import se.scalablesolutions.akka.amqp.AMQP.{ChannelParameters, ExchangeParameter
|
|||
import com.rabbitmq.client.{Channel, RpcClient}
|
||||
import se.scalablesolutions.akka.amqp.AMQP.{RpcClientSerializer, ChannelParameters, ExchangeParameters}
|
||||
|
||||
class RpcClientActor[I,O](exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcClientSerializer[I,O],
|
||||
channelParameters: Option[ChannelParameters] = None) extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
|
||||
class RpcClientActor[I,O](
|
||||
exchangeParameters: ExchangeParameters,
|
||||
routingKey: String,
|
||||
serializer: RpcClientSerializer[I,O],
|
||||
channelParameters: Option[ChannelParameters] = None)
|
||||
extends FaultTolerantChannelActor(exchangeParameters, channelParameters) {
|
||||
|
||||
import exchangeParameters._
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,10 @@ import se.scalablesolutions.akka.actor.{ActorRef, Actor}
|
|||
import com.rabbitmq.client.AMQP.BasicProperties
|
||||
import se.scalablesolutions.akka.amqp.AMQP.RpcServerSerializer
|
||||
|
||||
class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,O], requestHandler: I => O) extends Actor {
|
||||
class RpcServerActor[I,O](
|
||||
producer: ActorRef,
|
||||
serializer: RpcServerSerializer[I,O],
|
||||
requestHandler: I => O) extends Actor {
|
||||
|
||||
log.info("%s started", this)
|
||||
|
||||
|
|
@ -29,6 +32,5 @@ class RpcServerActor[I,O](producer: ActorRef, serializer: RpcServerSerializer[I,
|
|||
case Acknowledged(tag) => log.debug("%s acknowledged delivery with tag %d", this, tag)
|
||||
}
|
||||
|
||||
override def toString(): String =
|
||||
"AMQP.RpcServer[]"
|
||||
override def toString = "AMQP.RpcServer[]"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ object Actor extends Logging {
|
|||
type Receive = PartialFunction[Any, Unit]
|
||||
|
||||
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
|
||||
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor with type T.
|
||||
* <pre>
|
||||
|
|
|
|||
|
|
@ -203,7 +203,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
|
|||
|
||||
protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg }
|
||||
protected[akka] def currentMessage = guard.withGuard { _currentMessage }
|
||||
|
||||
|
||||
/** comparison only takes uuid into account
|
||||
*/
|
||||
def compareTo(other: ActorRef) = this.uuid.compareTo(other.uuid)
|
||||
|
|
@ -1260,8 +1260,8 @@ class LocalActorRef private[akka](
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] case class RemoteActorRef private[akka] (
|
||||
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
|
||||
uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader])
|
||||
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
|
||||
extends ActorRef {
|
||||
_uuid = uuuid
|
||||
timeout = _timeout
|
||||
|
|
|
|||
|
|
@ -49,13 +49,13 @@ object ActorRegistry extends ListenerManagement {
|
|||
|
||||
/**
|
||||
* Invokes the function on all known actors until it returns Some
|
||||
* Returns None if the function never returns Some
|
||||
* Returns None if the function never returns Some
|
||||
*/
|
||||
def find[T](f: (ActorRef) => Option[T]) : Option[T] = {
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val result = f(elements.nextElement)
|
||||
|
||||
|
||||
if(result.isDefined)
|
||||
return result
|
||||
}
|
||||
|
|
@ -134,7 +134,7 @@ object ActorRegistry extends ListenerManagement {
|
|||
newSet add actor
|
||||
|
||||
val oldSet = actorsById.putIfAbsent(id,newSet)
|
||||
|
||||
|
||||
//Parry for two simultaneous putIfAbsent(id,newSet)
|
||||
if(oldSet ne null)
|
||||
oldSet add actor
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ trait BootableActorLoaderService extends Bootable with Logging {
|
|||
} else Thread.currentThread.getContextClassLoader)
|
||||
}
|
||||
|
||||
abstract override def onLoad = {
|
||||
abstract override def onLoad = {
|
||||
applicationLoader.foreach(_ => log.info("Creating /deploy class-loader"))
|
||||
|
||||
super.onLoad
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import java.net.InetSocketAddress
|
|||
import scala.reflect.BeanProperty
|
||||
|
||||
/**
|
||||
* Subclass this abstract class to create a MDB-style untyped actor.
|
||||
* Subclass this abstract class to create a MDB-style untyped actor.
|
||||
* <p/>
|
||||
* This class is meant to be used from Java.
|
||||
* <p/>
|
||||
|
|
@ -29,12 +29,12 @@ import scala.reflect.BeanProperty
|
|||
* // Reply to original sender of message using the 'replyUnsafe' method
|
||||
* getContext().replyUnsafe(msg + ":" + getContext().getUuid());
|
||||
*
|
||||
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
|
||||
* } else if (msg.equals("UseSender") && getContext().getSender().isDefined()) {
|
||||
* // Reply to original sender of message using the sender reference
|
||||
* // also passing along my own refererence (the context)
|
||||
* getContext().getSender().get().sendOneWay(msg, context);
|
||||
* getContext().getSender().get().sendOneWay(msg, context);
|
||||
*
|
||||
* } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
|
||||
* } else if (msg.equals("UseSenderFuture") && getContext().getSenderFuture().isDefined()) {
|
||||
* // Reply to original sender of message using the sender future reference
|
||||
* getContext().getSenderFuture().get().completeWithResult(msg);
|
||||
*
|
||||
|
|
@ -51,7 +51,7 @@ import scala.reflect.BeanProperty
|
|||
* } else throw new IllegalArgumentException("Unknown message: " + message);
|
||||
* } else throw new IllegalArgumentException("Unknown message: " + message);
|
||||
* }
|
||||
*
|
||||
*
|
||||
* public static void main(String[] args) {
|
||||
* UntypedActorRef actor = UntypedActor.actorOf(SampleUntypedActor.class);
|
||||
* actor.start();
|
||||
|
|
@ -96,7 +96,7 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
|
|||
/**
|
||||
* Factory object for creating and managing 'UntypedActor's. Meant to be used from Java.
|
||||
* <p/>
|
||||
* Example on how to create an actor:
|
||||
* Example on how to create an actor:
|
||||
* <pre>
|
||||
* ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
|
||||
* actor.start();
|
||||
|
|
@ -111,11 +111,11 @@ abstract class RemoteUntypedActor(address: InetSocketAddress) extends UntypedAct
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object UntypedActor {
|
||||
|
||||
|
||||
/**
|
||||
* Creates an ActorRef out of the Actor. Allows you to pass in the class for the Actor.
|
||||
* <p/>
|
||||
* Example in Java:
|
||||
* Example in Java:
|
||||
* <pre>
|
||||
* ActorRef actor = UntypedActor.actorOf(MyUntypedActor.class);
|
||||
* actor.start();
|
||||
|
|
@ -134,13 +134,13 @@ object UntypedActor {
|
|||
}
|
||||
|
||||
/**
|
||||
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
|
||||
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
|
||||
* UntypedActor instance directly, but only through its 'UntypedActorRef' wrapper reference.
|
||||
* <p/>
|
||||
* Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only
|
||||
* Creates an ActorRef out of the Actor. Allows you to pass in the instance for the Actor. Only
|
||||
* use this method when you need to pass in constructor arguments into the 'UntypedActor'.
|
||||
* <p/>
|
||||
* Example in Java:
|
||||
* Example in Java:
|
||||
* <pre>
|
||||
* ActorRef actor = UntypedActor.actorOf(new MyUntypedActor("service:name", 5));
|
||||
* actor.start();
|
||||
|
|
@ -156,7 +156,7 @@ object UntypedActor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Use this class if you need to wrap an 'ActorRef' in the more Java-friendly 'UntypedActorRef'.
|
||||
* Use this class if you need to wrap an 'ActorRef' in the more Java-friendly 'UntypedActorRef'.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -170,7 +170,7 @@ object UntypedActorRef {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class UntypedActorRef(val actorRef: ActorRef) {
|
||||
|
||||
|
||||
/**
|
||||
* Returns the uuid for the actor.
|
||||
*/
|
||||
|
|
@ -186,14 +186,14 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
*/
|
||||
def setId(id: String) = actorRef.id = id
|
||||
def getId(): String = actorRef.id
|
||||
|
||||
|
||||
/**
|
||||
* Defines the default timeout for '!!' and '!!!' invocations,
|
||||
* e.g. the timeout for the future returned by the call to '!!' and '!!!'.
|
||||
*/
|
||||
def setTimeout(timeout: Long) = actorRef.timeout = timeout
|
||||
def getTimeout(): Long = actorRef.timeout
|
||||
|
||||
|
||||
/**
|
||||
* Defines the default timeout for an initial receive invocation.
|
||||
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
|
||||
|
|
@ -269,7 +269,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* Is defined if the message was sent with sent with 'sendRequestReply' or 'sendRequestReplyFuture', else None.
|
||||
*/
|
||||
def getSenderFuture(): Option[CompletableFuture[Any]] = actorRef.senderFuture
|
||||
|
||||
|
||||
/**
|
||||
* Starts up the actor and its message queue.
|
||||
*/
|
||||
|
|
@ -285,7 +285,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* Shuts down the actor its dispatcher and message queue.
|
||||
*/
|
||||
def stop(): Unit = actorRef.stop()
|
||||
|
||||
|
||||
/**
|
||||
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
|
||||
* <p/>
|
||||
|
|
@ -309,12 +309,12 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
def sendOneWay(message: AnyRef, sender: UntypedActorRef) =
|
||||
if (sender eq null) actorRef.!(message)(None)
|
||||
else actorRef.!(message)(Some(sender.actorRef))
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
|
|
@ -324,19 +324,19 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def sendRequestReply(message: AnyRef): AnyRef =
|
||||
def sendRequestReply(message: AnyRef): AnyRef =
|
||||
actorRef.!!(message)(None).getOrElse(throw new ActorTimeoutException(
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + actorRef.timeout +
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + actorRef.timeout +
|
||||
"]\n\ttimed out."))
|
||||
.asInstanceOf[AnyRef]
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* Sends a message asynchronously and waits on a future for a reply message under the hood. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
|
|
@ -350,18 +350,18 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
val result = if (sender eq null) actorRef.!!(message)(None)
|
||||
else actorRef.!!(message)(Some(sender.actorRef))
|
||||
result.getOrElse(throw new ActorTimeoutException(
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\tfrom [" + sender.actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + actorRef.timeout +
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\tfrom [" + sender.actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + actorRef.timeout +
|
||||
"]\n\ttimed out."))
|
||||
.asInstanceOf[AnyRef]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message under the hood.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
|
|
@ -371,18 +371,18 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* If you are sending messages using <code>sendRequestReply</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def sendRequestReply(message: AnyRef, timeout: Long): AnyRef =
|
||||
def sendRequestReply(message: AnyRef, timeout: Long): AnyRef =
|
||||
actorRef.!!(message, timeout)(None).getOrElse(throw new ActorTimeoutException(
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + timeout +
|
||||
"]\n\ttimed out."))
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + timeout +
|
||||
"]\n\ttimed out."))
|
||||
.asInstanceOf[AnyRef]
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously and waits on a future for a reply message under the hood.
|
||||
* <p/>
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* It waits on the reply either until it receives it or until the timeout expires
|
||||
* (which will throw an ActorTimeoutException). E.g. send-and-receive-eventually semantics.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
|
|
@ -396,17 +396,17 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
val result = if (sender eq null) actorRef.!!(message, timeout)(None)
|
||||
else actorRef.!!(message)(Some(sender.actorRef))
|
||||
result.getOrElse(throw new ActorTimeoutException(
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\tfrom [" + sender.actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + timeout +
|
||||
"]\n\ttimed out."))
|
||||
"Message [" + message +
|
||||
"]\n\tsent to [" + actorRef.actorClassName +
|
||||
"]\n\tfrom [" + sender.actorRef.actorClassName +
|
||||
"]\n\twith timeout [" + timeout +
|
||||
"]\n\ttimed out."))
|
||||
.asInstanceOf[AnyRef]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
|
||||
|
|
@ -416,10 +416,10 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def sendRequestReplyFuture(message: AnyRef): Future[_] = actorRef.!!!(message)(None)
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message. The timeout is taken from
|
||||
* the default timeout in the Actor.
|
||||
* <p/>
|
||||
* <b>NOTE:</b>
|
||||
* Use this method with care. In most cases it is better to use 'sendOneWay' together with the 'getContext().getSender()' to
|
||||
|
|
@ -428,10 +428,10 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] =
|
||||
def sendRequestReplyFuture(message: AnyRef, sender: UntypedActorRef): Future[_] =
|
||||
if (sender eq null) actorRef.!!!(message)(None)
|
||||
else actorRef.!!!(message)(Some(sender.actorRef))
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* <p/>
|
||||
|
|
@ -443,7 +443,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def sendRequestReplyFuture(message: AnyRef, timeout: Long): Future[_] = actorRef.!!!(message, timeout)(None)
|
||||
|
||||
|
||||
/**
|
||||
* Sends a message asynchronously returns a future holding the eventual reply message.
|
||||
* <p/>
|
||||
|
|
@ -454,17 +454,17 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
|
||||
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
|
||||
*/
|
||||
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] =
|
||||
def sendRequestReplyFuture(message: AnyRef, timeout: Long, sender: UntypedActorRef): Future[_] =
|
||||
if (sender eq null) actorRef.!!!(message, timeout)(None)
|
||||
else actorRef.!!!(message)(Some(sender.actorRef))
|
||||
|
||||
|
||||
/**
|
||||
* Forwards the message and passes the original sender actor as the sender.
|
||||
* <p/>
|
||||
* Works with 'sendOneWay', 'sendRequestReply' and 'sendRequestReplyFuture'.
|
||||
*/
|
||||
def forward(message: AnyRef, sender: UntypedActorRef): Unit =
|
||||
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
|
||||
def forward(message: AnyRef, sender: UntypedActorRef): Unit =
|
||||
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
|
||||
else actorRef.forward(message)(Some(sender.actorRef))
|
||||
|
||||
/**
|
||||
|
|
@ -474,7 +474,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* Throws an IllegalStateException if unable to determine what to reply to.
|
||||
*/
|
||||
def replyUnsafe(message: AnyRef): Unit = actorRef.reply(message)
|
||||
|
||||
|
||||
/**
|
||||
* Use <code>getContext().replySafe(..)</code> to reply with a message to the original sender of the message currently
|
||||
* being processed.
|
||||
|
|
@ -492,7 +492,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* Returns the class name for the Actor instance that is managed by the ActorRef.
|
||||
*/
|
||||
def getActorClassName(): String = actorRef.actorClassName
|
||||
|
||||
|
||||
/**
|
||||
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
|
||||
*/
|
||||
|
|
@ -538,7 +538,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
* Set the home address and port for this actor.
|
||||
*/
|
||||
def setHomeAddress(address: InetSocketAddress): Unit = actorRef.homeAddress = address
|
||||
|
||||
|
||||
/**
|
||||
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
|
||||
* receive a notification if the linked actor has crashed.
|
||||
|
|
@ -562,7 +562,7 @@ class UntypedActorRef(val actorRef: ActorRef) {
|
|||
/**
|
||||
* Atomically start, link and make an actor remote.
|
||||
*/
|
||||
def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit =
|
||||
def startLinkRemote(actor: UntypedActorRef, hostname: String, port: Int): Unit =
|
||||
actorRef.startLinkRemote(actor.actorRef, hostname, port)
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -42,6 +42,8 @@ import se.scalablesolutions.akka.config.Config.config
|
|||
object Dispatchers {
|
||||
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
|
||||
|
||||
object globalHawtDispatcher extends HawtDispatcher
|
||||
|
||||
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global") {
|
||||
override def register(actor: ActorRef) = {
|
||||
if (isShutdown) init
|
||||
|
|
@ -50,8 +52,18 @@ object Dispatchers {
|
|||
}
|
||||
|
||||
object globalReactorBasedSingleThreadEventDrivenDispatcher extends ReactorBasedSingleThreadEventDrivenDispatcher("global")
|
||||
|
||||
object globalReactorBasedThreadPoolEventDrivenDispatcher extends ReactorBasedThreadPoolEventDrivenDispatcher("global")
|
||||
|
||||
/**
|
||||
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
|
||||
* <p/>
|
||||
* Can be beneficial to use the <code>HawtDispatcher.pin(self)</code> to "pin" an actor to a specific thread.
|
||||
* <p/>
|
||||
* See the ScalaDoc for the {@link se.scalablesolutions.akka.dispatch.HawtDispatcher} for details.
|
||||
*/
|
||||
def newHawtDispatcher(aggregate: Boolean) = new HawtDispatcher(aggregate)
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||
* <p/>
|
||||
|
|
|
|||
|
|
@ -234,7 +234,7 @@ trait ThreadPoolBuilder {
|
|||
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging {
|
||||
|
||||
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
||||
def uncaughtException(thread: Thread, cause: Throwable) =
|
||||
def uncaughtException(thread: Thread, cause: Throwable) =
|
||||
log.error(cause, "UNCAUGHT in thread [%s]", thread.getName)
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
|||
|
||||
def startRemoteService = remoteServerThread.start
|
||||
|
||||
abstract override def onLoad = {
|
||||
abstract override def onLoad = {
|
||||
if (config.getBool("akka.remote.server.service", true)) {
|
||||
if (config.getBool("akka.remote.cluster.service", true)) Cluster.start(self.applicationLoader)
|
||||
log.info("Initializing Remote Actors Service...")
|
||||
|
|
|
|||
|
|
@ -48,44 +48,48 @@ case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty v
|
|||
case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
|
||||
|
||||
class RemoteClientException private[akka](message: String) extends RuntimeException(message)
|
||||
|
||||
/**
|
||||
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteClient extends Logging {
|
||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
|
||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
|
||||
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
|
||||
|
||||
private val remoteClients = new HashMap[String, RemoteClient]
|
||||
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
|
||||
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
|
||||
|
||||
// FIXME: simplify overloaded methods when we have Scala 2.8
|
||||
|
||||
def actorFor(className: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(className, className, 5000L, hostname, port, None)
|
||||
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
|
||||
|
||||
def actorFor(className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(className, className, 5000L, hostname, port, Some(loader))
|
||||
def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader))
|
||||
|
||||
def actorFor(uuid: String, className: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(uuid, className, 5000L, hostname, port, None)
|
||||
def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(serviceId, className, 5000L, hostname, port, None)
|
||||
|
||||
def actorFor(uuid: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(uuid, className, 5000L, hostname, port, Some(loader))
|
||||
def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(serviceId, className, 5000L, hostname, port, Some(loader))
|
||||
|
||||
def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
actorFor(className, className, timeout, hostname, port, None)
|
||||
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None)
|
||||
|
||||
def actorFor(className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(className, className, timeout, hostname, port, Some(loader))
|
||||
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader))
|
||||
|
||||
def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
RemoteActorRef(uuid, className, hostname, port, timeout, None)
|
||||
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
|
||||
|
||||
private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
RemoteActorRef(uuid, className, hostname, port, timeout, Some(loader))
|
||||
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))
|
||||
|
||||
private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
|
||||
RemoteActorRef(uuid, className, hostname, port, timeout, loader)
|
||||
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
|
||||
RemoteActorRef(serviceId, className, hostname, port, timeout, loader)
|
||||
|
||||
def clientFor(hostname: String, port: Int): RemoteClient =
|
||||
clientFor(new InetSocketAddress(hostname, port), None)
|
||||
|
|
@ -158,9 +162,11 @@ object RemoteClient extends Logging {
|
|||
}
|
||||
|
||||
/**
|
||||
* RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClient private[akka] (val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
|
||||
class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) extends Logging {
|
||||
val name = "RemoteClient@" + hostname + "::" + port
|
||||
|
||||
@volatile private[remote] var isRunning = false
|
||||
|
|
@ -226,7 +232,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
|
|||
}
|
||||
}
|
||||
} else {
|
||||
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port))
|
||||
throw exception
|
||||
}
|
||||
|
|
@ -245,21 +251,22 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClientPipelineFactory(name: String,
|
||||
futures: ConcurrentMap[Long, CompletableFuture[_]],
|
||||
supervisors: ConcurrentMap[String, ActorRef],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: SocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
client: RemoteClient) extends ChannelPipelineFactory {
|
||||
class RemoteClientPipelineFactory(
|
||||
name: String,
|
||||
futures: ConcurrentMap[Long, CompletableFuture[_]],
|
||||
supervisors: ConcurrentMap[String, ActorRef],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: SocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
client: RemoteClient) extends ChannelPipelineFactory {
|
||||
def getPipeline: ChannelPipeline = {
|
||||
|
||||
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
|
||||
|
||||
|
||||
val engine = RemoteServerSslContext.client.createSSLEngine()
|
||||
engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible?
|
||||
engine.setUseClientMode(true)
|
||||
|
||||
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
|
|
@ -272,9 +279,7 @@ class RemoteClientPipelineFactory(name: String,
|
|||
}
|
||||
|
||||
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
|
||||
|
||||
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
|
||||
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -283,13 +288,14 @@ class RemoteClientPipelineFactory(name: String,
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
class RemoteClientHandler(val name: String,
|
||||
val futures: ConcurrentMap[Long, CompletableFuture[_]],
|
||||
val supervisors: ConcurrentMap[String, ActorRef],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: SocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: RemoteClient)
|
||||
class RemoteClientHandler(
|
||||
val name: String,
|
||||
val futures: ConcurrentMap[Long, CompletableFuture[_]],
|
||||
val supervisors: ConcurrentMap[String, ActorRef],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: SocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: RemoteClient)
|
||||
extends SimpleChannelUpstreamHandler with Logging {
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
|
|
@ -318,13 +324,13 @@ class RemoteClientHandler(val name: String,
|
|||
val supervisedActor = supervisors.get(supervisorUuid)
|
||||
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
|
||||
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply))
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
|
||||
}
|
||||
future.completeWithException(null, parseException(reply))
|
||||
future.completeWithException(null, parseException(reply, client.loader))
|
||||
}
|
||||
futures.remove(reply.getId)
|
||||
} else {
|
||||
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result)
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port))
|
||||
throw exception
|
||||
}
|
||||
|
|
@ -358,24 +364,20 @@ class RemoteClientHandler(val name: String,
|
|||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
if(RemoteServer.SECURE){
|
||||
val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
sslHandler.handshake().addListener( new ChannelFutureListener {
|
||||
def operationComplete(future : ChannelFuture) : Unit = {
|
||||
if(future.isSuccess)
|
||||
connect
|
||||
//else
|
||||
//FIXME: What is the correct action here?
|
||||
}
|
||||
})
|
||||
} else {
|
||||
connect
|
||||
}
|
||||
if (RemoteServer.SECURE) {
|
||||
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
sslHandler.handshake.addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) connect
|
||||
else throw new RemoteClientException("Could not establish SSL handshake")
|
||||
}
|
||||
})
|
||||
} else connect
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.listeners.toArray.foreach(l =>
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
|
||||
client.listeners.toArray.foreach(listener =>
|
||||
listener.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port))
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
|
|
@ -385,9 +387,11 @@ class RemoteClientHandler(val name: String,
|
|||
event.getChannel.close
|
||||
}
|
||||
|
||||
private def parseException(reply: RemoteReplyProtocol): Throwable = {
|
||||
private def parseException(reply: RemoteReplyProtocol, loader: Option[ClassLoader]): Throwable = {
|
||||
val exception = reply.getException
|
||||
val exceptionClass = Class.forName(exception.getClassname)
|
||||
val classname = exception.getClassname
|
||||
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
|
||||
else Class.forName(classname)
|
||||
exceptionClass
|
||||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
|
|
|
|||
|
|
@ -76,34 +76,31 @@ object RemoteServer {
|
|||
}
|
||||
|
||||
val SECURE = {
|
||||
if(config.getBool("akka.remote.ssl.service",false)){
|
||||
|
||||
if (config.getBool("akka.remote.ssl.service",false)) {
|
||||
val properties = List(
|
||||
("key-store-type" ,"keyStoreType"),
|
||||
("key-store" ,"keyStore"),
|
||||
("key-store-pass" ,"keyStorePassword"),
|
||||
("trust-store-type","trustStoreType"),
|
||||
("trust-store" ,"trustStore"),
|
||||
("trust-store-pass","trustStorePassword")
|
||||
).map(x => ("akka.remote.ssl." + x._1,"javax.net.ssl."+x._2))
|
||||
|
||||
//If property is not set, and we have a value from our akka.conf, use that value
|
||||
for{ p <- properties if System.getProperty(p._2) eq null
|
||||
c <- config.getString(p._1)
|
||||
} System.setProperty(p._2,c)
|
||||
|
||||
if(config.getBool("akka.remote.ssl.debug",false))
|
||||
System.setProperty("javax.net.debug","ssl")
|
||||
("key-store-type" , "keyStoreType"),
|
||||
("key-store" , "keyStore"),
|
||||
("key-store-pass" , "keyStorePassword"),
|
||||
("trust-store-type", "trustStoreType"),
|
||||
("trust-store" , "trustStore"),
|
||||
("trust-store-pass", "trustStorePassword")
|
||||
).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2))
|
||||
|
||||
// If property is not set, and we have a value from our akka.conf, use that value
|
||||
for {
|
||||
p <- properties if System.getProperty(p._2) eq null
|
||||
c <- config.getString(p._1)
|
||||
} System.setProperty(p._2, c)
|
||||
|
||||
if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl")
|
||||
true
|
||||
}
|
||||
else
|
||||
false
|
||||
} else false
|
||||
}
|
||||
|
||||
object Address {
|
||||
def apply(hostname: String, port: Int) = new Address(hostname, port)
|
||||
}
|
||||
|
||||
class Address(val hostname: String, val port: Int) {
|
||||
override def hashCode: Int = {
|
||||
var result = HashCode.SEED
|
||||
|
|
@ -120,7 +117,7 @@ object RemoteServer {
|
|||
}
|
||||
|
||||
private class RemoteActorSet {
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
}
|
||||
|
||||
|
|
@ -307,17 +304,14 @@ class RemoteServer extends Logging {
|
|||
object RemoteServerSslContext {
|
||||
import javax.net.ssl.SSLContext
|
||||
|
||||
val (client,server) = {
|
||||
val (client, server) = {
|
||||
val protocol = "TLS"
|
||||
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
|
||||
//val store = KeyStore.getInstance("JKS")
|
||||
|
||||
val s = SSLContext.getInstance(protocol)
|
||||
s.init(null,null,null)
|
||||
|
||||
val c = SSLContext.getInstance(protocol)
|
||||
c.init(null,null,null)
|
||||
|
||||
(c,s)
|
||||
}
|
||||
}
|
||||
|
|
@ -340,20 +334,18 @@ class RemoteServerPipelineFactory(
|
|||
engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible?
|
||||
engine.setUseClientMode(false)
|
||||
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder))
|
||||
case _ => (join(),join())
|
||||
val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
}
|
||||
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors)
|
||||
|
||||
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
|
||||
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -379,24 +371,21 @@ class RemoteServerHandler(
|
|||
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
|
||||
openChannels.add(ctx.getChannel)
|
||||
}
|
||||
|
||||
override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
|
||||
if(RemoteServer.SECURE) {
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
|
||||
if (RemoteServer.SECURE) {
|
||||
val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
|
||||
|
||||
// Begin handshake.
|
||||
sslHandler.handshake().addListener( new ChannelFutureListener {
|
||||
def operationComplete(future : ChannelFuture) : Unit = {
|
||||
if(future.isSuccess)
|
||||
openChannels.add(future.getChannel)
|
||||
else
|
||||
future.getChannel.close
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) openChannels.add(future.getChannel)
|
||||
else future.getChannel.close
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
|
|
@ -499,8 +488,8 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val name = actorInfo.getTarget
|
||||
val uuid = actorInfo.getUuid
|
||||
val name = actorInfo.getTarget
|
||||
val timeout = actorInfo.getTimeout
|
||||
|
||||
val actorRefOrNull = actors get uuid
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class SchedulerSpec extends JUnitSuite {
|
|||
// should still be 1 left
|
||||
assert(countDownLatch.getCount == 1)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* ticket #372
|
||||
*/
|
||||
|
|
@ -59,19 +59,19 @@ class SchedulerSpec extends JUnitSuite {
|
|||
assert(ticks.await(10,TimeUnit.SECONDS))
|
||||
assert(ActorRegistry.actors.length === numActors)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* ticket #372
|
||||
*/
|
||||
@Test def schedulerShouldBeCancellable = withCleanEndState {
|
||||
object Ping
|
||||
val ticks = new CountDownLatch(1)
|
||||
|
||||
|
||||
val actor = actorOf(new Actor {
|
||||
def receive = { case Ping => ticks.countDown }
|
||||
}).start
|
||||
|
||||
(1 to 10).foreach { i =>
|
||||
(1 to 10).foreach { i =>
|
||||
val future = Scheduler.scheduleOnce(actor,Ping,1,TimeUnit.SECONDS)
|
||||
future.cancel(true)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,13 +66,13 @@ akka {
|
|||
|
||||
#You can either use java command-line options or use the settings below
|
||||
|
||||
#key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12
|
||||
#key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12
|
||||
#key-store = "yourcertificate.p12" #Same as -Djavax.net.ssl.keyStore=yourcertificate.p12
|
||||
#key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS
|
||||
#key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS
|
||||
|
||||
#trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks
|
||||
#trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore
|
||||
#trust-store-pass = "$PASS" #-Djavax.net.ssl.trustStorePassword=$PASS
|
||||
#trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks
|
||||
#trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore
|
||||
#trust-store-pass = "$PASS" #Same as -Djavax.net.ssl.trustStorePassword=$PASS
|
||||
|
||||
#This can be useful for debugging
|
||||
debug = off #if on, very verbose debug, same as -Djavax.net.debug=ssl
|
||||
|
|
|
|||
|
|
@ -3,10 +3,3 @@ include "akka-reference.conf"
|
|||
|
||||
# In this file you can override any option defined in the 'akka-reference.conf' file.
|
||||
# Copy in all or parts of the 'akka-reference.conf' file and modify as you please.
|
||||
# <akka>
|
||||
# <storage>
|
||||
# <redis>
|
||||
# cluster = ["localhost:6379", "localhost:6380", "localhost:6381"]
|
||||
# </redis>
|
||||
# </storage>
|
||||
# </akka>
|
||||
|
|
|
|||
|
|
@ -428,7 +428,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// akka-persistence-common subproject
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
|
||||
class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
|
||||
val commons_pool = Dependencies.commons_pool
|
||||
val thrift = Dependencies.thrift
|
||||
|
|
@ -747,4 +747,4 @@ trait DeployProject { self: BasicScalaProject =>
|
|||
|
||||
trait OSGiProject extends BNDPlugin { self: DefaultProject =>
|
||||
override def bndExportPackage = Seq("se.scalablesolutions.akka.*;version=%s".format(projectVersion.value))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue