diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ChannelCallbacks.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ChannelCallbacks.java deleted file mode 100644 index 36f54fd825..0000000000 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ChannelCallbacks.java +++ /dev/null @@ -1,10 +0,0 @@ -package se.scalablesolutions.akka.amqp; - -public final class ChannelCallbacks { - - public static final AMQPMessage STARTED = Started$.MODULE$; - public static final AMQPMessage RESTARTING = Restarting$.MODULE$; - public static final AMQPMessage STOPPED = Stopped$.MODULE$; - - private ChannelCallbacks() {} -} diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ConnectionCallbacks.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ConnectionCallbacks.java deleted file mode 100644 index 1314330928..0000000000 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ConnectionCallbacks.java +++ /dev/null @@ -1,10 +0,0 @@ -package se.scalablesolutions.akka.amqp; - -public final class ConnectionCallbacks { - - public static final AMQPMessage CONNECTED = Connected$.MODULE$; - public static final AMQPMessage RECONNECTING = Reconnecting$.MODULE$; - public static final AMQPMessage DISCONNECTED = Disconnected$.MODULE$; - - private ConnectionCallbacks() {} -} diff --git a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java index a878bb6b05..6dd8c3dac3 100644 --- a/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java +++ b/akka-amqp/src/main/java/se/scalablesolutions/akka/amqp/ExampleSessionJava.java @@ -11,9 +11,6 @@ import se.scalablesolutions.akka.util.Procedure; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static se.scalablesolutions.akka.amqp.ChannelCallbacks.*; -import static se.scalablesolutions.akka.amqp.ConnectionCallbacks.*; - public class ExampleSessionJava { public static void main(String... args) { @@ -166,11 +163,11 @@ class ChannelCallbackActor extends UntypedActor { } public void onReceive(Object message) throws Exception { - if (STARTED.getClass().isAssignableFrom(message.getClass())) { + if (Started.class.isAssignableFrom(message.getClass())) { System.out.println("### >> Channel callback: Started"); channelCountdown.countDown(); - } else if (RESTARTING.getClass().isAssignableFrom(message.getClass())) { - } else if (STOPPED.getClass().isAssignableFrom(message.getClass())) { + } else if (Restarting.class.isAssignableFrom(message.getClass())) { + } else if (Stopped.class.isAssignableFrom(message.getClass())) { System.out.println("### >> Channel callback: Stopped"); } else throw new IllegalArgumentException("Unknown message: " + message); } @@ -179,10 +176,10 @@ class ChannelCallbackActor extends UntypedActor { class ConnectionCallbackActor extends UntypedActor { public void onReceive(Object message) throws Exception { - if (CONNECTED.getClass().isAssignableFrom(message.getClass())) { + if (Connected.class.isAssignableFrom(message.getClass())) { System.out.println("### >> Connection callback: Connected!"); - } else if (RECONNECTING.getClass().isAssignableFrom(message.getClass())) { - } else if (DISCONNECTED.getClass().isAssignableFrom(message.getClass())) { + } else if (Reconnecting.class.isAssignableFrom(message.getClass())) { + } else if (Disconnected.class.isAssignableFrom(message.getClass())) { System.out.println("### >> Connection callback: Disconnected!"); } else throw new IllegalArgumentException("Unknown message: " + message); } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala index a99b1afc67..26fab46799 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQPMessage.scala @@ -5,8 +5,6 @@ package se.scalablesolutions.akka.amqp import se.scalablesolutions.akka.actor.ActorRef -import se.scalablesolutions.akka.AkkaException - import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.ShutdownSignalException @@ -46,18 +44,18 @@ case class Delivery( // connection messages case object Connect extends AMQPMessage -case object Connected extends AMQPMessage -case object Reconnecting extends AMQPMessage -case object Disconnected extends AMQPMessage +case class Connected() extends AMQPMessage // Needed for Java API usage +case class Reconnecting() extends AMQPMessage // Needed for Java API usage +case class Disconnected() extends AMQPMessage // Needed for Java API usage case object ChannelRequest extends InternalAMQPMessage // channel messages case object Start extends AMQPMessage -case object Started extends AMQPMessage -case object Restarting extends AMQPMessage -case object Stopped extends AMQPMessage +case class Started() extends AMQPMessage // Needed for Java API usage +case class Restarting() extends AMQPMessage // Needed for Java API usage +case class Stopped() extends AMQPMessage // Needed for Java API usage // delivery messages case class Acknowledge(deliveryTag: Long) extends AMQPMessage diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index ed97b359eb..b0fc0dd59f 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -12,7 +12,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.lang.String import se.scalablesolutions.akka.amqp.AMQP._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol -import se.scalablesolutions.akka.util.Procedure +import se.scalablesolutions.akka.amqp.ExchangeType._ object ExampleSession { diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala index 6617c62a44..d2333f8b7c 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala @@ -46,14 +46,14 @@ abstract private[amqp] class FaultTolerantChannelActor( } else { // channel error log.error(cause, "%s self restarting because of channel shutdown", toString) - notifyCallback(Restarting) + notifyCallback(Restarting()) self ! Start } } case Failure(cause) => log.error(cause, "%s self restarting because of channel failure", toString) closeChannel - notifyCallback(Restarting) + notifyCallback(Restarting()) self ! Start } @@ -81,7 +81,7 @@ abstract private[amqp] class FaultTolerantChannelActor( setupChannel(ch) channel = Some(ch) - notifyCallback(Started) + notifyCallback(Started()) log.info("Channel setup for %s", toString) } @@ -89,7 +89,7 @@ abstract private[amqp] class FaultTolerantChannelActor( channel.foreach { ch => if (ch.isOpen) ch.close - notifyCallback(Stopped) + notifyCallback(Stopped()) log.info("%s channel closed", toString) } channel = None @@ -100,7 +100,7 @@ abstract private[amqp] class FaultTolerantChannelActor( } override def preRestart(reason: Throwable) = { - notifyCallback(Restarting) + notifyCallback(Restarting()) closeChannel } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 72a897dccf..07c6f14954 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -72,7 +72,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio log.info("Successfully (re)connected to AMQP Server %s:%s [%s]", host, port, self.id) log.debug("Sending new channel to %d already linked actors", self.linkedActorsAsList.size) self.linkedActorsAsList.foreach(_ ! conn.createChannel) - notifyCallback(Connected) + notifyCallback(Connected()) } } catch { case e: Exception => @@ -81,7 +81,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio , connectionParameters.initReconnectDelay, self.id) reconnectionTimer.schedule(new TimerTask() { override def run = { - notifyCallback(Reconnecting) + notifyCallback(Reconnecting()) self ! Connect } }, connectionParameters.initReconnectDelay) @@ -92,7 +92,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio try { connection.foreach(_.close) log.debug("Disconnected AMQP connection at %s:%s [%s]", host, port, self.id) - notifyCallback(Disconnected) + notifyCallback(Disconnected()) } catch { case e: IOException => log.error("Could not close AMQP connection %s:%s [%s]", host, port, self.id) case _ => () @@ -114,7 +114,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio override def preRestart(reason: Throwable) = disconnect override def postRestart(reason: Throwable) = { - notifyCallback(Reconnecting) + notifyCallback(Reconnecting()) connect } }