Improve docs and api for zeromq. See #1713

* Wrote a comprehensive example for pub-sub
* Clarified how publish to topic is done
* Several minor, but important, api adjustments for the java api, and some also profit for scala
* Added documentation for Java and updated documentation for Scala
This commit is contained in:
Patrik Nordwall 2012-02-09 21:21:31 +01:00
parent 51a218b87f
commit 4a5f5eef21
11 changed files with 703 additions and 112 deletions

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.zeromq
import org.scalatest.junit.JUnitSuite
class ZeromqDocTest extends ZeromqDocTestBase with JUnitSuite

View file

@ -0,0 +1,284 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.zeromq;
//#pub-socket
import akka.zeromq.Bind;
import akka.zeromq.ZeroMQExtension;
//#pub-socket
//#sub-socket
import akka.zeromq.Connect;
import akka.zeromq.Listener;
import akka.zeromq.Subscribe;
//#sub-socket
//#unsub-topic-socket
import akka.zeromq.Unsubscribe;
//#unsub-topic-socket
//#pub-topic
import akka.zeromq.Frame;
import akka.zeromq.ZMQMessage;
//#pub-topic
import akka.zeromq.HighWatermark;
import akka.zeromq.SocketOption;
import akka.zeromq.ZeroMQVersion;
//#health
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.util.Duration;
import akka.serialization.SerializationExtension;
import akka.serialization.Serialization;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
//#health
import com.typesafe.config.ConfigFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.text.SimpleDateFormat;
import akka.actor.ActorSystem;
import akka.testkit.AkkaSpec;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assume;
import akka.zeromq.SocketType;
public class ZeromqDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("ZeromqDocTest",
ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf()));
}
@After
public void tearDown() {
system.shutdown();
}
@Test
public void demonstrateCreateSocket() {
Assume.assumeTrue(checkZeroMQInstallation());
//#pub-socket
ActorRef pubSocket = ZeroMQExtension.get(system).newPubSocket(new Bind("tcp://127.0.0.1:1233"));
//#pub-socket
//#sub-socket
ActorRef listener = system.actorOf(new Props(ListenerActor.class));
ActorRef subSocket = ZeroMQExtension.get(system).newSubSocket(new Connect("tcp://127.0.0.1:1233"),
new Listener(listener), Subscribe.all());
//#sub-socket
//#sub-topic-socket
ActorRef subTopicSocket = ZeroMQExtension.get(system).newSubSocket(new Connect("tcp://127.0.0.1:1233"),
new Listener(listener), new Subscribe("foo.bar"));
//#sub-topic-socket
//#unsub-topic-socket
subTopicSocket.tell(new Unsubscribe("foo.bar"));
//#unsub-topic-socket
byte[] payload = new byte[0];
//#pub-topic
pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload)));
//#pub-topic
//#high-watermark
ActorRef highWatermarkSocket = ZeroMQExtension.get(system).newRouterSocket(
new SocketOption[] { new Listener(listener), new Bind("tcp://127.0.0.1:1233"), new HighWatermark(50000) });
//#high-watermark
}
@Test
public void demonstratePubSub() throws Exception {
Assume.assumeTrue(checkZeroMQInstallation());
//#health2
system.actorOf(new Props(HealthProbe.class), "health");
//#health2
//#logger2
system.actorOf(new Props(Logger.class), "logger");
//#logger2
//#alerter2
system.actorOf(new Props(HeapAlerter.class), "alerter");
//#alerter2
Thread.sleep(3000L);
}
private boolean checkZeroMQInstallation() {
try {
ZeroMQVersion v = ZeroMQExtension.get(system).version();
return (v.major() == 2 && v.minor() == 1);
} catch (LinkageError e) {
return false;
}
}
//#listener-actor
public static class ListenerActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
//...
}
}
//#listener-actor
//#health
public static final Object TICK = "TICK";
public static class Heap implements Serializable {
public final long timestamp;
public final long used;
public final long max;
public Heap(long timestamp, long used, long max) {
this.timestamp = timestamp;
this.used = used;
this.max = max;
}
}
public static class Load implements Serializable {
public final long timestamp;
public final double loadAverage;
public Load(long timestamp, double loadAverage) {
this.timestamp = timestamp;
this.loadAverage = loadAverage;
}
}
public static class HealthProbe extends UntypedActor {
ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(new Bind("tcp://127.0.0.1:1237"));
MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
Serialization ser = SerializationExtension.get(getContext().system());
@Override
public void preStart() {
getContext().system().scheduler()
.schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK);
}
@Override
public void postRestart(Throwable reason) {
// don't call preStart
}
@Override
public void onReceive(Object message) {
if (message.equals(TICK)) {
MemoryUsage currentHeap = memory.getHeapMemoryUsage();
long timestamp = System.currentTimeMillis();
// use akka SerializationExtension to convert to bytes
byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
// the first frame is the topic, second is the message
pubSocket.tell(new ZMQMessage(new Frame("health.heap"), new Frame(heapPayload)));
// use akka SerializationExtension to convert to bytes
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(new Load(timestamp, os.getSystemLoadAverage()));
// the first frame is the topic, second is the message
pubSocket.tell(new ZMQMessage(new Frame("health.load"), new Frame(loadPayload)));
} else {
unhandled(message);
}
}
}
//#health
//#logger
public static class Logger extends UntypedActor {
ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(new Connect("tcp://127.0.0.1:1237"),
new Listener(getSelf()), new Subscribe("health"));
Serialization ser = SerializationExtension.get(getContext().system());
SimpleDateFormat timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS");
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object message) {
if (message instanceof ZMQMessage) {
ZMQMessage m = (ZMQMessage) message;
// the first frame is the topic, second is the message
if (m.firstFrameAsString().equals("health.heap")) {
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
log.info("Used heap {} bytes, at {}", heap.used, timestampFormat.format(new Date(heap.timestamp)));
} else if (m.firstFrameAsString().equals("health.load")) {
Load load = (Load) ser.serializerFor(Load.class).fromBinary(m.payload(1));
log.info("Load average {}, at {}", load.loadAverage, timestampFormat.format(new Date(load.timestamp)));
}
} else {
unhandled(message);
}
}
}
//#logger
//#alerter
public static class HeapAlerter extends UntypedActor {
ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(new Connect("tcp://127.0.0.1:1237"),
new Listener(getSelf()), new Subscribe("health.heap"));
Serialization ser = SerializationExtension.get(getContext().system());
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
int count = 0;
@Override
public void onReceive(Object message) {
if (message instanceof ZMQMessage) {
ZMQMessage m = (ZMQMessage) message;
// the first frame is the topic, second is the message
if (m.firstFrameAsString().equals("health.heap")) {
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
if (((double) heap.used / heap.max) > 0.9) {
count += 1;
} else {
count = 0;
}
if (count > 10) {
log.warning("Need more memory, using {} %", (100.0 * heap.used / heap.max));
}
}
} else {
unhandled(message);
}
}
}
//#alerter
}

View file

@ -23,3 +23,4 @@ Java API
transactors
fsm
extending-akka
zeromq

98
akka-docs/java/zeromq.rst Normal file
View file

@ -0,0 +1,98 @@
.. _zeromq-java:
###############
ZeroMQ (Java)
###############
.. sidebar:: Contents
.. contents:: :local:
Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket.
ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it.
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
Connection
==========
ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created.
Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example:
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#pub-socket
Above examples will create a ZeroMQ Publisher socket that is Bound to the port 1233 on localhost.
Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using:
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#sub-socket
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#listener-actor
The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide <http://zguide.zeromq.org/page:all>`_.
Publisher-subscriber connection
-------------------------------
In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall
subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can
subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages
to one or more actors that do not interact with the actor that sent the message.
When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber.
An actor is subscribed to a topic as follows:
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#sub-topic-socket
It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or
``Subscribe.all()`` is used, the actor is subscribed to all topics.
To unsubscribe from a topic you do the following:
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#unsub-topic-socket
To publish messages to a topic you must use two Frames with the topic in the first frame.
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#pub-topic
Pub-Sub in Action
^^^^^^^^^^^^^^^^^
The following example illustrates one publisher with two subscribers.
The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic
and ``Load`` events on the ``"health.load"`` topic.
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#health
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#health2
Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and
``Load`` events.
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#logger
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#logger2
Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events.
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#alerter
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#alerter2
Router-Dealer connection
------------------------
While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic.
When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy.
With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events.
To create a Router socket that has a high watermark configured, you would do:
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#high-watermark
The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.

View file

@ -0,0 +1,187 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.zeromq
import akka.actor.Actor
import akka.actor.Props
import akka.util.duration._
import akka.testkit._
import akka.zeromq.ZeroMQVersion
import akka.zeromq.ZeroMQExtension
import java.text.SimpleDateFormat
import java.util.Date
import akka.zeromq.SocketType
import akka.zeromq.Bind
object ZeromqDocSpec {
//#health
import akka.zeromq._
import akka.actor.Actor
import akka.actor.Props
import akka.actor.ActorLogging
import akka.serialization.SerializationExtension
import java.lang.management.ManagementFactory
case object Tick
case class Heap(timestamp: Long, used: Long, max: Long)
case class Load(timestamp: Long, loadAverage: Double)
class HealthProbe extends Actor {
val pubSocket = context.system.newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1235"))
val memory = ManagementFactory.getMemoryMXBean
val os = ManagementFactory.getOperatingSystemMXBean
val ser = SerializationExtension(context.system)
context.system.scheduler.schedule(1 second, 1 second, self, Tick)
def receive: Receive = {
case Tick
val currentHeap = memory.getHeapMemoryUsage
val timestamp = System.currentTimeMillis
// use akka SerializationExtension to convert to bytes
val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity)
// the first frame is the topic, second is the message
pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload)))
// use akka SerializationExtension to convert to bytes
val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity)
// the first frame is the topic, second is the message
pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload)))
}
}
//#health
//#logger
class Logger extends Actor with ActorLogging {
context.system.newSocket(SocketType.Sub, Listener(self), Connect("tcp://127.0.0.1:1235"), Subscribe("health"))
val ser = SerializationExtension(context.system)
val timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS")
def receive = {
// the first frame is the topic, second is the message
case m: ZMQMessage if m.firstFrameAsString == "health.heap"
ser.deserialize(m.payload(1), classOf[Heap], None) match {
case Right(Heap(timestamp, used, max))
log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp)))
case Left(e) throw e
}
case m: ZMQMessage if m.firstFrameAsString == "health.load"
ser.deserialize(m.payload(1), classOf[Load], None) match {
case Right(Load(timestamp, loadAverage))
log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp)))
case Left(e) throw e
}
}
}
//#logger
//#alerter
class HeapAlerter extends Actor with ActorLogging {
context.system.newSocket(SocketType.Sub, Listener(self), Connect("tcp://127.0.0.1:1235"), Subscribe("health.heap"))
val ser = SerializationExtension(context.system)
var count = 0
def receive = {
// the first frame is the topic, second is the message
case m: ZMQMessage if m.firstFrameAsString == "health.heap"
ser.deserialize(m.payload(1), classOf[Heap], None) match {
case Right(Heap(timestamp, used, max))
if ((used.toDouble / max) > 0.9) count += 1
else count = 0
if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max))
case Left(e) throw e
}
}
}
//#alerter
}
class ZeromqDocSpec extends AkkaSpec("akka.loglevel=INFO") {
import ZeromqDocSpec._
"demonstrate how to create socket" in {
checkZeroMQInstallation()
//#pub-socket
import akka.zeromq.ZeroMQExtension
val pubSocket = ZeroMQExtension(system).newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1234"))
//#pub-socket
//#pub-socket2
import akka.zeromq._
val pubSocket2 = system.newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1234"))
//#pub-socket2
//#sub-socket
import akka.zeromq._
val listener = system.actorOf(Props(new Actor {
def receive: Receive = {
case Connecting //...
case m: ZMQMessage //...
case _ //...
}
}))
val subSocket = system.newSocket(SocketType.Sub, Listener(listener), Connect("tcp://127.0.0.1:1234"), SubscribeAll)
//#sub-socket
//#sub-topic-socket
val subTopicSocket = system.newSocket(SocketType.Sub, Listener(listener), Connect("tcp://127.0.0.1:1234"), Subscribe("foo.bar"))
//#sub-topic-socket
//#unsub-topic-socket
subTopicSocket ! Unsubscribe("foo.bar")
//#unsub-topic-socket
val payload = Array.empty[Byte]
//#pub-topic
pubSocket ! ZMQMessage(Seq(Frame("foo.bar"), Frame(payload)))
//#pub-topic
//#high-watermark
val highWatermarkSocket = system.newSocket(
SocketType.Router,
Listener(listener),
Bind("tcp://127.0.0.1:1234"),
HighWatermark(50000))
//#high-watermark
}
"demonstrate pub-sub" in {
checkZeroMQInstallation()
//#health
system.actorOf(Props[HealthProbe], name = "health")
//#health
//#logger
system.actorOf(Props[Logger], name = "logger")
//#logger
//#alerter
system.actorOf(Props[HeapAlerter], name = "alerter")
//#alerter
Thread.sleep(3000)
}
def checkZeroMQInstallation() = try {
ZeroMQExtension(system).version match {
case ZeroMQVersion(2, 1, _) Unit
case version pending
}
} catch {
case e: LinkageError pending
}
}

View file

@ -1,8 +1,10 @@
.. _zeromq-module:
.. _zeromq-scala:
################
ZeroMQ (Scala)
################
ZeroMQ
======
.. sidebar:: Contents
@ -12,83 +14,76 @@ Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore
ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it.
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
Connection
----------
==========
ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created. Sockets are always created using ``akka.zeromq.ZeroMQ.newSocket``, for example:
ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created.
Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example:
.. code-block:: scala
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#pub-socket
import akka.zeromq._
val socket = system.zeromq.newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1234"))
or by importing the ``akka.zeromq._`` package to make newSocket method available on system, via an implicit conversion.
will create a ZeroMQ Publisher socket that is Bound to the port 1234 on localhost.
Importing the akka.zeromq._ package ensures that the implicit zeromq method is available.
Similarly you can create a subscription socket, that subscribes to all messages from the publisher using:
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#pub-socket2
.. code-block:: scala
val socket = system.zeromq.newSocket(SocketType.Sub, Connect("tcp://127.0.0.1:1234"), SubscribeAll)
Above examples will create a ZeroMQ Publisher socket that is Bound to the port 1234 on localhost.
Also, a socket may be created with a listener that handles received messages as well as notifications:
Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using:
.. code-block:: scala
val listener = system.actorOf(Props(new Actor {
def receive: Receive = {
case Connecting => ...
case _ => ...
}
}))
val socket = system.zeromq.newSocket(SocketType.Router, Listener(listener), Connect("tcp://localhost:1234"))
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#sub-socket
The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide <http://zguide.zeromq.org/page:all>`_.
Publisher-subscriber connection
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can subscribe to all available topics.
In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall
subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can
subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages
to one or more actors that do not interact with the actor that sent the message.
When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber.
An actor is subscribed to a topic as follows:
.. code-block:: scala
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#sub-topic-socket
val socket = system.zeromq.newSocket(SocketType.Sub, Listener(listener), Connect("tcp://localhost:1234"), Subscribe("the-topic"))
It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or
``SubscribeAll`` is used, the actor is subscribed to all topics.
Note that if the given string is empty (see below), the actor is subscribed to all topics. To unsubscribe from a topic you do the following:
To unsubscribe from a topic you do the following:
.. code-block:: scala
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#unsub-topic-socket
socket ! Unsubscribe("SomeTopic1")
To publish messages to a topic you must use two Frames with the topic in the first frame.
In an Akka environment, pub-sub connections shall be used when an actor sends messages to one or more actors that do not interact with the actor that sent the message. The following piece of code creates a publisher actor, binds the socket, and sends a message to be published:
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#pub-topic
.. code-block:: scala
Pub-Sub in Action
^^^^^^^^^^^^^^^^^
import akka.zeromq._
val socket = system.zeromq.newSocket(SocketType.Pub, Bind("tcp://127.0.0.1:1234"))
socket ! Send("hello".getBytes)
The following example illustrates one publisher with two subscribers.
In the following code, the subscriber is configured to receive messages for all topics:
The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic
and ``Load`` events on the ``"health.load"`` topic.
.. code-block:: scala
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#health
import akka.zeromq._
val listener = system.actorOf(Props(new Actor {
def receive: Receive = {
case Connecting => ...
case _ => ...
}
}))
val socket = system.zeromq.newSocket(SocketType.Sub, Listener(listener), Connect("tcp://127.0.0.1:1234"), SubscribeAll)
Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and
``Load`` events.
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#logger
Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events.
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#alerter
Router-Dealer connection
^^^^^^^^^^^^^^^^^^^^^^^^
------------------------
While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic.
When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy.
@ -96,19 +91,6 @@ With those socket types you can build your own reliable pub sub broker that uses
To create a Router socket that has a high watermark configured, you would do:
.. code-block:: scala
import akka.zeromq._
val listener = system.actorOf(Props(new Actor {
def receive: Receive = {
case Connecting => ...
case _ => ...
}
}))
val socket = system.zeromq.newSocket(
SocketType.Router,
Listener(listener),
Bind("tcp://127.0.0.1:1234"),
HWM(50000))
.. includecode:: code/akka/docs/zeromq/ZeromqDocSpec.scala#high-watermark
The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.
The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.

View file

@ -174,9 +174,12 @@ private[zeromq] case object Close extends Request
*
* @param payload the topic to subscribe to
*/
case class Subscribe(payload: Seq[Byte]) extends PubSubOption
case class Subscribe(payload: Seq[Byte]) extends PubSubOption {
def this(topic: String) = this(topic.getBytes("UTF-8"))
}
object Subscribe {
def apply(topic: String): Subscribe = new Subscribe(topic.getBytes)
def apply(topic: String): Subscribe = new Subscribe(topic)
val all = Subscribe(Seq.empty)
}
/**
@ -188,9 +191,11 @@ object Subscribe {
*
* @param payload
*/
case class Unsubscribe(payload: Seq[Byte]) extends PubSubOption
case class Unsubscribe(payload: Seq[Byte]) extends PubSubOption {
def this(topic: String) = this(topic.getBytes("UTF-8"))
}
object Unsubscribe {
def apply(topic: String): Unsubscribe = Unsubscribe(topic.getBytes)
def apply(topic: String): Unsubscribe = new Unsubscribe(topic)
}
/**
@ -204,7 +209,21 @@ case class Send(frames: Seq[Frame]) extends Request
* @param frames
*/
case class ZMQMessage(frames: Seq[Frame]) {
def firstFrameAsString = new String(frames.head.payload.toArray)
def this(frame: Frame) = this(Seq(frame))
def this(frame1: Frame, frame2: Frame) = this(Seq(frame1, frame2))
def this(frameArray: Array[Frame]) = this(frameArray.toSeq)
/**
* Convert the bytes in the first frame to a String, using specified charset.
*/
def firstFrameAsString(charsetName: String): String = new String(frames.head.payload.toArray, charsetName)
/**
* Convert the bytes in the first frame to a String, using "UTF-8" charset.
*/
def firstFrameAsString: String = firstFrameAsString("UTF-8")
def payload(frameIndex: Int): Array[Byte] = frames(frameIndex).payload.toArray
}
object ZMQMessage {
def apply(bytes: Array[Byte]): ZMQMessage = ZMQMessage(Seq(Frame(bytes)))

View file

@ -3,11 +3,18 @@
*/
package akka.zeromq
object Frame {
def apply(text: String): Frame = new Frame(text)
}
/**
* A single message frame of a zeromq message
* @param payload
*/
case class Frame(payload: Seq[Byte])
case class Frame(payload: Seq[Byte]) {
def this(bytes: Array[Byte]) = this(bytes.toSeq)
def this(text: String) = this(text.getBytes("UTF-8"))
}
/**
* Deserializes ZeroMQ messages into an immutable sequence of frames

View file

@ -22,6 +22,7 @@ case class ZeroMQVersion(major: Int, minor: Int, patch: Int) {
* The [[akka.actor.ExtensionId]] and [[akka.actor.ExtensionIdProvider]] for the ZeroMQ module
*/
object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider {
override def get(system: ActorSystem): ZeroMQExtension = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new ZeroMQExtension(system)
@ -141,92 +142,94 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
}
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Publisher socket.
* Java API factory method to create the actor representing the ZeroMQ Publisher socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newPubSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Pub +: socketParameters): _*)
def newPubSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Pub +: socketParameters): _*)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Subscriber socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @return the [[akka.actor.ActorRef]]
* Convenience for creating a publisher socket.
*/
def newSubSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Sub +: socketParameters): _*)
def newPubSocket(bind: Bind): ActorRef = newSocket(SocketType.Pub, bind)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Dealer socket.
* Java API factory method to create the actor representing the ZeroMQ Subscriber socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newDealerSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Dealer +: socketParameters): _*)
def newSubSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Sub +: socketParameters): _*)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Router socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @return the [[akka.actor.ActorRef]]
* Convenience for creating a subscriber socket.
*/
def newRouterSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Router +: socketParameters): _*)
def newSubSocket(connect: Connect, listener: Listener, subscribe: Subscribe): ActorRef = newSocket(SocketType.Sub, connect, listener, subscribe)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Push socket.
* Java API factory method to create the actor representing the ZeroMQ Dealer socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newPushSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Push +: socketParameters): _*)
def newDealerSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Dealer +: socketParameters): _*)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Pull socket.
* Java API factory method to create the actor representing the ZeroMQ Router socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newPullSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Pull +: socketParameters): _*)
def newRouterSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Router +: socketParameters): _*)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Req socket.
* Java API factory method to create the actor representing the ZeroMQ Push socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newReqSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Req +: socketParameters): _*)
def newPushSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Push +: socketParameters): _*)
/**
* Java API helper
* Factory method to create the actor representing the ZeroMQ Rep socket.
* Java API factory method to create the actor representing the ZeroMQ Pull socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters a varargs list of [[akka.zeromq.SocketOption]] to configure the socke
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newRepSocket(socketParameters: SocketOption*): ActorRef = newSocket((SocketType.Rep +: socketParameters): _*)
def newPullSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Pull +: socketParameters): _*)
/**
* Java API factory method to create the actor representing the ZeroMQ Req socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socket
* @return the [[akka.actor.ActorRef]]
*/
def newReqSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Req +: socketParameters): _*)
/**
* Java API factory method to create the actor representing the ZeroMQ Rep socket.
* You can pass in as many configuration options as you want and the order of the configuration options doesn't matter
* They are matched on type and the first one found wins.
*
* @param socketParameters array of [[akka.zeromq.SocketOption]] to configure the socke
* @return the [[akka.actor.ActorRef]]
*/
def newRepSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Rep +: socketParameters): _*)
private val zeromqGuardian: ActorRef = {
verifyZeroMQVersion

View file

@ -24,7 +24,8 @@ class ConcurrentSocketActorSpec
"ConcurrentSocketActor" should {
"support pub-sub connections" in {
checkZeroMQInstallation
val (publisherProbe, subscriberProbe) = (TestProbe(), TestProbe())
val publisherProbe = TestProbe()
val subscriberProbe = TestProbe()
val context = Context()
val publisher = newPublisher(context, publisherProbe.ref)
val subscriber = newSubscriber(context, subscriberProbe.ref)
@ -68,7 +69,7 @@ class ConcurrentSocketActorSpec
zmq.newSocket(SocketType.Pub, context, Listener(listener), Bind(endpoint))
}
def newSubscriber(context: Context, listener: ActorRef) = {
zmq.newSocket(SocketType.Sub, context, Listener(listener), Connect(endpoint), Subscribe(Seq.empty))
zmq.newSocket(SocketType.Sub, context, Listener(listener), Connect(endpoint), SubscribeAll)
}
def newMessageGenerator(actorRef: ActorRef) = {
system.actorOf(Props(new MessageGeneratorActor(actorRef)))
@ -110,7 +111,7 @@ class ConcurrentSocketActorSpec
protected def receive = {
case _
val payload = "%s".format(messageNumber)
messageNumber = messageNumber + 1
messageNumber += 1
actorRef ! ZMQMessage(payload.getBytes)
}
}

View file

@ -320,7 +320,8 @@ object AkkaBuild extends Build {
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox),
dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor,
fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox, zeroMQ),
settings = defaultSettings ++ Seq(
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
libraryDependencies ++= Dependencies.docs,