diff --git a/akka-docs/rst/general/configuration.rst b/akka-docs/rst/general/configuration.rst index ef25988740..3009ecceb7 100644 --- a/akka-docs/rst/general/configuration.rst +++ b/akka-docs/rst/general/configuration.rst @@ -484,12 +484,3 @@ akka-testkit .. literalinclude:: ../../../akka-testkit/src/main/resources/reference.conf :language: none -.. _config-akka-zeromq: - -akka-zeromq -~~~~~~~~~~~ - -.. literalinclude:: ../../../akka-zeromq/src/main/resources/reference.conf - :language: none - - diff --git a/akka-docs/rst/intro/getting-started.rst b/akka-docs/rst/intro/getting-started.rst index 01b034f5a0..82ae7aa409 100644 --- a/akka-docs/rst/intro/getting-started.rst +++ b/akka-docs/rst/intro/getting-started.rst @@ -48,8 +48,6 @@ Akka is very modular and consists of several JARs containing different features. - ``akka-testkit`` – Toolkit for testing Actor systems -- ``akka-zeromq`` – ZeroMQ integration - In addition to these stable modules there are several which are on their way into the stable core but are still marked “experimental” at this point. This does not mean that they do not function as intended, it primarily means that diff --git a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTest.java b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTest.java deleted file mode 100644 index bc25a12cd8..0000000000 --- a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package docs.zeromq; - -//#import-pub-socket -import akka.zeromq.Bind; -import akka.zeromq.ZeroMQExtension; -//#import-pub-socket -//#import-sub-socket -import akka.zeromq.Connect; -import akka.zeromq.Listener; -import akka.zeromq.Subscribe; -//#import-sub-socket -//#import-unsub-topic-socket -import akka.zeromq.Unsubscribe; -//#import-unsub-topic-socket -//#import-pub-topic -import akka.util.ByteString; -import akka.zeromq.ZMQMessage; -//#import-pub-topic - -import akka.zeromq.HighWatermark; -import akka.zeromq.SocketOption; -import akka.zeromq.ZeroMQVersion; - -//#import-health -import akka.actor.ActorRef; -import akka.actor.UntypedActor; -import akka.actor.Props; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import org.junit.*; -import scala.concurrent.duration.Duration; -import akka.serialization.SerializationExtension; -import akka.serialization.Serialization; -import java.io.Serializable; -import java.lang.management.ManagementFactory; -//#import-health - -import com.typesafe.config.ConfigFactory; - -import java.lang.management.MemoryMXBean; -import java.lang.management.MemoryUsage; -import java.lang.management.OperatingSystemMXBean; -import java.util.Date; -import java.text.SimpleDateFormat; - -import akka.actor.ActorSystem; -import akka.testkit.AkkaSpec; -import akka.testkit.AkkaJUnitActorSystemResource; - -public class ZeromqDocTest { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("ZeromqDocTest", - ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf())); - - private final ActorSystem system = actorSystemResource.getSystem(); - - @SuppressWarnings("unused") - @Test - public void demonstrateCreateSocket() { - Assume.assumeTrue(checkZeroMQInstallation()); - - //#pub-socket - ActorRef pubSocket = ZeroMQExtension.get(system).newPubSocket( - new Bind("tcp://127.0.0.1:1233")); - //#pub-socket - - //#sub-socket - ActorRef listener = system.actorOf(Props.create(ListenerActor.class)); - ActorRef subSocket = ZeroMQExtension.get(system).newSubSocket( - new Connect("tcp://127.0.0.1:1233"), - new Listener(listener), Subscribe.all()); - //#sub-socket - - //#sub-topic-socket - ActorRef subTopicSocket = ZeroMQExtension.get(system).newSubSocket( - new Connect("tcp://127.0.0.1:1233"), - new Listener(listener), new Subscribe("foo.bar")); - //#sub-topic-socket - - //#unsub-topic-socket - subTopicSocket.tell(new Unsubscribe("foo.bar"), ActorRef.noSender()); - //#unsub-topic-socket - - byte[] payload = new byte[0]; - //#pub-topic - pubSocket.tell(ZMQMessage.withFrames(ByteString.fromString("foo.bar"), - ByteString.fromArray(payload)), ActorRef.noSender()); - //#pub-topic - - system.stop(subSocket); - system.stop(subTopicSocket); - - //#high-watermark - ActorRef highWatermarkSocket = ZeroMQExtension.get(system).newRouterSocket( - new SocketOption[] { new Listener(listener), - new Bind("tcp://127.0.0.1:1233"), new HighWatermark(50000) }); - //#high-watermark - } - - @Test - public void demonstratePubSub() throws Exception { - Assume.assumeTrue(checkZeroMQInstallation()); - - //#health2 - - system.actorOf(Props.create(HealthProbe.class), "health"); - //#health2 - - //#logger2 - - system.actorOf(Props.create(Logger.class), "logger"); - //#logger2 - - //#alerter2 - - system.actorOf(Props.create(HeapAlerter.class), "alerter"); - //#alerter2 - - // Let it run for a while to see some output. - // Don't do like this in real tests, this is only doc demonstration. - Thread.sleep(3000L); - } - - private boolean checkZeroMQInstallation() { - try { - ZeroMQVersion v = ZeroMQExtension.get(system).version(); - return (v.major() >= 3 || (v.major() >= 2 && v.minor() >= 1)); - } catch (LinkageError e) { - return false; - } - } - - static - //#listener-actor - public class ListenerActor extends UntypedActor { - public void onReceive(Object message) throws Exception { - //... - } - } - //#listener-actor - - static - //#health - public final Object TICK = "TICK"; - - //#health - static - //#health - public class Heap implements Serializable { - private static final long serialVersionUID = 1L; - public final long timestamp; - public final long used; - public final long max; - - public Heap(long timestamp, long used, long max) { - this.timestamp = timestamp; - this.used = used; - this.max = max; - } - } - - //#health - static - //#health - public class Load implements Serializable { - private static final long serialVersionUID = 1L; - public final long timestamp; - public final double loadAverage; - - public Load(long timestamp, double loadAverage) { - this.timestamp = timestamp; - this.loadAverage = loadAverage; - } - } - - //#health - static - //#health - public class HealthProbe extends UntypedActor { - - ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket( - new Bind("tcp://127.0.0.1:1237")); - MemoryMXBean memory = ManagementFactory.getMemoryMXBean(); - OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); - Serialization ser = SerializationExtension.get(getContext().system()); - - @Override - public void preStart() { - getContext().system().scheduler() - .schedule(Duration.create(1, "second"), Duration.create(1, "second"), - getSelf(), TICK, getContext().dispatcher(), ActorRef.noSender()); - } - - @Override - public void postRestart(Throwable reason) { - // don't call preStart, only schedule once - } - - @Override - public void onReceive(Object message) { - if (message.equals(TICK)) { - MemoryUsage currentHeap = memory.getHeapMemoryUsage(); - long timestamp = System.currentTimeMillis(); - - // use akka SerializationExtension to convert to bytes - ByteString heapTopic = ByteString.fromString("health.heap", "UTF-8"); - ByteString heapPayload = ByteString.fromArray( - ser.serialize( - new Heap(timestamp, - currentHeap.getUsed(), - currentHeap.getMax()) - ).get()); - // the first frame is the topic, second is the message - pubSocket.tell(ZMQMessage.withFrames(heapTopic, heapPayload), getSelf()); - - // use akka SerializationExtension to convert to bytes - ByteString loadTopic = ByteString.fromString("health.load", "UTF-8"); - ByteString loadPayload = ByteString.fromArray( - ser.serialize(new Load(timestamp, os.getSystemLoadAverage())).get() - ); - // the first frame is the topic, second is the message - pubSocket.tell(ZMQMessage.withFrames(loadTopic, loadPayload), getSelf()); - } else { - unhandled(message); - } - } - - } - //#health - - static - //#logger - public class Logger extends UntypedActor { - - ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket( - new Connect("tcp://127.0.0.1:1237"), - new Listener(getSelf()), new Subscribe("health")); - Serialization ser = SerializationExtension.get(getContext().system()); - SimpleDateFormat timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS"); - LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - @Override - public void onReceive(Object message) { - if (message instanceof ZMQMessage) { - ZMQMessage m = (ZMQMessage) message; - String topic = m.frame(0).utf8String(); - // the first frame is the topic, second is the message - if ("health.heap".equals(topic)) { - Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get(); - log.info("Used heap {} bytes, at {}", heap.used, - timestampFormat.format(new Date(heap.timestamp))); - } else if ("health.load".equals(topic)) { - Load load = ser.deserialize(m.frame(1).toArray(), Load.class).get(); - log.info("Load average {}, at {}", load.loadAverage, - timestampFormat.format(new Date(load.timestamp))); - } - } else { - unhandled(message); - } - } - - } - - //#logger - - static - //#alerter - public class HeapAlerter extends UntypedActor { - - ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket( - new Connect("tcp://127.0.0.1:1237"), - new Listener(getSelf()), new Subscribe("health.heap")); - Serialization ser = SerializationExtension.get(getContext().system()); - LoggingAdapter log = Logging.getLogger(getContext().system(), this); - int count = 0; - - @Override - public void onReceive(Object message) { - if (message instanceof ZMQMessage) { - ZMQMessage m = (ZMQMessage) message; - String topic = m.frame(0).utf8String(); - // the first frame is the topic, second is the message - if ("health.heap".equals(topic)) { - Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get(); - if (((double) heap.used / heap.max) > 0.9) { - count += 1; - } else { - count = 0; - } - if (count > 10) { - log.warning("Need more memory, using {} %", - (100.0 * heap.used / heap.max)); - } - } - } else { - unhandled(message); - } - } - - } - //#alerter - -} diff --git a/akka-docs/rst/java/index-network.rst b/akka-docs/rst/java/index-network.rst index 0587e2f9d1..6f26d9d457 100644 --- a/akka-docs/rst/java/index-network.rst +++ b/akka-docs/rst/java/index-network.rst @@ -11,5 +11,4 @@ Networking io io-tcp io-udp - zeromq camel diff --git a/akka-docs/rst/java/zeromq.rst b/akka-docs/rst/java/zeromq.rst deleted file mode 100644 index 201a3d73a7..0000000000 --- a/akka-docs/rst/java/zeromq.rst +++ /dev/null @@ -1,146 +0,0 @@ - -.. _zeromq-java: - -############### - ZeroMQ -############### - - -Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket. - -ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it. - -The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library. -The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out. - -.. note:: - - The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported. - -Connection -========== - -ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created. -Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example: - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-pub-socket - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#pub-socket - -Above examples will create a ZeroMQ Publisher socket that is Bound to the port 21231 on localhost. - -Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using: - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-sub-socket - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#sub-socket - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#listener-actor - -The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide `_. - -Publisher-Subscriber Connection -------------------------------- - -In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall -subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can -subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages -to one or more actors that do not interact with the actor that sent the message. - -When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber. - -An actor is subscribed to a topic as follows: - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#sub-topic-socket - -It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or -``Subscribe.all()`` is used, the actor is subscribed to all topics. - -To unsubscribe from a topic you do the following: - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-unsub-topic-socket - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#unsub-topic-socket - -To publish messages to a topic you must use two Frames with the topic in the first frame. - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-pub-topic - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#pub-topic - -Pub-Sub in Action -^^^^^^^^^^^^^^^^^ - -The following example illustrates one publisher with two subscribers. - -The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic -and ``Load`` events on the ``"health.load"`` topic. - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-health - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#health - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#health2 - -Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and -``Load`` events. - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#logger - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#logger2 - -Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events. - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#alerter - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#alerter2 - -Router-Dealer Connection ------------------------- - -While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic. -When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy. -With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events. - -To create a Router socket that has a high watermark configured, you would do: - -.. includecode:: code/docs/zeromq/ZeromqDocTest.java#high-watermark - -The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket. - -Push-Pull Connection --------------------- - -Akka ZeroMQ module supports ``Push-Pull`` connections. - -You can create a ``Push`` connection through the:: - - ActorRef newPushSocket(SocketOption[] socketParameters); - -You can create a ``Pull`` connection through the:: - - ActorRef newPullSocket(SocketOption[] socketParameters); - -More documentation and examples will follow soon. - -Rep-Req Connection ------------------- - -Akka ZeroMQ module supports ``Rep-Req`` connections. - -You can create a ``Rep`` connection through the:: - - ActorRef newRepSocket(SocketOption[] socketParameters); - -You can create a ``Req`` connection through the:: - - ActorRef newReqSocket(SocketOption[] socketParameters); - -More documentation and examples will follow soon. - -Configuration -============= - -There are several configuration properties for the zeromq module, please refer -to the :ref:`reference configuration `. - diff --git a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala deleted file mode 100644 index 047b7fc124..0000000000 --- a/akka-docs/rst/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package docs.zeromq - -import language.postfixOps -import scala.concurrent.duration._ -import akka.actor.{ Actor, Props } -import akka.util.ByteString -import akka.testkit._ -import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind } -import java.text.SimpleDateFormat -import java.util.Date -import akka.actor.ActorRef - -object ZeromqDocSpec { - - //#health - import akka.zeromq._ - import akka.actor.Actor - import akka.actor.Props - import akka.actor.ActorLogging - import akka.serialization.SerializationExtension - import java.lang.management.ManagementFactory - - case object Tick - final case class Heap(timestamp: Long, used: Long, max: Long) - final case class Load(timestamp: Long, loadAverage: Double) - - class HealthProbe extends Actor { - - val pubSocket = ZeroMQExtension(context.system).newSocket(SocketType.Pub, - Bind("tcp://127.0.0.1:1235")) - val memory = ManagementFactory.getMemoryMXBean - val os = ManagementFactory.getOperatingSystemMXBean - val ser = SerializationExtension(context.system) - import context.dispatcher - - override def preStart() { - context.system.scheduler.schedule(1 second, 1 second, self, Tick) - } - - override def postRestart(reason: Throwable) { - // don't call preStart, only schedule once - } - - def receive: Receive = { - case Tick => - val currentHeap = memory.getHeapMemoryUsage - val timestamp = System.currentTimeMillis - - // use akka SerializationExtension to convert to bytes - val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, - currentHeap.getMax)).get - // the first frame is the topic, second is the message - pubSocket ! ZMQMessage(ByteString("health.heap"), ByteString(heapPayload)) - - // use akka SerializationExtension to convert to bytes - val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get - // the first frame is the topic, second is the message - pubSocket ! ZMQMessage(ByteString("health.load"), ByteString(loadPayload)) - } - } - //#health - - //#logger - class Logger extends Actor with ActorLogging { - - ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), - Connect("tcp://127.0.0.1:1235"), Subscribe("health")) - val ser = SerializationExtension(context.system) - val timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS") - - def receive = { - // the first frame is the topic, second is the message - case m: ZMQMessage if m.frames(0).utf8String == "health.heap" => - val Heap(timestamp, used, max) = ser.deserialize(m.frames(1).toArray, - classOf[Heap]).get - log.info("Used heap {} bytes, at {}", used, - timestampFormat.format(new Date(timestamp))) - - case m: ZMQMessage if m.frames(0).utf8String == "health.load" => - val Load(timestamp, loadAverage) = ser.deserialize(m.frames(1).toArray, - classOf[Load]).get - log.info("Load average {}, at {}", loadAverage, - timestampFormat.format(new Date(timestamp))) - } - } - //#logger - - //#alerter - class HeapAlerter extends Actor with ActorLogging { - - ZeroMQExtension(context.system).newSocket(SocketType.Sub, - Listener(self), Connect("tcp://127.0.0.1:1235"), Subscribe("health.heap")) - val ser = SerializationExtension(context.system) - var count = 0 - - def receive = { - // the first frame is the topic, second is the message - case m: ZMQMessage if m.frames(0).utf8String == "health.heap" => - val Heap(timestamp, used, max) = - ser.deserialize(m.frames(1).toArray, classOf[Heap]).get - if ((used.toDouble / max) > 0.9) count += 1 - else count = 0 - if (count > 10) log.warning("Need more memory, using {} %", - (100.0 * used / max)) - } - } - //#alerter - -} - -class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") { - import ZeromqDocSpec._ - - "demonstrate how to create socket" in { - checkZeroMQInstallation() - - //#pub-socket - import akka.zeromq.ZeroMQExtension - val pubSocket = ZeroMQExtension(system).newSocket(SocketType.Pub, - Bind("tcp://127.0.0.1:21231")) - //#pub-socket - - import akka.zeromq._ - val sub: { def subSocket: ActorRef; def listener: ActorRef } = new AnyRef { - //#sub-socket - import akka.zeromq._ - - class Listener extends Actor { - def receive: Receive = { - case Connecting => //... - case m: ZMQMessage => //... - case _ => //... - } - } - - val listener = system.actorOf(Props(classOf[Listener], this)) - val subSocket = ZeroMQExtension(system).newSocket(SocketType.Sub, - Listener(listener), Connect("tcp://127.0.0.1:21231"), SubscribeAll) - //#sub-socket - } - val listener = sub.listener - - //#sub-topic-socket - val subTopicSocket = ZeroMQExtension(system).newSocket(SocketType.Sub, - Listener(listener), Connect("tcp://127.0.0.1:21231"), Subscribe("foo.bar")) - //#sub-topic-socket - - //#unsub-topic-socket - subTopicSocket ! Unsubscribe("foo.bar") - //#unsub-topic-socket - - val payload = Array.empty[Byte] - //#pub-topic - pubSocket ! ZMQMessage(ByteString("foo.bar"), ByteString(payload)) - //#pub-topic - - system.stop(sub.subSocket) - system.stop(subTopicSocket) - - //#high-watermark - val highWatermarkSocket = ZeroMQExtension(system).newSocket( - SocketType.Router, - Listener(listener), - Bind("tcp://127.0.0.1:21233"), - HighWatermark(50000)) - //#high-watermark - } - - "demonstrate pub-sub" in { - checkZeroMQInstallation() - - //#health - - system.actorOf(Props[HealthProbe], name = "health") - //#health - - //#logger - - system.actorOf(Props[Logger], name = "logger") - //#logger - - //#alerter - - system.actorOf(Props[HeapAlerter], name = "alerter") - //#alerter - - // Let it run for a while to see some output. - // Don't do like this in real tests, this is only doc demonstration. - Thread.sleep(3.seconds.toMillis) - - } - - def checkZeroMQInstallation() = try { - ZeroMQExtension(system).version match { - case ZeroMQVersion(2, x, _) if x >= 1 => Unit - case ZeroMQVersion(y, _, _) if y >= 3 => Unit - case version => pending - } - } catch { - case e: LinkageError => pending - } -} diff --git a/akka-docs/rst/scala/index-network.rst b/akka-docs/rst/scala/index-network.rst index 0587e2f9d1..6f26d9d457 100644 --- a/akka-docs/rst/scala/index-network.rst +++ b/akka-docs/rst/scala/index-network.rst @@ -11,5 +11,4 @@ Networking io io-tcp io-udp - zeromq camel diff --git a/akka-docs/rst/scala/zeromq.rst b/akka-docs/rst/scala/zeromq.rst deleted file mode 100644 index be2a8ef010..0000000000 --- a/akka-docs/rst/scala/zeromq.rst +++ /dev/null @@ -1,129 +0,0 @@ - -.. _zeromq-scala: - -################ - ZeroMQ -################ - - -Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket. - -ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it. - -The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library. -The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out. - -.. note:: - - The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported. - -Connection -========== - -ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created. -Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example: - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#pub-socket - - -Above examples will create a ZeroMQ Publisher socket that is Bound to the port 21231 on localhost. - -Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using: - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#sub-socket - -The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide `_. - -Publisher-Subscriber Connection -------------------------------- - -In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall -subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can -subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages -to one or more actors that do not interact with the actor that sent the message. - -When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber. - -An actor is subscribed to a topic as follows: - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#sub-topic-socket - -It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or -``SubscribeAll`` is used, the actor is subscribed to all topics. - -To unsubscribe from a topic you do the following: - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#unsub-topic-socket - -To publish messages to a topic you must use two Frames with the topic in the first frame. - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#pub-topic - -Pub-Sub in Action -^^^^^^^^^^^^^^^^^ - -The following example illustrates one publisher with two subscribers. - -The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic -and ``Load`` events on the ``"health.load"`` topic. - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#health - -Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and -``Load`` events. - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#logger - -Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events. - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#alerter - -Router-Dealer Connection ------------------------- - -While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic. -When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy. -With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events. - -To create a Router socket that has a high watermark configured, you would do: - -.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#high-watermark - -The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket. - -Push-Pull Connection --------------------- - -Akka ZeroMQ module supports ``Push-Pull`` connections. - -You can create a ``Push`` connection through the:: - - def newPushSocket(socketParameters: Array[SocketOption]): ActorRef - -You can create a ``Pull`` connection through the:: - - def newPullSocket(socketParameters: Array[SocketOption]): ActorRef - -More documentation and examples will follow soon. - -Rep-Req Connection ------------------- - -Akka ZeroMQ module supports ``Rep-Req`` connections. - -You can create a ``Rep`` connection through the:: - - def newRepSocket(socketParameters: Array[SocketOption]): ActorRef - -You can create a ``Req`` connection through the:: - - def newReqSocket(socketParameters: Array[SocketOption]): ActorRef - -More documentation and examples will follow soon. - -Configuration -============= - -There are several configuration properties for the zeromq module, please refer -to the :ref:`reference configuration `. - diff --git a/akka-zeromq/build.sbt b/akka-zeromq/build.sbt deleted file mode 100644 index 4a8b32e32a..0000000000 --- a/akka-zeromq/build.sbt +++ /dev/null @@ -1,16 +0,0 @@ -import akka.{ AkkaBuild, Dependencies, Formatting, OSGi, Unidoc } -import com.typesafe.tools.mima.plugin.MimaKeys - -AkkaBuild.defaultSettings - -Formatting.formatSettings - -Unidoc.scaladocSettings - -Unidoc.javadocSettings - -OSGi.zeroMQ - -Dependencies.zeroMQ - -MimaKeys.previousArtifact := akkaPreviousArtifact("akka-zeromq").value diff --git a/akka-zeromq/src/main/resources/reference.conf b/akka-zeromq/src/main/resources/reference.conf deleted file mode 100644 index 5834b839da..0000000000 --- a/akka-zeromq/src/main/resources/reference.conf +++ /dev/null @@ -1,27 +0,0 @@ -##################################### -# Akka ZeroMQ Reference Config File # -##################################### - -# This is the reference config file that contains all the default settings. -# Make your edits/overrides in your application.conf. - -akka { - - zeromq { - - # The default timeout for a poll on the actual zeromq socket. - poll-timeout = 100ms - - # Timeout for creating a new socket - new-socket-timeout = 5s - - socket-dispatcher { - # A zeromq socket needs to be pinned to the thread that created it. - # Changing this value results in weird errors and race conditions within - # zeromq - executor = thread-pool-executor - type = "PinnedDispatcher" - thread-pool-executor.allow-core-timeout = off - } - } -} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala deleted file mode 100644 index d50dae34fa..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -import org.zeromq.ZMQ.{ Socket, Poller } -import org.zeromq.{ ZMQ ⇒ JZMQ } -import akka.actor._ -import scala.collection.immutable -import scala.annotation.tailrec -import scala.concurrent.{ Promise, Future } -import scala.concurrent.duration.Duration -import scala.collection.mutable.ListBuffer -import scala.util.control.NonFatal -import akka.event.Logging -import java.util.concurrent.TimeUnit -import akka.util.ByteString - -private[zeromq] object ConcurrentSocketActor { - private sealed trait PollMsg - private case object Poll extends PollMsg - private case object PollCareful extends PollMsg - - private case object Flush - - private class NoSocketHandleException() extends Exception("Couldn't create a zeromq socket.") - - private val DefaultContext = Context() -} -private[zeromq] class ConcurrentSocketActor(params: immutable.Seq[SocketOption]) extends Actor { - - import ConcurrentSocketActor._ - private val zmqContext = params collectFirst { case c: Context ⇒ c } getOrElse DefaultContext - - private var deserializer = params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer - private val socketType = { - import SocketType.{ ZMQSocketType ⇒ ST } - params.collectFirst { case t: ST ⇒ t }.getOrElse(throw new IllegalArgumentException("A socket type is required")) - } - - private val socket: Socket = zmqContext.socket(socketType) - private val poller: Poller = zmqContext.poller - - private val pendingSends = new ListBuffer[immutable.Seq[ByteString]] - - def receive = { - case m: PollMsg ⇒ doPoll(m) - case ZMQMessage(frames) ⇒ handleRequest(Send(frames)) - case r: Request ⇒ handleRequest(r) - case Flush ⇒ flush() - case Terminated(_) ⇒ context stop self - } - - private def handleRequest(msg: Request): Unit = msg match { - case Send(frames) ⇒ - if (frames.nonEmpty) { - val flushNow = pendingSends.isEmpty - pendingSends.append(frames) - if (flushNow) flush() - } - case opt: SocketOption ⇒ handleSocketOption(opt) - case q: SocketOptionQuery ⇒ handleSocketOptionQuery(q) - } - - private def handleConnectOption(msg: SocketConnectOption): Unit = msg match { - case Connect(endpoint) ⇒ { socket.connect(endpoint); notifyListener(Connecting) } - case Bind(endpoint) ⇒ socket.bind(endpoint) - } - - private def handlePubSubOption(msg: PubSubOption): Unit = msg match { - case Subscribe(topic) ⇒ socket.subscribe(topic.toArray) - case Unsubscribe(topic) ⇒ socket.unsubscribe(topic.toArray) - } - - private def handleSocketOption(msg: SocketOption): Unit = msg match { - case x: SocketMeta ⇒ throw new IllegalStateException("SocketMeta " + x + " only allowed for setting up a socket") - case c: SocketConnectOption ⇒ handleConnectOption(c) - case ps: PubSubOption ⇒ handlePubSubOption(ps) - case Linger(value) ⇒ socket.setLinger(value) - case ReconnectIVL(value) ⇒ socket.setReconnectIVL(value) - case Backlog(value) ⇒ socket.setBacklog(value) - case ReconnectIVLMax(value) ⇒ socket.setReconnectIVLMax(value) - case MaxMsgSize(value) ⇒ socket.setMaxMsgSize(value) - case SendHighWatermark(value) ⇒ socket.setSndHWM(value) - case ReceiveHighWatermark(value) ⇒ socket.setRcvHWM(value) - case HighWatermark(value) ⇒ socket.setHWM(value) - case Swap(value) ⇒ socket.setSwap(value) - case Affinity(value) ⇒ socket.setAffinity(value) - case Identity(value) ⇒ socket.setIdentity(value) - case Rate(value) ⇒ socket.setRate(value) - case RecoveryInterval(value) ⇒ socket.setRecoveryInterval(value) - case MulticastLoop(value) ⇒ socket.setMulticastLoop(value) - case MulticastHops(value) ⇒ socket.setMulticastHops(value) - case SendBufferSize(value) ⇒ socket.setSendBufferSize(value) - case ReceiveBufferSize(value) ⇒ socket.setReceiveBufferSize(value) - case d: Deserializer ⇒ deserializer = d - } - - private def handleSocketOptionQuery(msg: SocketOptionQuery): Unit = - sender() ! (msg match { - case Linger ⇒ socket.getLinger - case ReconnectIVL ⇒ socket.getReconnectIVL - case Backlog ⇒ socket.getBacklog - case ReconnectIVLMax ⇒ socket.getReconnectIVLMax - case MaxMsgSize ⇒ socket.getMaxMsgSize - case SendHighWatermark ⇒ socket.getSndHWM - case ReceiveHighWatermark ⇒ socket.getRcvHWM - case Swap ⇒ socket.getSwap - case Affinity ⇒ socket.getAffinity - case Identity ⇒ socket.getIdentity - case Rate ⇒ socket.getRate - case RecoveryInterval ⇒ socket.getRecoveryInterval - case MulticastLoop ⇒ socket.hasMulticastLoop - case MulticastHops ⇒ socket.getMulticastHops - case SendBufferSize ⇒ socket.getSendBufferSize - case ReceiveBufferSize ⇒ socket.getReceiveBufferSize - case FileDescriptor ⇒ socket.getFD - }) - - override def preStart { - watchListener() - setupSocket() - poller.register(socket, Poller.POLLIN) - setupConnection() - - import SocketType._ - socketType match { - case Pub | Push ⇒ // don’t poll - case Sub | Pull | Pair | Dealer | Router ⇒ self ! Poll - case Req | Rep ⇒ self ! PollCareful - } - } - - private def setupConnection(): Unit = { - params filter (_.isInstanceOf[SocketConnectOption]) foreach { self ! _ } - params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ } - } - - private def setupSocket() = params foreach { - case _: SocketConnectOption | _: PubSubOption | _: SocketMeta ⇒ // ignore, handled differently - case m ⇒ self ! m - } - - override def preRestart(reason: Throwable, message: Option[Any]): Unit = context.children foreach context.stop //Do not call postStop - - override def postRestart(reason: Throwable): Unit = () // Do nothing - - override def postStop: Unit = try { - if (socket != null) { - poller.unregister(socket) - socket.close - } - } finally notifyListener(Closed) - - @tailrec private def flushMessage(i: immutable.Seq[ByteString]): Boolean = - if (i.isEmpty) - true - else { - val head = i.head - val tail = i.tail - if (socket.send(head.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail) - else { - pendingSends.prepend(i) // Reenqueue the rest of the message so the next flush takes care of it - self ! Flush - false - } - } - - @tailrec private def flush(): Unit = - if (pendingSends.nonEmpty && flushMessage(pendingSends.remove(0))) flush() // Flush while things are going well - - // this is a “PollMsg=>Unit” which either polls or schedules Poll, depending on the sign of the timeout - private val doPollTimeout = { - val ext = ZeroMQExtension(context.system) - val fromConfig = params collectFirst { case PollTimeoutDuration(duration) ⇒ duration } - val duration = (fromConfig getOrElse ext.DefaultPollTimeout) - if (duration > Duration.Zero) { - // for positive timeout values, do poll (i.e. block this thread) - val pollLength = duration.toUnit(ext.pollTimeUnit).toLong - (msg: PollMsg) ⇒ - poller.poll(pollLength) - self ! msg - } else { - val d = -duration - - { (msg: PollMsg) ⇒ - // for negative timeout values, schedule Poll token -duration into the future - import context.dispatcher - context.system.scheduler.scheduleOnce(d, self, msg) - () - } - } - } - - @tailrec private def doPoll(mode: PollMsg, togo: Int = 10): Unit = - if (togo <= 0) self ! mode - else receiveMessage(mode) match { - case Seq() ⇒ doPollTimeout(mode) - case frames ⇒ notifyListener(deserializer(frames)); doPoll(mode, togo - 1) - } - - @tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[ByteString] = Vector.empty): immutable.Seq[ByteString] = - if (mode == PollCareful && (poller.poll(0) <= 0)) { - if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!") - } else { - socket.recv(if (mode == Poll) JZMQ.NOBLOCK else 0) match { - case null ⇒ /*EAGAIN*/ - if (currentFrames.isEmpty) currentFrames else receiveMessage(mode, currentFrames) - case bytes ⇒ - val frames = currentFrames :+ ByteString(bytes) - if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames - } - } - - private val listenerOpt = params collectFirst { case Listener(l) ⇒ l } - private def watchListener(): Unit = listenerOpt foreach context.watch - private def notifyListener(message: Any): Unit = listenerOpt foreach { _ ! message } -} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/Response.scala b/akka-zeromq/src/main/scala/akka/zeromq/Response.scala deleted file mode 100644 index 7a21f30713..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/Response.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -/** - * Base trait for the events raised by a ZeroMQ socket actor - */ -sealed trait Response - -/** - * When the ZeroMQ socket connects it sends this message to a listener - */ -case object Connecting extends Response { - /** - * Java API: get the singleton instance - */ - def getInstance = this -} -/** - * When the ZeroMQ socket disconnects it sends this message to a listener - */ -case object Closed extends Response { - /** - * Java API: get the singleton instance - */ - def getInstance = this -} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala deleted file mode 100644 index 6e4cdcaa66..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala +++ /dev/null @@ -1,537 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -import com.google.protobuf.Message -import akka.actor.ActorRef -import scala.concurrent.duration._ -import scala.collection.immutable -import org.zeromq.{ ZMQ ⇒ JZMQ } -import org.zeromq.ZMQ.{ Poller, Socket } -import akka.japi.Util.immutableSeq -import akka.util.ByteString -import akka.util.Collections.EmptyImmutableSeq -import annotation.varargs - -/** - * Marker trait representing request messages for zeromq - */ -sealed trait Request - -/** - * Marker trait representing the base for all socket options - */ -sealed trait SocketOption extends Request - -/** - * Marker trait representing the base for all meta operations for a socket - * such as the context, listener, socket type and poller dispatcher - */ -sealed trait SocketMeta extends SocketOption - -/** - * A base trait for connection options for a ZeroMQ socket - */ -sealed trait SocketConnectOption extends SocketOption { - def endpoint: String -} - -/** - * A base trait for pubsub options for the ZeroMQ socket - */ -sealed trait PubSubOption extends SocketOption { - def payload: ByteString -} - -/** - * A marker trait to group option queries together - */ -sealed trait SocketOptionQuery extends Request - -/** - * This socket should be a client socket and connect to the specified endpoint - * - * @param endpoint URI (ex. tcp://127.0.0.1:5432) - */ -final case class Connect(endpoint: String) extends SocketConnectOption - -/** - * Companion object for a ZeroMQ I/O thread pool - */ -object Context { - def apply(numIoThreads: Int = 1): Context = new Context(numIoThreads) -} - -/** - * Represents an I/O thread pool for ZeroMQ sockets. - * By default the ZeroMQ module uses an I/O thread pool with 1 thread. - * For most applications that should be sufficient - * - * @param numIoThreads - */ -class Context(numIoThreads: Int) extends SocketMeta { - private val context = JZMQ.context(numIoThreads) - - def socket(socketType: SocketType.ZMQSocketType): Socket = context.socket(socketType.id) - - def poller: Poller = context.poller - - def term(): Unit = context.term() -} - -/** - * A base trait for message deserializers - */ -trait Deserializer extends SocketOption { - def apply(frames: immutable.Seq[ByteString]): Any -} - -/** - * The different socket types you can create with zeromq - */ -object SocketType { - - abstract class ZMQSocketType(val id: Int) extends SocketMeta - - /** - * A Publisher socket - */ - object Pub extends ZMQSocketType(JZMQ.PUB) - - /** - * A subscriber socket - */ - object Sub extends ZMQSocketType(JZMQ.SUB) - - /** - * A dealer socket - */ - object Dealer extends ZMQSocketType(JZMQ.DEALER) - - /** - * A router socket - */ - object Router extends ZMQSocketType(JZMQ.ROUTER) - - /** - * A request socket - */ - object Req extends ZMQSocketType(JZMQ.REQ) - - /** - * A reply socket - */ - object Rep extends ZMQSocketType(JZMQ.REP) - - /** - * A push socket - */ - object Push extends ZMQSocketType(JZMQ.PUSH) - - /** - * A pull socket - */ - object Pull extends ZMQSocketType(JZMQ.PULL) - - /** - * A Pair socket - */ - object Pair extends ZMQSocketType(JZMQ.PAIR) -} - -/** - * An option containing the listener for the socket - * @param listener - */ -final case class Listener(listener: ActorRef) extends SocketMeta - -/** - * An option containing the configuration key for the poller loop dispatcher - * @param name - */ -final case class PollDispatcher(name: String) extends SocketMeta - -/** - * An option containing the duration a poll cycle should wait for a message before it loops - * @param duration - */ -final case class PollTimeoutDuration(duration: FiniteDuration = 100 millis) extends SocketMeta - -/** - * Start listening with this server socket on the specified address - * - * @param endpoint - */ -final case class Bind(endpoint: String) extends SocketConnectOption - -/** - * The [[akka.zeromq.Subscribe]] option establishes a new message filter on a [[akka.zeromq.SocketType.Pub]] socket. - * Newly created [[akka.zeromq.SocketType.Sub]] sockets filter out all incoming messages, - * therefore you should send this option to establish an initial message filter. - * - * An empty payload of length zero will subscribe to all incoming messages. - * A non-empty payload will subscribe to all messages beginning with the specified prefix. - * Multiple filters may be attached to a single [[akka.zeromq.SocketType.Sub]] socket, - * in which case a message will be accepted if it matches at least one filter. - * - * @param payload the topic to subscribe to - */ -final case class Subscribe(payload: ByteString) extends PubSubOption { - def this(topic: String) = this(ByteString(topic)) -} -object Subscribe { - val all = Subscribe(ByteString.empty) - def apply(topic: String): Subscribe = topic match { - case null | "" ⇒ all - case t ⇒ new Subscribe(t) - } -} - -/** - * The [[akka.zeromq.Unsubscribe]] option shall remove an existing message filter - * on a [[akka.zeromq.SocketType.Sub]] socket. The filter specified must match an existing filter - * previously established with the [[akka.zeromq.Subscribe]] option. If the socket has several instances of the - * same filter attached the [[akka.zeromq.Unsubscribe]] option shall remove only one instance, leaving the rest - * in place and functional. - * - * @param payload - */ -final case class Unsubscribe(payload: ByteString) extends PubSubOption { - def this(topic: String) = this(ByteString(topic)) -} -object Unsubscribe { - def apply(topic: String): Unsubscribe = new Unsubscribe(topic) -} - -/** - * Send a message over the zeromq socket - * @param frames - */ -final case class Send(frames: immutable.Seq[ByteString]) extends Request - -/** - * A message received over the zeromq socket - * @param frames - */ -final case class ZMQMessage(frames: immutable.Seq[ByteString]) { - def frame(frameIndex: Int): ByteString = frames(frameIndex) -} -object ZMQMessage { - val empty = new ZMQMessage(EmptyImmutableSeq) - - /** - * Scala API - * @param frames the frames of the returned ZMQMessage - * @return a ZMQMessage with the given frames - */ - def apply(frames: ByteString*): ZMQMessage = - if ((frames eq null) || frames.length == 0) empty else new ZMQMessage(frames.to[immutable.Seq]) - - /** - * Java API: create a message from the given frames - * - * @param frames the frames of the returned ZMQMessage - * @return a ZMQMessage with the given frames - */ - @varargs def withFrames(frames: ByteString*): ZMQMessage = apply(frames: _*) - - def apply[T](frames: T*)(implicit converter: T ⇒ ByteString): ZMQMessage = apply(frames map converter: _*) -} - -/** - * Configure this socket to have a linger of the specified value - * - * The linger period determines how long pending messages which have yet to be sent to a peer shall linger - * in memory after a socket is closed, and further affects the termination of the socket's context. - * - * The following outlines the different behaviours: - *
    - *
  • The default value of -1 specifies an infinite linger period. - * Pending messages shall not be discarded after the socket is closed; - * attempting to terminate the socket's context shall block until all pending messages - * have been sent to a peer.
  • - *
  • The value of 0 specifies no linger period. Pending messages shall be discarded immediately when the socket is closed.
  • - *
  • Positive values specify an upper bound for the linger period in milliseconds. - * Pending messages shall not be discarded after the socket is closed; - * attempting to terminate the socket's context shall block until either all pending messages have been sent to a peer, - * or the linger period expires, after which any pending messages shall be discarded.
  • - *
- * - * @param value The value in milliseconds for the linger option - */ -final case class Linger(value: Long) extends SocketOption - -/** - * Gets the linger option @see [[akka.zeromq.Linger]] - */ -object Linger extends SocketOptionQuery { - val no: Linger = Linger(0) -} - -/** - * Sets the recovery interval for multicast transports using the specified socket. - * The recovery interval determines the maximum time in seconds that a receiver can be absent from a multicast group - * before unrecoverable data loss will occur. - * - * Exercise care when setting large recovery intervals as the data needed for recovery will be held in memory. - * For example, a 1 minute recovery interval at a data rate of 1Gbps requires a 7GB in-memory buffer. - * - * @param value The interval in seconds - */ -final case class ReconnectIVL(value: Long) extends SocketOption - -/** - * Gets the recover interval @see [[akka.zeromq.ReconnectIVL]] - */ -object ReconnectIVL extends SocketOptionQuery - -/** - * The [[akka.zeromq.ReconnectIVLMax]] option shall set the maximum reconnection interval for the specified socket. - * This is the maximum period ØMQ shall wait between attempts to reconnect. On each reconnect attempt, - * the previous interval shall be doubled untill [[akka.zeromq.ReconnectIVLMax]] is reached. - * This allows for exponential backoff strategy. Default value means no exponential backoff is performed - * and reconnect interval calculations are only based on [[akka.zeromq.ReconnectIVL]]. - * - * @see [[akka.zeromq.ReconnectIVL]] - * - * This is a ZeroMQ 3.0 option - * - * @param value - */ -final case class ReconnectIVLMax(value: Long) extends SocketOption -/** - * Gets the max reconnect IVL - * @see [[akka.zeromq.ReconnectIVLMax]] - */ -object ReconnectIVLMax extends SocketOptionQuery - -/** - * The [[akka.zeromq.Backlog]] option shall set the maximum length of the queue of outstanding peer connections - * for the specified socket; this only applies to connection-oriented transports. For details refer to your - * operating system documentation for the listen function. - * - * @param value - */ -final case class Backlog(value: Long) extends SocketOption -/** - * Gets the backlog - * @see [[akka.zeromq.Backlog]] - */ -object Backlog extends SocketOptionQuery - -/** - * Limits the size of the inbound message. - * If a peer sends a message larger than [[akka.zeromq.MaxMsgSize]] it is disconnected. - * Value of -1 means no limit. - * - * This is a ZeroMQ 3.0 option - * - * @param value - */ -final case class MaxMsgSize(value: Long) extends SocketOption -object MaxMsgSize extends SocketOptionQuery - -/** - * The [[akka.zeromq.SendHighWatermark]] option shall set the high water mark for outbound messages on the specified socket. - * The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory - * for any single peer that the specified socket is communicating with. - * - * If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, - * ØMQ shall take appropriate action such as blocking or dropping sent messages. - * - * This is a ZeroMQ 3.0 option - * - * @param value - */ -final case class SendHighWatermark(value: Long) extends SocketOption - -/** - * Gets the SendHWM - * @see [[akka.zeromq.SendHighWatermark]] - */ -object SendHighWatermark extends SocketOptionQuery - -/** - * The [[akka.zeromq.ReceiveHighWatermark]] option shall set the high water mark for inbound messages on the specified socket. - * The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue - * in memory for any single peer that the specified socket is communicating with. - * - * If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, - * ØMQ shall take appropriate action such as blocking or dropping sent messages. - * - * This is a ZeroMQ 3.0 option - * - * @param value - */ -final case class ReceiveHighWatermark(value: Long) extends SocketOption - -/** - * Gets the ReceiveHighWatermark - * @see [[akka.zeromq.ReceiveHighWatermark]] - */ -object ReceiveHighWatermark extends SocketOptionQuery - -/** - * The [[akka.zeromq.HighWatermark]] option shall set the high water mark for the specified socket. - * The high water mark is a hard limit on the maximum number of outstanding messages ØMQ shall queue in memory for - * any single peer that the specified socket is communicating with. - * - * If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, - * ØMQ shall take appropriate action such as blocking or dropping sent messages. - * The default [[akka.zeromq.HighWatermark]] value of zero means "no limit". - * - * @param value - */ -final case class HighWatermark(value: Long) extends SocketOption - -/** - * The [[akka.zeromq.Swap]] option shall set the disk offload (swap) size for the specified socket. - * A socket which has [[akka.zeromq.Swap]] set to a non-zero value may exceed its high water mark; - * in this case outstanding messages shall be offloaded to storage on disk rather than held in memory. - * - * The value of [[akka.zeromq.Swap]] defines the maximum size of the swap space in bytes. - * - * @param value - */ -final case class Swap(value: Long) extends SocketOption - -/** - * Gets the [[akka.zeromq.Swap]] - * - * @see [[akka.zeromq.Swap]] - */ -object Swap extends SocketOptionQuery - -/** - * The [[akka.zeromq.Affinity]] option shall set the I/O thread affinity for newly created connections on the specified socket. - * - * Affinity determines which threads from the ØMQ I/O thread pool associated with the socket's context shall handle - * newly created connections. A value of zero specifies no affinity, meaning that work shall be distributed fairly - * among all ØMQ I/O threads in the thread pool. For non-zero values, the lowest bit corresponds to thread 1, - * second lowest bit to thread 2 and so on. For example, a value of 3 specifies that subsequent connections - * on socket shall be handled exclusively by I/O threads 1 and 2. - * - * @param value - */ -final case class Affinity(value: Long) extends SocketOption - -/** - * Gets the [[akka.zeromq.Affinity]] value - */ -object Affinity extends SocketOptionQuery - -/** - * Sets the identity of the specified socket. Socket identity determines if existing ØMQ infrastructure - * (message queues, forwarding devices) shall be identified with a specific application and persist across multiple - * runs of the application. - * - * If the socket has no identity, each run of an application is completely separate from other runs. - * However, with identity set the socket shall re-use any existing ØMQ infrastructure configured by the previous run(s). - * Thus the application may receive messages that were sent in the meantime, message queue limits shall be shared - * with previous run(s) and so on. - * - * Identity should be at least one byte and at most 255 bytes long. - * Identities starting with binary zero are reserved for use by ØMQ infrastructure. - * - * @param value The identity string for this socket - */ -final case class Identity(value: Array[Byte]) extends SocketOption - -/** - * Gets the [[akka.zeromq.Identity]] value - */ -object Identity extends SocketOptionQuery - -/** - * Sets the maximum send or receive data rate for multicast transports such as pgm using the specified socket. - * - * @param value The kilobits per second - */ -final case class Rate(value: Long) extends SocketOption - -/** - * Gets the send or receive rate for the socket - */ -object Rate extends SocketOptionQuery - -/** - * Sets the recovery interval for multicast transports using the specified socket. - * The recovery interval determines the maximum time in seconds that a receiver can be absent from a multicast group - * before unrecoverable data loss will occur. - * - * Exercise care when setting large recovery intervals as the data needed for recovery will be held in memory. - * For example, a 1 minute recovery interval at a data rate of 1Gbps requires a 7GB in-memory buffer. - * - * @param value The interval in seconds - */ -final case class RecoveryInterval(value: Long) extends SocketOption - -/** - * Gets the [[akka.zeromq.RecoveryInterval]] - */ -object RecoveryInterval extends SocketOptionQuery - -/** - * Controls whether data sent via multicast transports using the specified socket can also be received by the sending - * host via loop-back. A value of zero disables the loop-back functionality, while the default value of 1 enables the - * loop-back functionality. Leaving multicast loop-back enabled when it is not required can have a negative impact - * on performance. Where possible, disable McastLoop in production environments. - * - * @param value Flag indicating whether or not loopback multicast is enabled - */ -final case class MulticastLoop(value: Boolean) extends SocketOption - -/** - * Gets the [[akka.zeromq.MulticastLoop]] - */ -object MulticastLoop extends SocketOptionQuery - -/** - * Sets the time-to-live field in every multicast packet sent from this socket. - * The default is 1 which means that the multicast packets don't leave the local network. - * - * This is za ZeroMQ 3.0 option - * - * @param value - */ -final case class MulticastHops(value: Long) extends SocketOption - -/** - * Gets the [[akka.zeromq.MulticastHops]] - */ -object MulticastHops extends SocketOptionQuery - -/** - * The [[akka.zeromq.SendBufferSize]] option shall set the underlying kernel transmit buffer size for the socket to - * the specified size in bytes. A value of zero means leave the OS default unchanged. - * For details please refer to your operating system documentation for the SO_SNDBUF socket option. - * - * This is a ZeroMQ 2.x only option - * - * @param value - */ -final case class SendBufferSize(value: Long) extends SocketOption - -/** - * Gets the [[akka.zeromq.SendBufferSize]] - */ -object SendBufferSize extends SocketOptionQuery - -/** - * The [[akka.zeromq.ReceiveBufferSize]] option shall set the underlying kernel receive buffer size for the socket to - * the specified size in bytes. A value of zero means leave the OS default unchanged. - * For details refer to your operating system documentation for the SO_RCVBUF socket option. - * @param value - */ -final case class ReceiveBufferSize(value: Long) extends SocketOption - -/** - * Gets the [[akka.zeromq.ReceiveBufferSize]] - */ -object ReceiveBufferSize extends SocketOptionQuery - -/** - * Gets the file descriptor associated with the ZeroMQ socket - */ -object FileDescriptor extends SocketOptionQuery diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala deleted file mode 100644 index 4dc9d25fbe..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZMQMessageDeserializer.scala +++ /dev/null @@ -1,14 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -import scala.collection.immutable -import akka.util.ByteString - -/** - * Deserializes ZeroMQ messages into an immutable sequence of frames - */ -class ZMQMessageDeserializer extends Deserializer { - def apply(frames: immutable.Seq[ByteString]): ZMQMessage = ZMQMessage(frames) -} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQ.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQ.scala deleted file mode 100644 index 8b1a993719..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQ.scala +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq; - -/** - * Java API for akka.zeromq - */ -object ZeroMQ { - - /** - * The message that is sent when an ZeroMQ socket connects. - *

- *

-   * if (message == connecting()) {
-   *   // Socket connected
-   * }
-   * 
- * - * @return the single instance of Connecting - */ - def connecting() = Connecting - - /** - * The message that is sent when an ZeroMQ socket disconnects. - *

- *

-   * if (message == closed()) {
-   *   // Socket disconnected
-   * }
-   * 
- * - * @return the single instance of Closed - */ - def closed() = Closed - - /** - * The message to ask a ZeroMQ socket for its affinity configuration. - *

- *

-   * socket.ask(affinity())
-   * 
- * - * @return the single instance of Affinity - */ - def affinity() = Affinity - - /** - * The message to ask a ZeroMQ socket for its backlog configuration. - *

- *

-   * socket.ask(backlog())
-   * 
- * - * @return the single instance of Backlog - */ - def backlog() = Backlog - - /** - * The message to ask a ZeroMQ socket for its file descriptor configuration. - *

- *

-   * socket.ask(fileDescriptor())
-   * 
- * - * @return the single instance of FileDescriptor - */ - def fileDescriptor() = FileDescriptor - - /** - * The message to ask a ZeroMQ socket for its identity configuration. - *

- *

-   * socket.ask(identity())
-   * 
- * - * @return the single instance of Identity - */ - def identity() = Identity - - /** - * The message to ask a ZeroMQ socket for its linger configuration. - *

- *

-   * socket.ask(linger())
-   * 
- * - * @return the single instance of Linger - */ - def linger() = Linger - - /** - * The message to ask a ZeroMQ socket for its max message size configuration. - *

- *

-   * socket.ask(maxMessageSize())
-   * 
- * - * @return the single instance of MaxMsgSize - */ - def maxMessageSize() = MaxMsgSize - - /** - * The message to ask a ZeroMQ socket for its multicast hops configuration. - *

- *

-   * socket.ask(multicastHops())
-   * 
- * - * @return the single instance of MulticastHops - */ - def multicastHops() = MulticastHops - - /** - * The message to ask a ZeroMQ socket for its multicast loop configuration. - *

- *

-   * socket.ask(multicastLoop())
-   * 
- * - * @return the single instance of MulticastLoop - */ - def multicastLoop() = MulticastLoop - - /** - * The message to ask a ZeroMQ socket for its rate configuration. - *

- *

-   * socket.ask(rate())
-   * 
- * - * @return the single instance of Rate - */ - def rate() = Rate - - /** - * The message to ask a ZeroMQ socket for its receive bufferSize configuration. - *

- *

-   * socket.ask(receiveBufferSize())
-   * 
- * - * @return the single instance of ReceiveBufferSize - */ - def receiveBufferSize() = ReceiveBufferSize - - /** - * The message to ask a ZeroMQ socket for its receive high watermark configuration. - *

- *

-   * socket.ask(receiveHighWatermark())
-   * 
- * - * @return the single instance of ReceiveHighWatermark - */ - def receiveHighWatermark() = ReceiveHighWatermark - - /** - * The message to ask a ZeroMQ socket for its reconnect interval configuration. - *

- *

-   * socket.ask(reconnectIVL())
-   * 
- * - * @return the single instance of ReconnectIVL - */ - def reconnectIVL() = ReconnectIVL - - /** - * The message to ask a ZeroMQ socket for its max reconnect interval configuration. - *

- *

-   * socket.ask(reconnectIVLMax())
-   * 
- * - * @return the single instance of ReconnectIVLMax - */ - def reconnectIVLMax() = ReconnectIVLMax - - /** - * The message to ask a ZeroMQ socket for its recovery interval configuration. - *

- *

-   * socket.ask(recoveryInterval())
-   * 
- * - * @return the single instance of RecoveryInterval - */ - def recoveryInterval() = RecoveryInterval - - /** - * The message to ask a ZeroMQ socket for its send buffer size configuration. - *

- *

-   * socket.ask(sendBufferSize())
-   * 
- * - * @return the single instance of SendBufferSize - */ - def sendBufferSize() = SendBufferSize - - /** - * The message to ask a ZeroMQ socket for its send high watermark configuration. - *

- *

-   * socket.ask(sendHighWatermark())
-   * 
- * - * @return the single instance of SendHighWatermark - */ - def sendHighWatermark() = SendHighWatermark - - /** - * The message to ask a ZeroMQ socket for its swap configuration. - *

- *

-   * socket.ask(swap())
-   * 
- * - * @return the single instance of Swap - */ - def swap() = Swap -} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala deleted file mode 100644 index e30d803131..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ /dev/null @@ -1,267 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -import org.zeromq.{ ZMQ ⇒ JZMQ } -import org.zeromq.ZMQ.Poller -import akka.actor._ -import akka.pattern.ask -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import java.util.concurrent.TimeUnit -import akka.util.Timeout -import akka.util.Helpers.ConfigOps -import org.zeromq.ZMQException -import scala.concurrent.duration.FiniteDuration -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } - -/** - * A Model to represent a version of the zeromq library - * @param major - * @param minor - * @param patch - */ -final case class ZeroMQVersion(major: Int, minor: Int, patch: Int) { - override def toString: String = "%d.%d.%d".format(major, minor, patch) -} - -/** - * The [[akka.actor.ExtensionId]] and [[akka.actor.ExtensionIdProvider]] for the ZeroMQ module - */ -object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider { - override def get(system: ActorSystem): ZeroMQExtension = super.get(system) - def lookup(): this.type = this - override def createExtension(system: ExtendedActorSystem): ZeroMQExtension = new ZeroMQExtension(system) - - private val minVersionString = "2.1.0" - private val minVersion = JZMQ.makeVersion(2, 1, 0) -} - -/** - * The extension for the ZeroMQ module - * - * @param system The ActorSystem this extension belongs to. - */ -class ZeroMQExtension(system: ActorSystem) extends Extension { - - val DefaultPollTimeout: FiniteDuration = system.settings.config.getMillisDuration("akka.zeromq.poll-timeout") - val NewSocketTimeout: Timeout = Timeout(system.settings.config.getMillisDuration("akka.zeromq.new-socket-timeout")) - - val pollTimeUnit = if (version.major >= 3) TimeUnit.MILLISECONDS else TimeUnit.MICROSECONDS - - /** - * The version of the ZeroMQ library - * @return a [[akka.zeromq.ZeroMQVersion]] - */ - def version: ZeroMQVersion = ZeroMQVersion(JZMQ.getMajorVersion, JZMQ.getMinorVersion, JZMQ.getPatchVersion) - - /** - * Factory method to create the [[akka.actor.Props]] to build the ZeroMQ socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newSocketProps(socketParameters: SocketOption*): Props = { - verifyZeroMQVersion() - require(socketParameters exists { - case s: SocketType.ZMQSocketType ⇒ true - case _ ⇒ false - }, "A socket type is required") - val params = socketParameters.to[immutable.Seq] - Props(classOf[ConcurrentSocketActor], params).withDispatcher("akka.zeromq.socket-dispatcher") - } - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Publisher socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newPubSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Pub +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Subscriber socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newSubSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Sub +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Dealer socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newDealerSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Dealer +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Router socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newRouterSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Router +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Push socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newPushSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Push +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Pull socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newPullSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Pull +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Req socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newReqSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Rep +: socketParameters): _*) - - /** - * Java API: Factory method to create the [[akka.actor.Props]] to build a ZeroMQ Rep socket actor. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.Props]] - */ - def newRepSocketProps(socketParameters: SocketOption*): Props = newSocketProps((SocketType.Req +: socketParameters): _*) - - /** - * Factory method to create the actor representing the ZeroMQ socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke - * @return the [[akka.actor.ActorRef]] - */ - def newSocket(socketParameters: SocketOption*): ActorRef = { - implicit val timeout = NewSocketTimeout - Await.result((zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef], timeout.duration) - } - - /** - * Java API factory method to create the actor representing the ZeroMQ Publisher socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newPubSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Pub +: socketParameters): _*) - - /** - * Convenience for creating a publisher socket. - */ - def newPubSocket(bind: Bind): ActorRef = newSocket(SocketType.Pub, bind) - - /** - * Java API factory method to create the actor representing the ZeroMQ Subscriber socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newSubSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Sub +: socketParameters): _*) - - /** - * Convenience for creating a subscriber socket. - */ - def newSubSocket(connect: Connect, listener: Listener, subscribe: Subscribe): ActorRef = newSocket(SocketType.Sub, connect, listener, subscribe) - - /** - * Java API factory method to create the actor representing the ZeroMQ Dealer socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newDealerSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Dealer +: socketParameters): _*) - - /** - * Java API factory method to create the actor representing the ZeroMQ Router socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newRouterSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Router +: socketParameters): _*) - - /** - * Java API factory method to create the actor representing the ZeroMQ Push socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newPushSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Push +: socketParameters): _*) - - /** - * Java API factory method to create the actor representing the ZeroMQ Pull socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newPullSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Pull +: socketParameters): _*) - - /** - * Java API factory method to create the actor representing the ZeroMQ Req socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket - * @return the [[akka.actor.ActorRef]] - */ - def newReqSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Req +: socketParameters): _*) - - /** - * Java API factory method to create the actor representing the ZeroMQ Rep socket. - * You can pass in as many configuration options as you want and the order of the configuration options doesn't matter - * They are matched on type and the first one found wins. - * - * @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socke - * @return the [[akka.actor.ActorRef]] - */ - def newRepSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Rep +: socketParameters): _*) - - private val zeromqGuardian: ActorRef = { - verifyZeroMQVersion() - - system.actorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { - import SupervisorStrategy._ - override def supervisorStrategy = OneForOneStrategy() { - case ex: ZMQException if nonfatal(ex) ⇒ Resume - case _ ⇒ Stop - } - - private def nonfatal(ex: ZMQException) = ex.getErrorCode match { - case org.zeromq.ZeroMQ.EFSM | 45 /* ENOTSUP */ ⇒ true - case _ ⇒ false - } - - def receive = { case p: Props ⇒ sender() ! context.actorOf(p) } - }), "zeromq") - } - - private def verifyZeroMQVersion(): Unit = { - require( - JZMQ.getFullVersion > ZeroMQExtension.minVersion, - "Unsupported ZeroMQ version: %s, akka needs at least: %s".format(JZMQ.getVersionString, ZeroMQExtension.minVersionString)) - } -} diff --git a/akka-zeromq/src/main/scala/akka/zeromq/package.scala b/akka-zeromq/src/main/scala/akka/zeromq/package.scala deleted file mode 100644 index 95bc5c8c40..0000000000 --- a/akka-zeromq/src/main/scala/akka/zeromq/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka - -import language.implicitConversions - -import actor.ActorSystem - -/** - * A package object with an implicit conversion for the actor system as a convenience - */ -package object zeromq { - /** - * Convenience accessor to subscribe to all events - */ - val SubscribeAll: Subscribe = Subscribe.all - - /** - * Set the linger to 0, doesn't block and discards messages that haven't been sent yet. - */ - val NoLinger: Linger = Linger.no -} diff --git a/akka-zeromq/src/test/java/akka/zeromq/ZeroMQFromJavaTests.java b/akka-zeromq/src/test/java/akka/zeromq/ZeroMQFromJavaTests.java deleted file mode 100644 index 00f8151522..0000000000 --- a/akka-zeromq/src/test/java/akka/zeromq/ZeroMQFromJavaTests.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq; - -import static org.junit.Assert.*; -import org.junit.Test; - -public class ZeroMQFromJavaTests { - @Test - public void checkObjectHelperMethods() { - assertTrue(ZeroMQ.connecting() == Connecting$.MODULE$); - assertTrue(ZeroMQ.closed() == Closed$.MODULE$); - assertTrue(ZeroMQ.affinity() == Affinity$.MODULE$); - assertTrue(ZeroMQ.backlog() == Backlog$.MODULE$); - assertTrue(ZeroMQ.fileDescriptor() == FileDescriptor$.MODULE$); - assertTrue(ZeroMQ.identity() == Identity$.MODULE$); - assertTrue(ZeroMQ.linger() == Linger$.MODULE$); - assertTrue(ZeroMQ.maxMessageSize() == MaxMsgSize$.MODULE$); - assertTrue(ZeroMQ.multicastHops() == MulticastHops$.MODULE$); - assertTrue(ZeroMQ.multicastLoop() == MulticastLoop$.MODULE$); - assertTrue(ZeroMQ.rate() == Rate$.MODULE$); - assertTrue(ZeroMQ.receiveBufferSize() == ReceiveBufferSize$.MODULE$); - assertTrue(ZeroMQ.receiveHighWatermark() == ReceiveHighWatermark$.MODULE$); - assertTrue(ZeroMQ.reconnectIVL() == ReconnectIVL$.MODULE$); - assertTrue(ZeroMQ.reconnectIVLMax() == ReconnectIVLMax$.MODULE$); - assertTrue(ZeroMQ.recoveryInterval() == RecoveryInterval$.MODULE$); - assertTrue(ZeroMQ.sendBufferSize() == SendBufferSize$.MODULE$); - assertTrue(ZeroMQ.sendHighWatermark() == SendHighWatermark$.MODULE$); - assertTrue(ZeroMQ.swap() == Swap$.MODULE$); - } -} diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala deleted file mode 100644 index 07a7b20c9b..0000000000 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -import language.postfixOps - -import org.scalatest.Matchers -import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec } -import scala.concurrent.duration._ -import akka.actor.{ Cancellable, Actor, Props, ActorRef } -import akka.util.{ ByteString, Timeout } - -class ConcurrentSocketActorSpec extends AkkaSpec { - - implicit val timeout: Timeout = Timeout(15 seconds) - - def checkZeroMQInstallation() = - try { - zmq.version match { - case ZeroMQVersion(x, y, _) if x >= 3 || (x >= 2 && y >= 1) ⇒ Unit - case version ⇒ invalidZeroMQVersion(version) - } - } catch { - case e: LinkageError ⇒ zeroMQNotInstalled() - } - - def invalidZeroMQVersion(version: ZeroMQVersion) { - info("WARNING: The tests are not run because invalid ZeroMQ version: %s. Version >= 2.1.x required.".format(version)) - pending - } - - def zeroMQNotInstalled(): Unit = { - info("WARNING: The tests are not run because ZeroMQ is not installed. Version >= 2.1.x required.") - pending - } - - lazy val endpoints: Vector[String] = { - val sockets = Vector.fill(3)(new java.net.ServerSocket(0)) - val endpoints = sockets.map(s ⇒ s"tcp://127.0.0.1:${s.getLocalPort}") - sockets.foreach(_.close()) - endpoints - } - - // this must stay a def for checkZeroMQInstallation() to work correctly - def zmq = ZeroMQExtension(system) - - "ConcurrentSocketActor" should { - "support pub-sub connections" in { - checkZeroMQInstallation() - val subscriberProbe = TestProbe() - val context = Context() - val endpoint = endpoints(0) - val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint)) - val subscriber = zmq.newSocket(SocketType.Sub, context, Listener(subscriberProbe.ref), Connect(endpoint), SubscribeAll) - import system.dispatcher - val msgGenerator = system.scheduler.schedule(100 millis, 10 millis, new Runnable { - var number = 0 - def run() { - publisher ! ZMQMessage(ByteString(number.toString), ByteString.empty) - number += 1 - } - }) - - try { - subscriberProbe.expectMsg(Connecting) - val msgNumbers = subscriberProbe.receiveWhile(3 seconds) { - case msg: ZMQMessage if msg.frames.size == 2 ⇒ - msg.frames(1).length should be(0) - msg - }.map(m ⇒ m.frames(0).utf8String.toInt) - msgNumbers.length should be > 0 - msgNumbers should be(for (i ← msgNumbers.head to msgNumbers.last) yield i) - } finally { - msgGenerator.cancel() - watch(subscriber) - system stop subscriber - subscriberProbe.receiveWhile(3 seconds) { - case msg ⇒ msg - }.last should be(Closed) - expectTerminated(subscriber, 5.seconds) - watch(publisher) - system stop publisher - expectTerminated(publisher, 5.seconds) - context.term() - } - } - - "support req-rep connections" in { - checkZeroMQInstallation() - val requesterProbe = TestProbe() - val replierProbe = TestProbe() - val context = Context() - val endpoint = endpoints(1) - val requester = zmq.newSocket(SocketType.Req, context, Listener(requesterProbe.ref), Bind(endpoint)) - val replier = zmq.newSocket(SocketType.Rep, context, Listener(replierProbe.ref), Connect(endpoint)) - - try { - replierProbe.expectMsg(Connecting) - val request = ZMQMessage(ByteString("Request")) - val reply = ZMQMessage(ByteString("Reply")) - - requester ! request - replierProbe.expectMsg(request) - replier ! reply - requesterProbe.expectMsg(reply) - } finally { - watch(replier) - system stop replier - replierProbe.expectMsg(Closed) - expectTerminated(replier, 5.seconds) - watch(requester) - system stop requester - expectTerminated(requester, 5.seconds) - context.term() - } - } - - "should support push-pull connections" in { - checkZeroMQInstallation() - val pullerProbe = TestProbe() - val context = Context() - val endpoint = endpoints(2) - val pusher = zmq.newSocket(SocketType.Push, context, Bind(endpoint)) - val puller = zmq.newSocket(SocketType.Pull, context, Listener(pullerProbe.ref), Connect(endpoint)) - - try { - pullerProbe.expectMsg(Connecting) - val message = ZMQMessage(ByteString("Pushed message")) - - pusher ! message - pullerProbe.expectMsg(message) - } finally { - - watch(puller) - system stop puller - pullerProbe.expectMsg(Closed) - expectTerminated(puller, 5.seconds) - watch(pusher) - system stop pusher - expectTerminated(pusher, 5.seconds) - context.term() - } - } - - } - - class MessageGeneratorActor(actorRef: ActorRef) extends Actor { - var messageNumber: Int = 0 - var genMessages: Cancellable = null - - override def preStart() = { - import system.dispatcher - genMessages = system.scheduler.schedule(100 millis, 10 millis, self, "genMessage") - } - - override def postStop() = { - if (genMessages != null && !genMessages.isCancelled) { - genMessages.cancel - genMessages = null - } - } - - def receive = { - case _ ⇒ - val payload = "%s".format(messageNumber) - messageNumber += 1 - actorRef ! ZMQMessage(ByteString(payload)) - } - } -} diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ZeroMQFromJavaSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ZeroMQFromJavaSpec.scala deleted file mode 100644 index 9bf672edc4..0000000000 --- a/akka-zeromq/src/test/scala/akka/zeromq/ZeroMQFromJavaSpec.scala +++ /dev/null @@ -1,8 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.zeromq - -import org.scalatest.junit.JUnitWrapperSuite - -class ZeroMQFromJavaSpec extends JUnitWrapperSuite("akka.zeromq.ZeroMQFromJavaTests", Thread.currentThread.getContextClassLoader) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 1395c7b97b..e55793545e 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -63,13 +63,13 @@ object AkkaBuild extends Build { validatePullRequest <<= (unidoc in Compile, SphinxSupport.generate in Sphinx in docs) map { (_, _) => } ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, - persistence, persistenceTck, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) + persistence, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit) ) lazy val akkaScalaNightly = Project( id = "akka-scala-nightly", base = file("akka-scala-nightly"), - // remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings) + // remove dependencies that we have to build ourselves (Scala STM) // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, persistence, persistenceTck, kernel, osgi, contrib, multiNodeTestkit) @@ -146,12 +146,6 @@ object AkkaBuild extends Build { dependencies = Seq(persistence % "compile;test->test", testkit % "compile;test->test") ) - lazy val zeroMQ = Project( - id = "akka-zeromq", - base = file("akka-zeromq"), - dependencies = Seq(actor, testkit % "test;test->test") - ) - lazy val kernel = Project( id = "akka-kernel", base = file("akka-kernel"), @@ -174,7 +168,7 @@ object AkkaBuild extends Build { id = "akka-docs", base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", - remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, + remote % "compile;test->test", cluster, slf4j, agent, camel, osgi, persistence % "compile;test->test", persistenceTck) ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f7b248e219..c527b5b417 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,7 @@ object Dependencies { val scalaStm = "org.scala-stm" %% "scala-stm" % scalaStmVersion // Modified BSD (Scala) val slf4jApi = "org.slf4j" % "slf4j-api" % "1.7.5" // MIT - val zeroMQClient = "org.spark-project.zeromq" %% "zeromq-scala-binding" % "0.0.7-spark" // ApacheV2 + // mirrored in OSGi sample val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" exclude("jfree", "jcommon") exclude("jfree", "jfreechart") // ApacheV2 val osgiCore = "org.osgi" % "org.osgi.core" % "4.3.1" // ApacheV2 val osgiCompendium= "org.osgi" % "org.osgi.compendium" % "4.3.1" // ApacheV2 @@ -91,8 +91,6 @@ object Dependencies { val docs = Seq(Test.scalatest, Test.junit, Test.junitIntf) - val zeroMQ = deps(protobuf, zeroMQClient, Test.scalatest, Test.junit) - val contrib = Seq(Test.junitIntf, Test.commonsIo) } diff --git a/project/OSGi.scala b/project/OSGi.scala index 8b27646a3e..3583b74ab4 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -38,8 +38,6 @@ object OSGi { val testkit = exports(Seq("akka.testkit.*")) - val zeroMQ = exports(Seq("akka.zeromq.*"), imports = Seq(protobufImport()) ) - val osgiOptionalImports = Seq( // needed because testkit is normally not used in the application bundle, // but it should still be included as transitive dependency and used by BundleDelegatingClassLoader