-zer #15864 remove zeromq module
This commit is contained in:
parent
3a5b79dd50
commit
239619e68f
23 changed files with 4 additions and 2379 deletions
|
|
@ -484,12 +484,3 @@ akka-testkit
|
|||
.. literalinclude:: ../../../akka-testkit/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
.. _config-akka-zeromq:
|
||||
|
||||
akka-zeromq
|
||||
~~~~~~~~~~~
|
||||
|
||||
.. literalinclude:: ../../../akka-zeromq/src/main/resources/reference.conf
|
||||
:language: none
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -48,8 +48,6 @@ Akka is very modular and consists of several JARs containing different features.
|
|||
|
||||
- ``akka-testkit`` – Toolkit for testing Actor systems
|
||||
|
||||
- ``akka-zeromq`` – ZeroMQ integration
|
||||
|
||||
In addition to these stable modules there are several which are on their way
|
||||
into the stable core but are still marked “experimental” at this point. This
|
||||
does not mean that they do not function as intended, it primarily means that
|
||||
|
|
|
|||
|
|
@ -1,308 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.zeromq;
|
||||
|
||||
//#import-pub-socket
|
||||
import akka.zeromq.Bind;
|
||||
import akka.zeromq.ZeroMQExtension;
|
||||
//#import-pub-socket
|
||||
//#import-sub-socket
|
||||
import akka.zeromq.Connect;
|
||||
import akka.zeromq.Listener;
|
||||
import akka.zeromq.Subscribe;
|
||||
//#import-sub-socket
|
||||
//#import-unsub-topic-socket
|
||||
import akka.zeromq.Unsubscribe;
|
||||
//#import-unsub-topic-socket
|
||||
//#import-pub-topic
|
||||
import akka.util.ByteString;
|
||||
import akka.zeromq.ZMQMessage;
|
||||
//#import-pub-topic
|
||||
|
||||
import akka.zeromq.HighWatermark;
|
||||
import akka.zeromq.SocketOption;
|
||||
import akka.zeromq.ZeroMQVersion;
|
||||
|
||||
//#import-health
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import org.junit.*;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.serialization.SerializationExtension;
|
||||
import akka.serialization.Serialization;
|
||||
import java.io.Serializable;
|
||||
import java.lang.management.ManagementFactory;
|
||||
//#import-health
|
||||
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.lang.management.MemoryUsage;
|
||||
import java.lang.management.OperatingSystemMXBean;
|
||||
import java.util.Date;
|
||||
import java.text.SimpleDateFormat;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
|
||||
public class ZeromqDocTest {
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||
new AkkaJUnitActorSystemResource("ZeromqDocTest",
|
||||
ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf()));
|
||||
|
||||
private final ActorSystem system = actorSystemResource.getSystem();
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Test
|
||||
public void demonstrateCreateSocket() {
|
||||
Assume.assumeTrue(checkZeroMQInstallation());
|
||||
|
||||
//#pub-socket
|
||||
ActorRef pubSocket = ZeroMQExtension.get(system).newPubSocket(
|
||||
new Bind("tcp://127.0.0.1:1233"));
|
||||
//#pub-socket
|
||||
|
||||
//#sub-socket
|
||||
ActorRef listener = system.actorOf(Props.create(ListenerActor.class));
|
||||
ActorRef subSocket = ZeroMQExtension.get(system).newSubSocket(
|
||||
new Connect("tcp://127.0.0.1:1233"),
|
||||
new Listener(listener), Subscribe.all());
|
||||
//#sub-socket
|
||||
|
||||
//#sub-topic-socket
|
||||
ActorRef subTopicSocket = ZeroMQExtension.get(system).newSubSocket(
|
||||
new Connect("tcp://127.0.0.1:1233"),
|
||||
new Listener(listener), new Subscribe("foo.bar"));
|
||||
//#sub-topic-socket
|
||||
|
||||
//#unsub-topic-socket
|
||||
subTopicSocket.tell(new Unsubscribe("foo.bar"), ActorRef.noSender());
|
||||
//#unsub-topic-socket
|
||||
|
||||
byte[] payload = new byte[0];
|
||||
//#pub-topic
|
||||
pubSocket.tell(ZMQMessage.withFrames(ByteString.fromString("foo.bar"),
|
||||
ByteString.fromArray(payload)), ActorRef.noSender());
|
||||
//#pub-topic
|
||||
|
||||
system.stop(subSocket);
|
||||
system.stop(subTopicSocket);
|
||||
|
||||
//#high-watermark
|
||||
ActorRef highWatermarkSocket = ZeroMQExtension.get(system).newRouterSocket(
|
||||
new SocketOption[] { new Listener(listener),
|
||||
new Bind("tcp://127.0.0.1:1233"), new HighWatermark(50000) });
|
||||
//#high-watermark
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstratePubSub() throws Exception {
|
||||
Assume.assumeTrue(checkZeroMQInstallation());
|
||||
|
||||
//#health2
|
||||
|
||||
system.actorOf(Props.create(HealthProbe.class), "health");
|
||||
//#health2
|
||||
|
||||
//#logger2
|
||||
|
||||
system.actorOf(Props.create(Logger.class), "logger");
|
||||
//#logger2
|
||||
|
||||
//#alerter2
|
||||
|
||||
system.actorOf(Props.create(HeapAlerter.class), "alerter");
|
||||
//#alerter2
|
||||
|
||||
// Let it run for a while to see some output.
|
||||
// Don't do like this in real tests, this is only doc demonstration.
|
||||
Thread.sleep(3000L);
|
||||
}
|
||||
|
||||
private boolean checkZeroMQInstallation() {
|
||||
try {
|
||||
ZeroMQVersion v = ZeroMQExtension.get(system).version();
|
||||
return (v.major() >= 3 || (v.major() >= 2 && v.minor() >= 1));
|
||||
} catch (LinkageError e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static
|
||||
//#listener-actor
|
||||
public class ListenerActor extends UntypedActor {
|
||||
public void onReceive(Object message) throws Exception {
|
||||
//...
|
||||
}
|
||||
}
|
||||
//#listener-actor
|
||||
|
||||
static
|
||||
//#health
|
||||
public final Object TICK = "TICK";
|
||||
|
||||
//#health
|
||||
static
|
||||
//#health
|
||||
public class Heap implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
public final long timestamp;
|
||||
public final long used;
|
||||
public final long max;
|
||||
|
||||
public Heap(long timestamp, long used, long max) {
|
||||
this.timestamp = timestamp;
|
||||
this.used = used;
|
||||
this.max = max;
|
||||
}
|
||||
}
|
||||
|
||||
//#health
|
||||
static
|
||||
//#health
|
||||
public class Load implements Serializable {
|
||||
private static final long serialVersionUID = 1L;
|
||||
public final long timestamp;
|
||||
public final double loadAverage;
|
||||
|
||||
public Load(long timestamp, double loadAverage) {
|
||||
this.timestamp = timestamp;
|
||||
this.loadAverage = loadAverage;
|
||||
}
|
||||
}
|
||||
|
||||
//#health
|
||||
static
|
||||
//#health
|
||||
public class HealthProbe extends UntypedActor {
|
||||
|
||||
ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(
|
||||
new Bind("tcp://127.0.0.1:1237"));
|
||||
MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
|
||||
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
|
||||
Serialization ser = SerializationExtension.get(getContext().system());
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
getContext().system().scheduler()
|
||||
.schedule(Duration.create(1, "second"), Duration.create(1, "second"),
|
||||
getSelf(), TICK, getContext().dispatcher(), ActorRef.noSender());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRestart(Throwable reason) {
|
||||
// don't call preStart, only schedule once
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message.equals(TICK)) {
|
||||
MemoryUsage currentHeap = memory.getHeapMemoryUsage();
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
ByteString heapTopic = ByteString.fromString("health.heap", "UTF-8");
|
||||
ByteString heapPayload = ByteString.fromArray(
|
||||
ser.serialize(
|
||||
new Heap(timestamp,
|
||||
currentHeap.getUsed(),
|
||||
currentHeap.getMax())
|
||||
).get());
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket.tell(ZMQMessage.withFrames(heapTopic, heapPayload), getSelf());
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
ByteString loadTopic = ByteString.fromString("health.load", "UTF-8");
|
||||
ByteString loadPayload = ByteString.fromArray(
|
||||
ser.serialize(new Load(timestamp, os.getSystemLoadAverage())).get()
|
||||
);
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket.tell(ZMQMessage.withFrames(loadTopic, loadPayload), getSelf());
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#health
|
||||
|
||||
static
|
||||
//#logger
|
||||
public class Logger extends UntypedActor {
|
||||
|
||||
ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(
|
||||
new Connect("tcp://127.0.0.1:1237"),
|
||||
new Listener(getSelf()), new Subscribe("health"));
|
||||
Serialization ser = SerializationExtension.get(getContext().system());
|
||||
SimpleDateFormat timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS");
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof ZMQMessage) {
|
||||
ZMQMessage m = (ZMQMessage) message;
|
||||
String topic = m.frame(0).utf8String();
|
||||
// the first frame is the topic, second is the message
|
||||
if ("health.heap".equals(topic)) {
|
||||
Heap heap = ser.deserialize(m.frame(1).toArray(), Heap.class).get();
|
||||
log.info("Used heap {} bytes, at {}", heap.used,
|
||||
timestampFormat.format(new Date(heap.timestamp)));
|
||||
} else if ("health.load".equals(topic)) {
|
||||
Load load = ser.deserialize(m.frame(1).toArray(), Load.class).get();
|
||||
log.info("Load average {}, at {}", load.loadAverage,
|
||||
timestampFormat.format(new Date(load.timestamp)));
|
||||
}
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#logger
|
||||
|
||||
static
|
||||
//#alerter
|
||||
public class HeapAlerter extends UntypedActor {
|
||||
|
||||
ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(
|
||||
new Connect("tcp://127.0.0.1:1237"),
|
||||
new Listener(getSelf()), new Subscribe("health.heap"));
|
||||
Serialization ser = SerializationExtension.get(getContext().system());
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof ZMQMessage) {
|
||||
ZMQMessage m = (ZMQMessage) message;
|
||||
String topic = m.frame(0).utf8String();
|
||||
// the first frame is the topic, second is the message
|
||||
if ("health.heap".equals(topic)) {
|
||||
Heap heap = ser.<Heap>deserialize(m.frame(1).toArray(), Heap.class).get();
|
||||
if (((double) heap.used / heap.max) > 0.9) {
|
||||
count += 1;
|
||||
} else {
|
||||
count = 0;
|
||||
}
|
||||
if (count > 10) {
|
||||
log.warning("Need more memory, using {} %",
|
||||
(100.0 * heap.used / heap.max));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#alerter
|
||||
|
||||
}
|
||||
|
|
@ -11,5 +11,4 @@ Networking
|
|||
io
|
||||
io-tcp
|
||||
io-udp
|
||||
zeromq
|
||||
camel
|
||||
|
|
|
|||
|
|
@ -1,146 +0,0 @@
|
|||
|
||||
.. _zeromq-java:
|
||||
|
||||
###############
|
||||
ZeroMQ
|
||||
###############
|
||||
|
||||
|
||||
Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket.
|
||||
|
||||
ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it.
|
||||
|
||||
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
|
||||
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
|
||||
|
||||
.. note::
|
||||
|
||||
The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported.
|
||||
|
||||
Connection
|
||||
==========
|
||||
|
||||
ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created.
|
||||
Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-pub-socket
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#pub-socket
|
||||
|
||||
Above examples will create a ZeroMQ Publisher socket that is Bound to the port 21231 on localhost.
|
||||
|
||||
Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-sub-socket
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#sub-socket
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#listener-actor
|
||||
|
||||
The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide <http://zguide.zeromq.org/page:all>`_.
|
||||
|
||||
Publisher-Subscriber Connection
|
||||
-------------------------------
|
||||
|
||||
In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall
|
||||
subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can
|
||||
subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages
|
||||
to one or more actors that do not interact with the actor that sent the message.
|
||||
|
||||
When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber.
|
||||
|
||||
An actor is subscribed to a topic as follows:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#sub-topic-socket
|
||||
|
||||
It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or
|
||||
``Subscribe.all()`` is used, the actor is subscribed to all topics.
|
||||
|
||||
To unsubscribe from a topic you do the following:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-unsub-topic-socket
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#unsub-topic-socket
|
||||
|
||||
To publish messages to a topic you must use two Frames with the topic in the first frame.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-pub-topic
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#pub-topic
|
||||
|
||||
Pub-Sub in Action
|
||||
^^^^^^^^^^^^^^^^^
|
||||
|
||||
The following example illustrates one publisher with two subscribers.
|
||||
|
||||
The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic
|
||||
and ``Load`` events on the ``"health.load"`` topic.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#import-health
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#health
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#health2
|
||||
|
||||
Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and
|
||||
``Load`` events.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#logger
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#logger2
|
||||
|
||||
Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#alerter
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#alerter2
|
||||
|
||||
Router-Dealer Connection
|
||||
------------------------
|
||||
|
||||
While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic.
|
||||
When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy.
|
||||
With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events.
|
||||
|
||||
To create a Router socket that has a high watermark configured, you would do:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocTest.java#high-watermark
|
||||
|
||||
The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.
|
||||
|
||||
Push-Pull Connection
|
||||
--------------------
|
||||
|
||||
Akka ZeroMQ module supports ``Push-Pull`` connections.
|
||||
|
||||
You can create a ``Push`` connection through the::
|
||||
|
||||
ActorRef newPushSocket(SocketOption[] socketParameters);
|
||||
|
||||
You can create a ``Pull`` connection through the::
|
||||
|
||||
ActorRef newPullSocket(SocketOption[] socketParameters);
|
||||
|
||||
More documentation and examples will follow soon.
|
||||
|
||||
Rep-Req Connection
|
||||
------------------
|
||||
|
||||
Akka ZeroMQ module supports ``Rep-Req`` connections.
|
||||
|
||||
You can create a ``Rep`` connection through the::
|
||||
|
||||
ActorRef newRepSocket(SocketOption[] socketParameters);
|
||||
|
||||
You can create a ``Req`` connection through the::
|
||||
|
||||
ActorRef newReqSocket(SocketOption[] socketParameters);
|
||||
|
||||
More documentation and examples will follow soon.
|
||||
|
||||
Configuration
|
||||
=============
|
||||
|
||||
There are several configuration properties for the zeromq module, please refer
|
||||
to the :ref:`reference configuration <config-akka-zeromq>`.
|
||||
|
||||
|
|
@ -1,205 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package docs.zeromq
|
||||
|
||||
import language.postfixOps
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.util.ByteString
|
||||
import akka.testkit._
|
||||
import akka.zeromq.{ ZeroMQVersion, ZeroMQExtension, SocketType, Bind }
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ZeromqDocSpec {
|
||||
|
||||
//#health
|
||||
import akka.zeromq._
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Props
|
||||
import akka.actor.ActorLogging
|
||||
import akka.serialization.SerializationExtension
|
||||
import java.lang.management.ManagementFactory
|
||||
|
||||
case object Tick
|
||||
final case class Heap(timestamp: Long, used: Long, max: Long)
|
||||
final case class Load(timestamp: Long, loadAverage: Double)
|
||||
|
||||
class HealthProbe extends Actor {
|
||||
|
||||
val pubSocket = ZeroMQExtension(context.system).newSocket(SocketType.Pub,
|
||||
Bind("tcp://127.0.0.1:1235"))
|
||||
val memory = ManagementFactory.getMemoryMXBean
|
||||
val os = ManagementFactory.getOperatingSystemMXBean
|
||||
val ser = SerializationExtension(context.system)
|
||||
import context.dispatcher
|
||||
|
||||
override def preStart() {
|
||||
context.system.scheduler.schedule(1 second, 1 second, self, Tick)
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) {
|
||||
// don't call preStart, only schedule once
|
||||
}
|
||||
|
||||
def receive: Receive = {
|
||||
case Tick =>
|
||||
val currentHeap = memory.getHeapMemoryUsage
|
||||
val timestamp = System.currentTimeMillis
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed,
|
||||
currentHeap.getMax)).get
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket ! ZMQMessage(ByteString("health.heap"), ByteString(heapPayload))
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket ! ZMQMessage(ByteString("health.load"), ByteString(loadPayload))
|
||||
}
|
||||
}
|
||||
//#health
|
||||
|
||||
//#logger
|
||||
class Logger extends Actor with ActorLogging {
|
||||
|
||||
ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
|
||||
Connect("tcp://127.0.0.1:1235"), Subscribe("health"))
|
||||
val ser = SerializationExtension(context.system)
|
||||
val timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS")
|
||||
|
||||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.frames(0).utf8String == "health.heap" =>
|
||||
val Heap(timestamp, used, max) = ser.deserialize(m.frames(1).toArray,
|
||||
classOf[Heap]).get
|
||||
log.info("Used heap {} bytes, at {}", used,
|
||||
timestampFormat.format(new Date(timestamp)))
|
||||
|
||||
case m: ZMQMessage if m.frames(0).utf8String == "health.load" =>
|
||||
val Load(timestamp, loadAverage) = ser.deserialize(m.frames(1).toArray,
|
||||
classOf[Load]).get
|
||||
log.info("Load average {}, at {}", loadAverage,
|
||||
timestampFormat.format(new Date(timestamp)))
|
||||
}
|
||||
}
|
||||
//#logger
|
||||
|
||||
//#alerter
|
||||
class HeapAlerter extends Actor with ActorLogging {
|
||||
|
||||
ZeroMQExtension(context.system).newSocket(SocketType.Sub,
|
||||
Listener(self), Connect("tcp://127.0.0.1:1235"), Subscribe("health.heap"))
|
||||
val ser = SerializationExtension(context.system)
|
||||
var count = 0
|
||||
|
||||
def receive = {
|
||||
// the first frame is the topic, second is the message
|
||||
case m: ZMQMessage if m.frames(0).utf8String == "health.heap" =>
|
||||
val Heap(timestamp, used, max) =
|
||||
ser.deserialize(m.frames(1).toArray, classOf[Heap]).get
|
||||
if ((used.toDouble / max) > 0.9) count += 1
|
||||
else count = 0
|
||||
if (count > 10) log.warning("Need more memory, using {} %",
|
||||
(100.0 * used / max))
|
||||
}
|
||||
}
|
||||
//#alerter
|
||||
|
||||
}
|
||||
|
||||
class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
|
||||
import ZeromqDocSpec._
|
||||
|
||||
"demonstrate how to create socket" in {
|
||||
checkZeroMQInstallation()
|
||||
|
||||
//#pub-socket
|
||||
import akka.zeromq.ZeroMQExtension
|
||||
val pubSocket = ZeroMQExtension(system).newSocket(SocketType.Pub,
|
||||
Bind("tcp://127.0.0.1:21231"))
|
||||
//#pub-socket
|
||||
|
||||
import akka.zeromq._
|
||||
val sub: { def subSocket: ActorRef; def listener: ActorRef } = new AnyRef {
|
||||
//#sub-socket
|
||||
import akka.zeromq._
|
||||
|
||||
class Listener extends Actor {
|
||||
def receive: Receive = {
|
||||
case Connecting => //...
|
||||
case m: ZMQMessage => //...
|
||||
case _ => //...
|
||||
}
|
||||
}
|
||||
|
||||
val listener = system.actorOf(Props(classOf[Listener], this))
|
||||
val subSocket = ZeroMQExtension(system).newSocket(SocketType.Sub,
|
||||
Listener(listener), Connect("tcp://127.0.0.1:21231"), SubscribeAll)
|
||||
//#sub-socket
|
||||
}
|
||||
val listener = sub.listener
|
||||
|
||||
//#sub-topic-socket
|
||||
val subTopicSocket = ZeroMQExtension(system).newSocket(SocketType.Sub,
|
||||
Listener(listener), Connect("tcp://127.0.0.1:21231"), Subscribe("foo.bar"))
|
||||
//#sub-topic-socket
|
||||
|
||||
//#unsub-topic-socket
|
||||
subTopicSocket ! Unsubscribe("foo.bar")
|
||||
//#unsub-topic-socket
|
||||
|
||||
val payload = Array.empty[Byte]
|
||||
//#pub-topic
|
||||
pubSocket ! ZMQMessage(ByteString("foo.bar"), ByteString(payload))
|
||||
//#pub-topic
|
||||
|
||||
system.stop(sub.subSocket)
|
||||
system.stop(subTopicSocket)
|
||||
|
||||
//#high-watermark
|
||||
val highWatermarkSocket = ZeroMQExtension(system).newSocket(
|
||||
SocketType.Router,
|
||||
Listener(listener),
|
||||
Bind("tcp://127.0.0.1:21233"),
|
||||
HighWatermark(50000))
|
||||
//#high-watermark
|
||||
}
|
||||
|
||||
"demonstrate pub-sub" in {
|
||||
checkZeroMQInstallation()
|
||||
|
||||
//#health
|
||||
|
||||
system.actorOf(Props[HealthProbe], name = "health")
|
||||
//#health
|
||||
|
||||
//#logger
|
||||
|
||||
system.actorOf(Props[Logger], name = "logger")
|
||||
//#logger
|
||||
|
||||
//#alerter
|
||||
|
||||
system.actorOf(Props[HeapAlerter], name = "alerter")
|
||||
//#alerter
|
||||
|
||||
// Let it run for a while to see some output.
|
||||
// Don't do like this in real tests, this is only doc demonstration.
|
||||
Thread.sleep(3.seconds.toMillis)
|
||||
|
||||
}
|
||||
|
||||
def checkZeroMQInstallation() = try {
|
||||
ZeroMQExtension(system).version match {
|
||||
case ZeroMQVersion(2, x, _) if x >= 1 => Unit
|
||||
case ZeroMQVersion(y, _, _) if y >= 3 => Unit
|
||||
case version => pending
|
||||
}
|
||||
} catch {
|
||||
case e: LinkageError => pending
|
||||
}
|
||||
}
|
||||
|
|
@ -11,5 +11,4 @@ Networking
|
|||
io
|
||||
io-tcp
|
||||
io-udp
|
||||
zeromq
|
||||
camel
|
||||
|
|
|
|||
|
|
@ -1,129 +0,0 @@
|
|||
|
||||
.. _zeromq-scala:
|
||||
|
||||
################
|
||||
ZeroMQ
|
||||
################
|
||||
|
||||
|
||||
Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket.
|
||||
|
||||
ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it.
|
||||
|
||||
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
|
||||
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
|
||||
|
||||
.. note::
|
||||
|
||||
The currently used version of ``zeromq-scala-bindings`` is only compatible with zeromq 2; zeromq 3 is not supported.
|
||||
|
||||
Connection
|
||||
==========
|
||||
|
||||
ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created.
|
||||
Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#pub-socket
|
||||
|
||||
|
||||
Above examples will create a ZeroMQ Publisher socket that is Bound to the port 21231 on localhost.
|
||||
|
||||
Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#sub-socket
|
||||
|
||||
The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide <http://zguide.zeromq.org/page:all>`_.
|
||||
|
||||
Publisher-Subscriber Connection
|
||||
-------------------------------
|
||||
|
||||
In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall
|
||||
subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can
|
||||
subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages
|
||||
to one or more actors that do not interact with the actor that sent the message.
|
||||
|
||||
When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber.
|
||||
|
||||
An actor is subscribed to a topic as follows:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#sub-topic-socket
|
||||
|
||||
It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or
|
||||
``SubscribeAll`` is used, the actor is subscribed to all topics.
|
||||
|
||||
To unsubscribe from a topic you do the following:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#unsub-topic-socket
|
||||
|
||||
To publish messages to a topic you must use two Frames with the topic in the first frame.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#pub-topic
|
||||
|
||||
Pub-Sub in Action
|
||||
^^^^^^^^^^^^^^^^^
|
||||
|
||||
The following example illustrates one publisher with two subscribers.
|
||||
|
||||
The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic
|
||||
and ``Load`` events on the ``"health.load"`` topic.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#health
|
||||
|
||||
Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and
|
||||
``Load`` events.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#logger
|
||||
|
||||
Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events.
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#alerter
|
||||
|
||||
Router-Dealer Connection
|
||||
------------------------
|
||||
|
||||
While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic.
|
||||
When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy.
|
||||
With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events.
|
||||
|
||||
To create a Router socket that has a high watermark configured, you would do:
|
||||
|
||||
.. includecode:: code/docs/zeromq/ZeromqDocSpec.scala#high-watermark
|
||||
|
||||
The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.
|
||||
|
||||
Push-Pull Connection
|
||||
--------------------
|
||||
|
||||
Akka ZeroMQ module supports ``Push-Pull`` connections.
|
||||
|
||||
You can create a ``Push`` connection through the::
|
||||
|
||||
def newPushSocket(socketParameters: Array[SocketOption]): ActorRef
|
||||
|
||||
You can create a ``Pull`` connection through the::
|
||||
|
||||
def newPullSocket(socketParameters: Array[SocketOption]): ActorRef
|
||||
|
||||
More documentation and examples will follow soon.
|
||||
|
||||
Rep-Req Connection
|
||||
------------------
|
||||
|
||||
Akka ZeroMQ module supports ``Rep-Req`` connections.
|
||||
|
||||
You can create a ``Rep`` connection through the::
|
||||
|
||||
def newRepSocket(socketParameters: Array[SocketOption]): ActorRef
|
||||
|
||||
You can create a ``Req`` connection through the::
|
||||
|
||||
def newReqSocket(socketParameters: Array[SocketOption]): ActorRef
|
||||
|
||||
More documentation and examples will follow soon.
|
||||
|
||||
Configuration
|
||||
=============
|
||||
|
||||
There are several configuration properties for the zeromq module, please refer
|
||||
to the :ref:`reference configuration <config-akka-zeromq>`.
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue