after merge cleanup
This commit is contained in:
parent
53bde24e88
commit
fa2268a3e6
3 changed files with 15 additions and 18 deletions
|
|
@ -10,8 +10,8 @@ import se.scalablesolutions.akka.actor.UntypedActorFactory;
|
||||||
import se.scalablesolutions.akka.amqp.rpc.RPC;
|
import se.scalablesolutions.akka.amqp.rpc.RPC;
|
||||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol;
|
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol;
|
||||||
|
|
||||||
import se.scalablesolutions.akka.util.Function;
|
import se.scalablesolutions.akka.japi.Function;
|
||||||
import se.scalablesolutions.akka.util.Procedure;
|
import se.scalablesolutions.akka.japi.Procedure;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,6 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio
|
||||||
self.lifeCycle = Permanent
|
self.lifeCycle = Permanent
|
||||||
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]))
|
self.faultHandler = OneForOneStrategy(List(classOf[Throwable]))
|
||||||
|
|
||||||
self.trapExit = List(classOf[Throwable])
|
|
||||||
self.faultHandler = Some(OneForOneStrategy(None, None)) // never die
|
|
||||||
|
|
||||||
val reconnectionTimer = new Timer("%s-timer".format(self.id))
|
val reconnectionTimer = new Timer("%s-timer".format(self.id))
|
||||||
|
|
||||||
val connectionFactory: ConnectionFactory = new ConnectionFactory()
|
val connectionFactory: ConnectionFactory = new ConnectionFactory()
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,8 @@ import com.google.protobuf.Message
|
||||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||||
import Actor._
|
import Actor._
|
||||||
import se.scalablesolutions.akka.amqp._
|
import se.scalablesolutions.akka.amqp._
|
||||||
import se.scalablesolutions.akka.util.Procedure
|
|
||||||
import reflect.Manifest
|
import reflect.Manifest
|
||||||
|
import se.scalablesolutions.akka.japi
|
||||||
|
|
||||||
object RPC {
|
object RPC {
|
||||||
|
|
||||||
|
|
@ -44,7 +44,7 @@ object RPC {
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
serializer: RpcServerSerializer[I, O],
|
serializer: RpcServerSerializer[I, O],
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O]): RpcServerHandle = {
|
requestHandler: japi.Function[I,O]): RpcServerHandle = {
|
||||||
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _)
|
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -53,7 +53,7 @@ object RPC {
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
serializer: RpcServerSerializer[I, O],
|
serializer: RpcServerSerializer[I, O],
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
|
requestHandler: Function[I,O],
|
||||||
queueName: String): RpcServerHandle = {
|
queueName: String): RpcServerHandle = {
|
||||||
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName))
|
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName))
|
||||||
}
|
}
|
||||||
|
|
@ -63,7 +63,7 @@ object RPC {
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
serializer: RpcServerSerializer[I, O],
|
serializer: RpcServerSerializer[I, O],
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
|
requestHandler: japi.Function[I,O],
|
||||||
channelParameters: ChannelParameters): RpcServerHandle = {
|
channelParameters: ChannelParameters): RpcServerHandle = {
|
||||||
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters))
|
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters))
|
||||||
}
|
}
|
||||||
|
|
@ -73,7 +73,7 @@ object RPC {
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
serializer: RpcServerSerializer[I, O],
|
serializer: RpcServerSerializer[I, O],
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
|
requestHandler: japi.Function[I,O],
|
||||||
queueName: String,
|
queueName: String,
|
||||||
channelParameters: ChannelParameters): RpcServerHandle = {
|
channelParameters: ChannelParameters): RpcServerHandle = {
|
||||||
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters))
|
newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters))
|
||||||
|
|
@ -122,12 +122,12 @@ object RPC {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Needed for Java API usage
|
// Needed for Java API usage
|
||||||
def callAsync(request: O, responseHandler: Procedure[I]): Unit = {
|
def callAsync(request: O, responseHandler: japi.Procedure[I]): Unit = {
|
||||||
callAsync(request, 5000, responseHandler)
|
callAsync(request, 5000, responseHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Needed for Java API usage
|
// Needed for Java API usage
|
||||||
def callAsync(request: O, timeout: Long, responseHandler: Procedure[I]): Unit = {
|
def callAsync(request: O, timeout: Long, responseHandler: japi.Procedure[I]): Unit = {
|
||||||
callAsync(request, timeout){
|
callAsync(request, timeout){
|
||||||
case Some(response) => responseHandler.apply(response)
|
case Some(response) => responseHandler.apply(response)
|
||||||
}
|
}
|
||||||
|
|
@ -147,7 +147,7 @@ object RPC {
|
||||||
def newProtobufRpcServer[I <: Message, O <: Message](
|
def newProtobufRpcServer[I <: Message, O <: Message](
|
||||||
connection: ActorRef,
|
connection: ActorRef,
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
|
requestHandler: japi.Function[I,O],
|
||||||
resultClazz: Class[I]): RpcServerHandle = {
|
resultClazz: Class[I]): RpcServerHandle = {
|
||||||
|
|
||||||
implicit val manifest = Manifest.classType[I](resultClazz)
|
implicit val manifest = Manifest.classType[I](resultClazz)
|
||||||
|
|
@ -158,7 +158,7 @@ object RPC {
|
||||||
def newProtobufRpcServer[I <: Message, O <: Message](
|
def newProtobufRpcServer[I <: Message, O <: Message](
|
||||||
connection: ActorRef,
|
connection: ActorRef,
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
|
requestHandler: japi.Function[I,O],
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
resultClazz: Class[I]): RpcServerHandle = {
|
resultClazz: Class[I]): RpcServerHandle = {
|
||||||
|
|
||||||
|
|
@ -170,7 +170,7 @@ object RPC {
|
||||||
def newProtobufRpcServer[I <: Message, O <: Message](
|
def newProtobufRpcServer[I <: Message, O <: Message](
|
||||||
connection: ActorRef,
|
connection: ActorRef,
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[I,O],
|
requestHandler: japi.Function[I,O],
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
queueName: String,
|
queueName: String,
|
||||||
resultClazz: Class[I]): RpcServerHandle = {
|
resultClazz: Class[I]): RpcServerHandle = {
|
||||||
|
|
@ -240,14 +240,14 @@ object RPC {
|
||||||
// Needed for Java API usage
|
// Needed for Java API usage
|
||||||
def newStringRpcServer(connection: ActorRef,
|
def newStringRpcServer(connection: ActorRef,
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[String,String]): RpcServerHandle = {
|
requestHandler: japi.Function[String,String]): RpcServerHandle = {
|
||||||
newStringRpcServer(connection, exchangeName, requestHandler.apply _)
|
newStringRpcServer(connection, exchangeName, requestHandler.apply _)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Needed for Java API usage
|
// Needed for Java API usage
|
||||||
def newStringRpcServer(connection: ActorRef,
|
def newStringRpcServer(connection: ActorRef,
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[String,String],
|
requestHandler: japi.Function[String,String],
|
||||||
routingKey: String): RpcServerHandle = {
|
routingKey: String): RpcServerHandle = {
|
||||||
newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey))
|
newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey))
|
||||||
}
|
}
|
||||||
|
|
@ -255,7 +255,7 @@ object RPC {
|
||||||
// Needed for Java API usage
|
// Needed for Java API usage
|
||||||
def newStringRpcServer(connection: ActorRef,
|
def newStringRpcServer(connection: ActorRef,
|
||||||
exchangeName: String,
|
exchangeName: String,
|
||||||
requestHandler: se.scalablesolutions.akka.util.Function[String,String],
|
requestHandler: japi.Function[String,String],
|
||||||
routingKey: String,
|
routingKey: String,
|
||||||
queueName: String): RpcServerHandle = {
|
queueName: String): RpcServerHandle = {
|
||||||
newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName))
|
newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue