Merge remote-tracking branch 'origin/master' into wip-1750-remove-ReflectiveAccess-∂π
This commit is contained in:
commit
d2f28a06cd
83 changed files with 1579 additions and 561 deletions
|
|
@ -381,6 +381,7 @@ public class FutureDocTestBase {
|
|||
@Test public void useOnSuccessOnFailureAndOnComplete() {
|
||||
{
|
||||
Future<String> future = Futures.successful("foo", system.dispatcher());
|
||||
|
||||
//#onSuccess
|
||||
future.onSuccess(new OnSuccess<String>() {
|
||||
public void onSuccess(String result) {
|
||||
|
|
|
|||
|
|
@ -54,61 +54,7 @@ public class SerializationDocTestBase {
|
|||
}
|
||||
}
|
||||
//#my-own-serializer
|
||||
@Test public void haveExamples() {
|
||||
/*
|
||||
//#serialize-messages-config
|
||||
akka {
|
||||
actor {
|
||||
serialize-messages = on
|
||||
}
|
||||
}
|
||||
//#serialize-messages-config
|
||||
|
||||
//#serialize-creators-config
|
||||
akka {
|
||||
actor {
|
||||
serialize-creators = on
|
||||
}
|
||||
}
|
||||
//#serialize-creators-config
|
||||
|
||||
|
||||
//#serialize-serializers-config
|
||||
akka {
|
||||
actor {
|
||||
serializers {
|
||||
default = "akka.serialization.JavaSerializer"
|
||||
|
||||
myown = "akka.docs.serialization.MyOwnSerializer"
|
||||
}
|
||||
}
|
||||
}
|
||||
//#serialize-serializers-config
|
||||
|
||||
//#serialization-bindings-config
|
||||
akka {
|
||||
actor {
|
||||
serializers {
|
||||
default = "akka.serialization.JavaSerializer"
|
||||
java = "akka.serialization.JavaSerializer"
|
||||
proto = "akka.serialization.ProtobufSerializer"
|
||||
myown = "akka.docs.serialization.MyOwnSerializer"
|
||||
}
|
||||
|
||||
serialization-bindings {
|
||||
java = ["java.lang.String",
|
||||
"app.my.Customer"]
|
||||
proto = ["com.google.protobuf.Message"]
|
||||
myown = ["my.own.BusinessObject",
|
||||
"something.equally.Awesome",
|
||||
"akka.docs.serialization.MyOwnSerializable"
|
||||
"java.lang.Boolean"]
|
||||
}
|
||||
}
|
||||
}
|
||||
//#serialization-bindings-config
|
||||
*/
|
||||
}
|
||||
|
||||
@Test public void demonstrateTheProgrammaticAPI() {
|
||||
//#programmatic
|
||||
|
|
|
|||
8
akka-docs/java/code/akka/docs/zeromq/ZeromqDocTest.scala
Normal file
8
akka-docs/java/code/akka/docs/zeromq/ZeromqDocTest.scala
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.docs.zeromq
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
class ZeromqDocTest extends ZeromqDocTestBase with JUnitSuite
|
||||
286
akka-docs/java/code/akka/docs/zeromq/ZeromqDocTestBase.java
Normal file
286
akka-docs/java/code/akka/docs/zeromq/ZeromqDocTestBase.java
Normal file
|
|
@ -0,0 +1,286 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.docs.zeromq;
|
||||
|
||||
//#pub-socket
|
||||
import akka.zeromq.Bind;
|
||||
import akka.zeromq.ZeroMQExtension;
|
||||
|
||||
//#pub-socket
|
||||
//#sub-socket
|
||||
import akka.zeromq.Connect;
|
||||
import akka.zeromq.Listener;
|
||||
import akka.zeromq.Subscribe;
|
||||
|
||||
//#sub-socket
|
||||
//#unsub-topic-socket
|
||||
import akka.zeromq.Unsubscribe;
|
||||
|
||||
//#unsub-topic-socket
|
||||
//#pub-topic
|
||||
import akka.zeromq.Frame;
|
||||
import akka.zeromq.ZMQMessage;
|
||||
|
||||
//#pub-topic
|
||||
|
||||
import akka.zeromq.HighWatermark;
|
||||
import akka.zeromq.SocketOption;
|
||||
import akka.zeromq.ZeroMQVersion;
|
||||
|
||||
//#health
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.Props;
|
||||
import akka.event.Logging;
|
||||
import akka.event.LoggingAdapter;
|
||||
import akka.util.Duration;
|
||||
import akka.serialization.SerializationExtension;
|
||||
import akka.serialization.Serialization;
|
||||
import java.io.Serializable;
|
||||
import java.lang.management.ManagementFactory;
|
||||
//#health
|
||||
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.lang.management.MemoryUsage;
|
||||
import java.lang.management.OperatingSystemMXBean;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Date;
|
||||
import java.text.SimpleDateFormat;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.testkit.AkkaSpec;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assume;
|
||||
|
||||
import akka.zeromq.SocketType;
|
||||
|
||||
public class ZeromqDocTestBase {
|
||||
|
||||
ActorSystem system;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
system = ActorSystem.create("ZeromqDocTest",
|
||||
ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf()));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateCreateSocket() {
|
||||
Assume.assumeTrue(checkZeroMQInstallation());
|
||||
|
||||
//#pub-socket
|
||||
ActorRef pubSocket = ZeroMQExtension.get(system).newPubSocket(new Bind("tcp://127.0.0.1:1233"));
|
||||
//#pub-socket
|
||||
|
||||
//#sub-socket
|
||||
ActorRef listener = system.actorOf(new Props(ListenerActor.class));
|
||||
ActorRef subSocket = ZeroMQExtension.get(system).newSubSocket(new Connect("tcp://127.0.0.1:1233"),
|
||||
new Listener(listener), Subscribe.all());
|
||||
//#sub-socket
|
||||
|
||||
//#sub-topic-socket
|
||||
ActorRef subTopicSocket = ZeroMQExtension.get(system).newSubSocket(new Connect("tcp://127.0.0.1:1233"),
|
||||
new Listener(listener), new Subscribe("foo.bar"));
|
||||
//#sub-topic-socket
|
||||
|
||||
//#unsub-topic-socket
|
||||
subTopicSocket.tell(new Unsubscribe("foo.bar"));
|
||||
//#unsub-topic-socket
|
||||
|
||||
byte[] payload = new byte[0];
|
||||
//#pub-topic
|
||||
pubSocket.tell(new ZMQMessage(new Frame("foo.bar"), new Frame(payload)));
|
||||
//#pub-topic
|
||||
|
||||
//#high-watermark
|
||||
ActorRef highWatermarkSocket = ZeroMQExtension.get(system).newRouterSocket(
|
||||
new SocketOption[] { new Listener(listener), new Bind("tcp://127.0.0.1:1233"), new HighWatermark(50000) });
|
||||
//#high-watermark
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstratePubSub() throws Exception {
|
||||
Assume.assumeTrue(checkZeroMQInstallation());
|
||||
|
||||
//#health2
|
||||
|
||||
system.actorOf(new Props(HealthProbe.class), "health");
|
||||
//#health2
|
||||
|
||||
//#logger2
|
||||
|
||||
system.actorOf(new Props(Logger.class), "logger");
|
||||
//#logger2
|
||||
|
||||
//#alerter2
|
||||
|
||||
system.actorOf(new Props(HeapAlerter.class), "alerter");
|
||||
//#alerter2
|
||||
|
||||
// Let it run for a while to see some output.
|
||||
// Don't do like this in real tests, this is only doc demonstration.
|
||||
Thread.sleep(3000L);
|
||||
}
|
||||
|
||||
private boolean checkZeroMQInstallation() {
|
||||
try {
|
||||
ZeroMQVersion v = ZeroMQExtension.get(system).version();
|
||||
return (v.major() == 2 && v.minor() == 1);
|
||||
} catch (LinkageError e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
//#listener-actor
|
||||
public static class ListenerActor extends UntypedActor {
|
||||
public void onReceive(Object message) throws Exception {
|
||||
//...
|
||||
}
|
||||
}
|
||||
|
||||
//#listener-actor
|
||||
|
||||
//#health
|
||||
|
||||
public static final Object TICK = "TICK";
|
||||
|
||||
public static class Heap implements Serializable {
|
||||
public final long timestamp;
|
||||
public final long used;
|
||||
public final long max;
|
||||
|
||||
public Heap(long timestamp, long used, long max) {
|
||||
this.timestamp = timestamp;
|
||||
this.used = used;
|
||||
this.max = max;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Load implements Serializable {
|
||||
public final long timestamp;
|
||||
public final double loadAverage;
|
||||
|
||||
public Load(long timestamp, double loadAverage) {
|
||||
this.timestamp = timestamp;
|
||||
this.loadAverage = loadAverage;
|
||||
}
|
||||
}
|
||||
|
||||
public static class HealthProbe extends UntypedActor {
|
||||
|
||||
ActorRef pubSocket = ZeroMQExtension.get(getContext().system()).newPubSocket(new Bind("tcp://127.0.0.1:1237"));
|
||||
MemoryMXBean memory = ManagementFactory.getMemoryMXBean();
|
||||
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
|
||||
Serialization ser = SerializationExtension.get(getContext().system());
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
getContext().system().scheduler()
|
||||
.schedule(Duration.parse("1 second"), Duration.parse("1 second"), getSelf(), TICK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postRestart(Throwable reason) {
|
||||
// don't call preStart, only schedule once
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message.equals(TICK)) {
|
||||
MemoryUsage currentHeap = memory.getHeapMemoryUsage();
|
||||
long timestamp = System.currentTimeMillis();
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
byte[] heapPayload = ser.serializerFor(Heap.class).toBinary(
|
||||
new Heap(timestamp, currentHeap.getUsed(), currentHeap.getMax()));
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket.tell(new ZMQMessage(new Frame("health.heap"), new Frame(heapPayload)));
|
||||
|
||||
// use akka SerializationExtension to convert to bytes
|
||||
byte[] loadPayload = ser.serializerFor(Load.class).toBinary(new Load(timestamp, os.getSystemLoadAverage()));
|
||||
// the first frame is the topic, second is the message
|
||||
pubSocket.tell(new ZMQMessage(new Frame("health.load"), new Frame(loadPayload)));
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#health
|
||||
|
||||
//#logger
|
||||
public static class Logger extends UntypedActor {
|
||||
|
||||
ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(new Connect("tcp://127.0.0.1:1237"),
|
||||
new Listener(getSelf()), new Subscribe("health"));
|
||||
Serialization ser = SerializationExtension.get(getContext().system());
|
||||
SimpleDateFormat timestampFormat = new SimpleDateFormat("HH:mm:ss.SSS");
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof ZMQMessage) {
|
||||
ZMQMessage m = (ZMQMessage) message;
|
||||
// the first frame is the topic, second is the message
|
||||
if (m.firstFrameAsString().equals("health.heap")) {
|
||||
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
|
||||
log.info("Used heap {} bytes, at {}", heap.used, timestampFormat.format(new Date(heap.timestamp)));
|
||||
} else if (m.firstFrameAsString().equals("health.load")) {
|
||||
Load load = (Load) ser.serializerFor(Load.class).fromBinary(m.payload(1));
|
||||
log.info("Load average {}, at {}", load.loadAverage, timestampFormat.format(new Date(load.timestamp)));
|
||||
}
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#logger
|
||||
|
||||
//#alerter
|
||||
public static class HeapAlerter extends UntypedActor {
|
||||
|
||||
ActorRef subSocket = ZeroMQExtension.get(getContext().system()).newSubSocket(new Connect("tcp://127.0.0.1:1237"),
|
||||
new Listener(getSelf()), new Subscribe("health.heap"));
|
||||
Serialization ser = SerializationExtension.get(getContext().system());
|
||||
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof ZMQMessage) {
|
||||
ZMQMessage m = (ZMQMessage) message;
|
||||
// the first frame is the topic, second is the message
|
||||
if (m.firstFrameAsString().equals("health.heap")) {
|
||||
Heap heap = (Heap) ser.serializerFor(Heap.class).fromBinary(m.payload(1));
|
||||
if (((double) heap.used / heap.max) > 0.9) {
|
||||
count += 1;
|
||||
} else {
|
||||
count = 0;
|
||||
}
|
||||
if (count > 10) {
|
||||
log.warning("Need more memory, using {} %", (100.0 * heap.used / heap.max));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#alerter
|
||||
|
||||
}
|
||||
|
|
@ -55,6 +55,24 @@ Default values are taken from ``default-dispatcher``, i.e. all options doesn't n
|
|||
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override
|
||||
the values for the ``default-dispatcher`` in your configuration.
|
||||
|
||||
.. note::
|
||||
|
||||
It should be noted that the ``dispatcher-id`` used in :class:`Props` is in
|
||||
fact an absolute path into the configuration object, i.e. you can declare a
|
||||
dispatcher configuration nested within other configuration objects and refer
|
||||
to it like so: ``"my.config.object.myAwesomeDispatcher"``
|
||||
|
||||
There are two different executor services:
|
||||
|
||||
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for
|
||||
``default-dispatcher``.
|
||||
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
|
||||
|
||||
Note that the pool size is configured differently for the two executor services. The configuration above
|
||||
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``:
|
||||
|
||||
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
|
||||
|
||||
Let's now walk through the different dispatchers in more detail.
|
||||
|
||||
Thread-based
|
||||
|
|
@ -67,9 +85,11 @@ has worse performance and scalability than the event-based dispatcher but works
|
|||
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
|
||||
this dispatcher is that Actors do not block threads for each other.
|
||||
|
||||
The ``PinnedDispatcher`` can't be configured, but is created and associated with an actor like this:
|
||||
The ``PinnedDispatcher`` is configured like this:
|
||||
|
||||
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher
|
||||
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
|
||||
|
||||
Note that it must be used with ``executor = "thread-pool-executor"``.
|
||||
|
||||
Event-based
|
||||
^^^^^^^^^^^
|
||||
|
|
|
|||
|
|
@ -23,3 +23,4 @@ Java API
|
|||
transactors
|
||||
fsm
|
||||
extending-akka
|
||||
zeromq
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ This config option is very good if you want to know what config settings are loa
|
|||
akka {
|
||||
# Log the complete configuration at INFO level when the actor system is started.
|
||||
# This is useful when you are uncertain of what configuration is used.
|
||||
logConfigOnStart = on
|
||||
log-config-on-start = on
|
||||
}
|
||||
|
||||
If you want very detailed logging of all automatically received messages that are processed
|
||||
|
|
@ -218,13 +218,13 @@ Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``.
|
|||
With Logback the thread name is available with ``%X{sourceThread}`` specifier within the pattern layout configuration::
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<layout>
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} %-5level %logger{36} %X{sourceThread} - %msg%n</pattern>
|
||||
</layout>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
.. note::
|
||||
|
||||
|
||||
It will probably be a good idea to use the ``sourceThread`` MDC value also in
|
||||
non-Akka parts of the application in order to have this property consistently
|
||||
available in the logs.
|
||||
|
|
@ -235,9 +235,9 @@ is available for associating log messages e.g. with members of a router. This
|
|||
information is available in the MDC with attribute name ``akkaSource``::
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<layout>
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
|
||||
</layout>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
For more details on what this attribute contains—also for non-actors—please see
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ to your ``application.conf`` file::
|
|||
}
|
||||
remote {
|
||||
transport = "akka.remote.netty.NettyRemoteTransport"
|
||||
server {
|
||||
netty {
|
||||
hostname = "127.0.0.1"
|
||||
port = 2552
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,47 +25,39 @@ For Akka to know which ``Serializer`` to use for what, you need edit your :ref:`
|
|||
in the "akka.actor.serializers"-section you bind names to implementations of the ``akka.serialization.Serializer``
|
||||
you wish to use, like this:
|
||||
|
||||
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-serializers-config
|
||||
|
||||
.. note::
|
||||
|
||||
The name ``default`` is special in the sense that the ``Serializer``
|
||||
mapped to it will be used as default.
|
||||
.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-serializers-config
|
||||
|
||||
After you've bound names to different implementations of ``Serializer`` you need to wire which classes
|
||||
should be serialized using which ``Serializer``, this is done in the "akka.actor.serialization-bindings"-section:
|
||||
|
||||
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialization-bindings-config
|
||||
.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialization-bindings-config
|
||||
|
||||
.. note::
|
||||
You only need to specify the name of an interface or abstract base class of the
|
||||
messages. In case of ambiguity, i.e. the message implements several of the
|
||||
configured classes, the most specific configured class will be used, i.e. the
|
||||
one of which all other candidates are superclasses. If this condition cannot be
|
||||
met, because e.g. ``java.io.Serializable`` and ``MyOwnSerializable`` both apply
|
||||
and neither is a subtype of the other, a warning will be issued.
|
||||
|
||||
You only need to specify the name of an interface or abstract base class if the messages implements
|
||||
that. E.g. ``com.google.protobuf.Message`` for protobuf serialization.
|
||||
Akka provides serializers for :class:`java.io.Serializable` and `protobuf
|
||||
<http://code.google.com/p/protobuf/>`_
|
||||
:class:`com.google.protobuf.GeneratedMessage` by default (the latter only if
|
||||
depending on the akka-remote module), so normally you don't need to add
|
||||
configuration for that; since :class:`com.google.protobuf.GeneratedMessage`
|
||||
implements :class:`java.io.Serializable`, protobuf messages will always by
|
||||
serialized using the protobuf protocol unless specifically overridden. In order
|
||||
to disable a default serializer, map its marker type to “none”::
|
||||
|
||||
Protobuf
|
||||
--------
|
||||
|
||||
Akka provides a ``Serializer`` for `protobuf <http://code.google.com/p/protobuf/>`_ messages.
|
||||
To use that you need to add the following to the configuration::
|
||||
|
||||
akka {
|
||||
actor {
|
||||
serializers {
|
||||
proto = "akka.serialization.ProtobufSerializer"
|
||||
}
|
||||
|
||||
serialization-bindings {
|
||||
proto = ["com.google.protobuf.Message"]
|
||||
}
|
||||
}
|
||||
}
|
||||
akka.actor.serialization-bindings {
|
||||
"java.io.Serializable" = none
|
||||
}
|
||||
|
||||
Verification
|
||||
------------
|
||||
|
||||
If you want to verify that your messages are serializable you can enable the following config option:
|
||||
|
||||
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-messages-config
|
||||
.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-messages-config
|
||||
|
||||
.. warning::
|
||||
|
||||
|
|
@ -74,7 +66,7 @@ If you want to verify that your messages are serializable you can enable the fol
|
|||
|
||||
If you want to verify that your ``Props`` are serializable you can enable the following config option:
|
||||
|
||||
.. includecode:: code/akka/docs/serialization/SerializationDocTestBase.java#serialize-creators-config
|
||||
.. includecode:: ../scala/code/akka/docs/serialization/SerializationDocSpec.scala#serialize-creators-config
|
||||
|
||||
.. warning::
|
||||
|
||||
|
|
@ -122,3 +114,4 @@ reading in the representation of an :class:`ActorRef` for turning the string
|
|||
representation into a real reference. :class:`DynamicVariable` is a
|
||||
thread-local variable, so be sure to have it set while deserializing anything
|
||||
which might contain actor references.
|
||||
|
||||
|
|
|
|||
98
akka-docs/java/zeromq.rst
Normal file
98
akka-docs/java/zeromq.rst
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
|
||||
.. _zeromq-java:
|
||||
|
||||
###############
|
||||
ZeroMQ (Java)
|
||||
###############
|
||||
|
||||
.. sidebar:: Contents
|
||||
|
||||
.. contents:: :local:
|
||||
|
||||
Akka provides a ZeroMQ module which abstracts a ZeroMQ connection and therefore allows interaction between Akka actors to take place over ZeroMQ connections. The messages can be of a proprietary format or they can be defined using Protobuf. The socket actor is fault-tolerant by default and when you use the newSocket method to create new sockets it will properly reinitialize the socket.
|
||||
|
||||
ZeroMQ is very opinionated when it comes to multi-threading so configuration option `akka.zeromq.socket-dispatcher` always needs to be configured to a PinnedDispatcher, because the actual ZeroMQ socket can only be accessed by the thread that created it.
|
||||
|
||||
The ZeroMQ module for Akka is written against an API introduced in JZMQ, which uses JNI to interact with the native ZeroMQ library. Instead of using JZMQ, the module uses ZeroMQ binding for Scala that uses the native ZeroMQ library through JNA. In other words, the only native library that this module requires is the native ZeroMQ library.
|
||||
The benefit of the scala library is that you don't need to compile and manage native dependencies at the cost of some runtime performance. The scala-bindings are compatible with the JNI bindings so they are a drop-in replacement, in case you really need to get that extra bit of performance out.
|
||||
|
||||
Connection
|
||||
==========
|
||||
|
||||
ZeroMQ supports multiple connectivity patterns, each aimed to meet a different set of requirements. Currently, this module supports publisher-subscriber connections and connections based on dealers and routers. For connecting or accepting connections, a socket must be created.
|
||||
Sockets are always created using the ``akka.zeromq.ZeroMQExtension``, for example:
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#pub-socket
|
||||
|
||||
Above examples will create a ZeroMQ Publisher socket that is Bound to the port 1233 on localhost.
|
||||
|
||||
Similarly you can create a subscription socket, with a listener, that subscribes to all messages from the publisher using:
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#sub-socket
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#listener-actor
|
||||
|
||||
The following sub-sections describe the supported connection patterns and how they can be used in an Akka environment. However, for a comprehensive discussion of connection patterns, please refer to `ZeroMQ -- The Guide <http://zguide.zeromq.org/page:all>`_.
|
||||
|
||||
Publisher-subscriber connection
|
||||
-------------------------------
|
||||
|
||||
In a publisher-subscriber (pub-sub) connection, the publisher accepts one or more subscribers. Each subscriber shall
|
||||
subscribe to one or more topics, whereas the publisher publishes messages to a set of topics. Also, a subscriber can
|
||||
subscribe to all available topics. In an Akka environment, pub-sub connections shall be used when an actor sends messages
|
||||
to one or more actors that do not interact with the actor that sent the message.
|
||||
|
||||
When you're using zeromq pub/sub you should be aware that it needs multicast - check your cloud - to work properly and that the filtering of events for topics happens client side, so all events are always broadcasted to every subscriber.
|
||||
|
||||
An actor is subscribed to a topic as follows:
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#sub-topic-socket
|
||||
|
||||
It is a prefix match so it is subscribed to all topics starting with ``foo.bar``. Note that if the given string is empty or
|
||||
``Subscribe.all()`` is used, the actor is subscribed to all topics.
|
||||
|
||||
To unsubscribe from a topic you do the following:
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#unsub-topic-socket
|
||||
|
||||
To publish messages to a topic you must use two Frames with the topic in the first frame.
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#pub-topic
|
||||
|
||||
Pub-Sub in Action
|
||||
^^^^^^^^^^^^^^^^^
|
||||
|
||||
The following example illustrates one publisher with two subscribers.
|
||||
|
||||
The publisher monitors current heap usage and system load and periodically publishes ``Heap`` events on the ``"health.heap"`` topic
|
||||
and ``Load`` events on the ``"health.load"`` topic.
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#health
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#health2
|
||||
|
||||
Let's add one subscriber that logs the information. It subscribes to all topics starting with ``"health"``, i.e. both ``Heap`` and
|
||||
``Load`` events.
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#logger
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#logger2
|
||||
|
||||
Another subscriber keep track of used heap and warns if too much heap is used. It only subscribes to ``Heap`` events.
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#alerter
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#alerter2
|
||||
|
||||
Router-Dealer connection
|
||||
------------------------
|
||||
|
||||
While Pub/Sub is nice the real advantage of zeromq is that it is a "lego-box" for reliable messaging. And because there are so many integrations the multi-language support is fantastic.
|
||||
When you're using ZeroMQ to integrate many systems you'll probably need to build your own ZeroMQ devices. This is where the router and dealer socket types come in handy.
|
||||
With those socket types you can build your own reliable pub sub broker that uses TCP/IP and does publisher side filtering of events.
|
||||
|
||||
To create a Router socket that has a high watermark configured, you would do:
|
||||
|
||||
.. includecode:: code/akka/docs/zeromq/ZeromqDocTestBase.java#high-watermark
|
||||
|
||||
The akka-zeromq module accepts most if not all the available configuration options for a zeromq socket.
|
||||
Loading…
Add table
Add a link
Reference in a new issue