From fa2268a3e6d2c7146abe152c8c83ff8aee15ea32 Mon Sep 17 00:00:00 2001 From: momania Date: Fri, 8 Oct 2010 18:10:57 +0200 Subject: [PATCH] after merge cleanup --- .../akka/amqp/ExampleSessionJava.java | 4 +-- .../amqp/FaultTolerantConnectionActor.scala | 3 --- .../scalablesolutions/akka/amqp/rpc/RPC.scala | 26 +++++++++---------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java index 19b12d4c64..398feb17ce 100644 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java @@ -10,8 +10,8 @@ import se.scalablesolutions.akka.actor.UntypedActorFactory; import se.scalablesolutions.akka.amqp.rpc.RPC; import se.scalablesolutions.akka.remote.protocol.RemoteProtocol; -import se.scalablesolutions.akka.util.Function; -import se.scalablesolutions.akka.util.Procedure; +import se.scalablesolutions.akka.japi.Function; +import se.scalablesolutions.akka.japi.Procedure; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index e23acb8bc8..16ec8db389 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -19,9 +19,6 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Throwable])) - self.trapExit = List(classOf[Throwable]) - self.faultHandler = Some(OneForOneStrategy(None, None)) // never die - val reconnectionTimer = new Timer("%s-timer".format(self.id)) val connectionFactory: ConnectionFactory = new ConnectionFactory() diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 87800c7742..ed0f8be7e1 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -5,8 +5,8 @@ import com.google.protobuf.Message import se.scalablesolutions.akka.actor.{Actor, ActorRef} import Actor._ import se.scalablesolutions.akka.amqp._ -import se.scalablesolutions.akka.util.Procedure import reflect.Manifest +import se.scalablesolutions.akka.japi object RPC { @@ -44,7 +44,7 @@ object RPC { exchangeName: String, routingKey: String, serializer: RpcServerSerializer[I, O], - requestHandler: se.scalablesolutions.akka.util.Function[I,O]): RpcServerHandle = { + requestHandler: japi.Function[I,O]): RpcServerHandle = { newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _) } @@ -53,7 +53,7 @@ object RPC { exchangeName: String, routingKey: String, serializer: RpcServerSerializer[I, O], - requestHandler: se.scalablesolutions.akka.util.Function[I,O], + requestHandler: Function[I,O], queueName: String): RpcServerHandle = { newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName)) } @@ -63,7 +63,7 @@ object RPC { exchangeName: String, routingKey: String, serializer: RpcServerSerializer[I, O], - requestHandler: se.scalablesolutions.akka.util.Function[I,O], + requestHandler: japi.Function[I,O], channelParameters: ChannelParameters): RpcServerHandle = { newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, None, Some(channelParameters)) } @@ -73,7 +73,7 @@ object RPC { exchangeName: String, routingKey: String, serializer: RpcServerSerializer[I, O], - requestHandler: se.scalablesolutions.akka.util.Function[I,O], + requestHandler: japi.Function[I,O], queueName: String, channelParameters: ChannelParameters): RpcServerHandle = { newRpcServer(connection, exchangeName, routingKey, serializer, requestHandler.apply _, Some(queueName), Some(channelParameters)) @@ -122,12 +122,12 @@ object RPC { } // Needed for Java API usage - def callAsync(request: O, responseHandler: Procedure[I]): Unit = { + def callAsync(request: O, responseHandler: japi.Procedure[I]): Unit = { callAsync(request, 5000, responseHandler) } // Needed for Java API usage - def callAsync(request: O, timeout: Long, responseHandler: Procedure[I]): Unit = { + def callAsync(request: O, timeout: Long, responseHandler: japi.Procedure[I]): Unit = { callAsync(request, timeout){ case Some(response) => responseHandler.apply(response) } @@ -147,7 +147,7 @@ object RPC { def newProtobufRpcServer[I <: Message, O <: Message]( connection: ActorRef, exchangeName: String, - requestHandler: se.scalablesolutions.akka.util.Function[I,O], + requestHandler: japi.Function[I,O], resultClazz: Class[I]): RpcServerHandle = { implicit val manifest = Manifest.classType[I](resultClazz) @@ -158,7 +158,7 @@ object RPC { def newProtobufRpcServer[I <: Message, O <: Message]( connection: ActorRef, exchangeName: String, - requestHandler: se.scalablesolutions.akka.util.Function[I,O], + requestHandler: japi.Function[I,O], routingKey: String, resultClazz: Class[I]): RpcServerHandle = { @@ -170,7 +170,7 @@ object RPC { def newProtobufRpcServer[I <: Message, O <: Message]( connection: ActorRef, exchangeName: String, - requestHandler: se.scalablesolutions.akka.util.Function[I,O], + requestHandler: japi.Function[I,O], routingKey: String, queueName: String, resultClazz: Class[I]): RpcServerHandle = { @@ -240,14 +240,14 @@ object RPC { // Needed for Java API usage def newStringRpcServer(connection: ActorRef, exchangeName: String, - requestHandler: se.scalablesolutions.akka.util.Function[String,String]): RpcServerHandle = { + requestHandler: japi.Function[String,String]): RpcServerHandle = { newStringRpcServer(connection, exchangeName, requestHandler.apply _) } // Needed for Java API usage def newStringRpcServer(connection: ActorRef, exchangeName: String, - requestHandler: se.scalablesolutions.akka.util.Function[String,String], + requestHandler: japi.Function[String,String], routingKey: String): RpcServerHandle = { newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey)) } @@ -255,7 +255,7 @@ object RPC { // Needed for Java API usage def newStringRpcServer(connection: ActorRef, exchangeName: String, - requestHandler: se.scalablesolutions.akka.util.Function[String,String], + requestHandler: japi.Function[String,String], routingKey: String, queueName: String): RpcServerHandle = { newStringRpcServer(connection, exchangeName, requestHandler.apply _, Some(routingKey), Some(queueName))