Merge pull request #21587 from akka/wip-merge-to-master-patriknw

Merge Artery into master
This commit is contained in:
Patrik Nordwall 2016-09-29 15:45:27 +02:00 committed by GitHub
commit 577f43233a
315 changed files with 38582 additions and 832 deletions

2
.gitignore vendored
View file

@ -1,4 +1,6 @@
*#
*.log
*.orig
*.jfr
*.iml
*.ipr

View file

@ -5,7 +5,6 @@
package akka.serialization
import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor._
import akka.dispatch.sysmsg._
@ -17,6 +16,8 @@ import scala.beans.BeanInfo
import com.typesafe.config._
import akka.pattern.ask
import org.apache.commons.codec.binary.Hex.encodeHex
import java.nio.ByteOrder
import java.nio.ByteBuffer
import akka.actor.NoSerializationVerificationNeeded
import test.akka.serialization.NoVerification
@ -249,7 +250,25 @@ class SerializeSpec extends AkkaSpec(SerializationTests.serializeConf) {
intercept[IllegalArgumentException] {
byteSerializer.toBinary("pigdog")
}.getMessage should ===("ByteArraySerializer only serializes byte arrays, not [pigdog]")
}.getMessage should ===(s"${classOf[ByteArraySerializer].getName} only serializes byte arrays, not [java.lang.String]")
}
"support ByteBuffer serialization for byte arrays" in {
val byteSerializer = ser.serializerFor(classOf[Array[Byte]]).asInstanceOf[ByteBufferSerializer]
val byteBuffer = ByteBuffer.allocate(128).order(ByteOrder.LITTLE_ENDIAN)
val str = "abcdef"
val payload = str.getBytes("UTF-8")
byteSerializer.toBinary(payload, byteBuffer)
byteBuffer.position() should ===(payload.length)
byteBuffer.flip()
val deserialized = byteSerializer.fromBinary(byteBuffer, "").asInstanceOf[Array[Byte]]
byteBuffer.remaining() should ===(0)
new String(deserialized, "UTF-8") should ===(str)
intercept[IllegalArgumentException] {
byteSerializer.toBinary("pigdog", byteBuffer)
}.getMessage should ===(s"${classOf[ByteArraySerializer].getName} only serializes byte arrays, not [java.lang.String]")
}
}
}

View file

@ -83,9 +83,11 @@ akka {
actor {
# Either one of "local", "remote" or "cluster" or the
# FQCN of the ActorRefProvider to be used; the below is the built-in default,
# another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle.
provider = "akka.actor.LocalActorRefProvider"
# note that "remote" and "cluster" requires the akka-remote and akka-cluster
# artifacts to be on the classpath.
provider = "local"
# The guardian "/user" will use this class to obtain its supervisorStrategy.
# It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator.
@ -583,6 +585,19 @@ akka {
"[B" = bytes
"java.io.Serializable" = java
}
# Set this to on to enable serialization-bindings define in
# additional-serialization-bindings. Those are by default not included
# for backwards compatibility reasons. They are enabled by default if
# akka.remote.artery.enabled=on.
enable-additional-serialization-bindings = off
# Additional serialization-bindings that are replacing Java serialization are
# defined in this section and not included by default for backwards compatibility
# reasons. They can be enabled with enable-additional-serialization-bindings=on.
# They are enabled by default if akka.remote.artery.enabled=on.
additional-serialization-bindings {
}
# Log warnings when the default Java serialization is used to serialize messages.
# The default serializer uses Java serialization which is not very performant and should not
@ -604,7 +619,7 @@ akka {
# Identifier values from 0 to 16 are reserved for Akka internal usage.
serialization-identifiers {
"akka.serialization.JavaSerializer" = 1
"akka.serialization.ByteArraySerializer" = 4
"akka.serialization.ByteArraySerializer" = 4
}
# Configuration items which are used by the akka.actor.ActorDSL._ methods

View file

@ -166,7 +166,15 @@ object ActorSystem {
import config._
final val ConfigVersion: String = getString("akka.version")
final val ProviderClass: String = getString("akka.actor.provider")
final val ProviderClass: String =
getString("akka.actor.provider") match {
case "local" classOf[LocalActorRefProvider].getName
// these two cannot be referenced by class as they may not be on the classpath
case "remote" "akka.remote.RemoteActorRefProvider"
case "cluster" "akka.cluster.ClusterActorRefProvider"
case fqcn fqcn
}
final val SupervisorStrategyClass: String = getString("akka.actor.guardian-supervisor-strategy")
final val CreationTimeout: Timeout = Timeout(config.getMillisDuration("akka.actor.creation-timeout"))
final val UnstartedPushTimeout: Timeout = Timeout(config.getMillisDuration("akka.actor.unstarted-push-timeout"))
@ -831,6 +839,7 @@ private[akka] class ActorSystemImpl(
/**
* Adds a Runnable that will be executed on ActorSystem termination.
* Note that callbacks are executed in reverse order of insertion.
*
* @param r The callback to be executed on ActorSystem termination
* Throws RejectedExecutionException if called after ActorSystem has been terminated.
*/

View file

@ -76,6 +76,18 @@ object Address {
* Constructs a new Address with the specified protocol, system name, host and port
*/
def apply(protocol: String, system: String, host: String, port: Int) = new Address(protocol, system, Some(host), Some(port))
/**
* `Address` ordering type class, sorts addresses by protocol, name, host and port.
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
if (a eq b) false
else if (a.protocol != b.protocol) a.system.compareTo(b.protocol) < 0
else if (a.system != b.system) a.system.compareTo(b.system) < 0
else if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
else false
}
}
private[akka] trait PathUtils {

View file

@ -396,6 +396,17 @@ object Logging {
n.substring(i + 1)
}
/**
* Class name representation of a message.
* `ActorSelectionMessage` representation includes class name of
* wrapped message.
*/
def messageClassName(message: Any): String = message match {
case null "null"
case ActorSelectionMessage(m, _, _) s"ActorSelectionMessage(${m.getClass.getName})"
case m m.getClass.getName
}
/**
* INTERNAL API
*/

View file

@ -71,6 +71,29 @@ private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries:
tryCleanDirectByteBuffer(buf)
}
private final def tryCleanDirectByteBuffer(toBeDestroyed: ByteBuffer): Unit = DirectByteBufferPool.tryCleanDirectByteBuffer(toBeDestroyed)
}
/** INTERNAL API */
private[akka] object DirectByteBufferPool {
private val CleanDirectBuffer: ByteBuffer Unit =
try {
val cleanerMethod = Class.forName("java.nio.DirectByteBuffer").getMethod("cleaner")
cleanerMethod.setAccessible(true)
val cleanMethod = Class.forName("sun.misc.Cleaner").getMethod("clean")
cleanMethod.setAccessible(true)
{ (bb: ByteBuffer)
try
if (bb.isDirect) {
val cleaner = cleanerMethod.invoke(bb)
cleanerMethod.invoke(cleaner)
}
catch { case NonFatal(e) /* ok, best effort attempt to cleanup failed */ }
}
} catch { case NonFatal(e) _ () /* reflection failed, use no-op fallback */ }
/**
* DirectByteBuffers are garbage collected by using a phantom reference and a
* reference queue. Every once a while, the JVM checks the reference queue and
@ -81,16 +104,5 @@ private[akka] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries:
*
* Utilizes reflection to avoid dependency to `sun.misc.Cleaner`.
*/
private final def tryCleanDirectByteBuffer(toBeDestroyed: ByteBuffer): Unit = try {
if (toBeDestroyed.isDirect) {
val cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner")
cleanerMethod.setAccessible(true)
val cleaner = cleanerMethod.invoke(toBeDestroyed)
val cleanMethod = cleaner.getClass().getMethod("clean")
cleanMethod.setAccessible(true)
cleanMethod.invoke(cleaner)
}
} catch {
case NonFatal(_) // attempt failed, ok
}
}
def tryCleanDirectByteBuffer(byteBuffer: ByteBuffer): Unit = CleanDirectBuffer(byteBuffer)
}

View file

@ -68,11 +68,11 @@ object MurmurHash {
/**
* Incorporates a new value into an existing hash.
*
* @param hash the prior hash value
* @param value the new value to incorporate
* @param magicA a magic integer from the stream
* @param magicB a magic integer from a different stream
* @return the updated hash value
* @param hash the prior hash value
* @param value the new value to incorporate
* @param magicA a magic integer from the stream
* @param magicB a magic integer from a different stream
* @return the updated hash value
*/
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int =
(hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer

View file

@ -14,6 +14,10 @@ import scala.util.{ Try, DynamicVariable, Failure }
import scala.collection.immutable
import scala.util.control.NonFatal
import scala.util.Success
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import java.util.NoSuchElementException
object Serialization {
@ -30,12 +34,20 @@ object Serialization {
private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
class Settings(val config: Config) {
val Serializers: Map[String, String] = configToMap("akka.actor.serializers")
val SerializationBindings: Map[String, String] = configToMap("akka.actor.serialization-bindings")
val Serializers: Map[String, String] = configToMap(config.getConfig("akka.actor.serializers"))
val SerializationBindings: Map[String, String] = {
val defaultBindings = config.getConfig("akka.actor.serialization-bindings")
val bindings =
if (config.getBoolean("akka.actor.enable-additional-serialization-bindings") ||
config.getBoolean("akka.remote.artery.enabled"))
defaultBindings.withFallback(config.getConfig("akka.actor.additional-serialization-bindings"))
else defaultBindings
configToMap(bindings)
}
private final def configToMap(path: String): Map[String, String] = {
private final def configToMap(cfg: Config): Map[String, String] = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) (k v.toString) }
cfg.root.unwrapped.asScala.toMap map { case (k, v) (k v.toString) }
}
}
@ -83,6 +95,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
val settings = new Settings(system.settings.config)
val log = Logging(system, getClass.getName)
private val manifestCache = new AtomicReference[Map[String, Option[Class[_]]]](Map.empty[String, Option[Class[_]]])
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
@ -97,7 +110,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
*/
def deserialize[T](bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_ <: T]]): Try[T] =
Try {
val serializer = try serializerByIdentity(serializerId) catch {
val serializer = try getSerializerById(serializerId) catch {
case _: NoSuchElementException throw new NotSerializableException(
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
@ -112,28 +125,66 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
*/
def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] =
Try {
val serializer = try serializerByIdentity(serializerId) catch {
val serializer = try getSerializerById(serializerId) catch {
case _: NoSuchElementException throw new NotSerializableException(
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
serializer match {
case s2: SerializerWithStringManifest s2.fromBinary(bytes, manifest)
case s1
if (manifest == "")
s1.fromBinary(bytes, None)
else {
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
case Success(classManifest)
s1.fromBinary(bytes, Some(classManifest))
case Failure(e)
throw new NotSerializableException(
s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].")
}
}
}
deserializeByteArray(bytes, serializer, manifest)
}
private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = {
@tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = {
manifestCache.compareAndSet(cache, cache.updated(key, value)) ||
updateCache(manifestCache.get, key, value) // recursive, try again
}
serializer match {
case s2: SerializerWithStringManifest s2.fromBinary(bytes, manifest)
case s1
if (manifest == "")
s1.fromBinary(bytes, None)
else {
val cache = manifestCache.get
cache.get(manifest) match {
case Some(cachedClassManifest) s1.fromBinary(bytes, cachedClassManifest)
case None
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
case Success(classManifest)
val classManifestOption: Option[Class[_]] = Some(classManifest)
updateCache(cache, manifest, classManifestOption)
s1.fromBinary(bytes, classManifestOption)
case Failure(e)
throw new NotSerializableException(
s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].")
}
}
}
}
}
/**
* Deserializes the given ByteBuffer of bytes using the specified serializer id,
* using the optional type hint to the Serializer.
* Returns either the resulting object or throws an exception if deserialization fails.
*/
def deserializeByteBuffer(buf: ByteBuffer, serializerId: Int, manifest: String): AnyRef = {
val serializer = try getSerializerById(serializerId) catch {
case _: NoSuchElementException throw new NotSerializableException(
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
serializer match {
case ser: ByteBufferSerializer
ser.fromBinary(buf, manifest)
case _
val bytes = Array.ofDim[Byte](buf.remaining())
buf.get(bytes)
deserializeByteArray(bytes, serializer, manifest)
}
}
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* Returns either the resulting object or an Exception if one was thrown.
@ -246,6 +297,31 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
val serializerByIdentity: Map[Int, Serializer] =
Map(NullSerializer.identifier NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
/**
* Serializers with id 0 - 1023 are stored in an array for quick allocation free access
*/
private val quickSerializerByIdentity: Array[Serializer] = {
val size = 1024
val table = Array.ofDim[Serializer](size)
serializerByIdentity.foreach {
case (id, ser) if (0 <= id && id < size) table(id) = ser
}
table
}
/**
* @throws `NoSuchElementException` if no serializer with given `id`
*/
private def getSerializerById(id: Int): Serializer = {
if (0 <= id && id < quickSerializerByIdentity.length) {
quickSerializerByIdentity(id) match {
case null throw new NoSuchElementException(s"key not found: $id")
case ser ser
}
} else
serializerByIdentity(id)
}
private val isJavaSerializationWarningEnabled = settings.config.getBoolean("akka.actor.warn-about-java-serializer-usage")
private val isWarningOnNoVerificationEnabled = settings.config.getBoolean("akka.actor.warn-on-no-serialization-verification")

View file

@ -5,6 +5,7 @@ package akka.serialization
*/
import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream }
import java.nio.ByteBuffer
import java.util.concurrent.Callable
import akka.util.ClassLoaderObjectInputStream
import akka.actor.ExtendedActorSystem
@ -132,6 +133,56 @@ abstract class SerializerWithStringManifest extends Serializer {
}
/**
* Serializer between an object and a `ByteBuffer` representing that object.
*
* Implementations should typically extend [[SerializerWithStringManifest]] and
* in addition to the `ByteBuffer` based `toBinary` and `fromBinary` methods also
* implement the array based `toBinary` and `fromBinary` methods. The array based
* methods will be used when `ByteBuffer` is not used, e.g. in Akka Persistence.
*
* Note that the array based methods can for example be implemented by delegation
* like this:
* {{{
* // you need to know the maximum size in bytes of the serialized messages
* val pool = new akka.io.DirectByteBufferPool(defaultBufferSize = 1024 * 1024, maxPoolEntries = 10)
*
*
* // Implement this method for compatibility with `SerializerWithStringManifest`.
* override def toBinary(o: AnyRef): Array[Byte] = {
* val buf = pool.acquire()
* try {
* toBinary(o, buf)
* buf.flip()
* val bytes = Array.ofDim[Byte](buf.remaining)
* buf.get(bytes)
* bytes
* } finally {
* pool.release(buf)
* }
* }
*
* // Implement this method for compatibility with `SerializerWithStringManifest`.
* override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
* fromBinary(ByteBuffer.wrap(bytes), manifest)
*
* }}}
*/
trait ByteBufferSerializer {
/**
* Serializes the given object into the `ByteBuffer`.
*/
def toBinary(o: AnyRef, buf: ByteBuffer): Unit
/**
* Produces an object from a `ByteBuffer`, with an optional type-hint;
* the class should be loaded using ActorSystem.dynamicAccess.
*/
def fromBinary(buf: ByteBuffer, manifest: String): AnyRef
}
/**
* Base serializer trait with serialization identifiers configuration contract,
* when globally unique serialization identifier is configured in the `reference.conf`.
@ -252,7 +303,7 @@ class NullSerializer extends Serializer {
val nullAsBytes = Array[Byte]()
def includeManifest: Boolean = false
def identifier = 0
def toBinary(o: AnyRef) = nullAsBytes
def toBinary(o: AnyRef): Array[Byte] = nullAsBytes
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
}
@ -260,7 +311,7 @@ class NullSerializer extends Serializer {
* This is a special Serializer that Serializes and deserializes byte arrays only,
* (just returns the byte array unchanged/uncopied)
*/
class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer {
class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer with ByteBufferSerializer {
@deprecated("Use constructor with ExtendedActorSystem", "2.4")
def this() = this(null)
@ -271,10 +322,25 @@ class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerialize
else identifierFromConfig
def includeManifest: Boolean = false
def toBinary(o: AnyRef) = o match {
def toBinary(o: AnyRef): Array[Byte] = o match {
case null null
case o: Array[Byte] o
case other throw new IllegalArgumentException("ByteArraySerializer only serializes byte arrays, not [" + other + "]")
case other throw new IllegalArgumentException(
s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]")
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes
override def toBinary(o: AnyRef, buf: ByteBuffer): Unit =
o match {
case null
case bytes: Array[Byte] buf.put(bytes)
case other throw new IllegalArgumentException(
s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]")
}
override def fromBinary(buf: ByteBuffer, manifest: String): AnyRef = {
val bytes = Array.ofDim[Byte](buf.remaining())
buf.get(bytes)
bytes
}
}

View file

@ -178,11 +178,15 @@ object ByteString {
val copyLength = Math.min(buffer.remaining, offset + length)
if (copyLength > 0) {
buffer.put(bytes, offset, copyLength)
drop(copyLength)
}
copyLength
}
/** INTERNAL API: Specialized for internal use, appending ByteString1C to a ByteStringBuilder. */
private[akka] def appendToBuilder(buffer: ByteStringBuilder) = {
buffer.putByteArrayUnsafe(bytes)
}
}
/** INTERNAL API: ByteString backed by exactly one array, with start / end markers */

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
/**
* INTERNAL API: ByteString pretty printer, based on Johanes Rudolph's implementation from:
* https://github.com/jrudolph/akka/commit/c889dddf37c8635c365a79a391eb18a709f36773#diff-947cbf07996eeb823cb9850cc2e81126R19
*/
private[akka] object PrettyByteString {
private val indentDepth = 2
private val indent = " " * (indentDepth + 1)
implicit class asPretty(bs: ByteString) {
def prettyPrint(maxBytes: Int = 16 * 5): String = formatBytes(bs, maxBytes).mkString("\n")
}
def formatBytes(bs: ByteString, maxBytes: Int = 16 * 5): Iterator[String] = {
def asHex(b: Byte): String = b formatted "%02X"
def asASCII(b: Byte): Char =
if (b >= 0x20 && b < 0x7f) b.toChar
else '.'
def formatLine(bs: ByteString): String = {
val data = bs.toSeq
val hex = data.map(asHex).mkString(" ")
val ascii = data.map(asASCII).mkString
f"$indent%s $hex%-48s | $ascii"
}
def formatBytes(bs: ByteString): String =
bs.grouped(16).map(formatLine).mkString("\n")
val prefix = s"${indent}ByteString(${bs.size} bytes)"
if (bs.size <= maxBytes) Iterator(prefix + "\n", formatBytes(bs))
else
Iterator(
s"$prefix first + last $maxBytes:\n",
formatBytes(bs.take(maxBytes)),
s"\n$indent ... [${bs.size - maxBytes} bytes omitted] ...\n",
formatBytes(bs.takeRight(maxBytes)))
}
}

View file

@ -7,7 +7,8 @@ import java.util.Locale
import scala.concurrent.duration._
object PrettyDuration {
/** INTERNAL API */
private[akka] object PrettyDuration {
/**
* JAVA API

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
/**
* Emits integers from 1 to the given `elementCount`. The `java.lang.Integer`
* objects are allocated in the constructor of the stage, so it should be created
* before the benchmark is started.
*/
class BenchTestSource(elementCount: Int) extends GraphStage[SourceShape[java.lang.Integer]] {
private val elements = Array.ofDim[java.lang.Integer](elementCount)
(1 to elementCount).map(n => elements(n - 1) = n)
val out: Outlet[java.lang.Integer] = Outlet("BenchTestSource")
override val shape: SourceShape[java.lang.Integer] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
var n = 0
override def onPull(): Unit = {
n += 1
if (n > elementCount)
complete(out)
else
push(out, elements(n - 1))
}
setHandler(out, this)
}
}
class BenchTestSourceSameElement[T](elements: Int, elem: T) extends GraphStage[SourceShape[T]] {
val out: Outlet[T] = Outlet("BenchTestSourceSameElement")
override val shape: SourceShape[T] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
var n = 0
override def onPull(): Unit = {
n += 1
if (n > elements)
complete(out)
else
push(out, elem)
}
setHandler(out, this)
}
}

View file

@ -0,0 +1,229 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.io.File
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.channels.FileChannel
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import akka.remote.artery.compress._
import akka.stream.impl.ConstantFun
import org.openjdk.jmh.annotations.Scope
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor._
import akka.remote.AddressUidExtension
import akka.remote.RARP
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import akka.util.OptionVal
import akka.actor.Address
import scala.concurrent.Future
import akka.Done
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 4)
@Measurement(iterations = 5)
class CodecBenchmark {
val config = ConfigFactory.parseString(
"""
akka {
loglevel = WARNING
actor.provider = remote
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
}
"""
)
implicit val system = ActorSystem("CodecBenchmark", config)
val systemB = ActorSystem("systemB", system.settings.config)
private val envelopePool = new EnvelopeBufferPool(1024 * 1024, 128)
private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16)
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16)
val headerIn = HeaderBuilder.in(NoInboundCompressions)
val envelopeTemplateBuffer = ByteBuffer.allocate(1024 * 1024).order(ByteOrder.LITTLE_ENDIAN)
val uniqueLocalAddress = UniqueAddress(
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
AddressUidExtension(system).longAddressUid
)
val payload = Array.ofDim[Byte](1000)
private val inboundContext: InboundContext = new InboundContext {
override def localAddress: UniqueAddress = uniqueLocalAddress
override def association(uid: Long): OptionVal[OutboundContext] = OptionVal.None
// the following methods are not used by in this test
override def sendControl(to: Address, message: ControlMessage): Unit = ???
override def association(remoteAddress: Address): OutboundContext = ???
override def completeHandshake(peer: UniqueAddress): Future[Done] = ???
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private var materializer: ActorMaterializer = _
private var remoteRefB: RemoteActorRef = _
private var resolvedRef: InternalActorRef = _
private var senderStringA: String = _
private var recipientStringB: String = _
@Setup
def setup(): Unit = {
val settings = ActorMaterializerSettings(system)
materializer = ActorMaterializer(settings)
val actorOnSystemA = system.actorOf(Props.empty, "a")
senderStringA = actorOnSystemA.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val actorOnSystemB = systemB.actorOf(Props.empty, "b")
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
remoteRefB =
Await.result(system.actorSelection(rootB / "user" / "b").resolveOne(5.seconds), 5.seconds)
.asInstanceOf[RemoteActorRef]
resolvedRef = actorOnSystemA.asInstanceOf[InternalActorRef]
recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB)
val envelope = new EnvelopeBuffer(envelopeTemplateBuffer)
headerIn setVersion 1
headerIn setUid 42
headerIn setSerializer 4
headerIn setSenderActorRef actorOnSystemA
headerIn setRecipientActorRef remoteRefB
headerIn setManifest ""
envelope.writeHeader(headerIn)
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
}
@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
Await.result(systemB.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def reference(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.runWith(new LatchSink(N, latch))(materializer)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
@Benchmark
@OperationsPerInvocation(100000)
def encode(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))
.via(encoder)
.map(envelope => envelopePool.release(envelope))
.runWith(new LatchSink(N, latch))(materializer)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
@Benchmark
@OperationsPerInvocation(100000)
def decode(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val provider = RARP(system).provider
val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient
// juggling with the refs, since we don't run the real thing
val resolved = provider.resolveActorRefWithLocalAddress(localRecipient, uniqueLocalAddress.address)
resolved
}
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map { _ =>
val envelope = envelopePool.acquire()
envelopeTemplateBuffer.rewind()
envelope.byteBuffer.put(envelopeTemplateBuffer)
envelope.byteBuffer.flip()
envelope
}
.via(decoder)
.map {
case env: ReusableInboundEnvelope => inboundEnvelopePool.release(env)
case _ =>
}
.runWith(new LatchSink(N, latch))(materializer)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
@Benchmark
@OperationsPerInvocation(100000)
def encode_decode(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.ChangeOutboundCompression] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false))
val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val provider = RARP(system).provider
val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient
// juggling with the refs, since we don't run the real thing
val resolved = provider.resolveActorRefWithLocalAddress(localRecipient, uniqueLocalAddress.address)
resolved
}
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))
.via(encoder)
.via(decoder)
.map {
case env: ReusableInboundEnvelope => inboundEnvelopePool.release(env)
case _ =>
}
.runWith(new LatchSink(N, latch))(materializer)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
}

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.io.File
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations.{ OperationsPerInvocation, _ }
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class FlightRecorderBench {
@Param(Array("1", "5", "10"))
var writers: Int = 0
val Writes = 10000000
private var file: File = _
private var fileChannel: FileChannel = _
private var recorder: FlightRecorder = _
@Setup
def setup(): Unit = {
file = File.createTempFile("akka-flightrecorder", "dat")
file.deleteOnExit()
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ)
recorder = new FlightRecorder(fileChannel)
}
@TearDown
def shutdown(): Unit = {
fileChannel.force(false)
recorder.close()
fileChannel.close()
file.delete()
}
@Benchmark
@OperationsPerInvocation(10000000)
def flight_recorder_writes(): Unit = {
val latch = new CountDownLatch(writers)
(1 to writers).foreach { _ =>
val sink = recorder.createEventSink()
new Thread {
override def run(): Unit = {
var i = Writes
while (i > 0) {
sink.hiFreq(16, 16)
i -= 1
}
latch.countDown()
}
}.run()
}
latch.await()
}
}

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import akka.stream.Attributes
import akka.stream.Inlet
import akka.stream.SinkShape
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
class LatchSink(countDownAfter: Int, latch: CountDownLatch) extends GraphStage[SinkShape[Any]] {
val in: Inlet[Any] = Inlet("LatchSink")
override val shape: SinkShape[Any] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
var n = 0
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
n += 1
if (n == countDownAfter)
latch.countDown()
grab(in)
pull(in)
}
setHandler(in, this)
}
}
class BarrierSink(countDownAfter: Int, latch: CountDownLatch, barrierAfter: Int, barrier: CyclicBarrier)
extends GraphStage[SinkShape[Any]] {
val in: Inlet[Any] = Inlet("BarrierSink")
override val shape: SinkShape[Any] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
var n = 0
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
n += 1
grab(in)
if (n == countDownAfter)
latch.countDown()
else if (n % barrierAfter == 0)
barrier.await()
pull(in)
}
setHandler(in, this)
}
}

View file

@ -0,0 +1,107 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.charset.Charset
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
class LiteralEncodingBenchmark {
private val UsAscii = Charset.forName("US-ASCII")
private val str = "akka://SomeSystem@host12:1234/user/foo"
private val buffer = ByteBuffer.allocate(128).order(ByteOrder.LITTLE_ENDIAN)
private val literalChars = Array.ofDim[Char](64)
private val literalBytes = Array.ofDim[Byte](64)
private val unsafe = akka.util.Unsafe.instance
private val stringValueFieldOffset = unsafe.objectFieldOffset(classOf[String].getDeclaredField("value"))
@Benchmark
def getBytesNewArray(): String = {
val length = str.length()
// write
buffer.clear()
val bytes = str.getBytes(UsAscii)
buffer.put(bytes)
buffer.flip()
// read
val bytes2 = Array.ofDim[Byte](length)
buffer.get(bytes2)
new String(bytes2, UsAscii)
}
@Benchmark
def getBytesReuseArray(): String = {
val length = str.length()
// write
buffer.clear()
val bytes = str.getBytes(UsAscii)
buffer.put(bytes)
buffer.flip()
// read
buffer.get(literalBytes, 0, length)
new String(literalBytes, UsAscii)
}
@Benchmark
def getChars(): String = {
val length = str.length()
// write
buffer.clear()
str.getChars(0, length, literalChars, 0)
var i = 0
while (i < length) {
literalBytes(i) = literalChars(i).asInstanceOf[Byte]
i += 1
}
buffer.put(literalBytes, 0, length)
buffer.flip()
// read
buffer.get(literalBytes, 0, length)
i = 0
while (i < length) {
// UsAscii
literalChars(i) = literalBytes(i).asInstanceOf[Char]
i += 1
}
String.valueOf(literalChars, 0, length)
}
@Benchmark
def getCharsUnsafe(): String = {
val length = str.length()
// write
buffer.clear()
val chars = unsafe.getObject(str, stringValueFieldOffset).asInstanceOf[Array[Char]]
var i = 0
while (i < length) {
literalBytes(i) = chars(i).asInstanceOf[Byte]
i += 1
}
buffer.put(literalBytes, 0, length)
buffer.flip()
// read
buffer.get(literalBytes, 0, length)
i = 0
while (i < length) {
// UsAscii
literalChars(i) = literalBytes(i).asInstanceOf[Char]
i += 1
}
String.valueOf(literalChars, 0, length)
}
}

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import scala.concurrent.Lock
import scala.util.Success
import akka.stream.impl.fusing.GraphStages
import org.reactivestreams._
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import java.util.concurrent.Semaphore
import akka.stream.OverflowStrategy
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.CountDownLatch
import akka.stream.KillSwitches
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 4)
@Measurement(iterations = 10)
class SendQueueBenchmark {
val config = ConfigFactory.parseString(
"""
"""
)
implicit val system = ActorSystem("SendQueueBenchmark", config)
var materializer: ActorMaterializer = _
@Setup
def setup(): Unit = {
val settings = ActorMaterializerSettings(system)
materializer = ActorMaterializer(settings)
}
@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def queue(): Unit = {
val latch = new CountDownLatch(1)
val barrier = new CyclicBarrier(2)
val N = 100000
val burstSize = 1000
val source = Source.queue[Int](1024, OverflowStrategy.dropBuffer)
val (queue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)
var n = 1
while (n <= N) {
queue.offer(n)
if (n % burstSize == 0 && n < N) {
barrier.await()
}
n += 1
}
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
killSwitch.shutdown()
}
@Benchmark
@OperationsPerInvocation(100000)
def actorRef(): Unit = {
val latch = new CountDownLatch(1)
val barrier = new CyclicBarrier(2)
val N = 100000
val burstSize = 1000
val source = Source.actorRef(1024, OverflowStrategy.dropBuffer)
val (ref, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)
var n = 1
while (n <= N) {
ref ! n
if (n % burstSize == 0 && n < N) {
barrier.await()
}
n += 1
}
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
killSwitch.shutdown()
}
@Benchmark
@OperationsPerInvocation(100000)
def sendQueue(): Unit = {
val latch = new CountDownLatch(1)
val barrier = new CyclicBarrier(2)
val N = 100000
val burstSize = 1000
val queue = new ManyToOneConcurrentArrayQueue[Int](1024)
val source = Source.fromGraph(new SendQueue[Int])
val (sendQueue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)
sendQueue.inject(queue)
var n = 1
while (n <= N) {
if (!sendQueue.offer(n))
println(s"offer failed $n") // should not happen
if (n % burstSize == 0 && n < N) {
barrier.await()
}
n += 1
}
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
killSwitch.shutdown()
}
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery.compress
import java.util.Random
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
class CountMinSketchBenchmark {
// @Param(Array("4", "8", "12", "16"))
@Param(Array("16", "256", "4096", "65536"))
var w: Int = _
@Param(Array("16", "128", "1024"))
var d: Int = _
private val seed: Int = 20160726
val rand = new Random(seed)
val preallocateIds = Array.ofDim[Int](8192)
val preallocateValues = Array.ofDim[Long](8192)
var countMinSketch: CountMinSketch = _
@Setup
def init(): Unit = {
countMinSketch = new CountMinSketch(d, w, seed)
(0 to 8191).foreach { index =>
preallocateIds(index) = rand.nextInt()
preallocateValues(index) = Math.abs(rand.nextInt())
}
}
@Benchmark
@OperationsPerInvocation(8192)
def updateRandomNumbers(blackhole: Blackhole): Unit = {
var i: Int = 0;
while (i < 8192) {
blackhole.consume(countMinSketch.addObjectAndEstimateCount(preallocateIds(i), preallocateValues(i)))
i += 1
}
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery.compress
import java.util.concurrent.ThreadLocalRandom
import org.openjdk.jmh.annotations._
@Fork(1)
@State(Scope.Benchmark)
class InvertCompressionTableBenchmark {
/*
TODO: Possibly specialise the inversion, it's not in hot path so not doing it for now
a.r.artery.compress.CompressionTableBenchmark.invert_comp_to_decomp_1024 N/A thrpt 20 5828.963 ± 281.631 ops/s
a.r.artery.compress.CompressionTableBenchmark.invert_comp_to_decomp_256 N/A thrpt 20 29040.889 ± 345.425 ops/s
*/
def randomName = ThreadLocalRandom.current().nextInt(1000).toString
val compTable_256 = CompressionTable(17L, 2, Map(Vector.fill[String](256)(randomName).zipWithIndex: _*))
val compTable_1024 = CompressionTable(17L, 3, Map(Vector.fill[String](1024)(randomName).zipWithIndex: _*))
@Benchmark def invert_comp_to_decomp_256 = compTable_256.invert
@Benchmark def invert_comp_to_decomp_1024 = compTable_1024.invert
}

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.compress
import java.util.Random
import akka.remote.artery.compress.TopHeavyHitters
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
/**
* On Macbook pro:
* [info] Benchmark (n) Mode Cnt Score Error Units
* [info] HeavyHittersBenchmark.updateHitter 8192 thrpt 40 357 405.512 ± 3329.008 ops/s
* [info] HeavyHittersBenchmark.updateNotHitter 8192 thrpt 40 259 032 711.743 ± 7199514.142 ops/s
* [info] HeavyHittersBenchmark.updateRandomHitter 8192 thrpt 40 2 105 102.088 ± 18214.624 ops/s
*
* ===
* on our benchmarking box:
* ubuntu@ip-172-31-43-199:~/akka-ktoso$ lscpu
* Architecture: x86_64
* CPU op-mode(s): 32-bit, 64-bit
* Byte Order: Little Endian
* CPU(s): 2
* Thread(s) per core: 2
* CPU MHz: 2494.068
* Hypervisor vendor: Xen
* Virtualization type: full
* L1d cache: 32K
* L1i cache: 32K
* L2 cache: 256K
* L3 cache: 25600K
*
* ubuntu@ip-172-31-43-199:~/akka-ktoso$ cpuid | grep nm
* (simple synth) = Intel Core i9-4000 / Xeon E5-1600/E5-2600 v2 (Ivy Bridge-EP C1/M1/S1), 22nm
*
* [info] Benchmark (n) Mode Cnt Score Error Units
* [info] HeavyHittersBenchmark.updateHitter 8192 thrpt 40 309 512.584 ± 153.248 ops/s
* [info] HeavyHittersBenchmark.updateNotHitter 8192 thrpt 40 248 170 545.577 ± 1244986.765 ops/s
* [info] HeavyHittersBenchmark.updateRandomHitter 8192 thrpt 40 1 207 521.674 ± 912.676 ops/s
*/
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
class HeavyHittersBenchmark {
// @Param(Array("512", "8192"))
@Param(Array("8192"))
var n: Int = 0
private var topN: TopHeavyHitters[String] = _
val rand = new Random(1001021)
val preallocatedNums: Array[Long] = Array.ofDim(8192)
val preallocatedStrings: Array[String] = Array.ofDim(8192)
@Setup
def init(): Unit = {
topN = new TopHeavyHitters(n)
var i = 0
while (i < n) {
topN.update(i.toString, i)
preallocatedNums(i) = rand.nextLong()
preallocatedStrings(i) = i.toString
i += 1
}
}
@Benchmark
@OperationsPerInvocation(8192)
def updateNotHitter(blackhole: Blackhole): Unit = {
var i = 0
while (i < 8192) {
blackhole.consume(topN.update("NOT", 1)) // definitely not a heavy hitter
i += 1
}
}
@Benchmark
@OperationsPerInvocation(8192)
def updateExistingHitter(blackhole: Blackhole): Unit = {
var i: Int = 0
while (i < 8192) {
blackhole.consume(topN.update(preallocatedStrings(i % 16), Long.MaxValue)) // definitely a heavy hitter
i += 1
}
}
@Benchmark
def updateNewHitter(blackhole: Blackhole): Unit = {
var i = 0
while (i < 8192) {
blackhole.consume(topN.update(preallocatedStrings(i), Long.MaxValue))
i += 1
}
}
@Benchmark
@OperationsPerInvocation(8192)
def updateRandomHitter(blackhole: Blackhole): Unit = {
var i = 0
while (i < 8192) {
blackhole.consume(topN.update(preallocatedStrings(i), preallocatedNums(i))) // maybe a heavy hitter
i += 1
}
}
}

View file

@ -18,7 +18,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class FlowMapBenchmark {

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
import java.util
import java.util.concurrent.TimeUnit
import akka.remote.artery.LruBoundedCache
import org.openjdk.jmh.annotations.{ Param, _ }
import scala.util.Random
@State(Scope.Benchmark)
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
class LruBoundedCacheBench {
var javaHashMap: java.util.HashMap[String, String] = _
@Param(Array("1024", "8192"))
var count = 0
@Param(Array("128", "256"))
var stringSize = 0
private var lruCache: LruBoundedCache[String, String] = _
@Param(Array("90", "99"))
var loadFactor: Int = _
var toAdd: String = _
var toRemove: String = _
var toGet: String = _
@Setup
def setup(): Unit = {
val loadF: Double = loadFactor / 100.0
val threshold = (loadF * count).toInt
val random = Random
javaHashMap = new util.HashMap[String, String](count)
lruCache = new LruBoundedCache[String, String](count, threshold) {
override protected def compute(k: String): String = k
override protected def hash(k: String): Int = k.hashCode
override protected def isCacheable(v: String): Boolean = true
}
// Loading
for (i <- 1 to threshold) {
val value = random.nextString(stringSize)
if (i == 1) toGet = value
toRemove = value
javaHashMap.put(value, value)
lruCache.get(value)
}
toAdd = random.nextString(stringSize)
}
@Benchmark
def addOne_lruCache(): String = {
lruCache.getOrCompute(toAdd)
}
@Benchmark
def addOne_hashMap(): String = {
javaHashMap.put(toAdd, toAdd)
javaHashMap.get(toAdd)
}
@Benchmark
def addOne_hashMap_remove_put_get(): String = {
javaHashMap.remove(toRemove)
javaHashMap.put(toAdd, toAdd)
javaHashMap.get(toAdd)
}
}

View file

@ -6,11 +6,13 @@ package akka.cluster.metrics
import scala.language.postfixOps
import java.util.logging.LogManager
import org.slf4j.bridge.SLF4JBridgeHandler
import akka.testkit.AkkaSpec
import akka.actor.ExtendedActorSystem
import akka.actor.Address
import java.io.Closeable
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Actor
@ -22,6 +24,7 @@ import akka.actor.ActorLogging
import org.scalatest.mock.MockitoSugar
import akka.actor.ActorSystem
import akka.dispatch.Dispatchers
import akka.remote.RARP
/**
* Redirect different logging sources to SLF4J.
@ -132,7 +135,7 @@ trait MetricsCollectorFactory { this: AkkaSpec ⇒
*/
class MockitoSigarMetricsCollector(system: ActorSystem)
extends SigarMetricsCollector(
Address("akka.tcp", system.name),
Address(if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp", system.name),
MetricsConfig.defaultDecayFactor,
MockitoSigarProvider().createSigarInstance) {
}
@ -153,7 +156,7 @@ object MetricsConfig {
gossip-interval = 1s
}
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.actor.provider = remote
"""
/** Test w/o cluster, with collection disabled. */
@ -163,7 +166,7 @@ object MetricsConfig {
enabled = off
}
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.actor.provider = remote
"""
/** Test in cluster, with manual collection activation, collector mock, fast. */
@ -178,7 +181,7 @@ object MetricsConfig {
fallback = false
}
}
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
"""
}

View file

@ -15,7 +15,7 @@ import akka.cluster.TestMember
import akka.cluster.metrics.MetricsGossipEnvelope
class MessageSerializerSpec extends AkkaSpec(
"akka.actor.provider = akka.cluster.ClusterActorRefProvider") {
"akka.actor.provider = cluster") {
val serializer = new MessageSerializer(system.asInstanceOf[ExtendedActorSystem])

View file

@ -31,8 +31,6 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
import Shard.{ GetShardStats, ShardStats }
import Shard.{ State EntityState, EntityStarted, EntityStopped }
private lazy val serialization = SerializationExtension(system)
private final val BufferSize = 1024 * 4
private val CoordinatorStateManifest = "AA"

View file

@ -85,7 +85,7 @@ abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String)
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {

View file

@ -55,7 +55,7 @@ abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiN
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.roles = ["backend"]

View file

@ -46,7 +46,7 @@ object ClusterShardingGetStateSpecConfig extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.metrics.enabled = off
akka.cluster.auto-down-unreachable-after = 0s

View file

@ -47,7 +47,7 @@ object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.metrics.enabled = off

View file

@ -72,7 +72,7 @@ abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) exten
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {

View file

@ -63,7 +63,7 @@ abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiN
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"

View file

@ -129,7 +129,7 @@ abstract class ClusterShardingSpecConfig(
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.roles = ["backend"]

View file

@ -27,7 +27,7 @@ import org.apache.commons.io.FileUtils
object RemoveInternalClusterShardingDataSpec {
val config = """
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb {

View file

@ -16,6 +16,7 @@ import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.cluster.pubsub._
import akka.remote.RARP
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.util.Timeout
@ -30,7 +31,7 @@ object ClusterClientSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.client.heartbeat-interval = 1s
@ -430,10 +431,13 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
runOn(remainingServerRoleNames.toSeq: _*) {
Await.ready(system.whenTerminated, 20.seconds)
// start new system on same port
val port = Cluster(system).selfAddress.port.get
val sys2 = ActorSystem(
system.name,
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + Cluster(system).selfAddress.port.get)
.withFallback(system.settings.config))
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
else s"akka.remote.netty.tcp.port=$port"
).withFallback(system.settings.config))
Cluster(sys2).join(Cluster(sys2).selfAddress)
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(sys2).registerService(service2)

View file

@ -19,7 +19,7 @@ object ClusterClientStopSpec extends MultiNodeConfig {
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.client {
heartbeat-interval = 1s

View file

@ -28,7 +28,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.pub-sub.max-delta-elements = 500

View file

@ -21,10 +21,12 @@ import akka.actor.ActorLogging
import akka.cluster.pubsub.DistributedPubSubMediator.Internal.Status
import akka.cluster.pubsub.DistributedPubSubMediator.Internal.Delta
import akka.actor.ActorSystem
import scala.concurrent.Await
import akka.actor.Identify
import akka.actor.RootActorPath
import akka.actor.ActorIdentity
import akka.remote.RARP
object DistributedPubSubRestartSpec extends MultiNodeConfig {
val first = role("first")
@ -136,10 +138,16 @@ class DistributedPubSubRestartSpec extends MultiNodeSpec(DistributedPubSubRestar
runOn(third) {
Await.result(system.whenTerminated, 10.seconds)
val newSystem = ActorSystem(
system.name,
ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${Cluster(system).selfAddress.port.get}").withFallback(
system.settings.config))
val newSystem = {
val port = Cluster(system).selfAddress.port.get
val config = ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
else s"akka.remote.netty.tcp.port=$port"
).withFallback(system.settings.config)
ActorSystem(system.name, config)
}
try {
// don't join the old cluster
Cluster(newSystem).join(Cluster(newSystem).selfAddress)

View file

@ -38,7 +38,7 @@ object ClusterSingletonManagerChaosSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))

View file

@ -35,7 +35,7 @@ object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = off
"""))

View file

@ -41,7 +41,7 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))

View file

@ -35,7 +35,7 @@ object ClusterSingletonManagerStartupSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))

View file

@ -21,7 +21,7 @@ public class ClusterClientTest extends JUnitSuite {
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest",
ConfigFactory.parseString(
"akka.actor.provider = \"akka.cluster.ClusterActorRefProvider\"\n" +
"akka.actor.provider = \"cluster\"\n" +
"akka.remote.netty.tcp.port=0"));
private final ActorSystem system = actorSystemResource.getSystem();

View file

@ -25,7 +25,7 @@ public class DistributedPubSubMediatorTest extends JUnitSuite {
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest",
ConfigFactory.parseString(
"akka.actor.provider = \"akka.cluster.ClusterActorRefProvider\"\n" +
"akka.actor.provider = \"cluster\"\n" +
"akka.remote.netty.tcp.port=0"));
private final ActorSystem system = actorSystemResource.getSystem();

View file

@ -15,7 +15,7 @@ case class UnwrappedMessage(msg: String)
object DistributedPubSubMediatorRouterSpec {
def config(routingLogic: String) = s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
akka.remote.log-remote-lifecycle-events = off
akka.cluster.pub-sub.routing-logic = $routingLogic

View file

@ -58,27 +58,27 @@ object ClusterSingletonProxySpec {
}
}
val cfg = """akka {
loglevel = INFO
cluster {
auto-down-unreachable-after = 10s
min-nr-of-members = 2
}
actor.provider = "akka.cluster.ClusterActorRefProvider"
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
"""
val cfg = """
akka {
loglevel = INFO
cluster {
auto-down-unreachable-after = 10s
min-nr-of-members = 2
}
actor.provider = "cluster"
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
artery.canonical {
hostname = "127.0.0.1"
port = 0
}
}
}
"""
class Singleton extends Actor with ActorLogging {

View file

@ -4,11 +4,11 @@
package akka.cluster.singleton
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.actor.PoisonPill
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.remote.RARP
import akka.testkit.AkkaSpec
import akka.testkit.TestActors
import akka.testkit.TestProbe
@ -22,6 +22,10 @@ class ClusterSingletonRestartSpec extends AkkaSpec("""
hostname = "127.0.0.1"
port = 0
}
artery.canonical {
hostname = "127.0.0.1"
port = 0
}
}
""") {
@ -64,10 +68,17 @@ class ClusterSingletonRestartSpec extends AkkaSpec("""
shutdown(sys1)
// it will be downed by the join attempts of the new incarnation
sys3 = ActorSystem(
system.name,
ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${Cluster(sys1).selfAddress.port.get}").withFallback(
system.settings.config))
sys3 = {
val sys1port = Cluster(sys1).selfAddress.port.get
val sys3Config =
ConfigFactory.parseString(
if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys1port"
else s"akka.remote.netty.tcp.port=$sys1port"
).withFallback(system.settings.config)
ActorSystem(system.name, sys3Config)
}
join(sys3, sys2)
within(5.seconds) {

View file

@ -16041,6 +16041,24 @@ public final class ClusterMessages {
* <code>required uint32 uid = 2;</code>
*/
int getUid();
// optional uint32 uid2 = 3;
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
boolean hasUid2();
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
int getUid2();
}
/**
* Protobuf type {@code UniqueAddress}
@ -16116,6 +16134,11 @@ public final class ClusterMessages {
uid_ = input.readUInt32();
break;
}
case 24: {
bitField0_ |= 0x00000004;
uid2_ = input.readUInt32();
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -16194,9 +16217,34 @@ public final class ClusterMessages {
return uid_;
}
// optional uint32 uid2 = 3;
public static final int UID2_FIELD_NUMBER = 3;
private int uid2_;
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public boolean hasUid2() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public int getUid2() {
return uid2_;
}
private void initFields() {
address_ = akka.cluster.protobuf.msg.ClusterMessages.Address.getDefaultInstance();
uid_ = 0;
uid2_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -16228,6 +16276,9 @@ public final class ClusterMessages {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt32(2, uid_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeUInt32(3, uid2_);
}
getUnknownFields().writeTo(output);
}
@ -16245,6 +16296,10 @@ public final class ClusterMessages {
size += akka.protobuf.CodedOutputStream
.computeUInt32Size(2, uid_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += akka.protobuf.CodedOutputStream
.computeUInt32Size(3, uid2_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -16375,6 +16430,8 @@ public final class ClusterMessages {
bitField0_ = (bitField0_ & ~0x00000001);
uid_ = 0;
bitField0_ = (bitField0_ & ~0x00000002);
uid2_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -16415,6 +16472,10 @@ public final class ClusterMessages {
to_bitField0_ |= 0x00000002;
}
result.uid_ = uid_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.uid2_ = uid2_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -16437,6 +16498,9 @@ public final class ClusterMessages {
if (other.hasUid()) {
setUid(other.getUid());
}
if (other.hasUid2()) {
setUid2(other.getUid2());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -16626,6 +16690,55 @@ public final class ClusterMessages {
return this;
}
// optional uint32 uid2 = 3;
private int uid2_ ;
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public boolean hasUid2() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public int getUid2() {
return uid2_;
}
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public Builder setUid2(int value) {
bitField0_ |= 0x00000004;
uid2_ = value;
onChanged();
return this;
}
/**
* <code>optional uint32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public Builder clearUid2() {
bitField0_ = (bitField0_ & ~0x00000004);
uid2_ = 0;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:UniqueAddress)
}
@ -16789,14 +16902,14 @@ public final class ClusterMessages {
"\016\n\nSerialized\020\000\022\n\n\006Double\020\001\022\t\n\005Float\020\002\022\013" +
"\n\007Integer\020\003\022\010\n\004Long\020\004\"\007\n\005Empty\"K\n\007Addres" +
"s\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004p" +
"ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" +
"ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"E\n\rUniqueAdd" +
"ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" +
" \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" +
"\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014Me" +
"mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" +
"ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" +
"\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluster.protobuf.",
"msgH\001"
" \002(\r\022\014\n\004uid2\030\003 \001(\r*D\n\022ReachabilityStatus" +
"\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTerm" +
"inated\020\002*b\n\014MemberStatus\022\013\n\007Joining\020\000\022\006\n" +
"\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020" +
"\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.clu",
"ster.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -16922,7 +17035,7 @@ public final class ClusterMessages {
internal_static_UniqueAddress_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_UniqueAddress_descriptor,
new java.lang.String[] { "Address", "Uid", });
new java.lang.String[] { "Address", "Uid", "Uid2", });
return null;
}
};

View file

@ -252,4 +252,6 @@ message Address {
message UniqueAddress {
required Address address = 1;
required uint32 uid = 2;
// 64 bit uids but with backward wire compatibility
optional uint32 uid2 = 3;
}

View file

@ -54,6 +54,11 @@ akka {
# `akka.cluster.DowningProvider` having a public one argument constructor accepting an `ActorSystem`
downing-provider-class = ""
# Artery only setting
# When a node has been gracefully removed, let this time pass (to allow for example
# cluster singleton handover to complete) and then quarantine the removed node.
quarantine-removed-node-after=30s
# By default, the leader will not move 'Joining' members to 'Up' during a network
# split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp'
# so they become part of the cluster even during a network split. The leader will

View file

@ -67,7 +67,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
*/
val selfUniqueAddress: UniqueAddress = system.provider match {
case c: ClusterActorRefProvider
UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).addressUid)
UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).longAddressUid)
case other throw new ConfigurationException(
s"ActorSystem [${system}] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]")
}

View file

@ -250,7 +250,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
val MaxGossipsBeforeShuttingDownMyself = 5
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
// note that self is not initially member,

View file

@ -8,7 +8,7 @@ import scala.collection.immutable
import akka.actor.{ ActorLogging, ActorSelection, Address, Actor, RootActorPath }
import akka.cluster.ClusterEvent._
import akka.remote.FailureDetectorRegistry
import akka.remote.PriorityMessage
import akka.remote.HeartbeatMessage
import akka.actor.DeadLetterSuppression
/**
@ -36,12 +36,12 @@ private[cluster] object ClusterHeartbeatSender {
/**
* Sent at regular intervals for failure detection.
*/
final case class Heartbeat(from: Address) extends ClusterMessage with PriorityMessage with DeadLetterSuppression
final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
/**
* Sent as reply to [[Heartbeat]] messages.
*/
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with PriorityMessage with DeadLetterSuppression
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
// sent to self only
case object HeartbeatTick

View file

@ -12,6 +12,7 @@ import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberWeaklyUp
import akka.remote.FailureDetectorRegistry
import akka.remote.RemoteWatcher
import akka.remote.RARP
/**
* INTERNAL API
@ -51,9 +52,12 @@ private[cluster] class ClusterRemoteWatcher(
unreachableReaperInterval,
heartbeatExpectedResponseAfter) {
private val arteryEnabled = RARP(context.system).provider.remoteSettings.Artery.Enabled
val cluster = Cluster(context.system)
import cluster.selfAddress
private final case class DelayedQuarantine(m: Member, previousStatus: MemberStatus) extends NoSerializationVerificationNeeded
var clusterNodes: Set[Address] = Set.empty
override def preStart(): Unit = {
@ -73,10 +77,11 @@ private[cluster] class ClusterRemoteWatcher(
clusterNodes = state.members.collect { case m if m.address != selfAddress m.address }
clusterNodes foreach takeOverResponsibility
unreachable = unreachable diff clusterNodes
case MemberUp(m) memberUp(m)
case MemberWeaklyUp(m) memberUp(m)
case MemberRemoved(m, previousStatus) memberRemoved(m, previousStatus)
case _: MemberEvent // not interesting
case MemberUp(m) memberUp(m)
case MemberWeaklyUp(m) memberUp(m)
case MemberRemoved(m, previousStatus) memberRemoved(m, previousStatus)
case _: MemberEvent // not interesting
case DelayedQuarantine(m, previousStatus) delayedQuarantine(m, previousStatus)
}
def memberUp(m: Member): Unit =
@ -89,12 +94,22 @@ private[cluster] class ClusterRemoteWatcher(
def memberRemoved(m: Member, previousStatus: MemberStatus): Unit =
if (m.address != selfAddress) {
clusterNodes -= m.address
if (previousStatus == MemberStatus.Down) {
quarantine(m.address, Some(m.uniqueAddress.uid))
quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]")
} else if (arteryEnabled) {
// don't quarantine gracefully removed members (leaving) directly,
// give Cluster Singleton some time to exchange TakeOver/HandOver messages.
import context.dispatcher
context.system.scheduler.scheduleOnce(cluster.settings.QuarantineRemovedNodeAfter, self, DelayedQuarantine(m, previousStatus))
}
publishAddressTerminated(m.address)
}
def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit =
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
override def watchNode(watchee: InternalActorRef) =
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)

View file

@ -88,6 +88,9 @@ final class ClusterSettings(val config: Config, val systemName: String) {
else classOf[NoDowning].getName
}
val QuarantineRemovedNodeAfter: FiniteDuration =
cc.getMillisDuration("quarantine-removed-node-after") requiring (_ > Duration.Zero, "quarantine-removed-node-after must be > 0")
val AllowWeaklyUpMembers = cc.getBoolean("allow-weakly-up-members")
val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet

View file

@ -7,6 +7,8 @@ package akka.cluster
import akka.actor.Address
import MemberStatus._
import scala.runtime.AbstractFunction2
/**
* Represents the address, current status, and roles of a cluster member node.
*
@ -243,18 +245,43 @@ object MemberStatus {
Removed Set.empty[MemberStatus])
}
object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] {
// for binary compatibility
@deprecated("Use Long UID apply instead", since = "2.4.11")
def apply(address: Address, uid: Int) = new UniqueAddress(address, uid.toLong)
}
/**
* Member identifier consisting of address and random `uid`.
* The `uid` is needed to be able to distinguish different
* incarnations of a member with same hostname and port.
*/
@SerialVersionUID(1L)
final case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] {
override def hashCode = uid
final case class UniqueAddress(address: Address, longUid: Long) extends Ordered[UniqueAddress] {
override def hashCode = java.lang.Long.hashCode(longUid)
def compare(that: UniqueAddress): Int = {
val result = Member.addressOrdering.compare(this.address, that.address)
if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1
if (result == 0) if (this.longUid < that.longUid) -1 else if (this.longUid == that.longUid) 0 else 1
else result
}
}
// for binary compatibility
@deprecated("Use Long UID constructor instead", since = "2.4.11")
def this(address: Address, uid: Int) = this(address, uid.toLong)
@deprecated("Use longUid instead", since = "2.4.11")
def uid = longUid.toInt
/**
* For binary compatibility
* Stops `copy(Address, Long)` copy from being generated, use `apply` instead.
*/
@deprecated("Use Long UID constructor instead", since = "2.4.11")
def copy(address: Address = address, uid: Int = uid) = new UniqueAddress(address, uid)
}

View file

@ -126,8 +126,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder =
cm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid)
private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder = {
cm.UniqueAddress.newBuilder()
.setAddress(addressToProto(uniqueAddress.address))
.setUid(uniqueAddress.longUid.toInt)
.setUid2((uniqueAddress.longUid >> 32).toInt)
}
private def uniqueAddressToProtoByteArray(uniqueAddress: UniqueAddress): Array[Byte] =
uniqueAddressToProto(uniqueAddress).build.toByteArray
@ -161,8 +165,19 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
private def addressFromProto(address: cm.Address): Address =
Address(getProtocol(address), getSystem(address), address.getHostname, address.getPort)
private def uniqueAddressFromProto(uniqueAddress: cm.UniqueAddress): UniqueAddress =
UniqueAddress(addressFromProto(uniqueAddress.getAddress), uniqueAddress.getUid)
private def uniqueAddressFromProto(uniqueAddress: cm.UniqueAddress): UniqueAddress = {
UniqueAddress(
addressFromProto(uniqueAddress.getAddress),
if (uniqueAddress.hasUid2) {
// new remote node join the two parts of the long uid back
(uniqueAddress.getUid2.toLong << 32) | (uniqueAddress.getUid & 0xFFFFFFFFL)
} else {
// old remote node
uniqueAddress.getUid.toLong
}
)
}
private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int](
MemberStatus.Joining cm.MemberStatus.Joining_VALUE,

View file

@ -59,6 +59,7 @@ abstract class ClusterDeathWatchSpec
}
"An actor watching a remote actor in the cluster" must {
"receive Terminated when watched node becomes Down/Removed" in within(20 seconds) {
awaitClusterUp(first, second, third, fourth)
enterBarrier("cluster-up")

View file

@ -5,20 +5,24 @@ package akka.cluster
// TODO remove metrics
import java.util.UUID
import language.implicitConversions
import org.scalatest.{ Suite, Outcome, Canceled }
import org.scalatest.{ Canceled, Outcome, Suite }
import org.scalatest.exceptions.TestCanceledException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.{ ActorSystem, Address }
import akka.event.Logging.ErrorLevel
import scala.concurrent.duration._
import scala.collection.immutable
import java.util.concurrent.ConcurrentHashMap
import akka.remote.DefaultFailureDetectorRegistry
import akka.actor.ActorRef
import akka.actor.Actor
@ -33,8 +37,8 @@ object MultiNodeClusterSpec {
def clusterConfig(failureDetectorPuppet: Boolean): Config =
if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
def clusterConfig: Config = ConfigFactory.parseString("""
akka.actor.provider = akka.cluster.ClusterActorRefProvider
def clusterConfig: Config = ConfigFactory.parseString(s"""
akka.actor.provider = cluster
akka.cluster {
jmx.enabled = off
gossip-interval = 200 ms
@ -47,11 +51,18 @@ object MultiNodeClusterSpec {
akka.loglevel = INFO
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote.log-remote-lifecycle-events = off
akka.remote {
log-remote-lifecycle-events = off
artery.advanced.flight-recorder {
enabled=on
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
}
}
akka.loggers = ["akka.testkit.TestEventListener"]
akka.test {
single-expect-default = 5 s
}
""")
// sometimes we need to coordinate test shutdown with messages instead of barriers
@ -77,19 +88,25 @@ object MultiNodeClusterSpec {
}
}
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport { self: MultiNodeSpec
override def initialParticipants = roles.size
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
override def atStartup(): Unit = {
override protected def atStartup(): Unit = {
startCoroner()
muteLog()
self.atStartup()
}
override def afterTermination(): Unit = {
override protected def afterTermination(): Unit = {
self.afterTermination()
stopCoroner()
if (failed || sys.props.get("akka.remote.artery.always-dump-flight-recorder").isDefined) {
printFlightRecording()
}
deleteFlightRecorderFile()
}
override def expectedTestDuration = 60.seconds

View file

@ -89,6 +89,8 @@ abstract class NodeChurnSpec
}
"Cluster with short lived members" must {
"TODO work with artery" in (pending)
/*
"setup stable nodes" taggedAs LongRunningTest in within(15.seconds) {
val logListener = system.actorOf(Props(classOf[LogListener], testActor), "logListener")
system.eventStream.subscribe(logListener, classOf[Info])
@ -125,6 +127,8 @@ abstract class NodeChurnSpec
}
expectNoMsg(5.seconds)
}
*/
}
}

View file

@ -3,22 +3,17 @@
*/
package akka.cluster
import scala.collection.immutable
import scala.language.postfixOps
import scala.concurrent.duration._
import akka.actor.Address
import akka.cluster.MemberStatus._
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import java.util.concurrent.ThreadLocalRandom
import akka.actor.{ ActorSystem, Address }
import akka.remote.RARP
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.event.Logging.Info
import akka.actor.Actor
import akka.actor.Props
import java.util.concurrent.ThreadLocalRandom
import scala.collection.immutable
import scala.concurrent.duration._
import scala.language.postfixOps
// This test was a reproducer for issue #20639
object QuickRestartMultiJvmSpec extends MultiNodeConfig {
@ -72,10 +67,19 @@ abstract class QuickRestartSpec
else
ActorSystem(
system.name,
ConfigFactory.parseString(s"""
akka.cluster.roles = [round-$n]
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}""") // same port
.withFallback(system.settings.config))
// use the same port
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
s"""
akka.cluster.roles = [round-$n]
akka.remote.artery.canonical.port = ${Cluster(restartingSystem).selfAddress.port.get}
"""
else
s"""
akka.cluster.roles = [round-$n]
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}
"""
).withFallback(system.settings.config))
log.info("Restarting node has address: {}", Cluster(restartingSystem).selfUniqueAddress)
Cluster(restartingSystem).joinSeedNodes(seedNodes)
within(20.seconds) {

View file

@ -10,6 +10,7 @@ import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Address
import akka.actor.ActorSystem
@ -18,6 +19,7 @@ import akka.actor.Actor
import akka.actor.RootActorPath
import akka.cluster.MemberStatus._
import akka.actor.Deploy
import akka.remote.RARP
object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig {
val seed1 = role("seed1")
@ -52,8 +54,12 @@ abstract class RestartFirstSeedNodeSpec
lazy val restartedSeed1System = ActorSystem(
system.name,
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + seedNodes.head.port.get).
withFallback(system.settings.config))
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
"akka.remote.artery.canonical.port=" + seedNodes.head.port.get
else
"akka.remote.netty.tcp.port=" + seedNodes.head.port.get
).withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(seed1) {

View file

@ -5,7 +5,6 @@ package akka.cluster
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Address
@ -13,6 +12,7 @@ import akka.actor.Deploy
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.MemberStatus._
import akka.remote.RARP
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
@ -50,8 +50,12 @@ abstract class RestartNode3Spec
lazy val restartedSecondSystem = ActorSystem(
system.name,
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get).
withFallback(system.settings.config))
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
"akka.remote.artery.canonical.port=" + secondUniqueAddress.address.port.get
else
"akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get
).withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(second) {
@ -133,7 +137,7 @@ abstract class RestartNode3Spec
awaitAssert {
Cluster(system).readView.members.size should ===(3)
Cluster(system).readView.members.exists { m
m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid
m.address == secondUniqueAddress.address && m.uniqueAddress.longUid != secondUniqueAddress.longUid
}
}
}

View file

@ -141,7 +141,7 @@ abstract class RestartNodeSpec
awaitAssert {
Cluster(system).readView.members.size should ===(3)
Cluster(system).readView.members.exists { m
m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid
m.address == secondUniqueAddress.address && m.uniqueAddress.longUid != secondUniqueAddress.longUid
}
}
}

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import java.io.File
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.remote.RemoteSettings
import akka.remote.artery.ArterySettings
import akka.remote.artery.TaskRunner
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.ConfigFactory
import io.aeron.driver.MediaDriver
import io.aeron.driver.ThreadingMode
import org.agrona.IoUtil
object SharedMediaDriverSupport {
private val mediaDriver = new AtomicReference[Option[MediaDriver]](None)
def loadArterySettings(config: MultiNodeConfig): ArterySettings =
(new RemoteSettings(ConfigFactory.load(config.config))).Artery
def startMediaDriver(config: MultiNodeConfig): Unit = {
val arterySettings = loadArterySettings(config)
if (arterySettings.Enabled) {
val aeronDir = arterySettings.Advanced.AeronDirectoryName
require(aeronDir.nonEmpty, "aeron-dir must be defined")
val driverContext = new MediaDriver.Context
driverContext.aeronDirectoryName(aeronDir)
driverContext.clientLivenessTimeoutNs(arterySettings.Advanced.ClientLivenessTimeout.toNanos)
driverContext.imageLivenessTimeoutNs(arterySettings.Advanced.ImageLivenessTimeout.toNanos)
driverContext.driverTimeoutMs(arterySettings.Advanced.DriverTimeout.toMillis)
val idleCpuLevel = arterySettings.Advanced.IdleCpuLevel
driverContext
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
// Check if the media driver is already started by another multi-node jvm.
// It checks more than one time with a sleep inbetween. The number of checks
// depends on the multi-node index (i).
@tailrec def isDriverInactive(i: Int): Boolean = {
if (i < 0) true
else {
val active = driverContext.isDriverActive(5000, new Consumer[String] {
override def accept(msg: String): Unit = {
println(msg)
}
})
if (active) false
else {
Thread.sleep(500)
isDriverInactive(i - 1)
}
}
}
try {
if (isDriverInactive(MultiNodeSpec.selfIndex)) {
val driver = MediaDriver.launchEmbedded(driverContext)
println(s"Started media driver in directory [${driver.aeronDirectoryName}]")
if (!mediaDriver.compareAndSet(None, Some(driver))) {
throw new IllegalStateException("media driver started more than once")
}
}
} catch {
case NonFatal(e)
println(s"Failed to start media driver in [${aeronDir}]: ${e.getMessage}")
}
}
}
def isMediaDriverRunningByThisNode: Boolean = mediaDriver.get.isDefined
def stopMediaDriver(config: MultiNodeConfig): Unit = {
val maybeDriver = mediaDriver.getAndSet(None)
maybeDriver.foreach { driver
val arterySettings = loadArterySettings(config)
// let other nodes shutdown first
Thread.sleep(5000)
driver.close()
try {
if (arterySettings.Advanced.DeleteAeronDirectory) {
IoUtil.delete(new File(driver.aeronDirectoryName), false)
}
} catch {
case NonFatal(e)
println(
s"Couldn't delete Aeron embedded media driver files in [${driver.aeronDirectoryName}] " +
s"due to [${e.getMessage}]")
}
}
}
}

View file

@ -45,6 +45,7 @@ import akka.actor.ActorIdentity
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
import java.lang.management.ManagementFactory
import akka.remote.RARP
/**
* This test is intended to be used as long running stress test
@ -124,9 +125,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
akka.actor.serialize-messages = off
akka.actor.serialize-creators = off
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.actor.provider = cluster
akka.cluster {
failure-detector.acceptable-heartbeat-pause = 5s
failure-detector.acceptable-heartbeat-pause = 10s
auto-down-unreachable-after = 1s
publish-stats-interval = 1s
}
@ -134,6 +135,12 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.remote.artery.advanced {
idle-cpu-level = 1
embedded-media-driver = off
aeron-dir = "target/aeron-StressSpec"
}
akka.actor.default-dispatcher.fork-join-executor {
parallelism-min = 8
parallelism-max = 8
@ -699,8 +706,11 @@ class StressMultiJvmNode12 extends StressSpec
class StressMultiJvmNode13 extends StressSpec
abstract class StressSpec
extends MultiNodeSpec(StressMultiJvmSpec)
with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender {
extends MultiNodeSpec({
// Aeron media driver must be started before ActorSystem
SharedMediaDriverSupport.startMediaDriver(StressMultiJvmSpec)
StressMultiJvmSpec
}) with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender {
import StressMultiJvmSpec._
import ClusterEvent._
@ -726,6 +736,20 @@ abstract class StressSpec
classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys)
}
override protected def afterTermination(): Unit = {
SharedMediaDriverSupport.stopMediaDriver(StressMultiJvmSpec)
super.afterTermination()
}
Runtime.getRuntime.addShutdownHook(new Thread {
override def run(): Unit = {
if (SharedMediaDriverSupport.isMediaDriverRunningByThisNode)
println("Abrupt exit of JVM without closing media driver. This should not happen and may cause test failure.")
}
})
def isArteryEnabled: Boolean = RARP(system).provider.remoteSettings.Artery.Enabled
def jvmInfo(): String = {
val runtime = ManagementFactory.getRuntimeMXBean
val os = ManagementFactory.getOperatingSystemMXBean

View file

@ -23,13 +23,17 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
// Note that this test uses default configuration,
// not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.cluster.failure-detector.monitored-by-nr-of-members = 3
commonConfig(ConfigFactory.parseString(
"""
akka {
actor.provider = cluster
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster.failure-detector.monitored-by-nr-of-members = 3
}
"""))
}
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec
@ -38,11 +42,11 @@ class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec
abstract class SunnyWeatherSpec
extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)
abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)
with MultiNodeClusterSpec {
import SunnyWeatherMultiJvmSpec._
import ClusterEvent._
"A normal cluster" must {

View file

@ -7,17 +7,19 @@ import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import scala.concurrent.duration._
import akka.testkit._
import akka.testkit.TestEvent._
import java.util.concurrent.ThreadLocalRandom
import akka.remote.testconductor.RoleName
import akka.actor.Props
import akka.actor.Actor
import scala.util.control.NoStackTrace
import akka.remote.QuarantinedEvent
import akka.remote.{ QuarantinedEvent, RARP, RemoteActorRefProvider }
import akka.actor.ExtendedActorSystem
import akka.remote.RemoteActorRefProvider
import akka.actor.ActorRef
import akka.dispatch.sysmsg.Failed
import akka.actor.PoisonPill
@ -36,6 +38,7 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.remote.system-message-buffer-size=100
akka.remote.artery.advanced.system-message-buffer-size=100
akka.remote.netty.tcp.connection-timeout = 10s
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
@ -363,13 +366,14 @@ abstract class SurviveNetworkInstabilitySpec
}
runOn(side2: _*) {
// side2 comes back but stays unreachable
val expected = ((side2 ++ side1) map address).toSet
clusterView.members.map(_.address) should ===(expected)
assertUnreachable(side1: _*)
}
enterBarrier("after-7")
assertCanTalk((side1AfterJoin): _*)
assertCanTalk(side1AfterJoin: _*)
}
}

View file

@ -20,6 +20,7 @@ import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.MultiNodeClusterSpec.EndActor
import akka.remote.RARP
object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig {
val first = role("first")
@ -160,18 +161,30 @@ abstract class UnreachableNodeJoinsAgainSpec
runOn(victim) {
val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val freshConfig =
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
s"""
akka.remote.artery.canonical {
hostname = ${victimAddress.host.get}
port = ${victimAddress.port.get}
}
"""
else s"""
akka.remote.netty.tcp {
hostname = ${victimAddress.host.get}
port = ${victimAddress.port.get}
}"""
).withFallback(system.settings.config)
Await.ready(system.whenTerminated, 10 seconds)
// create new ActorSystem with same host:port
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp {
hostname = ${victimAddress.host.get}
port = ${victimAddress.port.get}
}
""").withFallback(system.settings.config))
val freshSystem = ActorSystem(system.name, freshConfig)
try {
Cluster(freshSystem).join(masterAddress)
within(15 seconds) {
within(30 seconds) {
awaitAssert(Cluster(freshSystem).readView.members.map(_.address) should contain(victimAddress))
awaitAssert(Cluster(freshSystem).readView.members.size should ===(expectedNumberOfMembers))
awaitAssert(Cluster(freshSystem).readView.members.map(_.status) should ===(Set(MemberStatus.Up)))

View file

@ -11,16 +11,14 @@ import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.remote.RARP
import akka.testkit.AkkaSpec
object AutoDownSpec {
final case class DownCalled(address: Address)
val memberA = TestMember(Address("akka.tcp", "sys", "a", 2552), Up)
val memberB = TestMember(Address("akka.tcp", "sys", "b", 2552), Up)
val memberC = TestMember(Address("akka.tcp", "sys", "c", 2552), Up)
class AutoDownTestActor(
memberA: Member,
autoDownUnreachableAfter: FiniteDuration,
probe: ActorRef)
extends AutoDownBase(autoDownUnreachableAfter) {
@ -36,13 +34,22 @@ object AutoDownSpec {
}
}
}
class AutoDownSpec extends AkkaSpec {
class AutoDownSpec extends AkkaSpec("akka.actor.provider=remote") {
import AutoDownSpec._
val protocol =
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
val memberA = TestMember(Address(protocol, "sys", "a", 2552), Up)
val memberB = TestMember(Address(protocol, "sys", "b", 2552), Up)
val memberC = TestMember(Address(protocol, "sys", "c", 2552), Up)
def autoDownActor(autoDownUnreachableAfter: FiniteDuration): ActorRef =
system.actorOf(Props(classOf[AutoDownTestActor], autoDownUnreachableAfter, testActor))
system.actorOf(Props(classOf[AutoDownTestActor], memberA, autoDownUnreachableAfter, testActor))
"AutoDown" must {

View file

@ -14,7 +14,7 @@ import akka.cluster.routing.ClusterRouterGroupSettings
object ClusterDeployerSpec {
val deployerConf = ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.actor.deployment {
/user/service1 {
router = round-robin-pool

View file

@ -17,11 +17,12 @@ import akka.cluster.ClusterEvent._
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.actor.ActorRef
import akka.remote.RARP
import akka.testkit.TestProbe
object ClusterDomainEventPublisherSpec {
val config = """
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
"""
}
@ -29,18 +30,22 @@ object ClusterDomainEventPublisherSpec {
class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublisherSpec.config)
with BeforeAndAfterEach with ImplicitSender {
val protocol =
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
var publisher: ActorRef = _
val aUp = TestMember(Address("akka.tcp", "sys", "a", 2552), Up)
val aUp = TestMember(Address(protocol, "sys", "a", 2552), Up)
val aLeaving = aUp.copy(status = Leaving)
val aExiting = aLeaving.copy(status = Exiting)
val aRemoved = aExiting.copy(status = Removed)
val bExiting = TestMember(Address("akka.tcp", "sys", "b", 2552), Exiting)
val bExiting = TestMember(Address(protocol, "sys", "b", 2552), Exiting)
val bRemoved = bExiting.copy(status = Removed)
val cJoining = TestMember(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP"))
val cJoining = TestMember(Address(protocol, "sys", "c", 2552), Joining, Set("GRP"))
val cUp = cJoining.copy(status = Up)
val cRemoved = cUp.copy(status = Removed)
val a51Up = TestMember(Address("akka.tcp", "sys", "a", 2551), Up)
val dUp = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP"))
val a51Up = TestMember(Address(protocol, "sys", "a", 2551), Up)
val dUp = TestMember(Address(protocol, "sys", "d", 2552), Up, Set("GRP"))
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress)
val g1 = Gossip(members = SortedSet(aUp, cJoining)).seen(aUp.uniqueAddress).seen(cJoining.uniqueAddress)

View file

@ -25,7 +25,7 @@ object ClusterSpec {
publish-stats-interval = 0 s # always, when it happens
failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet
}
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.remote.netty.tcp.port = 0
#akka.loglevel = DEBUG
@ -107,7 +107,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
"allow join and leave with local address" in {
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
"""))
try {

View file

@ -38,7 +38,7 @@ class DowningProviderSpec extends WordSpec with Matchers {
"""
akka {
loglevel = WARNING
actor.provider = "akka.cluster.ClusterActorRefProvider"
actor.provider = "cluster"
remote {
netty.tcp {
hostname = "127.0.0.1"

View file

@ -21,7 +21,7 @@ object MetricsEnabledSpec {
akka.cluster.metrics.enabled = on
akka.cluster.metrics.collect-interval = 1 s
akka.cluster.metrics.gossip-interval = 1 s
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.actor.provider = remote
"""
}

View file

@ -14,7 +14,7 @@ import akka.actor.ActorLogging
object StartupWithOneThreadSpec {
val config = """
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.actor.creation-timeout = 10s
akka.remote.netty.tcp.port = 0

View file

@ -11,7 +11,7 @@ import collection.immutable.SortedSet
import akka.testkit.AkkaSpec
class ClusterMessageSerializerSpec extends AkkaSpec(
"akka.actor.provider = akka.cluster.ClusterActorRefProvider") {
"akka.actor.provider = cluster") {
val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem])

View file

@ -22,7 +22,7 @@ object ClusterRouterSupervisorSpec {
}
class ClusterRouterSupervisorSpec extends AkkaSpec("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
""") {

View file

@ -7,19 +7,24 @@ package akka.cluster.routing
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.actor.RootActorPath
import akka.remote.RARP
import akka.testkit.AkkaSpec
import akka.routing.ActorSelectionRoutee
import akka.routing.ActorRefRoutee
class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
""")) {
val a1 = Address("akka.tcp", "sys", "a1", 2551)
val b1 = Address("akka.tcp", "sys", "b1", 2551)
val c1 = Address("akka.tcp", "sys", "c1", 2551)
val d1 = Address("akka.tcp", "sys", "d1", 2551)
val protocol =
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
val a1 = Address(protocol, "sys", "a1", 2551)
val b1 = Address(protocol, "sys", "b1", 2551)
val c1 = Address(protocol, "sys", "c1", 2551)
val d1 = Address(protocol, "sys", "d1", 2551)
val routeeA = ActorSelectionRoutee(system.actorSelection(RootActorPath(a1) / "user" / "a"))
val routeeB = ActorSelectionRoutee(system.actorSelection(RootActorPath(b1) / "user" / "b"))

View file

@ -12421,6 +12421,24 @@ public final class ReplicatorMessages {
* <code>required sfixed32 uid = 2;</code>
*/
int getUid();
// optional sfixed32 uid2 = 3;
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
boolean hasUid2();
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
int getUid2();
}
/**
* Protobuf type {@code akka.cluster.ddata.UniqueAddress}
@ -12491,6 +12509,11 @@ public final class ReplicatorMessages {
uid_ = input.readSFixed32();
break;
}
case 29: {
bitField0_ |= 0x00000004;
uid2_ = input.readSFixed32();
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -12569,9 +12592,34 @@ public final class ReplicatorMessages {
return uid_;
}
// optional sfixed32 uid2 = 3;
public static final int UID2_FIELD_NUMBER = 3;
private int uid2_;
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public boolean hasUid2() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public int getUid2() {
return uid2_;
}
private void initFields() {
address_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Address.getDefaultInstance();
uid_ = 0;
uid2_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -12603,6 +12651,9 @@ public final class ReplicatorMessages {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeSFixed32(2, uid_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeSFixed32(3, uid2_);
}
getUnknownFields().writeTo(output);
}
@ -12620,6 +12671,10 @@ public final class ReplicatorMessages {
size += akka.protobuf.CodedOutputStream
.computeSFixed32Size(2, uid_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += akka.protobuf.CodedOutputStream
.computeSFixed32Size(3, uid2_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -12745,6 +12800,8 @@ public final class ReplicatorMessages {
bitField0_ = (bitField0_ & ~0x00000001);
uid_ = 0;
bitField0_ = (bitField0_ & ~0x00000002);
uid2_ = 0;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -12785,6 +12842,10 @@ public final class ReplicatorMessages {
to_bitField0_ |= 0x00000002;
}
result.uid_ = uid_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.uid2_ = uid2_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -12807,6 +12868,9 @@ public final class ReplicatorMessages {
if (other.hasUid()) {
setUid(other.getUid());
}
if (other.hasUid2()) {
setUid2(other.getUid2());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -12996,6 +13060,55 @@ public final class ReplicatorMessages {
return this;
}
// optional sfixed32 uid2 = 3;
private int uid2_ ;
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public boolean hasUid2() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public int getUid2() {
return uid2_;
}
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public Builder setUid2(int value) {
bitField0_ |= 0x00000004;
uid2_ = value;
onChanged();
return this;
}
/**
* <code>optional sfixed32 uid2 = 3;</code>
*
* <pre>
* 64 bit uids but with backward wire compatibility
* </pre>
*/
public Builder clearUid2() {
bitField0_ = (bitField0_ & ~0x00000004);
uid2_ = 0;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.UniqueAddress)
}
@ -14806,14 +14919,14 @@ public final class ReplicatorMessages {
" \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda" +
"ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" +
"\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat",
"aEnvelope\"J\n\rUniqueAddress\022,\n\007address\030\001 " +
"aEnvelope\"X\n\rUniqueAddress\022,\n\007address\030\001 " +
"\002(\0132\033.akka.cluster.ddata.Address\022\013\n\003uid\030" +
"\002 \002(\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004po" +
"rt\030\002 \002(\r\"V\n\014OtherMessage\022\027\n\017enclosedMess" +
"age\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messa" +
"geManifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elemen" +
"ts\030\001 \003(\tB#\n\037akka.cluster.ddata.protobuf." +
"msgH\001"
"\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010hostna" +
"me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" +
"\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" +
"\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" +
"GSet\022\020\n\010elements\030\001 \003(\tB#\n\037akka.cluster.d" +
"data.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -14927,7 +15040,7 @@ public final class ReplicatorMessages {
internal_static_akka_cluster_ddata_UniqueAddress_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_UniqueAddress_descriptor,
new java.lang.String[] { "Address", "Uid", });
new java.lang.String[] { "Address", "Uid", "Uid2", });
internal_static_akka_cluster_ddata_Address_descriptor =
getDescriptor().getMessageTypes().get(15);
internal_static_akka_cluster_ddata_Address_fieldAccessorTable = new

View file

@ -98,6 +98,8 @@ message Gossip {
message UniqueAddress {
required Address address = 1;
required sfixed32 uid = 2;
// 64 bit uids but with backward wire compatibility
optional sfixed32 uid2 = 3;
}
message Address {

View file

@ -88,10 +88,21 @@ trait SerializationSupport {
Address(addressProtocol, system.name, address.getHostname, address.getPort)
def uniqueAddressToProto(uniqueAddress: UniqueAddress): dm.UniqueAddress.Builder =
dm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid)
dm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address))
.setUid(uniqueAddress.longUid.toInt)
.setUid2((uniqueAddress.longUid >> 32).toInt)
def uniqueAddressFromProto(uniqueAddress: dm.UniqueAddress): UniqueAddress =
UniqueAddress(addressFromProto(uniqueAddress.getAddress), uniqueAddress.getUid)
UniqueAddress(
addressFromProto(uniqueAddress.getAddress),
if (uniqueAddress.hasUid2) {
// new remote node join the two parts of the long uid back
(uniqueAddress.getUid2.toLong << 32) | (uniqueAddress.getUid & 0xFFFFFFFFL)
} else {
// old remote node
uniqueAddress.getUid.toLong
}
)
def resolveActorRef(path: String): ActorRef =
system.provider.resolveActorRef(path)

View file

@ -24,7 +24,7 @@ object JepsenInspiredInsertSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote.log-remote-lifecycle-events = ERROR

View file

@ -26,7 +26,7 @@ object PerformanceSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote.log-remote-lifecycle-events = ERROR

View file

@ -22,7 +22,7 @@ object ReplicatorChaosSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.cluster.roles = ["backend"]
akka.log-dead-letters-during-shutdown = off
"""))

View file

@ -21,7 +21,7 @@ object ReplicatorPruningSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off
"""))

View file

@ -20,7 +20,7 @@ object ReplicatorSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off
"""))

View file

@ -44,7 +44,7 @@ class LocalConcurrencySpec(_system: ActorSystem) extends TestKit(_system)
this(ActorSystem(
"LocalConcurrencySpec",
ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
""")))
}

View file

@ -40,7 +40,7 @@ object LotsOfDataBot {
ConfigFactory.parseString("""
passive = off
max-entries = 100000
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote {
netty.tcp {
hostname = "127.0.0.1"

View file

@ -13,6 +13,7 @@ import akka.actor.ActorRef
import akka.cluster.ddata.Replicator.Internal._
import akka.cluster.ddata.Replicator._
import akka.actor.ActorSelection
import akka.remote.RARP
object WriteAggregatorSpec {
@ -50,12 +51,16 @@ object WriteAggregatorSpec {
}
class WriteAggregatorSpec extends AkkaSpec("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
""")
with ImplicitSender {
val nodeA = Address("akka.tcp", "Sys", "a", 2552)
val protocol =
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
val nodeA = Address(protocol, "Sys", "a", 2552)
val nodeB = nodeA.copy(host = Some("b"))
val nodeC = nodeA.copy(host = Some("c"))
val nodeD = nodeA.copy(host = Some("d"))

View file

@ -23,20 +23,23 @@ import akka.cluster.ddata.Replicator.Internal._
import akka.cluster.ddata.VersionVector
import akka.testkit.TestKit
import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
"ReplicatedDataSerializerSpec",
ConfigFactory.parseString("""
akka.actor.provider=akka.cluster.ClusterActorRefProvider
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem])
val address1 = UniqueAddress(Address("akka.tcp", system.name, "some.host.org", 4711), 1)
val address2 = UniqueAddress(Address("akka.tcp", system.name, "other.host.org", 4711), 2)
val address3 = UniqueAddress(Address("akka.tcp", system.name, "some.host.org", 4712), 3)
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3)
override def afterAll {
shutdown()

View file

@ -21,20 +21,23 @@ import akka.cluster.ddata.Replicator.Internal._
import akka.testkit.TestKit
import akka.util.ByteString
import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"ReplicatorMessageSerializerSpec",
ConfigFactory.parseString("""
akka.actor.provider=akka.cluster.ClusterActorRefProvider
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatorMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
val address1 = UniqueAddress(Address("akka.tcp", system.name, "some.host.org", 4711), 1)
val address2 = UniqueAddress(Address("akka.tcp", system.name, "other.host.org", 4711), 2)
val address3 = UniqueAddress(Address("akka.tcp", system.name, "some.host.org", 4712), 3)
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3)
val keyA = GSetKey[String]("A")

View file

@ -153,7 +153,7 @@ A custom ``application.conf`` might look like this::
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
provider = "cluster"
default-dispatcher {
# Throughput for default Dispatcher, set to 1 for as fair as possible

View file

@ -19,8 +19,8 @@ i.e. not necessarily the initial contact points.
provided in a more efficient way by :ref:`distributed-pub-sub-java` for actors that
belong to the same cluster.
Also, note it's necessary to change ``akka.actor.provider`` from ``akka.actor.LocalActorRefProvider``
to ``akka.remote.RemoteActorRefProvider`` or ``akka.cluster.ClusterActorRefProvider`` when using
Also, note it's necessary to change ``akka.actor.provider`` from ``local``
to ``remote`` or ``cluster`` when using
the cluster client.
The receptionist is supposed to be started on all nodes, or all nodes with specified role,

View file

@ -31,7 +31,7 @@ The ``application.conf`` configuration looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/resources/application.conf#snippet
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-java`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
settings, but with ``cluster``.
The ``akka.cluster.seed-nodes`` should normally also be added to your ``application.conf`` file.
.. note::

View file

@ -31,7 +31,7 @@ to your ``application.conf`` file::
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
@ -44,7 +44,7 @@ to your ``application.conf`` file::
As you can see in the example above there are four things you need to add to get started:
* Change provider from ``akka.actor.LocalActorRefProvider`` to ``akka.remote.RemoteActorRefProvider``
* Change provider from ``local`` to ``remote``
* Add host name - the machine you want to run the actor system on; this host
name is exactly what is passed to remote systems in order to identify this
system and consequently used for connecting back to this system if need be,

View file

@ -19,8 +19,8 @@ i.e. not necessarily the initial contact points.
provided in a more efficient way by :ref:`distributed-pub-sub-scala` for actors that
belong to the same cluster.
Also, note it's necessary to change ``akka.actor.provider`` from ``akka.actor.LocalActorRefProvider``
to ``akka.remote.RemoteActorRefProvider`` or ``akka.cluster.ClusterActorRefProvider`` when using
Also, note it's necessary to change ``akka.actor.provider`` from ``local``
to ``remote`` or ``cluster`` when using
the cluster client.
The receptionist is supposed to be started on all nodes, or all nodes with specified role,

View file

@ -25,7 +25,7 @@ The ``application.conf`` configuration looks like this:
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/resources/application.conf#snippet
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala`
settings, but with ``akka.cluster.ClusterActorRefProvider``.
settings, but with ``cluster``.
The ``akka.cluster.seed-nodes`` should normally also be added to your ``application.conf`` file.
.. note::

View file

@ -10,7 +10,7 @@ object ClusterDocSpec {
val config =
"""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
"""
}

Some files were not shown because too many files have changed in this diff Show more