Merge branch 'michaelom-patch-2'

This commit is contained in:
Konrad Malawski 2015-10-13 11:54:21 +02:00
commit bd8b5cba6a
33 changed files with 106 additions and 47 deletions

View file

@ -14,7 +14,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@SuppressWarnings("serial")
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since
// Unsafe does not expose XCHG operation intrinsically before JDK 8
@SuppressWarnings("unused")
private volatile Node<T> _tailDoNotCallMeDirectly;
@ -34,7 +35,7 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
Node<T> next = tail.next();
if (next == null && get() != tail) {
// if tail != head this is not going to change until consumer makes progress
// if tail != head this is not going to change until producer makes progress
// we can avoid reading the head and just spin on next until it shows up
do {
next = tail.next();

View file

@ -82,10 +82,6 @@ akka {
# Timeout for ActorSystem.actorOf
creation-timeout = 20s
# Frequency with which stopping actors are prodded in case they had to be
# removed from their parents
reaper-interval = 5s
# Serializes and deserializes (non-primitive) messages to ensure immutability,
# this is only intended for testing.
serialize-messages = off

View file

@ -136,10 +136,12 @@ private[akka] class Mailboxes(
lazy val mqType: Class[_] = getProducedMessageQueueType(mailboxType)
if (hasMailboxRequirement && !mailboxRequirement.isAssignableFrom(mqType))
throw new IllegalArgumentException(
s"produced message queue type [$mqType] does not fulfill requirement for dispatcher [${id}]")
s"produced message queue type [$mqType] does not fulfill requirement for dispatcher [$id]. " +
s"Must be a subclass of [$mailboxRequirement].")
if (hasRequiredType(actorClass) && !actorRequirement.isAssignableFrom(mqType))
throw new IllegalArgumentException(
s"produced message queue type [$mqType] does not fulfill requirement for actor class [$actorClass]")
s"produced message queue type [$mqType] does not fulfill requirement for actor class [$actorClass]. " +
s"Must be a subclass of [$actorRequirement].")
mailboxType
}

View file

@ -9,6 +9,6 @@ OSGi.clusterSharding
Dependencies.clusterSharding
//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-cluster-sharding").value
MimaKeys.previousArtifact := akkaPreviousArtifact("akka-cluster-sharding").value
enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)

View file

@ -9,6 +9,6 @@ OSGi.clusterTools
Dependencies.clusterTools
//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-cluster-tools").value
MimaKeys.previousArtifact := akkaPreviousArtifact("akka-cluster-tools").value
enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)

View file

@ -11,7 +11,7 @@ OSGi.distributedData
Dependencies.distributedData
//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-distributed-data").value
MimaKeys.previousArtifact := akkaPreviousArtifact("akka-distributed-data-experimental").value
enablePlugins(MultiNodeScalaTest)

View file

@ -109,6 +109,11 @@ class IncludeCode(Directive):
return count
nonempty = filter(lambda l: l.strip(), lines)
if not nonempty:
return [document.reporter.error(
"Snippet ({}#{}) not found!".format(filename, section),
line=self.lineno
)]
tabcounts = map(lambda l: countwhile(lambda c: c == ' ', l), nonempty)
tabshift = min(tabcounts) if tabcounts else 0

View file

@ -196,8 +196,8 @@ Logging of Configuration
------------------------
If the system or config property ``akka.log-config-on-start`` is set to ``on``, then the
complete configuration at INFO level when the actor system is started. This is useful
when you are uncertain of what configuration is used.
complete configuration is logged at INFO level when the actor system is started. This is
useful when you are uncertain of what configuration is used.
If in doubt, you can also easily and nicely inspect configuration objects
before or after using them to construct an actor system:

View file

@ -36,7 +36,7 @@ The above actor uses event sourcing and the support provided in ``UntypedPersist
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable.
Note how the ``persistenceId`` is defined. The name of the actor is the entity entity identifier (utf-8 URL-encoded).
Note how the ``persistenceId`` is defined. The name of the actor is the entity identifier (utf-8 URL-encoded).
You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node
@ -259,7 +259,7 @@ Note that stopped entities will be started again when a new message is targeted
Graceful Shutdown
-----------------
You can send the message ``ClusterSharding.GracefulShutdown`` message (``ClusterSharding.gracefulShutdownInstance
You can send the message ``ClusterSharding.GracefulShutdown`` message (``ClusterSharding.gracefulShutdownInstance``
in Java) to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``ShardRegion`` and then the
``ShardRegion`` actor will be stopped. You can ``watch`` the ``ShardRegion`` actor to know when it is completed.
During this period other regions will buffer messages for those shards in the same way as when a rebalance is

View file

@ -61,7 +61,7 @@ of programmatically provided parameter.
Types of dispatchers
--------------------
There are 4 different types of message dispatchers:
There are 3 different types of message dispatchers:
* Dispatcher

View file

@ -231,7 +231,12 @@ Loggers
=======
Logging is performed asynchronously through an event bus. Log events are processed by an event handler actor
and it will receive the log events in the same order as they were emitted.
and it will receive the log events in the same order as they were emitted.
.. note::
The event handler actor does not have a bounded inbox and is run on the default dispatcher. This means
that logging extreme amounts of data may affect your application badly. It can be somewhat mitigated by
making sure to use an async logging backend though. (See :ref:`slf4j-directly-java`)
You can configure which event handlers are created at system start-up and listen to logging events. That is done using the
``loggers`` element in the :ref:`configuration`.
@ -323,6 +328,18 @@ the first case and ``LoggerFactory.getLogger(String s)`` in the second).
final LoggingAdapter log = Logging.getLogger(system.eventStream(), "my.string");
.. _slf4j-directly-java:
Using the SLF4J API directly
----------------------------
If you use the SLF4J API directly in your application, remember that the logging operations will block
while the underlying infrastructure writes the log statements.
This can be avoided by configuring the logging implementation to use
a non-blocking appender. Logback provides `AsyncAppender <http://logback.qos.ch/manual/appenders.html#AsyncAppender>`_
that does this. It also contains a feature which will drop ``INFO`` and ``DEBUG`` messages if the logging
load is high.
Logging Thread, Akka Source and Actor System in MDC
---------------------------------------------------

View file

@ -799,12 +799,12 @@ Plugin TCK
----------
In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK <http://en.wikipedia.org/wiki/Technology_Compatibility_Kit>`_ for short).
The TCK is usable from Java as well as Scala projects, for Java you need to include the akka-persistence-tck-experimental dependency::
The TCK is usable from Java as well as Scala projects, for Java you need to include the akka-persistence-tck dependency::
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-tck-experimental_${scala.version}</artifactId>
<version>2.3.5</version>
<artifactId>akka-persistence-tck_${scala.version}</artifactId>
<version>@version@</version>
<scope>test</scope>
</dependency>

View file

@ -413,6 +413,12 @@ implement it yourself either as a helper trait or simply by overriding ``persist
override def persistenceId = self.path.toStringWithoutAddress
Failures
--------
Backend journal failures during recovery and persist are treated differently than in 2.3.x. The ``PersistenceFailure``
message is removed and the actor is unconditionally stopped. The new behavior and reasons for it is explained in
:ref:`failures-scala`.
Persist sequence of events
--------------------------

View file

@ -36,7 +36,7 @@ The above actor uses event sourcing and the support provided in ``PersistentActo
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable.
Note how the ``persistenceId`` is defined. The name of the actor is the entity entity identifier (utf-8 URL-encoded).
Note how the ``persistenceId`` is defined. The name of the actor is the entity identifier (utf-8 URL-encoded).
You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node
@ -262,8 +262,7 @@ Note that stopped entities will be started again when a new message is targeted
Graceful Shutdown
-----------------
You can send the message ``ClusterSharding.GracefulShutdown`` message (``ClusterSharding.gracefulShutdownInstance
in Java) to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``ShardRegion`` and then the
You can send the message ``ClusterSharding.GracefulShutdown`` message to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``ShardRegion`` and then the
``ShardRegion`` actor will be stopped. You can ``watch`` the ``ShardRegion`` actor to know when it is completed.
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.

View file

@ -61,7 +61,7 @@ of programmatically provided parameter.
Types of dispatchers
--------------------
There are 4 different types of message dispatchers:
There are 3 different types of message dispatchers:
* Dispatcher

View file

@ -274,6 +274,11 @@ Loggers
Logging is performed asynchronously through an event bus. Log events are processed by an event handler actor
and it will receive the log events in the same order as they were emitted.
.. note::
The event handler actor does not have a bounded inbox and is run on the default dispatcher. This means
that logging extreme amounts of data may affect your application badly. It can be somewhat mitigated by
making sure to use an async logging backend though. (See :ref:`slf4j-directly-scala`)
You can configure which event handlers are created at system start-up and listen to logging events. That is done using the
``loggers`` element in the :ref:`configuration`.
Here you can also define the log level. More fine grained filtering based on the log source
@ -359,6 +364,18 @@ the first case and ``LoggerFactory.getLogger(s: String)`` in the second).
val log = Logging(system.eventStream, "my.nice.string")
.. _slf4j-directly-scala:
Using the SLF4J API directly
----------------------------
If you use the SLF4J API directly in your application, remember that the logging operations will block
while the underlying infrastructure writes the log statements.
This can be avoided by configuring the logging implementation to use
a non-blocking appender. Logback provides `AsyncAppender <http://logback.qos.ch/manual/appenders.html#AsyncAppender>`_
that does this. It also contains a feature which will drop ``INFO`` and ``DEBUG`` messages if the logging
load is high.
Logging Thread, Akka Source and Actor System in MDC
---------------------------------------------------

View file

@ -858,9 +858,9 @@ Plugin TCK
----------
In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK <http://en.wikipedia.org/wiki/Technology_Compatibility_Kit>`_ for short).
The TCK is usable from Java as well as Scala projects, for Scala you need to include the akka-persistence-tck-experimental dependency::
The TCK is usable from Java as well as Scala projects, for Scala you need to include the akka-persistence-tck dependency::
"com.typesafe.akka" %% "akka-persistence-tck-experimental" % "@version@" % "test"
"com.typesafe.akka" %% "akka-persistence-tck" % "@version@" % "test"
To include the Journal TCK tests in your test suite simply extend the provided ``JournalSpec``:

View file

@ -748,7 +748,6 @@ options:
.. includecode:: code/docs/testkit/TestkitDocSpec.scala#logging-receive
.
If the aforementioned setting is not given in the :ref:`configuration`, this method will
pass through the given :class:`Receive` function unmodified, meaning that
there is no runtime cost unless actually enabled.

View file

@ -11,7 +11,7 @@ OSGi.persistenceQuery
Dependencies.persistenceQuery
//MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value
MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-query-experimental").value
enablePlugins(ScaladocNoVerificationOfDiagrams)

View file

@ -71,7 +71,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
}
} else {
(fromSnr to toSnr).map { i
AtomicWrite(PersistentRepr(persistentRepr(i)))
AtomicWrite(persistentRepr(i))
}
}

View file

@ -0,0 +1,18 @@
package akka.persistence.journal.leveldb
import akka.persistence.journal.JournalSpec
import akka.persistence.{ PersistenceSpec, PluginCleanup }
class LeveldbJournalNoAtomicPersistMultipleEventsSpec extends JournalSpec(
config = PersistenceSpec.config(
"leveldb",
"LeveldbJournalNoAtomicPersistMultipleEventsSpec",
extraConfig = Some("akka.persistence.journal.leveldb.native = off")))
with PluginCleanup {
/**
* Setting to false to test the single message atomic write behaviour of JournalSpec
*/
override def supportsAtomicPersistAllOfSeveralEvents = false
}

View file

@ -9,6 +9,6 @@ OSGi.persistence
Dependencies.persistence
MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence-experimental").value
MimaKeys.previousArtifact := akkaPreviousArtifact("akka-persistence").value
fork in Test := true

View file

@ -407,8 +407,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c
}
class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(
// TODO disable debug logging once happy with stability of this test
ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))

View file

@ -163,8 +163,12 @@ class SteppingInMemPersistentActorStashingSpec extends PersistenceSpec(
SteppingInmemJournal.step(journal)
SteppingInmemJournal.step(journal)
persistentActor ! GetState
expectMsg(List("a", "c", "b"))
within(3.seconds) {
awaitAssert {
persistentActor ! GetState
expectMsg(List("a", "c", "b"))
}
}
}
}

View file

@ -30,7 +30,6 @@ object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig {
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s
akka.remote.gate-invalid-addresses-for = 0.5 s
""")))
testTransport(on = true)

View file

@ -26,8 +26,6 @@ object RemoteQuarantinePiercingSpec extends MultiNodeConfig {
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = INFO
akka.remote.quarantine-systems-for = 1 d
akka.remote.gate-invalid-addresses-for = 0.5 s
""")))
class Subject extends Actor {

View file

@ -31,7 +31,6 @@ object Ticket15109Spec extends MultiNodeConfig {
## Keep it tight, otherwise reestablishing a connection takes too much time
akka.remote.transport-failure-detector.heartbeat-interval = 1 s
akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s
akka.remote.quarantine-systems-for = 1 d
akka.remote.retry-gate-closed-for = 0.5 s
""")))

View file

@ -82,8 +82,6 @@ object RemotingSpec {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote {
transport = "akka.remote.Remoting"
retry-gate-closed-for = 1 s
log-remote-lifecycle-events = on
@ -637,7 +635,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
val config = ConfigFactory.parseString(s"""
akka.remote.enabled-transports = ["akka.remote.test"]
akka.remote.retry-gate-closed-for = 5s
akka.remote.log-lifecylce-events = on
akka.remote.log-remote-lifecycle-events = on
#akka.loglevel = DEBUG
akka.remote.test {
@ -717,7 +715,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
val config = ConfigFactory.parseString(s"""
akka.remote.enabled-transports = ["akka.remote.test"]
akka.remote.retry-gate-closed-for = 5s
akka.remote.log-lifecylce-events = on
akka.remote.log-remote-lifecycle-events = on
akka.remote.test {
registry-key = JMeMndLLsw

View file

@ -42,7 +42,6 @@ object Configuration {
hostname = localhost
port = %d
security {
enable = on
trust-store = "%s"
key-store = "%s"
key-store-password = "changeme"

View file

@ -38,7 +38,6 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
transport-failure-detector {
implementation-class = "akka.remote.PhiAccrualFailureDetector"
threshold = 7.0
max-sample-size = 100
min-std-deviation = 100 ms
acceptable-heartbeat-pause = 3 s

View file

@ -20,7 +20,6 @@ object AkkaProtocolStressTest {
remote.log-remote-lifecycle-events = on
remote.transport-failure-detector {
threshold = 1.0
max-sample-size = 2
min-std-deviation = 1 ms
## We want lots of lost connections in this test, keep it sensitive

View file

@ -406,7 +406,7 @@ object AkkaBuild extends Build {
def akkaPreviousArtifact(id: String): Def.Initialize[Option[sbt.ModuleID]] = Def.setting {
if (enableMiMa) {
val version: String = "2.3.11" // FIXME verify all 2.3.x versions
val version: String = "2.4.0" // FIXME verify all 2.3.x versions
val fullId = crossVersion.value match {
case _ : CrossVersion.Binary => id + "_" + scalaBinaryVersion.value
case _ : CrossVersion.Full => id + "_" + scalaVersion.value

View file

@ -33,7 +33,11 @@ object MiMa extends AutoPlugin {
}
val mimaIgnoredProblems = {
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.core._
Seq()
// FIXME somehow we must use different filters when akkaPreviousArtifact is 2.3.x
/* Below are the filters we used when comparing to 2.3.x
Seq(
FilterAnyProblem("akka.remote.testconductor.Terminate"),
FilterAnyProblem("akka.remote.testconductor.TerminateMsg"),
@ -572,5 +576,6 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ClusterCoreDaemon.akka$cluster$ClusterCoreDaemon$$isJoiningToUp$1")
)
*/
}
}