Remove words such as simply and obviously from docs (#25095)
* One does not "simply" * It's not obvious * It's not really _that_ easily done * Basically is basically a useless word * Of course - if you already know how things work you wouldn't be reading the docs * Clearly is maybe not so clear for everyone * Just was just a bit harder as there are some uses that are just
This commit is contained in:
parent
da5cc33b92
commit
09025092ae
66 changed files with 184 additions and 185 deletions
|
|
@ -568,7 +568,7 @@ sending their ActorRefs to other Actors within messages.
|
|||
|
||||
@@@
|
||||
|
||||
The supplied path is parsed as a `java.net.URI`, which basically means
|
||||
The supplied path is parsed as a `java.net.URI`, which means
|
||||
that it is split on `/` into path elements. If the path starts with `/`, it
|
||||
is absolute and the look-up starts at the root guardian (which is the parent of
|
||||
`"/user"`); otherwise it starts at the current actor. If a path element equals
|
||||
|
|
@ -1333,12 +1333,12 @@ and partial functions can be chained together using the `PartialFunction#orElse`
|
|||
however you should keep in mind that "first match" wins - which may be important when combining functions that both can handle the same type of message.
|
||||
|
||||
For example, imagine you have a set of actors which are either `Producers` or `Consumers`, yet sometimes it makes sense to
|
||||
have an actor share both behaviors. This can be easily achieved without having to duplicate code by extracting the behaviors to
|
||||
have an actor share both behaviors. This can be achieved without having to duplicate code by extracting the behaviors to
|
||||
traits and implementing the actor's `receive` as combination of these partial functions.
|
||||
|
||||
@@snip [ActorDocSpec.scala]($code$/scala/docs/actor/ActorDocSpec.scala) { #receive-orElse }
|
||||
|
||||
Instead of inheritance the same pattern can be applied via composition - one would simply compose the receive method using partial functions from delegates.
|
||||
Instead of inheritance the same pattern can be applied via composition - compose the receive method using partial functions from delegates.
|
||||
|
||||
@@@
|
||||
|
||||
|
|
|
|||
|
|
@ -358,7 +358,7 @@ able to suspend any work started for request processing (thereby freeing threads
|
|||
to do other work) and resume processing when the response is ready. This is
|
||||
currently the case for a [subset of components](http://camel.apache.org/asynchronous-routing-engine.html)
|
||||
such as the Jetty component.
|
||||
All other Camel components can still be used, of course, but they will cause
|
||||
All other Camel components can still be used, but they will cause
|
||||
allocation of a thread for the duration of an in-out message exchange. There's
|
||||
also [Examples](#camel-examples) that implements both, an asynchronous
|
||||
consumer and an asynchronous producer, with the jetty component.
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
# Cluster Client
|
||||
|
||||
An actor system that is not part of the cluster can communicate with actors
|
||||
somewhere in the cluster via this @unidoc[ClusterClient]. The client can of course be part of
|
||||
somewhere in the cluster via the @unidoc[ClusterClient], the client can run in an `ActorSystem` that is part of
|
||||
another cluster. It only needs to know the location of one (or more) nodes to use as initial
|
||||
contact points. It will establish a connection to a @unidoc[akka.cluster.client.ClusterReceptionist] somewhere in
|
||||
the cluster. It will monitor the connection to the receptionist and establish a new
|
||||
|
|
|
|||
|
|
@ -274,7 +274,7 @@ Because of these issues, auto-downing should **never** be used in a production e
|
|||
|
||||
There are two ways to remove a member from the cluster.
|
||||
|
||||
You can just stop the actor system (or the JVM process). It will be detected
|
||||
You can stop the actor system (or the JVM process). It will be detected
|
||||
as unreachable and removed after the automatic or manual downing as described
|
||||
above.
|
||||
|
||||
|
|
@ -496,7 +496,7 @@ For some use cases it is convenient and sometimes also mandatory to ensure that
|
|||
you have exactly one actor of a certain type running somewhere in the cluster.
|
||||
|
||||
This can be implemented by subscribing to member events, but there are several corner
|
||||
cases to consider. Therefore, this specific use case is made easily accessible by the
|
||||
cases to consider. Therefore, this specific use case is covered by the
|
||||
@ref:[Cluster Singleton](cluster-singleton.md).
|
||||
|
||||
## Cluster Sharding
|
||||
|
|
@ -882,7 +882,7 @@ Then the abstract `MultiNodeSpec`, which takes the `MultiNodeConfig` as construc
|
|||
|
||||
@@snip [StatsSampleSpec.scala]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala) { #abstract-test }
|
||||
|
||||
Most of this can of course be extracted to a separate trait to avoid repeating this in all your tests.
|
||||
Most of this can be extracted to a separate trait to avoid repeating this in all your tests.
|
||||
|
||||
Typically you begin your test by starting up the cluster and let the members join, and create some actors.
|
||||
That can be done like this:
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ Historically, Akka has been following the Java or Scala style of versioning wher
|
|||
the second one would mean **major**, and third be the **minor**, thus: `epoch.major.minor` (versioning scheme followed until and during `2.3.x`).
|
||||
|
||||
**Currently**, since Akka `2.4.0`, the new versioning applies which is closer to semantic versioning many have come to expect,
|
||||
in which the version number is deciphered as `major.minor.patch`. This also means that Akka `2.5.x` is binary compatible with the `2.4` series releases (with the exception of "may change" APIs of course).
|
||||
in which the version number is deciphered as `major.minor.patch`. This also means that Akka `2.5.x` is binary compatible with the `2.4` series releases (with the exception of "may change" APIs).
|
||||
|
||||
In addition to that, Akka `2.4.x` has been made binary compatible with the `2.3.x` series,
|
||||
so there is no reason to remain on Akka 2.3.x, since upgrading is completely compatible
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ failure detection services. The idea is that it is keeping a history of failure
|
|||
statistics, calculated from heartbeats received from other nodes, and is
|
||||
trying to do educated guesses by taking multiple factors, and how they
|
||||
accumulate over time, into account in order to come up with a better guess if a
|
||||
specific node is up or down. Rather than just answering "yes" or "no" to the
|
||||
specific node is up or down. Rather than only answering "yes" or "no" to the
|
||||
question "is the node down?" it returns a `phi` value representing the
|
||||
likelihood that the node is down.
|
||||
|
||||
|
|
@ -139,9 +139,9 @@ and the actor system must be restarted before it can join the cluster again.
|
|||
|
||||
After gossip convergence a `leader` for the cluster can be determined. There is no
|
||||
`leader` election process, the `leader` can always be recognised deterministically
|
||||
by any node whenever there is gossip convergence. The leader is just a role, any node
|
||||
by any node whenever there is gossip convergence. The leader is only a role, any node
|
||||
can be the leader and it can change between convergence rounds.
|
||||
The `leader` is simply the first node in sorted order that is able to take the leadership role,
|
||||
The `leader` is the first node in sorted order that is able to take the leadership role,
|
||||
where the preferred member states for a `leader` are `up` and `leaving`
|
||||
(see the [Membership Lifecycle](#membership-lifecycle) section below for more information about member states).
|
||||
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ Java
|
|||
|
||||
|
||||
When facing this, you
|
||||
may be tempted to just wrap the blocking call inside a `Future` and work
|
||||
may be tempted to wrap the blocking call inside a `Future` and work
|
||||
with that instead, but this strategy is too simple: you are quite likely to
|
||||
find bottlenecks or run out of memory or threads when the application runs
|
||||
under increased load.
|
||||
|
|
@ -336,7 +336,7 @@ The thread pool behavior is shown in the below diagram.
|
|||
|
||||

|
||||
|
||||
Messages sent to `SeparateDispatcherFutureActor` and `PrintActor` are easily handled by the default dispatcher - the
|
||||
Messages sent to `SeparateDispatcherFutureActor` and `PrintActor` are handled by the default dispatcher - the
|
||||
green lines, which represent the actual execution.
|
||||
|
||||
When blocking operations are run on the `my-blocking-dispatcher`,
|
||||
|
|
@ -382,8 +382,8 @@ on which DBMS is deployed on what hardware.
|
|||
|
||||
@@@ note
|
||||
|
||||
Configuring thread pools is a task best delegated to Akka, simply configure
|
||||
in the `application.conf` and instantiate through an
|
||||
Configuring thread pools is a task best delegated to Akka, configure
|
||||
it in `application.conf` and instantiate through an
|
||||
@ref:[`ActorSystem`](#dispatcher-lookup)
|
||||
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -420,7 +420,7 @@ Java
|
|||
|
||||
If you only need to add elements to a set and not remove elements the `GSet` (grow-only set) is
|
||||
the data type to use. The elements can be any type of values that can be serialized.
|
||||
Merge is simply the union of the two sets.
|
||||
Merge is the union of the two sets.
|
||||
|
||||
Scala
|
||||
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #gset }
|
||||
|
|
@ -558,7 +558,7 @@ changing and writing the value with `WriteMajority` (or more).
|
|||
|
||||
### Custom Data Type
|
||||
|
||||
You can rather easily implement your own data types. The only requirement is that it implements
|
||||
You can implement your own data types. The only requirement is that it implements
|
||||
the @scala[`merge`]@java[`mergeData`] function of the @scala[`ReplicatedData`]@java[`AbstractReplicatedData`] trait.
|
||||
|
||||
A nice property of stateful CRDTs is that they typically compose nicely, i.e. you can combine several
|
||||
|
|
@ -700,7 +700,7 @@ actor system to make the name unique. If using a dynamically assigned
|
|||
port (0) it will be different each time and the previously stored data
|
||||
will not be loaded.
|
||||
|
||||
Making the data durable has of course a performance cost. By default, each update is flushed
|
||||
Making the data durable has a performance cost. By default, each update is flushed
|
||||
to disk before the `UpdateSuccess` reply is sent. For better performance, but with the risk of losing
|
||||
the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
|
||||
a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
|
||||
|
|
|
|||
|
|
@ -268,6 +268,6 @@ Java
|
|||
|
||||
### Other Uses
|
||||
|
||||
The event stream is always there and ready to be used, just publish your own
|
||||
The event stream is always there and ready to be used, you can publish your own
|
||||
events (it accepts @scala[`AnyRef`]@java[`Object`]) and subscribe listeners to the corresponding JVM
|
||||
classes.
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ ensure the thread safety of his/her extension.
|
|||
|
||||
## Building an Extension
|
||||
|
||||
So let's create a sample extension that just lets us count the number of times something has happened.
|
||||
So let's create a sample extension that lets us count the number of times something has happened.
|
||||
|
||||
First, we define what our `Extension` should do:
|
||||
|
||||
|
|
|
|||
|
|
@ -88,7 +88,7 @@ You can combine your own strategy with the default strategy:
|
|||
|
||||
### Stopping Supervisor Strategy
|
||||
|
||||
Closer to the Erlang way is the strategy to just stop children when they fail
|
||||
Closer to the Erlang way is the strategy to stop children when they fail
|
||||
and then take corrective action in the supervisor when DeathWatch signals the
|
||||
loss of the child. This strategy is also provided pre-packaged as
|
||||
`SupervisorStrategy.stoppingStrategy` with an accompanying
|
||||
|
|
@ -114,7 +114,7 @@ by overriding the `logFailure` method.
|
|||
|
||||
Toplevel actors means those which are created using `system.actorOf()`, and
|
||||
they are children of the @ref:[User Guardian](general/supervision.md#user-guardian). There are no
|
||||
special rules applied in this case, the guardian simply applies the configured
|
||||
special rules applied in this case, the guardian applies the configured
|
||||
strategy.
|
||||
|
||||
## Test Application
|
||||
|
|
|
|||
|
|
@ -187,7 +187,7 @@ If you need to do conditional propagation, you can use `filter`:
|
|||
|
||||
### For Comprehensions
|
||||
|
||||
Since `Future` has a `map`, `filter` and `flatMap` method it can be easily used in a 'for comprehension':
|
||||
Since `Future` has a `map`, `filter` and `flatMap` method it can be used in a 'for comprehension':
|
||||
|
||||
@@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #for-comprehension }
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ communicated to the right person, a better solution can be found than if
|
|||
trying to keep everything “under the carpet”.
|
||||
|
||||
Now, the difficulty in designing such a system is how to decide who should
|
||||
supervise what. There is of course no single best solution, but there are a few
|
||||
supervise what. There is no single best solution, but there are a few
|
||||
guidelines which might be helpful:
|
||||
|
||||
* If one actor manages the work another actor is doing, e.g. by passing on
|
||||
|
|
@ -64,7 +64,7 @@ influence on the supervisor strategy, and it should be noted that a
|
|||
functional dependency alone is not a criterion for deciding where to place a
|
||||
certain child actor in the hierarchy.
|
||||
|
||||
There are of course always exceptions to these rules, but no matter whether you
|
||||
There are always exceptions to these rules, but no matter whether you
|
||||
follow the rules or break them, you should always have a reason.
|
||||
|
||||
## Configuration Container
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ pending requests, etc. These data are what make an actor valuable, and they
|
|||
must be protected from corruption by other actors. The good news is that Akka
|
||||
actors conceptually each have their own light-weight thread, which is
|
||||
completely shielded from the rest of the system. This means that instead of
|
||||
having to synchronize access using locks you can just write your actor code
|
||||
having to synchronize access using locks you can write your actor code
|
||||
without worrying about concurrency at all.
|
||||
|
||||
Behind the scenes Akka will run sets of actors on sets of real threads, where
|
||||
|
|
|
|||
|
|
@ -181,7 +181,7 @@ example send a message to a specific sibling:
|
|||
context.actorSelection("../brother") ! msg
|
||||
```
|
||||
|
||||
Absolute paths may of course also be looked up on *context* in the usual way, i.e.
|
||||
Absolute paths may also be looked up on *context* in the usual way, i.e.
|
||||
|
||||
```scala
|
||||
context.actorSelection("/user/serviceA") ! msg
|
||||
|
|
@ -220,7 +220,7 @@ release.
|
|||
@@@ note
|
||||
|
||||
What the above sections described in some detail can be summarized and
|
||||
memorized easily as follows:
|
||||
memorized as follows:
|
||||
|
||||
* `actorOf` only ever creates a new actor, and it creates it as a direct
|
||||
child of the context on which this method is invoked (which may be any
|
||||
|
|
@ -251,7 +251,7 @@ When an actor is terminated, its reference will point to the dead letter mailbox
|
|||
DeathWatch will publish its final transition and in general it is not expected
|
||||
to come back to life again (since the actor life cycle does not allow this).
|
||||
While it is possible to create an actor at a later time with an identical
|
||||
path—simply due to it being impossible to enforce the opposite without keeping
|
||||
path—due to it being impossible to enforce the opposite without keeping
|
||||
the set of all actors ever created available—this is not good practice:
|
||||
messages sent with `actorSelection` to an actor which “died” suddenly start to work
|
||||
again, but without any guarantee of ordering between this transition and any
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ If the system or config property `akka.log-config-on-start` is set to `on`, then
|
|||
complete configuration is logged at INFO level when the actor system is started. This is
|
||||
useful when you are uncertain of what configuration is used.
|
||||
|
||||
If in doubt, you can also easily and nicely inspect configuration objects
|
||||
If in doubt, you can inspect your configuration objects
|
||||
before or after using them to construct an actor system:
|
||||
|
||||
@@@vars
|
||||
|
|
@ -238,8 +238,7 @@ bundles is not always trivial, the current approach of Akka is that each
|
|||
loader (if available, otherwise just its own loader as in
|
||||
`this.getClass.getClassLoader`) and uses that for all reflective accesses.
|
||||
This implies that putting Akka on the boot class path will yield
|
||||
`NullPointerException` from strange places: this is simply not
|
||||
supported.
|
||||
`NullPointerException` from strange places: this is not supported.
|
||||
|
||||
## Application specific settings
|
||||
|
||||
|
|
|
|||
|
|
@ -10,17 +10,17 @@ chapter.
|
|||
In order to give some context to the discussion below, consider an application
|
||||
which spans multiple network hosts. The basic mechanism for communication is
|
||||
the same whether sending to an actor on the local JVM or to a remote actor, but
|
||||
of course there will be observable differences in the latency of delivery
|
||||
there will be observable differences in the latency of delivery
|
||||
(possibly also depending on the bandwidth of the network link and the message
|
||||
size) and the reliability. In case of a remote message send there are obviously
|
||||
size) and the reliability. In case of a remote message send there are
|
||||
more steps involved which means that more can go wrong. Another aspect is that
|
||||
local sending will just pass a reference to the message inside the same JVM,
|
||||
local sending will pass a reference to the message inside the same JVM,
|
||||
without any restrictions on the underlying object which is sent, whereas a
|
||||
remote transport will place a limit on the message size.
|
||||
|
||||
Writing your actors such that every interaction could possibly be remote is the
|
||||
safe, pessimistic bet. It means to only rely on those properties which are
|
||||
always guaranteed and which are discussed in detail below. This has of course
|
||||
always guaranteed and which are discussed in detail below. This has
|
||||
some overhead in the actor’s implementation. If you are willing to sacrifice full
|
||||
location transparency—for example in case of a group of closely collaborating
|
||||
actors—you can place them always on the same JVM and enjoy stricter guarantees
|
||||
|
|
@ -209,7 +209,7 @@ In addition, local sends can fail in Akka-specific ways:
|
|||
* if the receiving actor fails while processing the message or is already
|
||||
terminated
|
||||
|
||||
While the first is clearly a matter of configuration the second deserves some
|
||||
While the first is a matter of configuration the second deserves some
|
||||
thought: the sender of a message does not get feedback if there was an
|
||||
exception while processing, that notification goes to the supervisor instead.
|
||||
This is in general not distinguishable from a lost message for an outside
|
||||
|
|
@ -293,7 +293,7 @@ replication and scaling of consumers of this event stream (i.e. other
|
|||
components may consume the event stream as a means to replicate the component’s
|
||||
state on a different continent or to react to changes). If the component’s
|
||||
state is lost—due to a machine failure or by being pushed out of a cache—it can
|
||||
easily be reconstructed by replaying the event stream (usually employing
|
||||
be reconstructed by replaying the event stream (usually employing
|
||||
snapshots to speed up the process). @ref:[Event sourcing](../persistence.md#event-sourcing) is supported by
|
||||
Akka Persistence.
|
||||
|
||||
|
|
@ -351,7 +351,7 @@ Every time an actor does not terminate by its own decision, there is a chance
|
|||
that some messages which it sends to itself are lost. There is one which
|
||||
happens quite easily in complex shutdown scenarios that is usually benign:
|
||||
seeing a `akka.dispatch.Terminate` message dropped means that two
|
||||
termination requests were given, but of course only one can succeed. In the
|
||||
termination requests were given, but only one can succeed. In the
|
||||
same vein, you might see `akka.actor.Terminated` messages from children
|
||||
while stopping a hierarchy of actors turning up in dead letters if the parent
|
||||
is still watching the child when the parent terminates.
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ It took quite a while until we were reasonably happy with the look and feel of t
|
|||
|
||||
@@@ note
|
||||
|
||||
As detailed in the introduction keep in mind that the Akka Streams API is completely decoupled from the Reactive Streams interfaces which are just an implementation detail for how to pass stream data between individual processing stages.
|
||||
As detailed in the introduction keep in mind that the Akka Streams API is completely decoupled from the Reactive Streams interfaces which are an implementation detail for how to pass stream data between individual processing stages.
|
||||
|
||||
@@@
|
||||
|
||||
|
|
@ -76,7 +76,7 @@ Akka Streams must enable a library to express any stream processing utility in t
|
|||
|
||||
@@@ note
|
||||
|
||||
A source that emits a stream of streams is still just a normal Source, the kind of elements that are produced does not play a role in the static stream topology that is being expressed.
|
||||
A source that emits a stream of streams is still a normal Source, the kind of elements that are produced does not play a role in the static stream topology that is being expressed.
|
||||
|
||||
@@@
|
||||
|
||||
|
|
@ -90,7 +90,7 @@ Unfortunately the method name for signaling *failure* to a Subscriber is called
|
|||
|
||||
@@@
|
||||
|
||||
There is only limited support for treating `onError` in Akka Streams compared to the operators that are available for the transformation of data elements, which is intentional in the spirit of the previous paragraph. Since `onError` signals that the stream is collapsing, its ordering semantics are not the same as for stream completion: transformation stages of any kind will just collapse with the stream, possibly still holding elements in implicit or explicit buffers. This means that data elements emitted before a failure can still be lost if the `onError` overtakes them.
|
||||
There is only limited support for treating `onError` in Akka Streams compared to the operators that are available for the transformation of data elements, which is intentional in the spirit of the previous paragraph. Since `onError` signals that the stream is collapsing, its ordering semantics are not the same as for stream completion: transformation stages of any kind will collapse with the stream, possibly still holding elements in implicit or explicit buffers. This means that data elements emitted before a failure can still be lost if the `onError` overtakes them.
|
||||
|
||||
The ability for failures to propagate faster than data elements is essential for tearing down streams that are back-pressured—especially since back-pressure can be the failure mode (e.g. by tripping upstream buffers which then abort because they cannot do anything else; or if a dead-lock occurred).
|
||||
|
||||
|
|
|
|||
|
|
@ -177,7 +177,7 @@ message will be delivered irrespective of the order in which the monitoring
|
|||
request and target’s termination occur, i.e. you still get the message even if
|
||||
at the time of registration the target is already dead.
|
||||
|
||||
Monitoring is particularly useful if a supervisor cannot simply restart its
|
||||
Monitoring is particularly useful if a supervisor cannot restart its
|
||||
children and has to terminate them, e.g. in case of errors during actor
|
||||
initialization. In that case it should monitor those children and re-create
|
||||
them or schedule itself to retry this at a later time.
|
||||
|
|
@ -269,7 +269,7 @@ but processed afterwards.
|
|||
|
||||
Normally stopping a child (i.e. not in response to a failure) will not
|
||||
automatically terminate the other children in an all-for-one strategy; this can
|
||||
easily be done by watching their lifecycle: if the `Terminated` message
|
||||
be done by watching their lifecycle: if the `Terminated` message
|
||||
is not handled by the supervisor, it will throw a `DeathPactException`
|
||||
which (depending on its supervisor) will restart it, and the default
|
||||
`preRestart` action will terminate all children. Of course this can be
|
||||
|
|
@ -278,5 +278,5 @@ handled explicitly as well.
|
|||
Please note that creating one-off actors from an all-for-one supervisor entails
|
||||
that failures escalated by the temporary actor will affect all the permanent
|
||||
ones. If this is not desired, install an intermediate supervisor; this can very
|
||||
easily be done by declaring a router of size 1 for the worker, see
|
||||
be done by declaring a router of size 1 for the worker, see
|
||||
@ref:[Routing](../routing.md).
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
In this chapter we attempt to establish a common terminology to define a solid ground for communicating about concurrent,
|
||||
distributed systems which Akka targets. Please note that, for many of these terms, there is no single agreed definition.
|
||||
We simply seek to give working definitions that will be used in the scope of the Akka documentation.
|
||||
We seek to give working definitions that will be used in the scope of the Akka documentation.
|
||||
|
||||
## Concurrency vs. Parallelism
|
||||
|
||||
|
|
|
|||
|
|
@ -177,7 +177,7 @@ Some of the challenges that HTTP tackles:
|
|||
|
||||
### Example of module use
|
||||
|
||||
Akka modules integrate together seamlessly. For example, think of a large set of stateful business objects, such as documents or shopping carts, that website users access. If you model these as sharded entities, using Sharding and Persistence, they will be balanced across a cluster that you can scale out on-demand. They will be available during spikes that come from advertising campaigns or before holidays will be handled, even if some systems crash. You can also easily take the real-time stream of domain events with Persistence Query and use Streams to pipe them into a streaming Fast Data engine. Then, take the output of that engine as a Stream, manipulate it using Akka Streams
|
||||
Akka modules integrate together seamlessly. For example, think of a large set of stateful business objects, such as documents or shopping carts, that website users access. If you model these as sharded entities, using Sharding and Persistence, they will be balanced across a cluster that you can scale out on-demand. They will be available during spikes that come from advertising campaigns or before holidays will be handled, even if some systems crash. You can also take the real-time stream of domain events with Persistence Query and use Streams to pipe them into a streaming Fast Data engine. Then, take the output of that engine as a Stream, manipulate it using Akka Streams
|
||||
operators and expose it as web socket connections served by a load balanced set of HTTP servers hosted by your cluster
|
||||
to power your real-time business analytics tool.
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ You should have already followed the instructions in the @scala[[Akka Quickstart
|
|||
|
||||
## IoT example use case
|
||||
|
||||
In this tutorial, we'll use Akka to build out part of an Internet of Things (IoT) system that reports data from sensor devices installed in customers' homes. The example focuses on temperature readings. The target use case simply allows customers to log in and view the last reported temperature from different areas of their homes. You can imagine that such sensors could also collect relative humidity or other interesting data and an application would likely support reading and changing device configuration, maybe even alerting home owners when sensor state falls outside of a particular range.
|
||||
In this tutorial, we'll use Akka to build out part of an Internet of Things (IoT) system that reports data from sensor devices installed in customers' homes. The example focuses on temperature readings. The target use case allows customers to log in and view the last reported temperature from different areas of their homes. You can imagine that such sensors could also collect relative humidity or other interesting data and an application would likely support reading and changing device configuration, maybe even alerting home owners when sensor state falls outside of a particular range.
|
||||
|
||||
In a real system, the application would be exposed to customers through a mobile app or browser. This guide concentrates only on the core logic for storing temperatures that would be called over a network protocol, such as HTTP. It also includes writing tests to help you get comfortable and proficient with testing actors.
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ In the Hello World example, we have already seen how `system.actorOf()`, creates
|
|||
_user defined_ hierarchy. You typically have only one (or very few) top level actors in your `ActorSystem`.
|
||||
We create child, or non-top-level, actors by invoking `context.actorOf()` from an existing actor. The `context.actorOf()` method has a signature identical to `system.actorOf()`, its top-level counterpart.
|
||||
|
||||
The easiest way to see the actor hierarchy in action is to simply print `ActorRef` instances. In this small experiment, we create an actor, print its reference, create a child of this actor, and print the child's reference. We start with the Hello World project, if you have not downloaded it, download the Quickstart project from the @scala[[Lightbend Tech Hub](http://developer.lightbend.com/start/?group=akka&project=akka-quickstart-scala)]@java[[Lightbend Tech Hub](http://developer.lightbend.com/start/?group=akka&project=akka-quickstart-java)].
|
||||
The easiest way to see the actor hierarchy in action is to print `ActorRef` instances. In this small experiment, we create an actor, print its reference, create a child of this actor, and print the child's reference. We start with the Hello World project, if you have not downloaded it, download the Quickstart project from the @scala[[Lightbend Tech Hub](http://developer.lightbend.com/start/?group=akka&project=akka-quickstart-scala)]@java[[Lightbend Tech Hub](http://developer.lightbend.com/start/?group=akka&project=akka-quickstart-java)].
|
||||
|
||||
|
||||
In your Hello World project, navigate to the `com.lightbend.akka.sample` package and create a new @scala[Scala file called `ActorHierarchyExperiments.scala`]@java[Java file called `ActorHierarchyExperiments.java`] here. Copy and paste the code from the snippet below to this new source file. Save your file and run `sbt "runMain com.lightbend.akka.sample.ActorHierarchyExperiments"` to observe the output.
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ The tasks of a device actor will be simple:
|
|||
* Collect temperature measurements
|
||||
* When asked, report the last measured temperature
|
||||
|
||||
However, a device might start without immediately having a temperature measurement. Hence, we need to account for the case where a temperature is not present. This also allows us to test the query part of the actor without the write part present, as the device actor can simply report an empty result.
|
||||
However, a device might start without immediately having a temperature measurement. Hence, we need to account for the case where a temperature is not present. This also allows us to test the query part of the actor without the write part present, as the device actor can report an empty result.
|
||||
|
||||
The protocol for obtaining the current temperature from the device actor is simple. The actor:
|
||||
|
||||
|
|
@ -33,10 +33,10 @@ These two messages seem to cover the required functionality. However, the approa
|
|||
|
||||
* There will be observable differences in the latency of delivery between local and remote messages, because factors like network link bandwidth and the message size also come into play.
|
||||
* Reliability is a concern because a remote message send involves more steps, which means that more can go wrong.
|
||||
* A local send will just pass a reference to the message inside the same JVM, without any restrictions on the underlying object which is sent, whereas a remote transport will place a limit on the message size.
|
||||
* A local send will pass a reference to the message inside the same JVM, without any restrictions on the underlying object which is sent, whereas a remote transport will place a limit on the message size.
|
||||
|
||||
In addition, while sending inside the same JVM is significantly more reliable, if an
|
||||
actor fails due to a programmer error while processing the message, the effect is basically the same as if a remote network request fails due to the remote host crashing while processing the message. Even though in both cases, the service recovers after a while (the actor is restarted by its supervisor, the host is restarted by an operator or by a monitoring system) individual requests are lost during the crash. **Therefore, writing your actors such that every
|
||||
actor fails due to a programmer error while processing the message, the effect is the same as if a remote network request fails due to the remote host crashing while processing the message. Even though in both cases, the service recovers after a while (the actor is restarted by its supervisor, the host is restarted by an operator or by a monitoring system) individual requests are lost during the crash. **Therefore, writing your actors such that every
|
||||
message could possibly be lost is the safe, pessimistic bet.**
|
||||
|
||||
But to further understand the need for flexibility in the protocol, it will help to consider Akka message ordering and message delivery guarantees. Akka provides the following behavior for message sends:
|
||||
|
|
@ -129,7 +129,7 @@ Note in the code that:
|
|||
|
||||
* The @scala[companion object]@java[static method] defines how to construct a `Device` actor. The `props` parameters include an ID for the device and the group to which it belongs, which we will use later.
|
||||
* The @scala[companion object]@java[class] includes the definitions of the messages we reasoned about previously.
|
||||
* In the `Device` class, the value of `lastTemperatureReading` is initially set to @scala[`None`]@java[`Optional.empty()`], and the actor will simply report it back if queried.
|
||||
* In the `Device` class, the value of `lastTemperatureReading` is initially set to @scala[`None`]@java[`Optional.empty()`], and the actor will report it back if queried.
|
||||
|
||||
## Testing the actor
|
||||
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ Java
|
|||
|
||||
### Keeping track of the device actors in the group
|
||||
|
||||
So far, we have implemented logic for registering device actors in the group. Devices come and go, however, so we will need a way to remove device actors from the @scala[`Map[String, ActorRef]`] @java[`Map<String, ActorRef>`]. We will assume that when a device is removed, its corresponding device actor is simply stopped. Supervision, as we discussed earlier, only handles error scenarios — not graceful stopping. So we need to notify the parent when one of the device actors is stopped.
|
||||
So far, we have implemented logic for registering device actors in the group. Devices come and go, however, so we will need a way to remove device actors from the @scala[`Map[String, ActorRef]`] @java[`Map<String, ActorRef>`]. We will assume that when a device is removed, its corresponding device actor is stopped. Supervision, as we discussed earlier, only handles error scenarios — not graceful stopping. So we need to notify the parent when one of the device actors is stopped.
|
||||
|
||||
Akka provides a _Death Watch_ feature that allows an actor to _watch_ another actor and be notified if the other actor is stopped. Unlike supervision, watching is not limited to parent-child relationships, any actor can watch any other actor as long as it knows the `ActorRef`. After a watched actor stops, the watcher receives a `Terminated(actorRef)` message which also contains the reference to the watched actor. The watcher can either handle this message explicitly or will fail with a `DeathPactException`. This latter is useful if the actor can no longer perform its own duties after the watched actor has been stopped. In our case, the group should still function after one device have been stopped, so we need to handle the `Terminated(actorRef)` message.
|
||||
|
||||
|
|
@ -170,7 +170,7 @@ Scala
|
|||
Java
|
||||
: @@snip [DeviceGroup.java]($code$/java/jdocs/tutorial_4/DeviceGroup.java) { #device-group-remove }
|
||||
|
||||
So far we have no means to get which devices the group device actor keeps track of and, therefore, we cannot test our new functionality yet. To make it testable, we add a new query capability (message @scala[`RequestDeviceList(requestId: Long)`] @java[`RequestDeviceList`]) that simply lists the currently active
|
||||
So far we have no means to get which devices the group device actor keeps track of and, therefore, we cannot test our new functionality yet. To make it testable, we add a new query capability (message @scala[`RequestDeviceList(requestId: Long)`] @java[`RequestDeviceList`]) that lists the currently active
|
||||
device IDs:
|
||||
|
||||
Scala
|
||||
|
|
@ -181,12 +181,12 @@ Java
|
|||
|
||||
We are almost ready to test the removal of devices. But, we still need the following capabilities:
|
||||
|
||||
* To stop a device actor from our test case. From the outside, any actor can be stopped by simply sending a special
|
||||
* To stop a device actor from our test case. From the outside, any actor can be stopped by sending a special
|
||||
the built-in message, `PoisonPill`, which instructs the actor to stop.
|
||||
* To be notified once the device actor is stopped. We can use the _Death Watch_ facility for this purpose, too. The @scala[`TestProbe`] @java[`TestKit`] has two messages that we can easily use, `watch()` to watch a specific actor, and `expectTerminated`
|
||||
to assert that the watched actor has been terminated.
|
||||
|
||||
We add two more test cases now. In the first, we just test that we get back the list of proper IDs once we have added a few devices. The second test case makes sure that the device ID is properly removed after the device actor has been stopped:
|
||||
We add two more test cases now. In the first, we test that we get back the list of proper IDs once we have added a few devices. The second test case makes sure that the device ID is properly removed after the device actor has been stopped:
|
||||
|
||||
Scala
|
||||
: @@snip [DeviceGroupSpec.scala]($code$/scala/tutorial_4/DeviceGroupSpec.scala) { #device-group-list-terminate-test }
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ The conversational patterns that we have seen so far are simple in the sense tha
|
|||
|
||||
* Device actors return a reading, which requires no state change
|
||||
* Record a temperature, which updates a single field
|
||||
* Device Group actors maintain group membership by simply adding or removing entries from a map
|
||||
* Device Group actors maintain group membership by adding or removing entries from a map
|
||||
|
||||
In this part, we will use a more complex example. Since homeowners will be interested in the temperatures throughout their home, our goal is to be able to query all of the device actors in a group. Let us start by investigating how such a query API should behave.
|
||||
|
||||
|
|
@ -17,8 +17,8 @@ The very first issue we face is that the membership of a group is dynamic. Each
|
|||
These issues can be addressed in many different ways, but the important point is to settle on the desired behavior. The following works well for our use case:
|
||||
|
||||
* When a query arrives, the group actor takes a _snapshot_ of the existing device actors and will only ask those actors for the temperature.
|
||||
* Actors that start up _after_ the query arrives are simply ignored.
|
||||
* If an actor in the snapshot stops during the query without answering, we will simply report the fact that it stopped to the sender of the query message.
|
||||
* Actors that start up _after_ the query arrives are ignored.
|
||||
* If an actor in the snapshot stops during the query without answering, we will report the fact that it stopped to the sender of the query message.
|
||||
|
||||
Apart from device actors coming and going dynamically, some actors might take a long time to answer. For example, they could be stuck in an accidental infinite loop, or fail due to a bug and drop our request. We don't want the query to continue indefinitely, so we will consider it complete in either of the following cases:
|
||||
|
||||
|
|
@ -43,7 +43,7 @@ Java
|
|||
|
||||
## Implementing the query
|
||||
|
||||
One approach for implementing the query involves adding code to the group device actor. However, in practice this can be very cumbersome and error prone. Remember that when we start a query, we need to take a snapshot of the devices present and start a timer so that we can enforce the deadline. In the meantime, _another query_ can arrive. For the second query, of course, we need to keep track of the exact same information but in isolation from the previous query. This would require us to maintain separate mappings between queries and device actors.
|
||||
One approach for implementing the query involves adding code to the group device actor. However, in practice this can be very cumbersome and error prone. Remember that when we start a query, we need to take a snapshot of the devices present and start a timer so that we can enforce the deadline. In the meantime, _another query_ can arrive. For the second query we need to keep track of the exact same information but in isolation from the previous query. This would require us to maintain separate mappings between queries and device actors.
|
||||
|
||||
Instead, we will implement a simpler, and superior approach. We will create an actor that represents a _single query_ and that performs the tasks needed to complete the query on behalf of the group actor. So far we have created actors that belonged to classical domain objects, but now, we will create an
|
||||
actor that represents a process or a task rather than an entity. We benefit by keeping our group device actor simple and being able to better test query capability in isolation.
|
||||
|
|
@ -63,7 +63,7 @@ not used yet, the built-in scheduler facility. Using the scheduler is simple:
|
|||
|
||||
* We get the scheduler from the `ActorSystem`, which, in turn,
|
||||
is accessible from the actor's context: @scala[`context.system.scheduler`]@java[`getContext().getSystem().scheduler()`]. This needs an @scala[implicit] `ExecutionContext` which
|
||||
is basically the thread-pool that will execute the timer task itself. In our case, we use the same dispatcher
|
||||
is the thread-pool that will execute the timer task itself. In our case, we use the same dispatcher
|
||||
as the actor by @scala[importing `import context.dispatcher`] @java[passing in `getContext().dispatcher()`].
|
||||
* The
|
||||
@scala[`scheduler.scheduleOnce(time, actorRef, message)`] @java[`scheduler.scheduleOnce(time, actorRef, message, executor, sender)`] method will schedule the message `message` into the future by the
|
||||
|
|
@ -87,8 +87,8 @@ Java
|
|||
The query actor, apart from the pending timer, has one stateful aspect, tracking the set of actors that: have replied, have stopped, or have not replied. One way to track this state is
|
||||
to create a mutable field in the actor @scala[(a `var`)]. A different approach takes advantage of the ability to change how
|
||||
an actor responds to messages. A
|
||||
`Receive` is just a function (or an object, if you like) that can be returned from another function. By default, the `receive` block defines the behavior of the actor, but it is possible to change it multiple times during the life of the actor. We simply call `context.become(newBehavior)`
|
||||
where `newBehavior` is anything with type `Receive` @scala[(which is just a shorthand for `PartialFunction[Any, Unit]`)]. We will leverage this
|
||||
`Receive` is just a function (or an object, if you like) that can be returned from another function. By default, the `receive` block defines the behavior of the actor, but it is possible to change it multiple times during the life of the actor. We call `context.become(newBehavior)`
|
||||
where `newBehavior` is anything with type `Receive` @scala[(which is a shorthand for `PartialFunction[Any, Unit]`)]. We will leverage this
|
||||
feature to track the state of our actor.
|
||||
|
||||
For our use case:
|
||||
|
|
@ -104,7 +104,7 @@ For our use case:
|
|||
that has been stopped in the meantime.
|
||||
* We can reach the deadline and receive a `CollectionTimeout`.
|
||||
|
||||
In the first two cases, we need to keep track of the replies, which we now simply delegate to a method `receivedResponse`, which we will discuss later. In the case of timeout, we need to simply take all the actors that have not yet replied yet (the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. Then we reply to the submitter of the query with the collected results and stop the query actor.
|
||||
In the first two cases, we need to keep track of the replies, which we now delegate to a method `receivedResponse`, which we will discuss later. In the case of timeout, we need to simply take all the actors that have not yet replied yet (the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. Then we reply to the submitter of the query with the collected results and stop the query actor.
|
||||
|
||||
To accomplish this, add the following to your `DeviceGroupQuery` source file:
|
||||
|
||||
|
|
@ -120,12 +120,12 @@ It is not yet clear how we will "mutate" the `repliesSoFar` and `stillWaiting` d
|
|||
then it returns a brand new `Receive` that will use those new parameters.
|
||||
|
||||
We have seen how we
|
||||
can install the initial `Receive` by simply returning it from `receive`. In order to install a new one, to record a
|
||||
can install the initial `Receive` by returning it from `receive`. In order to install a new one, to record a
|
||||
new reply, for example, we need some mechanism. This mechanism is the method `context.become(newReceive)` which will
|
||||
_change_ the actor's message handling function to the provided `newReceive` function. You can imagine that before
|
||||
starting, your actor automatically calls `context.become(receive)`, i.e. installing the `Receive` function that
|
||||
is returned from `receive`. This is another important observation: **it is not `receive` that handles the messages,
|
||||
it just returns a `Receive` function that will actually handle the messages**.
|
||||
it returns a `Receive` function that will actually handle the messages**.
|
||||
|
||||
We now have to figure out what to do in `receivedResponse`. First, we need to record the new result in the map `repliesSoFar` and remove the actor from `stillWaiting`. The next step is to check if there are any remaining actors we are waiting for. If there is none, we send the result of the query to the original requester and stop the query actor. Otherwise, we need to update the `repliesSoFar` and `stillWaiting` structures and wait for more
|
||||
messages.
|
||||
|
|
@ -136,7 +136,7 @@ response from a device actor, but then it stops during the lifetime of the query
|
|||
to overwrite the already received reply. In other words, we don't want to receive `Terminated` after we recorded the
|
||||
response. This is simple to achieve by calling `context.unwatch(ref)`. This method also ensures that we don't
|
||||
receive `Terminated` events that are already in the mailbox of the actor. It is also safe to call this multiple times,
|
||||
only the first call will have any effect, the rest is simply ignored.
|
||||
only the first call will have any effect, the rest is ignored.
|
||||
|
||||
With all this knowledge, we can create the `receivedResponse` method:
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ Java
|
|||
: @@snip [DeviceGroupQuery.java]($code$/java/jdocs/tutorial_5/DeviceGroupQuery.java) { #query-collect-reply }
|
||||
|
||||
It is quite natural to ask at this point, what have we gained by using the `context.become()` trick instead of
|
||||
just making the `repliesSoFar` and `stillWaiting` structures mutable fields of the actor (i.e. `var`s)? In this
|
||||
making the `repliesSoFar` and `stillWaiting` structures mutable fields of the actor (i.e. `var`s)? In this
|
||||
simple example, not that much. The value of this style of state keeping becomes more evident when you suddenly have
|
||||
_more kinds_ of states. Since each state
|
||||
might have temporary data that is relevant itself, keeping these as fields would pollute the global state
|
||||
|
|
@ -169,7 +169,7 @@ Java
|
|||
Now let's verify the correctness of the query actor implementation. There are various scenarios we need to test individually to make
|
||||
sure everything works as expected. To be able to do this, we need to simulate the device actors somehow to exercise
|
||||
various normal or failure scenarios. Thankfully we took the list of collaborators (actually a `Map`) as a parameter
|
||||
to the query actor, so we can easily pass in @scala[`TestProbe`] @java[`TestKit`] references. In our first test, we try out the case when
|
||||
to the query actor, so we can pass in @scala[`TestProbe`] @java[`TestKit`] references. In our first test, we try out the case when
|
||||
there are two devices and both report a temperature:
|
||||
|
||||
Scala
|
||||
|
|
@ -230,7 +230,7 @@ Java
|
|||
It is probably worth restating what we said at the beginning of the chapter. By keeping the temporary state that is only relevant to the query itself in a separate actor we keep the group actor implementation very simple. It delegates
|
||||
everything to child actors and therefore does not have to keep state that is not relevant to its core business. Also, multiple queries can now run parallel to each other, in fact, as many as needed. In our case querying an individual device actor is a fast operation, but if this were not the case, for example, because the remote sensors need to be contacted over the network, this design would significantly improve throughput.
|
||||
|
||||
We close this chapter by testing that everything works together. This test is just a variant of the previous ones, now exercising the group query feature:
|
||||
We close this chapter by testing that everything works together. This test is a variant of the previous ones, now exercising the group query feature:
|
||||
|
||||
Scala
|
||||
: @@snip [DeviceGroupSpec.scala]($code$/scala/tutorial_5/DeviceGroupSpec.scala) { #group-query-integration-test }
|
||||
|
|
|
|||
|
|
@ -260,7 +260,7 @@ Java
|
|||
The most interesting part is probably the last: an `Ack` removes the oldest
|
||||
data chunk from the buffer, and if that was the last chunk then we either close
|
||||
the connection (if the peer closed its half already) or return to the idle
|
||||
behavior; otherwise we just send the next buffered chunk and stay waiting for
|
||||
behavior; otherwise we send the next buffered chunk and stay waiting for
|
||||
the next `Ack`.
|
||||
|
||||
Back-pressure can be propagated also across the reading side back to the writer
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ demonstrated above. The UDP extension is queried using the
|
|||
@scala[`SimpleSender`]@java[`UdpMessage.simpleSender`] message, which is answered by a `SimpleSenderReady`
|
||||
notification. The sender of this message is the newly created sender actor
|
||||
which from this point onward can be used to send datagrams to arbitrary
|
||||
destinations; in this example it will just send any UTF-8 encoded
|
||||
destinations; in this example it will send any UTF-8 encoded
|
||||
`String` it receives to a predefined remote address.
|
||||
|
||||
@@@ note
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ nacked messages it may need to keep a buffer of pending messages.
|
|||
|
||||
@@@ warning
|
||||
|
||||
An acknowledged write does not mean acknowledged delivery or storage; receiving an ack for a write simply signals that
|
||||
An acknowledged write does not mean acknowledged delivery or storage; receiving an ack for a write signals that
|
||||
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
|
||||
not error handling. In other words, data may still be lost, even if every write is acknowledged.
|
||||
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ Also see the @ref:[logging options for TestKit](testing.md#actor-logging).
|
|||
|
||||
The rules for translating the source object to the source string and class
|
||||
which are inserted into the `LogEvent` during runtime are implemented
|
||||
using implicit parameters and thus fully customizable: simply create your own
|
||||
using implicit parameters and thus fully customizable: create your own
|
||||
instance of `LogSource[T]` and have it in scope when creating the
|
||||
logger.
|
||||
|
||||
|
|
@ -472,7 +472,7 @@ If you want to more accurately output the timestamp, use the MDC attribute `akka
|
|||
### MDC values defined by the application
|
||||
|
||||
One useful feature available in Slf4j is [MDC](http://logback.qos.ch/manual/mdc.html),
|
||||
Akka has a way to let the application specify custom values, you just need to get a
|
||||
Akka has a way to let the application specify custom values, for this you need to use a
|
||||
specialized `LoggingAdapter`, the `DiagnosticLoggingAdapter`. In order to
|
||||
get it you can use the factory, providing an @scala[Actor] @java[AbstractActor] as logSource:
|
||||
|
||||
|
|
@ -488,7 +488,7 @@ Java
|
|||
final DiagnosticLoggingAdapter log = Logging.getLogger(this);
|
||||
```
|
||||
|
||||
Once you have the logger, you just need to add the custom values before you log something.
|
||||
Once you have the logger, you need to add the custom values before you log something.
|
||||
This way, the values will be put in the SLF4J MDC right before appending the log and removed after.
|
||||
|
||||
@@@ note
|
||||
|
|
|
|||
|
|
@ -269,7 +269,7 @@ Scala
|
|||
Java
|
||||
: @@snip [MyUnboundedMailbox.java]($code$/java/jdocs/dispatcher/MyUnboundedMailbox.java) { #mailbox-implementation-example }
|
||||
|
||||
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
|
||||
And then you specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher
|
||||
configuration, or the mailbox configuration.
|
||||
|
||||
@@@ note
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ of convenience functions for making the test nodes interact with each other. Mor
|
|||
operations is available in the `akka.remote.testkit.MultiNodeSpec` API documentation.
|
||||
|
||||
The setup of the `MultiNodeSpec` is configured through java system properties that you set on all JVMs that's going to run a
|
||||
node under test. These can easily be set on the JVM command line with `-Dproperty=value`.
|
||||
node under test. These can be set on the JVM command line with `-Dproperty=value`.
|
||||
|
||||
These are the available properties:
|
||||
:
|
||||
|
|
@ -65,9 +65,9 @@ will be the server. All failure injection and throttling must be done from this
|
|||
## The SbtMultiJvm Plugin
|
||||
|
||||
The @ref:[SbtMultiJvm Plugin](multi-jvm-testing.md) has been updated to be able to run multi node tests, by
|
||||
automatically generating the relevant `multinode.*` properties. This means that you can easily run multi node tests
|
||||
on a single machine without any special configuration by just running them as normal multi-jvm tests. These tests can
|
||||
then be run distributed over multiple machines without any changes simply by using the multi-node additions to the
|
||||
automatically generating the relevant `multinode.*` properties. This means that you can run multi node tests
|
||||
on a single machine without any special configuration by running them as normal multi-jvm tests. These tests can
|
||||
then be run distributed over multiple machines without any changes by using the multi-node additions to the
|
||||
plugin.
|
||||
|
||||
### Multi Node Specific Additions
|
||||
|
|
|
|||
|
|
@ -221,8 +221,8 @@ Java
|
|||
If the target database does not provide a reactive streams `Subscriber` that can perform writes,
|
||||
you may have to implement the write logic using plain functions or Actors instead.
|
||||
|
||||
In case your write logic is state-less and you just need to convert the events from one data type to another
|
||||
before writing into the alternative datastore, then the projection is as simple as:
|
||||
In case your write logic is state-less and you need to convert the events from one data type to another
|
||||
before writing into the alternative datastore, then the projection will look like this:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceQueryDocSpec.scala]($code$/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-simple-classes }
|
||||
|
|
@ -284,7 +284,7 @@ A read journal plugin must implement `akka.persistence.query.ReadJournalProvider
|
|||
creates instances of `akka.persistence.query.scaladsl.ReadJournal` and
|
||||
`akka.persistence.query.javaadsl.ReadJournal`. The plugin must implement both the `scaladsl`
|
||||
and the `javadsl` @scala[traits]@java[interfaces] because the `akka.stream.scaladsl.Source` and
|
||||
`akka.stream.javadsl.Source` are different types and even though those types can easily be converted
|
||||
`akka.stream.javadsl.Source` are different types and even though those types can be converted
|
||||
to each other it is most convenient for the end user to get access to the Java or Scala `Source` directly.
|
||||
As illustrated below one of the implementations can delegate to the other.
|
||||
|
||||
|
|
|
|||
|
|
@ -123,7 +123,7 @@ origin of the event (`persistenceId`, `sequenceNr` and more).
|
|||
|
||||
More advanced techniques (e.g. [Remove event class and ignore events](#remove-event-class)) will dive into using the manifests for increasing the
|
||||
flexibility of the persisted vs. exposed types even more. However for now we will focus on the simpler evolution techniques,
|
||||
concerning simply configuring the payload serializers.
|
||||
concerning only configuring the payload serializers.
|
||||
|
||||
By default the `payload` will be serialized using Java Serialization. This is fine for testing and initial phases
|
||||
of your development (while you're still figuring out things and the data will not need to stay persisted forever).
|
||||
|
|
@ -197,7 +197,7 @@ needs to have an associated code which indicates if it is a window or aisle seat
|
|||
**Solution:**
|
||||
Adding fields is the most common change you'll need to apply to your messages so make sure the serialization format
|
||||
you picked for your payloads can handle it apropriately, i.e. such changes should be *binary compatible*.
|
||||
This is easily achieved using the right serializer toolkit – we recommend something like [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or
|
||||
This is achieved using the right serializer toolkit – we recommend something like [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or
|
||||
[Apache Thrift](https://thrift.apache.org/) however other tools may fit your needs just as well – picking a serializer backend is something
|
||||
you should research before picking one to run with. In the following examples we will be using protobuf, mostly because
|
||||
we are familiar with it, it does its job well and Akka is using it internally as well.
|
||||
|
|
@ -213,7 +213,7 @@ Java
|
|||
: @@snip [PersistenceSchemaEvolutionDocTest.java]($code$/java/jdocs/persistence/PersistenceSchemaEvolutionDocTest.java) { #protobuf-read-optional-model }
|
||||
|
||||
Next we prepare an protocol definition using the protobuf Interface Description Language, which we'll use to generate
|
||||
the serializer code to be used on the Akka Serialization layer (notice that the schema aproach allows us to easily rename
|
||||
the serializer code to be used on the Akka Serialization layer (notice that the schema aproach allows us to rename
|
||||
fields, as long as the numeric identifiers of the fields do not change):
|
||||
|
||||
@@snip [FlightAppModels.proto]($code$/../main/protobuf/FlightAppModels.proto) { #protobuf-read-optional-proto }
|
||||
|
|
@ -268,7 +268,7 @@ sometimes renaming fields etc.), while some other operations are strictly not po
|
|||
@@@
|
||||
|
||||
**Solution 2 - by manually handling the event versions:**
|
||||
Another solution, in case your serialization format does not support renames as easily as the above mentioned formats,
|
||||
Another solution, in case your serialization format does not support renames like the above mentioned formats,
|
||||
is versioning your schema. For example, you could have made your events carry an additional field called `_version`
|
||||
which was set to `1` (because it was the initial schema), and once you change the schema you bump this number to `2`,
|
||||
and write an adapter which can perform the rename.
|
||||
|
|
@ -294,7 +294,7 @@ or put together in a simple helper @scala[trait]@java[class].
|
|||
@@@ note
|
||||
|
||||
The technique of versioning events and then promoting them to the latest version using JSON transformations
|
||||
can of course be applied to more than just field renames – it also applies to adding fields and all kinds of
|
||||
can be applied to more than just field renames – it also applies to adding fields and all kinds of
|
||||
changes in the message format.
|
||||
|
||||
@@@
|
||||
|
|
@ -311,12 +311,12 @@ and should be deleted. You still have to be able to replay from a journal which
|
|||
|
||||
The problem of removing an event type from the domain model is not as much its removal, as the implications
|
||||
for the recovery mechanisms that this entails. For example, a naive way of filtering out certain kinds of events from
|
||||
being delivered to a recovering `PersistentActor` is pretty simple, as one can simply filter them out in an @ref:[EventAdapter](persistence.md#event-adapters):
|
||||
being delivered to a recovering `PersistentActor` is pretty simple, as one can filter them out in an @ref:[EventAdapter](persistence.md#event-adapters):
|
||||
|
||||

|
||||
|
||||
The `EventAdapter` can drop old events (**O**) by emitting an empty `EventSeq`.
|
||||
Other events can simply be passed through (**E**).
|
||||
Other events can be passed through (**E**).
|
||||
|
||||
This however does not address the underlying cost of having to deserialize all the events during recovery,
|
||||
even those which will be filtered out by the adapter. In the next section we will improve the above explained mechanism
|
||||
|
|
@ -345,8 +345,8 @@ that the type is no longer needed, and skip the deserialization all-together:
|
|||

|
||||
|
||||
The serializer is aware of the old event types that need to be skipped (**O**), and can skip deserializing them alltogether
|
||||
by simply returning a "tombstone" (**T**), which the EventAdapter converts into an empty EventSeq.
|
||||
Other events (**E**) can simply be passed through.
|
||||
by returning a "tombstone" (**T**), which the EventAdapter converts into an empty EventSeq.
|
||||
Other events (**E**) can just be passed through.
|
||||
|
||||
The serializer detects that the string manifest points to a removed event type and skips attempting to deserialize it:
|
||||
|
||||
|
|
@ -474,7 +474,7 @@ Let us consider a situation where an event represents "user details changed". Af
|
|||
event is too coarse, and needs to be split into "user name changed" and "user address changed", because somehow
|
||||
users keep changing their usernames a lot and we'd like to keep this as a separate event.
|
||||
|
||||
The write side change is very simple, we simply persist `UserNameChanged` or `UserAddressChanged` depending
|
||||
The write side change is very simple, we persist `UserNameChanged` or `UserAddressChanged` depending
|
||||
on what the user actually intended to change (instead of the composite `UserDetailsChanged` that we had in version 1
|
||||
of our model).
|
||||
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ If validation succeeds, events are generated from the command, representing the
|
|||
are then persisted and, after successful persistence, used to change the actor's state. When the persistent actor
|
||||
needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied.
|
||||
In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. Event sourced
|
||||
actors may of course also process commands that do not change application state such as query commands for example.
|
||||
actors may also process commands that do not change application state such as query commands for example.
|
||||
|
||||
Another excellent article about "thinking in Events" is [Events As First-Class Citizens](https://hackernoon.com/events-as-first-class-citizens-8633e8479493) by Randy Shoup. It is a short and recommended read if you're starting
|
||||
developing Events based applications.
|
||||
|
|
@ -309,7 +309,7 @@ Java
|
|||
|
||||
@@@ note
|
||||
|
||||
In order to implement the pattern known as "*command sourcing*" simply call @scala[`persistAsync(cmd)(...)`]@java[`persistAsync`] right away on all incoming
|
||||
In order to implement the pattern known as "*command sourcing*" call @scala[`persistAsync(cmd)(...)`]@java[`persistAsync`] right away on all incoming
|
||||
messages and handle them in the callback.
|
||||
|
||||
@@@
|
||||
|
|
@ -454,7 +454,7 @@ if you for example know that serialization format has changed in an incompatible
|
|||
|
||||
### Atomic writes
|
||||
|
||||
Each event is of course stored atomically, but it is also possible to store several events atomically by
|
||||
Each event is stored atomically, but it is also possible to store several events atomically by
|
||||
using the `persistAll` or `persistAllAsync` method. That means that all events passed to that method
|
||||
are stored or none of them are stored if there is an error.
|
||||
|
||||
|
|
@ -1151,7 +1151,7 @@ has, and also performs some longer operations on the Journal while printing its
|
|||
to provide a proper benchmarking environment it can be used to get a rough feel about your journal's performance in the most
|
||||
typical scenarios.
|
||||
|
||||
In order to include the `SnapshotStore` TCK tests in your test suite simply extend the `SnapshotStoreSpec`:
|
||||
In order to include the `SnapshotStore` TCK tests in your test suite extend the `SnapshotStoreSpec`:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistencePluginDocSpec.scala]($code$/scala/docs/persistence/PersistencePluginDocSpec.scala) { #snapshot-store-tck-scala }
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ which are unnecessary concepts for newcomers to learn. The new `createReceive` r
|
|||
additional imports.
|
||||
|
||||
Note that The `Receive` can still be implemented in other ways than using the `ReceiveBuilder`
|
||||
since it in the end is just a wrapper around a Scala `PartialFunction`. For example, one could
|
||||
since it in the end is a wrapper around a Scala `PartialFunction`. For example, one could
|
||||
implement an adapter to [Javaslang Pattern Matching DSL](http://www.javaslang.io/javaslang-docs/#_pattern_matching).
|
||||
|
||||
The mechanical source code change for migration to the new `AbstractActor` is to implement the
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ In order to communicate with an actor, it is necessary to have its `ActorRef`. I
|
|||
the creator of the actor (the caller of `actorOf()`) is who gets the `ActorRef` for an actor that it can
|
||||
then send to other actors. In other words:
|
||||
|
||||
* An Actor can get a remote Actor's reference simply by receiving a message from it (as it's available as @scala[`sender()`]@java[`getSender()`] then),
|
||||
* An Actor can get a remote Actor's reference by receiving a message from it (as it's available as @scala[`sender()`]@java[`getSender()`] then),
|
||||
or inside of a remote message (e.g. *PleaseReply(message: String, remoteActorRef: ActorRef)*)
|
||||
|
||||
Alternatively, an actor can look up another located at a known path using
|
||||
|
|
@ -424,7 +424,7 @@ You have a few choices how to set up certificates and hostname verification:
|
|||
* The single set of keys and the single certificate is distributed to all nodes. The certificate can
|
||||
be self-signed as it is distributed both as a certificate for authentication but also as the trusted certificate.
|
||||
* If the keys/certificate are lost, someone else can connect to your cluster.
|
||||
* Adding nodes to the cluster is simple as the key material can just be deployed / distributed to the new node.
|
||||
* Adding nodes to the cluster is simple as the key material can be deployed / distributed to the new node.
|
||||
* Have a single set of keys and a single certificate for all nodes that contains all of the host names and *enable*
|
||||
hostname checking.
|
||||
* This means that only the hosts mentioned in the certificate can connect to the cluster.
|
||||
|
|
|
|||
|
|
@ -364,7 +364,7 @@ together with a tutorial for a more hands-on experience. The source code of this
|
|||
### Remote Events
|
||||
|
||||
It is possible to listen to events that occur in Akka Remote, and to subscribe/unsubscribe to these events
|
||||
you simply register as listener to the below described types in on the `ActorSystem.eventStream`.
|
||||
you register as listener to the below described types in on the `ActorSystem.eventStream`.
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
|
|||
|
|
@ -161,7 +161,7 @@ is to make the default behave such that adding `.withRouter` to a child’s defi
|
|||
change the supervision strategy applied to the child. This might be an inefficiency that you can avoid
|
||||
by specifying the strategy when defining the router.
|
||||
|
||||
Setting the strategy is easily done:
|
||||
Setting the strategy is done like this:
|
||||
|
||||
Scala
|
||||
: @@snip [RoutingSpec.scala]($akka$/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala) { #supervision }
|
||||
|
|
@ -892,7 +892,7 @@ routing logic directly in their `ActorRef` rather than in the router actor. Mess
|
|||
a router's `ActorRef` can be immediately routed to the routee, bypassing the single-threaded
|
||||
router actor entirely.
|
||||
|
||||
The cost to this is, of course, that the internals of routing code are more complicated than if
|
||||
The cost to this is that the internals of routing code are more complicated than if
|
||||
routers were implemented with normal actors. Fortunately all of this complexity is invisible to
|
||||
consumers of the routing API. However, it is something to be aware of when implementing your own
|
||||
routers.
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# Serialization
|
||||
|
||||
The messages that Akka actors send to each other are JVM objects (e.g. instances of Scala case classes). Message passing between actors that live on the same JVM is straightforward. It is simply done via reference passing. However, messages that have to escape the JVM to reach an actor running on a different host have to undergo some form of serialization (i.e. the objects have to be converted to and from byte arrays).
|
||||
The messages that Akka actors send to each other are JVM objects (e.g. instances of Scala case classes). Message passing between actors that live on the same JVM is straightforward. It is done via reference passing. However, messages that have to escape the JVM to reach an actor running on a different host have to undergo some form of serialization (i.e. the objects have to be converted to and from byte arrays).
|
||||
|
||||
Akka itself uses Protocol Buffers to serialize internal messages (i.e. cluster gossip messages). However, the serialization mechanism in Akka allows you to write custom serializers and to define which serializer to use for what.
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ Allows coupling termination (cancellation, completion, erroring) of Sinks and So
|
|||
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow between them.
|
||||
Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages.
|
||||
|
||||
E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled,
|
||||
E.g. if the emitted `Flow` gets a cancellation, the `Source` is cancelled,
|
||||
however the Sink will also be completed. The table below illustrates the effects in detail:
|
||||
|
||||
| Returned Flow | Sink (in) | Source (out) |
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ we illustrate the most common used stages viewed as "boxes".
|
|||
The *linear* stages are `Source`, `Sink`
|
||||
and `Flow`, as these can be used to compose strict chains of processing stages.
|
||||
Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build
|
||||
more complex graph layouts, not just chains. `BidiFlow` stages are usually useful in IO related tasks, where
|
||||
more complex graph layouts, not only chains. `BidiFlow` stages are usually useful in IO related tasks, where
|
||||
there are input and output channels to be handled. Due to the specific shape of `BidiFlow` it is easy to
|
||||
stack them on top of each other to build a layered protocol for example. The `TLS` support in Akka is for example
|
||||
implemented as a `BidiFlow`.
|
||||
|
|
@ -76,7 +76,7 @@ Java
|
|||
It is clear however that there is no nesting present in our first attempt, since the library cannot figure out
|
||||
where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the
|
||||
DSL provided by the `Flow`, `Source`, `Sink` classes then nesting can be achieved by calling one of the
|
||||
methods `withAttributes()` or `named()` (where the latter is just a shorthand for adding a name attribute).
|
||||
methods `withAttributes()` or `named()` (where the latter is a shorthand for adding a name attribute).
|
||||
|
||||
The following code demonstrates how to achieve the desired nesting:
|
||||
|
||||
|
|
@ -170,7 +170,7 @@ Java
|
|||
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-use }
|
||||
|
||||
It is not possible to use it as a `Flow` yet, though (i.e. we cannot call `.filter()` on it), but `Flow`
|
||||
has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are similar methods on `Source`,
|
||||
has a `fromGraph()` method that adds the DSL to a `FlowShape`. There are similar methods on `Source`,
|
||||
`Sink` and `BidiShape`, so it is easy to get back to the simpler DSL if a graph has the right shape.
|
||||
For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods.
|
||||
To demonstrate this, we will create the following graph:
|
||||
|
|
@ -192,7 +192,7 @@ throw an exception if this is violated.
|
|||
|
||||
@@@
|
||||
|
||||
We are still in debt of demonstrating that `RunnableGraph` is a component just like any other, which can
|
||||
We are still in debt of demonstrating that `RunnableGraph` is a component like any other, which can
|
||||
be embedded in graphs. In the following snippet we embed one closed graph in another:
|
||||
|
||||
Scala
|
||||
|
|
@ -273,7 +273,7 @@ Java
|
|||
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-3 }
|
||||
|
||||
As the last example, we wire together `nestedSource` and `nestedSink` and we use a custom combiner function to
|
||||
create a yet another materialized type of the resulting `RunnableGraph`. This combiner function just ignores
|
||||
create a yet another materialized type of the resulting `RunnableGraph`. This combiner function ignores
|
||||
the @scala[`Future[String]`] @java[`CompletionStage<String>`] part, and wraps the other two values in a custom case class `MyClass`
|
||||
(indicated by color *purple* on the diagram):
|
||||
|
||||
|
|
@ -288,7 +288,7 @@ Java
|
|||
|
||||
@@@ note
|
||||
|
||||
The nested structure in the above example is not necessary for combining the materialized values, it just
|
||||
The nested structure in the above example is not necessary for combining the materialized values, it
|
||||
demonstrates how the two features work together. See @ref:[Combining materialized values](stream-flows-and-basics.md#flow-combine-mat) for further examples
|
||||
of combining materialized values without nesting and hierarchy involved.
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ general, more targeted recipes are available as separate sections (@ref:[Buffers
|
|||
|
||||
**Situation:** During development it is sometimes helpful to see what happens in a particular section of a stream.
|
||||
|
||||
The simplest solution is to simply use a `map` operation and use `println` to print the elements received to the console.
|
||||
The simplest solution is to use a `map` operation and use `println` to print the elements received to the console.
|
||||
While this recipe is rather simplistic, it is often suitable for a quick debug session.
|
||||
|
||||
Scala
|
||||
|
|
@ -63,7 +63,7 @@ all the nested elements inside the sequences separately.
|
|||
|
||||
The `mapConcat` operation can be used to implement a one-to-many transformation of elements using a mapper function
|
||||
in the form of @scala[`In => immutable.Seq[Out]`] @java[`In -> List<Out>`]. In this case we want to map a @scala[`Seq`] @java[`List`] of elements to the elements in the
|
||||
collection itself, so we can just call @scala[`mapConcat(identity)`] @java[`mapConcat(l -> l)`].
|
||||
collection itself, so we can call @scala[`mapConcat(identity)`] @java[`mapConcat(l -> l)`].
|
||||
|
||||
Scala
|
||||
: @@snip [RecipeFlattenSeq.scala]($code$/scala/docs/stream/cookbook/RecipeFlattenSeq.scala) { #flattening-seqs }
|
||||
|
|
@ -103,7 +103,7 @@ of the stream.
|
|||
|
||||
This recipe uses a `GraphStage` to host a mutable `MessageDigest` class (part of the Java Cryptography
|
||||
API) and update it with the bytes arriving from the stream. When the stream starts, the `onPull` handler of the
|
||||
stage is called, which just bubbles up the `pull` event to its upstream. As a response to this pull, a ByteString
|
||||
stage is called, which bubbles up the `pull` event to its upstream. As a response to this pull, a ByteString
|
||||
chunk will arrive (`onPush`) which we use to update the digest, then it will pull for the next chunk.
|
||||
|
||||
Eventually the stream of `ByteString` s depletes and we get a notification about this event via `onUpstreamFinish`.
|
||||
|
|
@ -246,8 +246,8 @@ In this collection we show recipes that use stream graph elements to achieve var
|
|||
In other words, even if the stream would be able to flow (not being backpressured) we want to hold back elements until a
|
||||
trigger signal arrives.
|
||||
|
||||
This recipe solves the problem by simply zipping the stream of `Message` elements with the stream of `Trigger`
|
||||
signals. Since `Zip` produces pairs, we simply map the output stream selecting the first element of the pair.
|
||||
This recipe solves the problem by zipping the stream of `Message` elements with the stream of `Trigger`
|
||||
signals. Since `Zip` produces pairs, we map the output stream selecting the first element of the pair.
|
||||
|
||||
Scala
|
||||
: @@snip [RecipeManualTrigger.scala]($code$/scala/docs/stream/cookbook/RecipeManualTrigger.scala) { #manually-triggered-stream }
|
||||
|
|
@ -303,7 +303,7 @@ This can be solved by using a versatile rate-transforming operation, `conflate`.
|
|||
a special `reduce` operation that collapses multiple upstream elements into one aggregate element if needed to keep
|
||||
the speed of the upstream unaffected by the downstream.
|
||||
|
||||
When the upstream is faster, the reducing process of the `conflate` starts. Our reducer function simply takes
|
||||
When the upstream is faster, the reducing process of the `conflate` starts. Our reducer function takes
|
||||
the freshest element. This in a simple dropping operation.
|
||||
|
||||
Scala
|
||||
|
|
@ -345,7 +345,7 @@ We will use `conflateWithSeed` to solve the problem. The seed version of conflat
|
|||
the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks
|
||||
at that point.
|
||||
* A fold function that is invoked when multiple upstream messages needs to be collapsed to an aggregate value due
|
||||
to the insufficient processing rate of the downstream. Our folding function simply increments the currently stored
|
||||
to the insufficient processing rate of the downstream. Our folding function increments the currently stored
|
||||
count of the missed ticks so far.
|
||||
|
||||
As a result, we have a flow of `Int` where the number represents the missed ticks. A number 0 means that we were
|
||||
|
|
@ -365,7 +365,7 @@ the last value for the downstream if necessary.
|
|||
|
||||
We have two options to implement this feature. In both cases we will use `GraphStage` to build our custom
|
||||
element. In the first version we will use a provided initial value `initial` that will be used
|
||||
to feed the downstream if no upstream element is ready yet. In the `onPush()` handler we just overwrite the
|
||||
to feed the downstream if no upstream element is ready yet. In the `onPush()` handler we overwrite the
|
||||
`currentValue` variable and immediately relieve the upstream by calling `pull()`. The downstream `onPull` handler
|
||||
is very similar, we immediately relieve the downstream by emitting `currentValue`.
|
||||
|
||||
|
|
@ -464,7 +464,7 @@ Java
|
|||
consumed.
|
||||
|
||||
This recipe uses a `GraphStage` to implement the desired feature. In the only handler we override,
|
||||
`onPush()` we just update a counter and see if it gets larger than `maximumBytes`. If a violation happens
|
||||
`onPush()` we update a counter and see if it gets larger than `maximumBytes`. If a violation happens
|
||||
we signal failure, otherwise we forward the chunk we have received.
|
||||
|
||||
Scala
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ or output ports. It is a counterpart of the `GraphDSL.create()` method which cre
|
|||
stages by composing others. Where `GraphStage` differs is that it creates a stage that is itself not divisible into
|
||||
smaller ones, and allows state to be maintained inside it in a safe way.
|
||||
|
||||
As a first motivating example, we will build a new `Source` that will simply emit numbers from 1 until it is
|
||||
As a first motivating example, we will build a new `Source` that will emit numbers from 1 until it is
|
||||
cancelled. To start, we need to define the "interface" of our stage, which is called *shape* in Akka Streams terminology
|
||||
(this is explained in more detail in the section @ref:[Modularity, Composition and Hierarchy](stream-composition.md)). This is how this looks like:
|
||||
|
||||
|
|
@ -50,7 +50,7 @@ To receive the necessary events one needs to register a subclass of @scala[`OutH
|
|||
(`Outlet`). This handler will receive events related to the lifecycle of the port. In our case we need to
|
||||
override `onPull()` which indicates that we are free to emit a single element. There is another callback,
|
||||
`onDownstreamFinish()` which is called if the downstream cancelled. Since the default behavior of that callback is
|
||||
to stop the stage, we don't need to override it. In the `onPull` callback we will simply emit the next number. This
|
||||
to stop the stage, we don't need to override it. In the `onPull` callback we will emit the next number. This
|
||||
is how it looks like in the end:
|
||||
|
||||
Scala
|
||||
|
|
@ -204,7 +204,7 @@ filter. The conceptual wiring of `Filter` looks like this:
|
|||
As we see above, if the given predicate matches the current element we are propagating it downwards, otherwise
|
||||
we return the “ball” to our upstream so that we get the new element. This is achieved by modifying the map
|
||||
example by adding a conditional in the `onPush` handler and decide between a `pull(in)` or `push(out)` call
|
||||
(and of course not having a mapping `f` function).
|
||||
(and not having a mapping `f` function).
|
||||
|
||||
Scala
|
||||
: @@snip [GraphStageDocSpec.scala]($code$/scala/docs/stream/GraphStageDocSpec.scala) { #many-to-one }
|
||||
|
|
@ -282,7 +282,7 @@ more advanced stages which may need to be debugged at some point.
|
|||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
The helper trait `akka.stream.stage.StageLogging` is provided to enable you to easily obtain a `LoggingAdapter`
|
||||
The helper trait `akka.stream.stage.StageLogging` is provided to enable you to obtain a `LoggingAdapter`
|
||||
inside of a `GraphStage` as long as the `Materializer` you're using is able to provide you with a logger.
|
||||
In that sense, it serves a very similar purpose as `ActorLogging` does for Actors.
|
||||
|
||||
|
|
@ -291,14 +291,14 @@ In that sense, it serves a very similar purpose as `ActorLogging` does for Actor
|
|||
@@@ div { .group-java }
|
||||
|
||||
You can extend the `akka.stream.stage.GraphStageWithLogging` or `akka.strea.stage.TimerGraphStageWithLogging` classes
|
||||
instead of the usual `GraphStage` to enable you to easily obtain a `LoggingAdapter` inside your stage as long as
|
||||
instead of the usual `GraphStage` to enable you to obtain a `LoggingAdapter` inside your stage as long as
|
||||
the `Materializer` you're using is able to provide you with a logger.
|
||||
|
||||
@@@
|
||||
|
||||
@@@ note
|
||||
|
||||
Please note that you can always simply use a logging library directly inside a Stage.
|
||||
Please note that you can always use a logging library directly inside a Stage.
|
||||
Make sure to use an asynchronous appender however, to not accidentally block the stage when writing to files etc.
|
||||
See @ref:[Using the SLF4J API directly](../logging.md#slf4j-directly) for more details on setting up async appenders in SLF4J.
|
||||
|
||||
|
|
@ -499,7 +499,7 @@ an implicit class that enriches them generically, this class would require expli
|
|||
parameters due to [SI-2712](https://issues.scala-lang.org/browse/SI-2712). For a partial workaround that unifies
|
||||
extensions to `Source` and `Flow` see [this sketch by R. Kuhn](https://gist.github.com/rkuhn/2870fcee4937dda2cad5).
|
||||
|
||||
A lot simpler is the task of just adding an extension method to `Source` as shown below:
|
||||
A lot simpler is the task of adding an extension method to `Source` as shown below:
|
||||
|
||||
@@snip [GraphStageDocSpec.scala]($code$/scala/docs/stream/GraphStageDocSpec.scala) { #extending-source }
|
||||
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ Java
|
|||
The resulting `Source` can be materialized any number of times, each materialization effectively attaching
|
||||
a new subscriber. If there are no subscribers attached to this hub then it will not drop any elements but instead
|
||||
backpressure the upstream producer until subscribers arrive. This behavior can be tweaked by using the combinators
|
||||
`.buffer` for example with a drop strategy, or just attaching a subscriber that drops all messages. If there
|
||||
`.buffer` for example with a drop strategy, or attaching a subscriber that drops all messages. If there
|
||||
are no other subscribers, this will ensure that the producer is kept drained (dropping all elements) and once a new
|
||||
subscriber arrives it will adaptively slow down, ensuring no more messages are dropped.
|
||||
|
||||
|
|
@ -139,7 +139,7 @@ Java
|
|||
|
||||
We now use a few tricks to add more features. First of all, we attach a `Sink.ignore`
|
||||
at the broadcast side of the channel to keep it drained when there are no subscribers. If this behavior is not the
|
||||
desired one this line can be simply dropped.
|
||||
desired one this line can be dropped.
|
||||
|
||||
Scala
|
||||
: @@snip [HubsDocSpec.scala]($code$/scala/docs/stream/HubsDocSpec.scala) { #pub-sub-2 }
|
||||
|
|
@ -149,7 +149,7 @@ Java
|
|||
|
||||
We now wrap the `Sink` and `Source` in a `Flow` using `Flow.fromSinkAndSource`. This bundles
|
||||
up the two sides of the channel into one and forces users of it to always define a publisher and subscriber side
|
||||
(even if the subscriber side is just dropping). It also allows us to very simply attach a `KillSwitch` as
|
||||
(even if the subscriber side is dropping). It also allows us to attach a `KillSwitch` as
|
||||
a `BidiStage` which in turn makes it possible to close both the original `Sink` and `Source` at the
|
||||
same time.
|
||||
Finally, we add `backpressureTimeout` on the consumer side to ensure that subscribers that block the channel for more
|
||||
|
|
@ -195,7 +195,7 @@ i.e. `int` greater than or equal to 0 and less than number of consumers.
|
|||
The resulting `Source` can be materialized any number of times, each materialization effectively attaching
|
||||
a new consumer. If there are no consumers attached to this hub then it will not drop any elements but instead
|
||||
backpressure the upstream producer until consumers arrive. This behavior can be tweaked by using the combinators
|
||||
`.buffer` for example with a drop strategy, or just attaching a consumer that drops all messages. If there
|
||||
`.buffer` for example with a drop strategy, or attaching a consumer that drops all messages. If there
|
||||
are no other consumers, this will ensure that the producer is kept drained (dropping all elements) and once a new
|
||||
consumer arrives and messages are routed to the new consumer it will adaptively slow down, ensuring no more messages
|
||||
are dropped.
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ it will be represented by the `RunnableGraph` type, indicating that it is ready
|
|||
It is important to remember that even after constructing the `RunnableGraph` by connecting all the source, sink and
|
||||
different processing stages, no data will flow through it until it is materialized. Materialization is the process of
|
||||
allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve
|
||||
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
|
||||
starting up Actors). Thanks to Flows being a description of the processing pipeline they are *immutable,
|
||||
thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have
|
||||
one actor prepare the work, and then have it be materialized at some completely different place in the code.
|
||||
|
||||
|
|
@ -214,7 +214,7 @@ To illustrate this further let us consider both problem situations and how the b
|
|||
|
||||
### Slow Publisher, fast Subscriber
|
||||
|
||||
This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are
|
||||
This is the happy case – we do not need to slow down the Publisher in this case. However signalling rates are
|
||||
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
|
||||
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
|
||||
during such situations, however we do not want to pay a high penalty for this safety net being enabled.
|
||||
|
|
|
|||
|
|
@ -151,17 +151,17 @@ A partial graph also verifies that all ports are either connected or part of the
|
|||
<a id="constructing-sources-sinks-flows-from-partial-graphs"></a>
|
||||
## Constructing Sources, Sinks and Flows from Partial Graphs
|
||||
|
||||
Instead of treating a @scala[partial graph]@java[`Graph`] as simply a collection of flows and junctions which may not yet all be
|
||||
Instead of treating a @scala[partial graph]@java[`Graph`] as a collection of flows and junctions which may not yet all be
|
||||
connected it is sometimes useful to expose such a complex graph as a simpler structure,
|
||||
such as a `Source`, `Sink` or `Flow`.
|
||||
|
||||
In fact, these concepts can be easily expressed as special cases of a partially connected graph:
|
||||
In fact, these concepts can be expressed as special cases of a partially connected graph:
|
||||
|
||||
* `Source` is a partial graph with *exactly one* output, that is it returns a `SourceShape`.
|
||||
* `Sink` is a partial graph with *exactly one* input, that is it returns a `SinkShape`.
|
||||
* `Flow` is a partial graph with *exactly one* input and *exactly one* output, that is it returns a `FlowShape`.
|
||||
|
||||
Being able to hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one
|
||||
Being able to hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to create one
|
||||
complex element and from there on treat it as simple compound stage for linear computations.
|
||||
|
||||
In order to create a Source from a graph the method `Source.fromGraph` is used, to use it we must have a
|
||||
|
|
@ -245,7 +245,7 @@ of the same type,
|
|||
* `FanInShape1`, `FanInShape2`, ..., `FanOutShape1`, `FanOutShape2`, ... for junctions
|
||||
with multiple input (or output) ports of different types.
|
||||
|
||||
Since our shape has two input ports and one output port, we can just use the `FanInShape` DSL to define
|
||||
Since our shape has two input ports and one output port, we can use the `FanInShape` DSL to define
|
||||
our custom shape:
|
||||
|
||||
Scala
|
||||
|
|
@ -312,7 +312,7 @@ Java
|
|||
: @@snip [BidiFlowDocTest.java]($code$/java/jdocs/stream/BidiFlowDocTest.java) { #codec-impl }
|
||||
|
||||
|
||||
In this way you could easily integrate any other serialization library that
|
||||
In this way you can integrate any other serialization library that
|
||||
turns an object into a sequence of bytes.
|
||||
|
||||
The other stage that we talked about is a little more involved since reversing
|
||||
|
|
@ -339,7 +339,7 @@ Java
|
|||
This example demonstrates how `BidiFlow` subgraphs can be hooked
|
||||
together and also turned around with the @scala[`.reversed`]@java[`.reversed()`] method. The test
|
||||
simulates both parties of a network communication protocol without actually
|
||||
having to open a network connection—the flows can just be connected directly.
|
||||
having to open a network connection—the flows can be connected directly.
|
||||
|
||||
<a id="graph-matvalue"></a>
|
||||
## Accessing the materialized value inside the Graph
|
||||
|
|
|
|||
|
|
@ -198,7 +198,7 @@ email addresses looked up.
|
|||
The final piece of this pipeline is to generate the demand that pulls the tweet
|
||||
authors information through the emailing pipeline: we attach a `Sink.ignore`
|
||||
which makes it all run. If our email process would return some interesting data
|
||||
for further transformation then we would of course not ignore it but send that
|
||||
for further transformation then we would not ignore it but send that
|
||||
result stream onwards for further processing or storage.
|
||||
|
||||
Note that `mapAsync` preserves the order of the stream elements. In this example the order
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ implementations like Akka Streams offer a nice user API).
|
|||
|
||||
The Akka Streams API is completely decoupled from the Reactive Streams
|
||||
interfaces. While Akka Streams focus on the formulation of transformations on
|
||||
data streams the scope of Reactive Streams is just to define a common mechanism
|
||||
data streams the scope of Reactive Streams is to define a common mechanism
|
||||
of how to move data across an asynchronous boundary without losses, buffering
|
||||
or resource exhaustion.
|
||||
|
||||
|
|
|
|||
|
|
@ -20,12 +20,12 @@ Java
|
|||
|
||||

|
||||
|
||||
Next, we simply handle *each* incoming connection using a `Flow` which will be used as the processing stage
|
||||
Next, we handle *each* incoming connection using a `Flow` which will be used as the processing stage
|
||||
to handle and emit `ByteString` s from and to the TCP Socket. Since one `ByteString` does not have to necessarily
|
||||
correspond to exactly one line of text (the client might be sending the line in chunks) we use the @scala[`Framing.delimiter`]@java[`delimiter`]
|
||||
helper Flow @java[from `akka.stream.javadsl.Framing`] to chunk the inputs up into actual lines of text. The last boolean
|
||||
argument indicates that we require an explicit line ending even for the last message before the connection is closed.
|
||||
In this example we simply add exclamation marks to each incoming text message and push it through the flow:
|
||||
In this example we add exclamation marks to each incoming text message and push it through the flow:
|
||||
|
||||
Scala
|
||||
: @@snip [StreamTcpDocSpec.scala]($code$/scala/docs/stream/io/StreamTcpDocSpec.scala) { #echo-server-simple-handle }
|
||||
|
|
@ -64,8 +64,8 @@ Java
|
|||
: @@snip [StreamTcpDocTest.java]($code$/java/jdocs/stream/io/StreamTcpDocTest.java) { #repl-client }
|
||||
|
||||
The `repl` flow we use to handle the server interaction first prints the servers response, then awaits on input from
|
||||
the command line (this blocking call is used here just for the sake of simplicity) and converts it to a
|
||||
`ByteString` which is then sent over the wire to the server. Then we simply connect the TCP pipeline to this
|
||||
the command line (this blocking call is used here for the sake of simplicity) and converts it to a
|
||||
`ByteString` which is then sent over the wire to the server. Then we connect the TCP pipeline to this
|
||||
processing stage–at this point it will be materialized and start processing data once the server responds with
|
||||
an *initial message*.
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ in which *either side is waiting for the other one to start the conversation*. O
|
|||
to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we
|
||||
are connecting to would start the conversation, which effectively means both sides are back-pressured and can not get
|
||||
the conversation started. There are multiple ways of dealing with this which are explained in depth in @ref:[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles),
|
||||
however in client-server scenarios it is often the simplest to make either side simply send an initial message.
|
||||
however in client-server scenarios it is often the simplest to make either side send an initial message.
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -112,7 +112,7 @@ which completes the stream once it encounters such command].
|
|||
|
||||
### Using framing in your protocol
|
||||
|
||||
Streaming transport protocols like TCP just pass streams of bytes, and does not know what is a logical chunk of bytes from the
|
||||
Streaming transport protocols like TCP only pass streams of bytes, and does not know what is a logical chunk of bytes from the
|
||||
application's point of view. Often when implementing network protocols you will want to introduce your own framing.
|
||||
This can be done in two ways:
|
||||
An end-of-frame marker, e.g. end line `\n`, can do framing via `Framing.delimiter`.
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ Java
|
|||
: @@snip [FlowParallelismDocTest.java]($code$/java/jdocs/stream/FlowParallelismDocTest.java) { #pipelining }
|
||||
|
||||
The two `map` stages in sequence (encapsulated in the "frying pan" flows) will be executed in a pipelined way,
|
||||
basically doing the same as Roland with his frying pans:
|
||||
the same way that Roland was using his frying pans:
|
||||
|
||||
1. A `ScoopOfBatter` enters `fryingPan1`
|
||||
2. `fryingPan1` emits a HalfCookedPancake once `fryingPan2` becomes available
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ Java
|
|||
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #run-source }
|
||||
|
||||
This line will complement the source with a consumer function—in this example
|
||||
we simply print out the numbers to the console—and pass this little stream
|
||||
we print out the numbers to the console—and pass this little stream
|
||||
setup to an Actor that runs it. This activation is signaled by having “run” be
|
||||
part of the method name; there are other methods that run Akka Streams, and
|
||||
they all follow this pattern.
|
||||
|
|
@ -92,11 +92,11 @@ There are other ways to create a materializer, e.g. from an
|
|||
`ActorContext` when using streams from within Actors. The
|
||||
`Materializer` is a factory for stream execution engines, it is the
|
||||
thing that makes streams run—you don’t need to worry about any of the details
|
||||
just now apart from that you need one for calling any of the `run` methods on
|
||||
right now apart from that you need one for calling any of the `run` methods on
|
||||
a `Source`. @scala[The materializer is picked up implicitly if it is omitted
|
||||
from the `run` method call arguments, which we will do in the following.]
|
||||
|
||||
The nice thing about Akka Streams is that the `Source` is just a
|
||||
The nice thing about Akka Streams is that the `Source` is a
|
||||
description of what you want to run, and like an architect’s blueprint it can
|
||||
be reused, incorporated into a larger design. We may choose to transform the
|
||||
source of integers and write it to a file instead:
|
||||
|
|
@ -112,7 +112,7 @@ stream: starting with the number 1 (@scala[`BigInt(1)`]@java[`BigInteger.ONE`])
|
|||
the incoming numbers, one after the other; the scan operation emits the initial
|
||||
value and then every calculation result. This yields the series of factorial
|
||||
numbers which we stash away as a `Source` for later reuse—it is
|
||||
important to keep in mind that nothing is actually computed yet, this is just a
|
||||
important to keep in mind that nothing is actually computed yet, this is a
|
||||
description of what we want to have computed once we run the stream. Then we
|
||||
convert the resulting series of numbers into a stream of `ByteString`
|
||||
objects describing lines in a text file. This stream is then run by attaching a
|
||||
|
|
@ -200,7 +200,7 @@ combinator to signal to all its upstream sources of data that it can only
|
|||
accept elements at a certain rate—when the incoming rate is higher than one per
|
||||
second the throttle operator will assert *back-pressure* upstream.
|
||||
|
||||
This is basically all there is to Akka Streams in a nutshell—glossing over the
|
||||
This is all there is to Akka Streams in a nutshell—glossing over the
|
||||
fact that there are dozens of sources and sinks and many more stream
|
||||
transformation operators to choose from, see also @ref:[operator index](operators/index.md).
|
||||
|
||||
|
|
@ -281,7 +281,7 @@ Finally in order to @ref:[materialize](stream-flows-and-basics.md#stream-materia
|
|||
the Flow to a @scala[`Sink`]@java[`Sink<T, M>`] that will get the Flow running. The simplest way to do this is to call
|
||||
`runWith(sink)` on a @scala[`Source`]@java[`Source<Out, M>`]. For convenience a number of common Sinks are predefined and collected as @java[static] methods on
|
||||
the @scala[`Sink` companion object]@java[`Sink class`].
|
||||
For now let's simply print each author:
|
||||
For now let's print each author:
|
||||
|
||||
Scala
|
||||
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreachsink-println }
|
||||
|
|
@ -340,7 +340,7 @@ For example we'd like to write all author handles into one file, and all hashtag
|
|||
This means we have to split the source stream into two streams which will handle the writing to these different files.
|
||||
|
||||
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
|
||||
One of these that we'll be using in this example is called `Broadcast`, and it simply emits elements from its
|
||||
One of these that we'll be using in this example is called `Broadcast`, and it emits elements from its
|
||||
input port to all of its output ports.
|
||||
|
||||
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs)
|
||||
|
|
@ -435,7 +435,7 @@ In our case this type is @scala[`Future[Int]`]@java[`CompletionStage<Integer>`]
|
|||
In case of the stream failing, this future would complete with a Failure.
|
||||
|
||||
A `RunnableGraph` may be reused
|
||||
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
|
||||
and materialized multiple times, because it is only the "blueprint" of the stream. This means that if we materialize a stream,
|
||||
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
|
||||
will be different, as illustrated by this example:
|
||||
|
||||
|
|
|
|||
|
|
@ -165,7 +165,7 @@ Java
|
|||
If our imaginary external job provider is a client using our API, we might
|
||||
want to enforce that the client cannot have more than 1000 queued jobs
|
||||
otherwise we consider it flooding and terminate the connection. This is
|
||||
easily achievable by the error strategy which simply fails the stream
|
||||
achievable by the error strategy which fails the stream
|
||||
once the buffer gets full.
|
||||
|
||||
Scala
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ Actors would usually be used to establish the stream, by means of some initial m
|
|||
"I want to offer you many log elements (the stream ref)", or alternatively in the opposite way "If you need
|
||||
to send me much data, here is the stream ref you can use to do so".
|
||||
|
||||
Since the two sides ("local" and "remote") of each reference may be confusing to simply refer to as
|
||||
Since the two sides ("local" and "remote") of each reference may be confusing to refer to as
|
||||
"remote" and "local" -- since either side can be seen as "local" or "remote" depending how we look at it --
|
||||
we propose to use the terminology "origin" and "target", which is defined by where the stream ref was created.
|
||||
For `SourceRef`s, the "origin" is the side which has the data that it is going to stream out. For `SinkRef`s
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ elements using:
|
|||
* sources and sinks specifically crafted for writing tests from the `akka-stream-testkit` module.
|
||||
|
||||
It is important to keep your data processing pipeline as separate sources,
|
||||
flows and sinks. This makes them easily testable by wiring them up to other
|
||||
flows and sinks. This makes them testable by wiring them up to other
|
||||
sources or sinks, or some test harnesses that `akka-testkit` or
|
||||
`akka-stream-testkit` provide.
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ To use Akka Testkit, add the module to your project:
|
|||
## Asynchronous Testing: `TestKit`
|
||||
|
||||
Testkit allows you to test your actors in a controlled but realistic
|
||||
environment. The definition of the environment depends of course very much on
|
||||
environment. The definition of the environment depends very much on
|
||||
the problem at hand and the level at which you intend to test, ranging from
|
||||
simple checks to full system tests.
|
||||
|
||||
|
|
@ -332,7 +332,7 @@ Java
|
|||
### Resolving Conflicts with Implicit ActorRef
|
||||
|
||||
If you want the sender of messages inside your TestKit-based tests to be the `testActor`
|
||||
simply mix in `ImplicitSender` into your test.
|
||||
mix in `ImplicitSender` into your test.
|
||||
|
||||
@@snip [PlainWordSpec.scala]($code$/scala/docs/testkit/PlainWordSpec.scala) { #implicit-sender }
|
||||
|
||||
|
|
@ -598,8 +598,8 @@ simplest example for this situation is an actor which sends a message to
|
|||
itself. In this case, processing cannot continue immediately as that would
|
||||
violate the actor model, so the invocation is queued and will be processed when
|
||||
the active invocation on that actor finishes its processing; thus, it will be
|
||||
processed on the calling thread, but simply after the actor finishes its
|
||||
previous work. In the other case, the invocation is simply processed
|
||||
processed on the calling thread, but after the actor finishes its
|
||||
previous work. In the other case, the invocation is processed
|
||||
immediately on the current thread. Futures scheduled via this dispatcher are
|
||||
also executed immediately.
|
||||
|
||||
|
|
@ -945,7 +945,7 @@ dispatcher to `CallingThreadDispatcher.global` and it sets the
|
|||
|
||||
If you want to test the actor behavior, including hotswapping, but without
|
||||
involving a dispatcher and without having the `TestActorRef` swallow
|
||||
any thrown exceptions, then there is another mode available for you: just use
|
||||
any thrown exceptions, then there is another mode available for you: use
|
||||
the `receive` method on `TestActorRef`, which will be forwarded to the
|
||||
underlying actor:
|
||||
|
||||
|
|
@ -957,7 +957,7 @@ Java
|
|||
|
||||
### Use Cases
|
||||
|
||||
You may of course mix and match both modi operandi of `TestActorRef` as
|
||||
You may mix and match both modi operandi of `TestActorRef` as
|
||||
suits your test needs:
|
||||
|
||||
* one common use case is setting up the actor into a specific internal state
|
||||
|
|
|
|||
|
|
@ -276,7 +276,7 @@ e.g. when interfacing with untyped actors.
|
|||
## Proxying
|
||||
|
||||
You can use the `typedActorOf` that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor.
|
||||
This is usable if you want to communicate remotely with TypedActors on other machines, just pass the `ActorRef` to `typedActorOf`.
|
||||
This is usable if you want to communicate remotely with TypedActors on other machines, pass the `ActorRef` to `typedActorOf`.
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -320,8 +320,8 @@ Scala
|
|||
Java
|
||||
: @@snip [TypedActorDocTest.java]($code$/java/jdocs/actor/TypedActorDocTest.java) { #typed-router-types }
|
||||
|
||||
In order to round robin among a few instances of such actors, you can simply create a plain untyped router,
|
||||
and then facade it with a `TypedActor` like shown in the example below. This works because typed actors of course
|
||||
In order to round robin among a few instances of such actors, you can create a plain untyped router,
|
||||
and then facade it with a `TypedActor` like shown in the example below. This works because typed actors
|
||||
communicate using the same mechanisms as normal actors, and methods calls on them get transformed into message sends of `MethodCall` messages.
|
||||
|
||||
Scala
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ Scala
|
|||
Java
|
||||
: @@snip [IntroSpec.scala]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java) { #imports }
|
||||
|
||||
With these in place we can define our first Actor, and of course it will say
|
||||
With these in place we can define our first Actor, and it will say
|
||||
hello!
|
||||
|
||||
Scala
|
||||
|
|
@ -172,7 +172,7 @@ The state is managed by changing behavior rather than using any variables.
|
|||
When a new `GetSession` command comes in we add that client to the
|
||||
list that is in the returned behavior. Then we also need to create the session’s
|
||||
`ActorRef` that will be used to post messages. In this case we want to
|
||||
create a very simple Actor that just repackages the `PostMessage`
|
||||
create a very simple Actor that repackages the `PostMessage`
|
||||
command into a `PublishSessionMessage` command which also includes the
|
||||
screen name.
|
||||
|
||||
|
|
@ -190,7 +190,7 @@ has `private` visibility and can't be created outside the `ChatRoom` @scala[obje
|
|||
If we did not care about securing the correspondence between a session and a
|
||||
screen name then we could change the protocol such that `PostMessage` is
|
||||
removed and all clients just get an @scala[`ActorRef[PublishSessionMessage]`]@java[`ActorRef<PublishSessionMessage>`] to
|
||||
send to. In this case no session actor would be needed and we could just use
|
||||
send to. In this case no session actor would be needed and we could use
|
||||
@scala[`ctx.self`]@java[`ctx.getSelf()`]. The type-checks work out in that case because
|
||||
@scala[`ActorRef[-T]`]@java[`ActorRef<T>`] is contravariant in its type parameter, meaning that we
|
||||
can use a @scala[`ActorRef[RoomCommand]`]@java[`ActorRef<RoomCommand>`] wherever an
|
||||
|
|
@ -257,7 +257,7 @@ called `ctx.watch` for it. This allows us to shut down the Actor system: when
|
|||
the main Actor terminates there is nothing more to do.
|
||||
|
||||
Therefore after creating the Actor system with the `main` Actor’s
|
||||
`Behavior` we just await its termination.
|
||||
`Behavior` the only thing we do is await its termination.
|
||||
|
||||
|
||||
## Relation to Akka (untyped) Actors
|
||||
|
|
|
|||
|
|
@ -103,6 +103,6 @@ Scala
|
|||
Java
|
||||
: @@snip [TypedWatchingUntypedTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/coexistence/TypedWatchingUntypedTest.java) { #typed }
|
||||
|
||||
There is one caveat regarding supervision of untyped child from typed parent. If the child throws an exception we would expect it to be restarted, but supervision in Akka Typed defaults to stopping the child in case it fails. The restarting facilities in Akka Typed will not work with untyped children. However, the workaround is simply to add another untyped actor that takes care of the supervision, i.e. restarts in case of failure if that is the desired behavior.
|
||||
There is one caveat regarding supervision of untyped child from typed parent. If the child throws an exception we would expect it to be restarted, but supervision in Akka Typed defaults to stopping the child in case it fails. The restarting facilities in Akka Typed will not work with untyped children. However, the workaround is to add another untyped actor that takes care of the supervision, i.e. restarts in case of failure if that is the desired behavior.
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ Scala
|
|||
Java
|
||||
: @@snip [PersistentActorCompileOnyTest.java]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #state }
|
||||
|
||||
The command handler just persists the `Cmd` payload in an `Evt`@java[. In this simple example the command handler is defined using a lambda, for the more complicated example below a `CommandHandlerBuilder` is used]:
|
||||
The command handler persists the `Cmd` payload in an `Evt`@java[. In this simple example the command handler is defined using a lambda, for the more complicated example below a `CommandHandlerBuilder` is used]:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #command-handler }
|
||||
|
|
@ -185,7 +185,7 @@ Java
|
|||
|
||||
The event handler is always the same independent of state. The main reason for not making the event handler
|
||||
part of the `CommandHandler` is that all events must be handled and that is typically independent of what the
|
||||
current state is. The event handler can of course still decide what to do based on the state if that is needed.
|
||||
current state is. The event handler can still decide what to do based on the state if that is needed.
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #event-handler }
|
||||
|
|
|
|||
|
|
@ -594,7 +594,7 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
|||
promise.complete(elem);
|
||||
push(out, elem);
|
||||
|
||||
// replace handler with one just forwarding
|
||||
// replace handler with one that only forwards elements
|
||||
setHandler(in, new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() {
|
||||
|
|
|
|||
|
|
@ -271,7 +271,7 @@ public class TestKitDocTest extends AbstractJavaTest {
|
|||
public void demonstrateProbe() {
|
||||
//#test-probe
|
||||
new TestKit(system) {{
|
||||
// simple actor which just forwards messages
|
||||
// simple actor which only forwards messages
|
||||
class Forwarder extends AbstractActor {
|
||||
final ActorRef target;
|
||||
@SuppressWarnings("unused")
|
||||
|
|
|
|||
|
|
@ -214,12 +214,12 @@ package docs.serialization {
|
|||
// (beneath toBinary)
|
||||
val identifier: String = Serialization.serializedActorPath(theActorRef)
|
||||
|
||||
// Then just serialize the identifier however you like
|
||||
// Then serialize the identifier however you like
|
||||
|
||||
// Deserialize
|
||||
// (beneath fromBinary)
|
||||
val deserializedActorRef = extendedSystem.provider.resolveActorRef(identifier)
|
||||
// Then just use the ActorRef
|
||||
// Then use the ActorRef
|
||||
//#actorref-serializer
|
||||
|
||||
//#external-address
|
||||
|
|
|
|||
|
|
@ -427,7 +427,7 @@ class GraphStageDocSpec extends AkkaSpec {
|
|||
promise.success(elem)
|
||||
push(out, elem)
|
||||
|
||||
// replace handler with one just forwarding
|
||||
// replace handler with one that only forwards elements
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
push(out, grab(in))
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class RecipeCollectingMetrics extends RecipeSpec {
|
|||
// the counter stream and store the final value, and also repeat this final value if no update is received between
|
||||
// metrics collection rounds.
|
||||
//
|
||||
// To finish the recipe, we simply use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad``
|
||||
// To finish the recipe, we use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad``
|
||||
// stream whenever a new ``Tick`` arrives on the stream of ticks, ``reportTicks``.
|
||||
//
|
||||
// .. includecode:: ../code/docs/stream/cookbook/RecipeCollectingMetrics.scala#periodic-metrics-collection
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue