Compression tables properly *used* for Outgoing Compression (#20874)

* =art now correctly compresses and 2 table mode working
* =art AGRESSIVELY optimising hashing, not convienved about correctness yet
* fix HandshakeShouldDropCompressionTableSpec
This commit is contained in:
Konrad Malawski 2016-07-04 16:42:14 +02:00 committed by Patrik Nordwall
parent 27275b4680
commit d1015c1dc6
18 changed files with 407 additions and 319 deletions

View file

@ -58,7 +58,7 @@ class CodecBenchmark {
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16) private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16)
val compressionOut = NoOutboundCompressions val compressionOut = NoOutboundCompressions
val headerIn = HeaderBuilder.in(NoopInboundCompressions) val headerIn = HeaderBuilder.in(NoInboundCompressions)
val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN)
val uniqueLocalAddress = UniqueAddress( val uniqueLocalAddress = UniqueAddress(
@ -166,7 +166,7 @@ class CodecBenchmark {
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map { _ => .map { _ =>
@ -207,7 +207,7 @@ class CodecBenchmark {
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, NoopInboundCompressions, envelopePool, inboundEnvelopePool)) resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) .map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))

View file

@ -6,9 +6,11 @@ package akka.remote.artery
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import akka.remote.artery.compress.CompressionSettings
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
@ -55,11 +57,13 @@ object MaxThroughputSpec extends MultiNodeConfig {
#advanced.embedded-media-driver = off #advanced.embedded-media-driver = off
#advanced.aeron-dir = "target/aeron" #advanced.aeron-dir = "target/aeron"
#advanced.compression { advanced.compression {
# enabled = on enabled = off
# actor-refs.enabled = on actor-refs {
# manifests.enabled = on enabled = on
#} advertisement-interval = 1 second
}
}
} }
} }
"""))) """)))
@ -137,11 +141,14 @@ object MaxThroughputSpec extends MultiNodeConfig {
println( println(
s"=== MaxThroughput ${self.path.name}: " + s"=== MaxThroughput ${self.path.name}: " +
f"throughput ${throughput * testSettings.senderReceiverPairs}%,.0f msg/s, " + f"throughput ${throughput * testSettings.senderReceiverPairs}%,.0f msg/s, " +
f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s, " + f"${throughput * payloadSize * testSettings.senderReceiverPairs}%,.0f bytes/s (payload), " +
f"${throughput * totalSize(context.system) * testSettings.senderReceiverPairs}%,.0f bytes/s (total" +
(if (CompressionSettings(context.system).enabled) ",compression" else "") + "), " +
s"dropped ${totalMessages - totalReceived}, " + s"dropped ${totalMessages - totalReceived}, " +
s"max round-trip $maxRoundTripMillis ms, " + s"max round-trip $maxRoundTripMillis ms, " +
s"burst size $burstSize, " + s"burst size $burstSize, " +
s"payload size $payloadSize, " + s"payload size $payloadSize, " +
s"total size ${totalSize(context.system)}, " +
s"$took ms to deliver $totalReceived messages") s"$took ms to deliver $totalReceived messages")
plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024) plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024)
context.stop(self) context.stop(self)
@ -171,7 +178,10 @@ object MaxThroughputSpec extends MultiNodeConfig {
totalMessages: Long, totalMessages: Long,
burstSize: Int, burstSize: Int,
payloadSize: Int, payloadSize: Int,
senderReceiverPairs: Int) senderReceiverPairs: Int) {
// data based on measurement
def totalSize(system: ActorSystem) = payloadSize + (if (CompressionSettings(system).enabled) 38 else 110)
}
class TestSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with ByteBufferSerializer { class TestSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with ByteBufferSerializer {
@ -270,7 +280,8 @@ abstract class MaxThroughputSpec
totalMessages = adjustedTotalMessages(20000), totalMessages = adjustedTotalMessages(20000),
burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000 burstSize = 200, // don't exceed the send queue capacity 200*5*3=3000
payloadSize = 100, payloadSize = 100,
senderReceiverPairs = 5)) senderReceiverPairs = 5)
)
def test(testSettings: TestSettings): Unit = { def test(testSettings: TestSettings): Unit = {
import testSettings._ import testSettings._

View file

@ -4,15 +4,15 @@
package akka.remote.artery.compress; package akka.remote.artery.compress;
import akka.actor.Actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import java.io.UnsupportedEncodingException;
import java.util.Random; import java.util.Random;
/** /**
* INTERNAL API: Count-Min Sketch datastructure. * INTERNAL API: Count-Min Sketch datastructure.
* *
* Not thread-safe.
*
* An Improved Data Stream Summary: The Count-Min Sketch and its Applications * An Improved Data Stream Summary: The Count-Min Sketch and its Applications
* https://web.archive.org/web/20060907232042/http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf * https://web.archive.org/web/20060907232042/http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf
* *
@ -31,14 +31,19 @@ public class CountMinSketch {
private double eps; private double eps;
private double confidence; private double confidence;
private int[] recyclableCMSHashBuckets;
public CountMinSketch(int depth, int width, int seed) { public CountMinSketch(int depth, int width, int seed) {
this.depth = depth; this.depth = depth;
this.width = width; this.width = width;
this.eps = 2.0 / width; this.eps = 2.0 / width;
this.confidence = 1 - 1 / Math.pow(2, depth); this.confidence = 1 - 1 / Math.pow(2, depth);
recyclableCMSHashBuckets = preallocateHashBucketsArray(depth);
initTablesWith(depth, width, seed); initTablesWith(depth, width, seed);
} }
@SuppressWarnings("unused")
public CountMinSketch(double epsOfTotalCount, double confidence, int seed) { public CountMinSketch(double epsOfTotalCount, double confidence, int seed) {
// 2/w = eps ; w = 2/eps // 2/w = eps ; w = 2/eps
// 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence) // 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence)
@ -46,10 +51,12 @@ public class CountMinSketch {
this.confidence = confidence; this.confidence = confidence;
this.width = (int) Math.ceil(2 / epsOfTotalCount); this.width = (int) Math.ceil(2 / epsOfTotalCount);
this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2)); this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2));
recyclableCMSHashBuckets = preallocateHashBucketsArray(depth);
initTablesWith(depth, width, seed); initTablesWith(depth, width, seed);
} }
CountMinSketch(int depth, int width, int size, long[] hashA, long[][] table) { @SuppressWarnings("unused")
public CountMinSketch(int depth, int width, int size, long[] hashA, long[][] table) {
this.depth = depth; this.depth = depth;
this.width = width; this.width = width;
this.eps = 2.0 / width; this.eps = 2.0 / width;
@ -57,6 +64,7 @@ public class CountMinSketch {
this.hashA = hashA; this.hashA = hashA;
this.table = table; this.table = table;
this.size = size; this.size = size;
recyclableCMSHashBuckets = preallocateHashBucketsArray(depth);
} }
private void initTablesWith(int depth, int width, int seed) { private void initTablesWith(int depth, int width, int seed) {
@ -75,11 +83,11 @@ public class CountMinSketch {
} }
/** Referred to as {@code epsilon} in the whitepaper */ /** Referred to as {@code epsilon} in the whitepaper */
public double getRelativeError() { public double relativeError() {
return eps; return eps;
} }
public double getConfidence() { public double confidence() {
return confidence; return confidence;
} }
@ -108,7 +116,7 @@ public class CountMinSketch {
size += count; size += count;
} }
public void add(String item, long count) { public void addObject(Object item, long count) {
if (count < 0) { if (count < 0) {
// Actually for negative increments we'll need to use the median // Actually for negative increments we'll need to use the median
// instead of minimum, and accuracy will suffer somewhat. // instead of minimum, and accuracy will suffer somewhat.
@ -116,10 +124,9 @@ public class CountMinSketch {
// parameter to constructor. // parameter to constructor.
throw new IllegalArgumentException("Negative increments not implemented"); throw new IllegalArgumentException("Negative increments not implemented");
} }
// TODO we could reuse the arrays MurmurHash.hashBuckets(item, recyclableCMSHashBuckets, width);
final int[] buckets = MurmurHash.hashBuckets(item, depth, width); // TODO replace with Scala's Murmur3, it's much faster
for (int i = 0; i < depth; ++i) { for (int i = 0; i < depth; ++i) {
table[i][buckets[i]] += count; table[i][recyclableCMSHashBuckets[i]] += count;
} }
size += count; size += count;
} }
@ -128,7 +135,7 @@ public class CountMinSketch {
* Similar to {@code add}, however we reuse the fact that the hask buckets have to be calculated for {@code add} * Similar to {@code add}, however we reuse the fact that the hask buckets have to be calculated for {@code add}
* already, and a separate {@code estimateCount} operation would have to calculate them again, so we do it all in one go. * already, and a separate {@code estimateCount} operation would have to calculate them again, so we do it all in one go.
*/ */
public long addAndEstimateCount(String item, long count) { public long addObjectAndEstimateCount(Object item, long count) {
if (count < 0) { if (count < 0) {
// Actually for negative increments we'll need to use the median // Actually for negative increments we'll need to use the median
// instead of minimum, and accuracy will suffer somewhat. // instead of minimum, and accuracy will suffer somewhat.
@ -136,12 +143,12 @@ public class CountMinSketch {
// parameter to constructor. // parameter to constructor.
throw new IllegalArgumentException("Negative increments not implemented"); throw new IllegalArgumentException("Negative increments not implemented");
} }
final int[] buckets = MurmurHash.hashBuckets(item, depth, width); MurmurHash.hashBuckets(item, recyclableCMSHashBuckets, width);
for (int i = 0; i < depth; ++i) { for (int i = 0; i < depth; ++i) {
table[i][buckets[i]] += count; table[i][recyclableCMSHashBuckets[i]] += count;
} }
size += count; size += count;
return estimateCount(buckets); return estimateCount(recyclableCMSHashBuckets);
} }
public long size() { public long size() {
@ -160,15 +167,6 @@ public class CountMinSketch {
return res; return res;
} }
/**
* The estimate is correct within {@code 'epsilon' * (total item count)},
* with probability {@code confidence}.
*/
public long estimateCount(String item) {
int[] buckets = MurmurHash.hashBuckets(item, depth, width);
return estimateCount(buckets);
}
/** /**
* The estimate is correct within {@code 'epsilon' * (total item count)}, * The estimate is correct within {@code 'epsilon' * (total item count)},
* with probability {@code confidence}. * with probability {@code confidence}.
@ -198,7 +196,6 @@ public class CountMinSketch {
// TODO replace with Scala's Murmur3, it's much faster // TODO replace with Scala's Murmur3, it's much faster
private static class MurmurHash { private static class MurmurHash {
// FIXME: This overload isn't actually ever used
public static int hash(Object o) { public static int hash(Object o) {
if (o == null) { if (o == null) {
return 0; return 0;
@ -288,10 +285,14 @@ public class CountMinSketch {
} }
public static int hashLong(long data) { public static int hashLong(long data) {
return hashLong(data, 0);
}
public static int hashLong(long data, int seed) {
int m = 0x5bd1e995; int m = 0x5bd1e995;
int r = 24; int r = 24;
int h = 0; int h = seed;
// int h = seed ^ length;
int k = (int) data * m; int k = (int) data * m;
k ^= k >>> r; k ^= k >>> r;
@ -314,26 +315,39 @@ public class CountMinSketch {
// http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf
// does prove to work in actual tests, and is obviously faster // does prove to work in actual tests, and is obviously faster
// than performing further iterations of murmur. // than performing further iterations of murmur.
public static int[] hashBuckets(String key, int hashCount, int max) { // public static int[] hashBuckets(String key, int hashCount, int max) {
byte[] b; // byte[] b;
try { // try {
b = key.getBytes("UTF-16");// TODO Use the Unsafe trick @patriknw used to access the backing array directly -- via Endre // b = key.getBytes("UTF-16");// TODO Use the Unsafe trick @patriknw used to access the backing array directly -- via Endre
} catch (UnsupportedEncodingException e) { // } catch (UnsupportedEncodingException e) {
throw new RuntimeException(e); // throw new RuntimeException(e);
// }
// return hashBuckets(b, hashCount, max);
// }
// static int[] hashBuckets(byte[] b, int hashCount, int max) {
// // TODO we could reuse the arrays
// int[] result = preallocateHashBucketsArray(hashCount);
// int hash1 = hash(b, b.length, 0);
// int hash2 = hash(b, b.length, hash1);
// for (int i = 0; i < hashCount; i++) {
// result[i] = Math.abs((hash1 + i * hash2) % max);
// }
// return result;
// }
/** Mutates passed in {@code hashBuckets} */
static void hashBuckets(Object item, int[] hashBuckets, int max) {
int hash1 = hash(item); // specialized hash for ActorRef and Strings
int hash2 = hashLong(hash1, hash1);
final int depth = hashBuckets.length;
for (int i = 0; i < depth; i++)
hashBuckets[i] = Math.abs((hash1 + i * hash2) % max);
} }
return hashBuckets(b, hashCount, max);
} }
static int[] hashBuckets(byte[] b, int hashCount, int max) { public int[] preallocateHashBucketsArray(int depth) {
// TODO we could reuse the arrays return new int[depth];
int[] result = new int[hashCount];
int hash1 = hash(b, b.length, 0);
int hash2 = hash(b, b.length, hash1);
for (int i = 0; i < hashCount; i++) {
result[i] = Math.abs((hash1 + i * hash2) % max);
}
return result;
}
} }
@Override @Override

View file

@ -5,9 +5,9 @@ package akka.remote.artery
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.remote.artery.compress.CompressionProtocol.CompressionMessage import akka.remote.artery.compress.CompressionProtocol.CompressionMessage
import scala.collection.JavaConverters._
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -33,7 +33,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageObserver
import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduCodec
import akka.remote.transport.AkkaPduProtobufCodec import akka.remote.transport.AkkaPduProtobufCodec
import akka.remote.artery.compress.{ InboundCompressionsImpl, CompressionProtocol } import akka.remote.artery.compress._
import akka.stream.AbruptTerminationException import akka.stream.AbruptTerminationException
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.KillSwitches import akka.stream.KillSwitches
@ -57,16 +57,20 @@ import org.agrona.IoUtil
import java.io.File import java.io.File
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.channels.{ DatagramChannel, FileChannel } import java.nio.channels.{ DatagramChannel, FileChannel }
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import io.aeron.CncFileDescriptor import io.aeron.CncFileDescriptor
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.util.OptionVal import akka.util.OptionVal
import io.aeron.driver.ThreadingMode import io.aeron.driver.ThreadingMode
import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.BusySpinIdleStrategy import org.agrona.concurrent.BusySpinIdleStrategy
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.Props import akka.actor.Props
import akka.actor.Actor import akka.actor.Actor
@ -113,7 +117,7 @@ private[akka] object AssociationState {
incarnation = 1, incarnation = 1,
uniqueRemoteAddressPromise = Promise(), uniqueRemoteAddressPromise = Promise(),
quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], quarantined = ImmutableLongMap.empty[QuarantinedTimestamp],
outboundCompression = NoOutboundCompressions) outboundCompressions = NoOutboundCompressions)
final case class QuarantinedTimestamp(nanoTime: Long) { final case class QuarantinedTimestamp(nanoTime: Long) {
override def toString: String = override def toString: String =
@ -128,7 +132,7 @@ private[akka] final class AssociationState(
val incarnation: Int, val incarnation: Int,
val uniqueRemoteAddressPromise: Promise[UniqueAddress], val uniqueRemoteAddressPromise: Promise[UniqueAddress],
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp],
val outboundCompression: OutboundCompressions) { val outboundCompressions: OutboundCompressions) {
import AssociationState.QuarantinedTimestamp import AssociationState.QuarantinedTimestamp
@ -154,6 +158,9 @@ private[akka] final class AssociationState(
} }
} }
def withCompression(compression: OutboundCompressions) =
new AssociationState(incarnation, uniqueRemoteAddressPromise, quarantined, compression)
def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompressions): AssociationState = def newIncarnation(remoteAddressPromise: Promise[UniqueAddress], compression: OutboundCompressions): AssociationState =
new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression) new AssociationState(incarnation + 1, remoteAddressPromise, quarantined, compression)
@ -164,7 +171,7 @@ private[akka] final class AssociationState(
incarnation, incarnation,
uniqueRemoteAddressPromise, uniqueRemoteAddressPromise,
quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())),
outboundCompression = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it outboundCompressions = NoOutboundCompressions) // after quarantine no compression needed anymore, drop it
case _ this case _ this
} }
@ -283,6 +290,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
@volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeron: Aeron = _
@volatile private[this] var aeronErrorLogTask: Cancellable = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _
// this is only used to allow triggering compression advertisements or state from tests
@volatile private[this] var activeCompressions = Set.empty[InboundCompressions]
override def localAddress: UniqueAddress = _localAddress override def localAddress: UniqueAddress = _localAddress
override def defaultAddress: Address = localAddress.address override def defaultAddress: Address = localAddress.address
override def addresses: Set[Address] = _addresses override def addresses: Set[Address] = _addresses
@ -416,7 +426,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.threadingMode(ThreadingMode.DEDICATED) .threadingMode(ThreadingMode.DEDICATED)
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
.receiverIdleStrategy(new BusySpinIdleStrategy) .receiverIdleStrategy(new BusySpinIdleStrategy)
.senderIdleStrategy(new BusySpinIdleStrategy); .senderIdleStrategy(new BusySpinIdleStrategy)
} else if (remoteSettings.IdleCpuLevel == 1) { } else if (remoteSettings.IdleCpuLevel == 1) {
driverContext driverContext
.threadingMode(ThreadingMode.SHARED) .threadingMode(ThreadingMode.SHARED)
@ -505,10 +515,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
} }
private def runInboundStreams(): Unit = { private def runInboundStreams(): Unit = {
val noCompressions = new NoInboundCompressions(system) // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082 val noCompressions = NoInboundCompressions // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082
val compressions = createInboundCompressions(this) val compressions = createInboundCompressions(this)
runInboundControlStream(noCompressions) runInboundControlStream(noCompressions) // TODO should understand compressions too
runInboundOrdinaryMessagesStream(compressions) runInboundOrdinaryMessagesStream(compressions)
if (largeMessageDestinationsEnabled) { if (largeMessageDestinationsEnabled) {
runInboundLargeMessagesStream() runInboundLargeMessagesStream()
@ -542,12 +552,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
m match { m match {
case CompressionProtocol.ActorRefCompressionAdvertisement(from, table) case CompressionProtocol.ActorRefCompressionAdvertisement(from, table)
log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table) log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table)
association(from.address).compression.applyActorRefCompressionTable(table) association(from.address).outboundCompression.applyActorRefCompressionTable(table)
system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table))
case CompressionProtocol.ClassManifestCompressionAdvertisement(from, table) case CompressionProtocol.ClassManifestCompressionAdvertisement(from, table)
log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table)
association(from.address).compression.applyClassManifestCompressionTable(table) association(from.address).outboundCompression.applyClassManifestCompressionTable(table)
system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table))
} }
@ -592,12 +602,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
} }
private def runInboundLargeMessagesStream(): Unit = { private def runInboundLargeMessagesStream(): Unit = {
val compression = new NoInboundCompressions(system) // no compression on large message stream for now val disableCompression = NoInboundCompressions // no compression on large message stream for now
val completed = val completed =
if (remoteSettings.TestMode) { if (remoteSettings.TestMode) {
val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool) val (mgmt, c) = aeronSource(largeStreamId, largeEnvelopePool)
.via(inboundLargeFlow(compression)) .via(inboundLargeFlow(disableCompression))
.viaMat(inboundTestFlow)(Keep.right) .viaMat(inboundTestFlow)(Keep.right)
.toMat(inboundSink)(Keep.both) .toMat(inboundSink)(Keep.both)
.run()(materializer) .run()(materializer)
@ -605,7 +615,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
c c
} else { } else {
aeronSource(largeStreamId, largeEnvelopePool) aeronSource(largeStreamId, largeEnvelopePool)
.via(inboundLargeFlow(compression)) .via(inboundLargeFlow(disableCompression))
.toMat(inboundSink)(Keep.right) .toMat(inboundSink)(Keep.right)
.run()(materializer) .run()(materializer)
} }
@ -758,8 +768,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool)) Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, bufferPool))
private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions = private def createInboundCompressions(inboundContext: InboundContext): InboundCompressions =
if (remoteSettings.ArteryCompressionSettings.enabled) new InboundCompressionsImpl(system, inboundContext) if (remoteSettings.ArteryCompressionSettings.enabled) {
else new NoInboundCompressions(system) val comp = new InboundCompressionsImpl(system, inboundContext)
activeCompressions += comp
comp
} else NoInboundCompressions
def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = def createEncoder(pool: EnvelopeBufferPool, compression: OutboundCompressions): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool)) Flow.fromGraph(new Encoder(localAddress, system, compression, outboundEnvelopePool, pool))
@ -779,8 +792,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
} }
def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
val resolveActorRefWithLocalAddress: String InternalActorRef = val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) recipient provider.resolveActorRefWithLocalAddress(recipient, localAddress.address)
}
Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool, Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool,
inboundEnvelopePool)) inboundEnvelopePool))
} }
@ -833,6 +847,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def outboundTestFlow(association: Association): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] = def outboundTestFlow(association: Association): Flow[OutboundEnvelope, OutboundEnvelope, TestManagementApi] =
Flow.fromGraph(new OutboundTestStage(association)) Flow.fromGraph(new OutboundTestStage(association))
/** INTERNAL API: for testing only. */
private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = {
activeCompressions.foreach {
case c: InboundCompressionsImpl if actorRef || manifest
log.info("Triggering compression table advertisement for {}", c)
if (actorRef) c.runNextActorRefAdvertisement()
if (manifest) c.runNextClassManifestAdvertisement()
case _
}
}
} }
/** /**

View file

@ -9,7 +9,9 @@ import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressionsImpl }
import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable, OutboundCompressions, OutboundCompressionsImpl }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
@ -82,7 +84,7 @@ private[remote] class Association(
// start sending (enqueuing) to the Association immediate after construction. // start sending (enqueuing) to the Association immediate after construction.
/** Accesses the currently active outbound compression. */ /** Accesses the currently active outbound compression. */
def compression: OutboundCompressions = associationState.outboundCompression def outboundCompression: OutboundCompressions = associationState.outboundCompressions
def createQueue(capacity: Int): Queue[OutboundEnvelope] = def createQueue(capacity: Int): Queue[OutboundEnvelope] =
new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
@ -144,9 +146,14 @@ private[remote] class Association(
val current = associationState val current = associationState
current.uniqueRemoteAddressPromise.trySuccess(peer) current.uniqueRemoteAddressPromise.trySuccess(peer)
current.uniqueRemoteAddressValue() match { current.uniqueRemoteAddressValue() match {
case Some(`peer`) // our value case Some(`peer`)
// our value
if (current.outboundCompressions == NoOutboundCompressions) {
// enable outbound compression (here, since earlier we don't know the remote address)
swapState(current, current.withCompression(createOutboundCompressions(remoteAddress)))
}
case _ case _
val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressionTable(remoteAddress)) val newState = current.newIncarnation(Promise.successful(peer), createOutboundCompressions(remoteAddress))
if (swapState(current, newState)) { if (swapState(current, newState)) {
current.uniqueRemoteAddressValue() match { current.uniqueRemoteAddressValue() match {
case Some(old) case Some(old)
@ -407,11 +414,10 @@ private[remote] class Association(
} }
// TODO: Make sure that once other channels use Compression, each gets it's own // TODO: Make sure that once other channels use Compression, each gets it's own
private def createOutboundCompressionTable(remoteAddress: Address): OutboundCompressions = { private def createOutboundCompressions(remoteAddress: Address): OutboundCompressions = {
if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) { if (transport.provider.remoteSettings.ArteryCompressionSettings.enabled) {
val compression = new OutboundCompressionsImpl(transport.system, remoteAddress) val compression = new OutboundCompressionsImpl(transport.system, remoteAddress)
// FIXME should use verion number of table instead of hashCode log.debug("Creating Outbound compression table to [{}]", remoteAddress)
log.info("Creating Outbound compression table ({}) to [{}]", compression.hashCode, remoteAddress)
compression compression
} else NoOutboundCompressions } else NoOutboundCompressions
} }
@ -421,17 +427,25 @@ private[remote] class Association(
* This way the same outgoing stream will switch to using the new table without the need of restarting it. * This way the same outgoing stream will switch to using the new table without the need of restarting it.
*/ */
private object CurrentAssociationStateOutboundCompressionsProxy extends OutboundCompressions { private object CurrentAssociationStateOutboundCompressionsProxy extends OutboundCompressions {
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit =
associationState.outboundCompression.applyActorRefCompressionTable(table)
override def actorRefCompressionTableVersion: Int =
associationState.outboundCompressions.actorRefCompressionTableVersion
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = {
associationState.outboundCompressions.applyActorRefCompressionTable(table)
}
override final def compressActorRef(ref: ActorRef): Int = {
associationState.outboundCompressions.compressActorRef(ref)
}
override def classManifestCompressionTableVersion: Int =
associationState.outboundCompressions.classManifestCompressionTableVersion
override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit =
associationState.outboundCompression.applyClassManifestCompressionTable(table) associationState.outboundCompressions.applyClassManifestCompressionTable(table)
override final def compressActorRef(ref: ActorRef): Int =
associationState.outboundCompression.compressActorRef(ref)
override final def compressClassManifest(manifest: String): Int = override final def compressClassManifest(manifest: String): Int =
associationState.outboundCompression.compressClassManifest(manifest) associationState.outboundCompressions.compressClassManifest(manifest)
override def toString =
s"${Logging.simpleName(getClass)}(current delegate: ${associationState.outboundCompressions})"
} }
override def toString: String = override def toString: String =

View file

@ -9,7 +9,7 @@ import java.nio.{ ByteBuffer, ByteOrder }
import akka.actor.{ ActorRef, Address } import akka.actor.{ ActorRef, Address }
import akka.remote.artery.compress.CompressionProtocol._ import akka.remote.artery.compress.CompressionProtocol._
import akka.remote.artery.compress.{ CompressionTable, NoopInboundCompressions, NoopOutboundCompressions } import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions }
import akka.serialization.Serialization import akka.serialization.Serialization
import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer }
import akka.util.{ OptionVal, Unsafe } import akka.util.{ OptionVal, Unsafe }
@ -54,6 +54,7 @@ private[remote] object EnvelopeBuffer {
val VersionOffset = 0 // Int val VersionOffset = 0 // Int
val UidOffset = 4 // Long val UidOffset = 4 // Long
val SerializerOffset = 12 // Int val SerializerOffset = 12 // Int
val SenderActorRefTagOffset = 16 // Int val SenderActorRefTagOffset = 16 // Int
val RecipientActorRefTagOffset = 20 // Int val RecipientActorRefTagOffset = 20 // Int
val ClassManifestTagOffset = 24 // Int val ClassManifestTagOffset = 24 // Int
@ -70,41 +71,14 @@ private[remote] object EnvelopeBuffer {
val StringValueFieldOffset = Unsafe.instance.objectFieldOffset(classOf[String].getDeclaredField("value")) val StringValueFieldOffset = Unsafe.instance.objectFieldOffset(classOf[String].getDeclaredField("value"))
} }
/**
* INTERNAL API
* Decompress and cause compression advertisements.
*
* One per inbound message stream thus must demux by originUid to use the right tables.
*/
private[remote] trait InboundCompressions {
def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit
def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef]
def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit
def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String]
}
/**
* INTERNAL API
* Compress outgoing data and handle compression advertisements to fill compression table.
*
* One per outgoing message stream.
*/
private[remote] trait OutboundCompressions {
def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit
def compressActorRef(ref: ActorRef): Int
def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit
def compressClassManifest(manifest: String): Int
}
/** INTERNAL API */ /** INTERNAL API */
private[remote] object HeaderBuilder { private[remote] object HeaderBuilder {
// We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl // We really only use the Header builder on one "side" or the other, thus in order to avoid having to split its impl
// we inject no-op compression's of the "other side". // we inject no-op compression's of the "other side".
def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoopOutboundCompressions) def in(compression: InboundCompressions): HeaderBuilder = new HeaderBuilderImpl(compression, NoOutboundCompressions)
def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoopInboundCompressions, compression) def out(compression: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(NoInboundCompressions, compression)
/** INTERNAL API, FOR TESTING ONLY */ /** INTERNAL API, FOR TESTING ONLY */
private[remote] def bothWays(in: InboundCompressions, out: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(in, out) private[remote] def bothWays(in: InboundCompressions, out: OutboundCompressions): HeaderBuilder = new HeaderBuilderImpl(in, out)
@ -170,8 +144,8 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres
// Fields only available for EnvelopeBuffer // Fields only available for EnvelopeBuffer
var _version: Int = _ var _version: Int = _
var _uid: Long = _ var _uid: Long = _
var _actorRefCompressionTableVersion: Int = -1 var _actorRefCompressionTableVersion: Int = 0
var _classManifestCompressionTableVersion: Int = -1 var _classManifestCompressionTableVersion: Int = 0
var _senderActorRef: String = null var _senderActorRef: String = null
var _senderActorRefIdx: Int = -1 var _senderActorRefIdx: Int = -1
@ -247,17 +221,17 @@ private[remote] final class HeaderBuilderImpl(inboundCompression: InboundCompres
override def toString = override def toString =
"HeaderBuilderImpl(" + "HeaderBuilderImpl(" +
version + ", " + "version:" + version + ", " +
actorRefCompressionTableVersion + ", " + "actorRefCompressionTableVersion:" + actorRefCompressionTableVersion + ", " +
classManifestCompressionTableVersion + ", " + "classManifestCompressionTableVersion:" + classManifestCompressionTableVersion + ", " +
uid + ", " + "uid:" + uid + ", " +
_senderActorRef + ", " + "_senderActorRef:" + _senderActorRef + ", " +
_senderActorRefIdx + ", " + "_senderActorRefIdx:" + _senderActorRefIdx + ", " +
_recipientActorRef + ", " + "_recipientActorRef:" + _recipientActorRef + ", " +
_recipientActorRefIdx + ", " + "_recipientActorRefIdx:" + _recipientActorRefIdx + ", " +
_serializer + ", " + "_serializer:" + _serializer + ", " +
_manifest + ", " + "_manifest:" + _manifest + ", " +
_manifestIdx + ")" "_manifestIdx:" + _manifestIdx + ")"
} }

View file

@ -1,4 +1,3 @@
/** /**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com> * Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/ */
@ -12,8 +11,9 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension } import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._ import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.{ ByteString, OptionVal } import akka.util.{ ByteString, OptionVal, PrettyByteString }
import akka.actor.EmptyLocalActorRef import akka.actor.EmptyLocalActorRef
import akka.remote.artery.compress.{ InboundCompressions, OutboundCompressions, OutboundCompressionsImpl }
import akka.stream.stage.TimerGraphStageLogic import akka.stream.stage.TimerGraphStageLogic
/** /**
@ -47,6 +47,10 @@ private[remote] class Encoder(
val outboundEnvelope = grab(in) val outboundEnvelope = grab(in)
val envelope = bufferPool.acquire() val envelope = bufferPool.acquire()
// FIXME: OMG race between setting the version, and using the table!!!!
headerBuilder setActorRefCompressionTableVersion compression.actorRefCompressionTableVersion
headerBuilder setClassManifestCompressionTableVersion compression.classManifestCompressionTableVersion
// internally compression is applied by the builder: // internally compression is applied by the builder:
outboundEnvelope.recipient match { outboundEnvelope.recipient match {
case OptionVal.Some(r) headerBuilder setRecipientActorRef r case OptionVal.Some(r) headerBuilder setRecipientActorRef r
@ -147,24 +151,26 @@ private[remote] class Decoder(
val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef(originUid) match { val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef(originUid) match {
case OptionVal.Some(ref) case OptionVal.Some(ref)
OptionVal(ref.asInstanceOf[InternalActorRef]) OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined
// `get` on Path is safe because it surely is not a compressed value here
resolveRecipient(headerBuilder.recipientActorRefPath.get) resolveRecipient(headerBuilder.recipientActorRefPath.get)
case _
OptionVal.None
} }
val sender: InternalActorRef = headerBuilder.senderActorRef(originUid) match { val sender: OptionVal[InternalActorRef] = headerBuilder.senderActorRef(originUid) match {
case OptionVal.Some(ref) case OptionVal.Some(ref)
ref.asInstanceOf[InternalActorRef] OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None case OptionVal.None if headerBuilder.senderActorRefPath.isDefined
// `get` on Path is safe because it surely is not a compressed value here OptionVal(resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get))
resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get) case _
OptionVal.None
} }
// --- hit refs and manifests for heavy-hitter counting // --- hit refs and manifests for heavy-hitter counting
association match { association match {
case OptionVal.Some(assoc) case OptionVal.Some(assoc)
val remoteAddress = assoc.remoteAddress val remoteAddress = assoc.remoteAddress
compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender) if (sender.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender.get)
if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get) if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get)
compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid)) compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid))
case _ case _
@ -181,7 +187,7 @@ private[remote] class Decoder(
recipient, recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here? localAddress, // FIXME: Is this needed anymore? What should we do here?
deserializedMessage, deserializedMessage,
OptionVal.Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead sender, // FIXME: No need for an option, decode simply to deadLetters instead
originUid, originUid,
association) association)

View file

@ -3,8 +3,8 @@
*/ */
package akka.remote.artery package akka.remote.artery
import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef } import akka.actor.{ ActorRef, Address }
import akka.remote.artery.compress.CompressionTable import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, OutboundCompressions }
import akka.util.OptionVal import akka.util.OptionVal
/** /**
@ -12,11 +12,10 @@ import akka.util.OptionVal
* *
* Literarily, no compression! * Literarily, no compression!
*/ */
final class NoInboundCompressions(system: ActorSystem) extends InboundCompressions { case object NoInboundCompressions extends InboundCompressions {
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = ()
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] =
if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1") if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
else if (idx == 0) OptionVal.Some(system.deadLetters) // special case deadLetters
else OptionVal.None else OptionVal.None
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = ()
@ -30,10 +29,12 @@ final class NoInboundCompressions(system: ActorSystem) extends InboundCompressio
* *
* Literarily, no compression! * Literarily, no compression!
*/ */
object NoOutboundCompressions extends OutboundCompressions { case object NoOutboundCompressions extends OutboundCompressions {
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = () override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ()
override def actorRefCompressionTableVersion: Int = 0
override def compressActorRef(ref: ActorRef): Int = -1 override def compressActorRef(ref: ActorRef): Int = -1
override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = () override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ()
override def classManifestCompressionTableVersion: Int = 0
override def compressClassManifest(manifest: String): Int = -1 override def compressClassManifest(manifest: String): Int = -1
} }

View file

@ -9,24 +9,25 @@ import java.util.function.LongFunction
import akka.actor.{ ActorRef, ActorSystem, Address } import akka.actor.{ ActorRef, ActorSystem, Address }
import akka.remote.artery._ import akka.remote.artery._
import akka.util.OptionVal import akka.util.OptionVal
import akka.remote.artery.OutboundCompressions
import org.agrona.collections.Long2ObjectHashMap import org.agrona.collections.Long2ObjectHashMap
/** INTERNAL API */ /** INTERNAL API */
private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompressions { private[remote] final class OutboundCompressionsImpl(system: ActorSystem, remoteAddress: Address) extends OutboundCompressions {
private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress) private val actorRefsOut = new OutboundActorRefCompression(system, remoteAddress)
private val classManifestsOut = new OutboundCompressionTable[String](system, remoteAddress) private val classManifestsOut = new OutboundClassManifestCompression(system, remoteAddress)
// actor ref compression --- // actor ref compression ---
override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref) override def compressActorRef(ref: ActorRef): Int = actorRefsOut.compress(ref)
override def actorRefCompressionTableVersion: Int = actorRefsOut.activeCompressionTableVersion
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit =
actorRefsOut.flipTable(table) actorRefsOut.flipTable(table)
// class manifest compression --- // class manifest compression ---
override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest) override def compressClassManifest(manifest: String): Int = classManifestsOut.compress(manifest)
override def classManifestCompressionTableVersion: Int = classManifestsOut.activeCompressionTableVersion
override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit =
classManifestsOut.flipTable(table) classManifestsOut.flipTable(table)
} }
@ -42,10 +43,9 @@ private[remote] final class InboundCompressionsImpl(
) extends InboundCompressions { ) extends InboundCompressions {
private val settings = CompressionSettings(system) private val settings = CompressionSettings(system)
private val localAddress = inboundContext.localAddress
// FIXME we also must remove the ones that won't be used anymore - when quarantine triggers // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers
private[this] val _actorRefsIn = new Long2ObjectHashMap[InboundActorRefCompression]() private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]()
private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] {
override def apply(originUid: Long): InboundActorRefCompression = { override def apply(originUid: Long): InboundActorRefCompression = {
val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max) val actorRefHitters = new TopHeavyHitters[ActorRef](settings.actorRefs.max)
@ -53,9 +53,9 @@ private[remote] final class InboundCompressionsImpl(
} }
} }
private def actorRefsIn(originUid: Long): InboundActorRefCompression = private def actorRefsIn(originUid: Long): InboundActorRefCompression =
_actorRefsIn.computeIfAbsent(originUid, createInboundActorRefsForOrigin) _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin)
private[this] val _classManifestsIn = new Long2ObjectHashMap[InboundManifestCompression]() private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]()
private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] {
override def apply(originUid: Long): InboundManifestCompression = { override def apply(originUid: Long): InboundManifestCompression = {
val manifestHitters = new TopHeavyHitters[String](settings.manifests.max) val manifestHitters = new TopHeavyHitters[String](settings.manifests.max)
@ -63,12 +63,13 @@ private[remote] final class InboundCompressionsImpl(
} }
} }
private def classManifestsIn(originUid: Long): InboundManifestCompression = private def classManifestsIn(originUid: Long): InboundManifestCompression =
_classManifestsIn.computeIfAbsent(originUid, createInboundManifestsForOrigin) _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin)
// actor ref compression --- // actor ref compression ---
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = {
actorRefsIn(originUid).decompress(tableVersion, idx) actorRefsIn(originUid).decompress(tableVersion, idx)
}
override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef): Unit = { override def hitActorRef(originUid: Long, tableVersion: Int, address: Address, ref: ActorRef): Unit = {
actorRefsIn(originUid).increment(address, ref, 1L) actorRefsIn(originUid).increment(address, ref, 1L)
} }
@ -80,20 +81,18 @@ private[remote] final class InboundCompressionsImpl(
override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String): Unit = { override def hitClassManifest(originUid: Long, tableVersion: Int, address: Address, manifest: String): Unit = {
classManifestsIn(originUid).increment(address, manifest, 1L) classManifestsIn(originUid).increment(address, manifest, 1L)
} }
// testing utilities ---
/** INTERNAL API: for testing only */
private[remote] def runNextActorRefAdvertisement() = {
import scala.collection.JavaConverters._
_actorRefsIns.values().asScala.foreach { inbound inbound.runNextTableAdvertisement() }
} }
object NoopInboundCompressions extends InboundCompressions { /** INTERNAL API: for testing only */
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () private[remote] def runNextClassManifestAdvertisement() = {
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal.None import scala.collection.JavaConverters._
_classManifestsIns.values().asScala.foreach { inbound inbound.runNextTableAdvertisement() }
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = ()
override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal.None
} }
object NoopOutboundCompressions extends OutboundCompressions {
override def compressActorRef(ref: ActorRef): Int = -1
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ()
override def compressClassManifest(manifest: String): Int = -1
override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ()
} }

View file

@ -1,11 +0,0 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery.compress
import akka.actor.Address
/** Callback invoked when a compression id allocation should be advertised to the remote actor system. */
trait AdvertiseCompressionId[T] {
def apply(remoteAddress: Address, ref: T, id: Int): Unit
}

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery.compress
import akka.actor.{ ActorRef, Address }
import akka.util.OptionVal
/**
* INTERNAL API
* Decompress and cause compression advertisements.
*
* One per inbound message stream thus must demux by originUid to use the right tables.
*/
private[remote] trait InboundCompressions {
def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit
def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef]
def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit
def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String]
}
/**
* INTERNAL API
* Compress outgoing data and handle compression advertisements to fill compression table.
*
* One per outgoing message stream.
*/
private[remote] trait OutboundCompressions {
def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit
def actorRefCompressionTableVersion: Int
def compressActorRef(ref: ActorRef): Int
def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit
def classManifestCompressionTableVersion: Int
def compressClassManifest(manifest: String): Int
}

View file

@ -5,7 +5,7 @@
package akka.remote.artery.compress package akka.remote.artery.compress
/** INTERNAL API: Versioned compression table to be advertised between systems */ /** INTERNAL API: Versioned compression table to be advertised between systems */
private[akka] final case class CompressionTable[T](version: Long, map: Map[T, Int]) { private[akka] final case class CompressionTable[T](version: Int, map: Map[T, Int]) {
def invert: DecompressionTable[T] = def invert: DecompressionTable[T] =
if (map.isEmpty) DecompressionTable.empty[T].copy(version = version) if (map.isEmpty) DecompressionTable.empty[T].copy(version = version)

View file

@ -5,8 +5,16 @@
package akka.remote.artery.compress package akka.remote.artery.compress
/** INTERNAL API */ /** INTERNAL API */
private[remote] final case class DecompressionTable[T](version: Long, table: Array[T]) { private[remote] final case class DecompressionTable[T](version: Int, table: Array[T]) {
def get(idx: Int): T = table(idx) // TODO version maybe better as Long? // OR implement roll-over
private[this] val length = table.length
def get(idx: Int): T = {
if (idx >= length)
throw new IllegalArgumentException(s"Attempted decompression of unknown id: [$idx]! " +
s"Only $length ids allocated in table version [$version].")
table(idx)
}
def invert: CompressionTable[T] = def invert: CompressionTable[T] =
CompressionTable(version, Map(table.zipWithIndex: _*)) CompressionTable(version, Map(table.zipWithIndex: _*))
@ -16,7 +24,7 @@ private[remote] final case class DecompressionTable[T](version: Long, table: Arr
getClass.getName + getClass.getName +
s"(version: $version, " + s"(version: $version, " +
( (
if (table.length == 0) "[empty]" if (length == 0) "[empty]"
else s"table: [${table.zipWithIndex.map({ case (t, i) s"$i -> $t" }).mkString(",")}" else s"table: [${table.zipWithIndex.map({ case (t, i) s"$i -> $t" }).mkString(",")}"
) + "])" ) + "])"
} }

View file

@ -4,12 +4,9 @@
package akka.remote.artery.compress package akka.remote.artery.compress
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ ActorRef, ActorSystem, Address } import akka.actor.{ ActorRef, ActorSystem, Address }
import akka.event.Logging import akka.event.{ Logging, NoLogging }
import akka.remote.artery.{ InboundContext, OutboundContext } import akka.remote.artery.{ InboundContext, OutboundContext }
import akka.stream.impl.ConstantFun
import akka.util.{ OptionVal, PrettyDuration } import akka.util.{ OptionVal, PrettyDuration }
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
@ -27,7 +24,7 @@ private[remote] final class InboundActorRefCompression(
settings: CompressionSettings, settings: CompressionSettings,
originUid: Long, originUid: Long,
inboundContext: InboundContext, inboundContext: InboundContext,
heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters, _.path.toSerializationFormat) { heavyHitters: TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters) {
preAllocate(system.deadLetters) preAllocate(system.deadLetters)
@ -43,9 +40,9 @@ private[remote] final class InboundActorRefCompression(
scheduleNextTableAdvertisement() scheduleNextTableAdvertisement()
override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval override protected def tableAdvertisementInterval = settings.actorRefs.advertisementInterval
def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[ActorRef]): Unit = { override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[ActorRef]): Unit = {
log.debug(s"Advertise ActorRef compression [$table] to [${association.remoteAddress}]") log.debug(s"Advertise ActorRef compression [$table], from [${inboundContext.localAddress}] to [${outboundContext.remoteAddress}]")
association.sendControl(CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table)) outboundContext.sendControl(CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table))
} }
} }
@ -54,14 +51,16 @@ final class InboundManifestCompression(
settings: CompressionSettings, settings: CompressionSettings,
originUid: Long, originUid: Long,
inboundContext: InboundContext, inboundContext: InboundContext,
heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters, ConstantFun.scalaIdentityFunction) { heavyHitters: TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters) {
scheduleNextTableAdvertisement() scheduleNextTableAdvertisement()
override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval override protected def tableAdvertisementInterval = settings.manifests.advertisementInterval
override def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[String]): Unit = { override lazy val log = NoLogging
log.debug(s"Advertise ClassManifest compression [$table] to [${association.remoteAddress}]")
association.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table)) override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[String]): Unit = {
log.debug(s"Advertise ClassManifest compression [$table] to [${outboundContext.remoteAddress}]")
outboundContext.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table))
} }
} }
@ -74,10 +73,9 @@ private[remote] abstract class InboundCompression[T >: Null](
val settings: CompressionSettings, val settings: CompressionSettings,
originUid: Long, originUid: Long,
inboundContext: InboundContext, inboundContext: InboundContext,
val heavyHitters: TopHeavyHitters[T], val heavyHitters: TopHeavyHitters[T]) {
convertKeyToString: T String) { // TODO avoid converting to string, in order to use the ActorRef.hashCode!
val log = Logging(system, "InboundCompressionTable") lazy val log = Logging(system, getClass.getSimpleName)
// TODO atomic / state machine? the InbouncCompression could even extend ActomicReference[State]! // TODO atomic / state machine? the InbouncCompression could even extend ActomicReference[State]!
@ -91,7 +89,7 @@ private[remote] abstract class InboundCompression[T >: Null](
// 2 tables are used, one is "still in use", and the // 2 tables are used, one is "still in use", and the
@volatile private[this] var activeTable = DecompressionTable.empty[T] @volatile private[this] var activeTable = DecompressionTable.empty[T]
@volatile private[this] var nextTable = DecompressionTable.empty[T] @volatile private[this] var nextTable = DecompressionTable.empty[T].copy(version = 1)
// TODO calibrate properly (h/w have direct relation to preciseness and max capacity) // TODO calibrate properly (h/w have direct relation to preciseness and max capacity)
private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt) private[this] val cms = new CountMinSketch(100, 100, System.currentTimeMillis().toInt)
@ -106,25 +104,28 @@ private[remote] abstract class InboundCompression[T >: Null](
* @throws UnknownCompressedIdException if given id is not known, this may indicate a bug such situation should not happen. * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug such situation should not happen.
*/ */
// not tailrec because we allow special casing in sub-class, however recursion is always at most 1 level deep // not tailrec because we allow special casing in sub-class, however recursion is always at most 1 level deep
def decompress(tableVersion: Long, idx: Int): OptionVal[T] = { def decompress(incomingTableVersion: Long, idx: Int): OptionVal[T] = {
val activeVersion = activeTable.version // TODO move into state val activeVersion = activeTable.version
if (tableVersion == -1) OptionVal.None // no compression, bail out early if (incomingTableVersion == -1) OptionVal.None // no compression, bail out early
else if (tableVersion == activeVersion) { else if (incomingTableVersion == activeVersion) {
val value: T = activeTable.get(idx) val value: T = activeTable.get(idx)
if (settings.debug) log.debug(s"Decompress [{}] => {}", idx, value)
if (value != null) OptionVal.Some[T](value) if (value != null) OptionVal.Some[T](value)
else throw new UnknownCompressedIdException(idx) else throw new UnknownCompressedIdException(idx)
} else if (tableVersion < activeVersion) { } else if (incomingTableVersion < activeVersion) {
log.warning("Received value compressed with old table: [{}], current table version is: [{}]", tableVersion, activeVersion) log.warning("Received value compressed with old table: [{}], current table version is: [{}]", incomingTableVersion, activeVersion)
OptionVal.None OptionVal.None
} else if (tableVersion == nextTable.version) { } else if (incomingTableVersion == nextTable.version) {
flipTables() advertisementInProgress = false
decompress(tableVersion, idx) // recurse, activeTable will not be able to handle this log.debug("Received first value compressed using the next prepared compression table, flipping to it (version: {})", nextTable.version)
startUsingNextTable()
decompress(incomingTableVersion, idx) // recurse, activeTable will not be able to handle this
} else { } else {
// which means that incoming version was > nextTable.version, which likely is a bug // which means that incoming version was > nextTable.version, which likely is a bug
log.error("Inbound message is using compression table version higher than the highest allocated table on this node. " + log.error(
"This should not happen! State: activeTable: {}, nextTable, incoming tableVersion: {}", activeVersion, nextTable, tableVersion) "Inbound message is using compression table version higher than the highest allocated table on this node. " +
"This should not happen! State: activeTable: {}, nextTable: {}, incoming tableVersion: {}",
activeVersion, nextTable.version, incomingTableVersion)
OptionVal.None OptionVal.None
} }
@ -136,41 +137,11 @@ private[remote] abstract class InboundCompression[T >: Null](
*/ */
// TODO not so happy about passing around address here, but in incoming there's no other earlier place to get it? // TODO not so happy about passing around address here, but in incoming there's no other earlier place to get it?
def increment(remoteAddress: Address, value: T, n: Long): Unit = { def increment(remoteAddress: Address, value: T, n: Long): Unit = {
val key = convertKeyToString(value) val count = cms.addObjectAndEstimateCount(value, n)
if (shouldIgnore(key)) {
// ignore...
} else {
val count = cms.addAndEstimateCount(key, n)
// TODO optimise order of these, what is more expensive? // TODO optimise order of these, what is more expensive?
// TODO (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering. // TODO (now the `previous` is, but if aprox datatype there it would be faster)... Needs pondering.
val wasHeavyHitter = addAndCheckIfheavyHitterDetected(value, count) addAndCheckIfheavyHitterDetected(value, count)
if (wasHeavyHitter)
log.debug(s"Heavy hitter detected: {} [count: {}]", value, count)
// if (wasHeavyHitter && !wasCompressedPreviously(key)) {
// val idx = prepareCompressionAdvertisement()
// log.debug("Allocated compression id [" + idx + "] for [" + value + "], in association with [" + remoteAddress + "]")
// }
}
}
private def shouldIgnore(key: String) = { // TODO this is hacky, if we'd do this we trigger compression too early (before association exists, so control messages fail)
key match {
case null true
case "" true // empty class manifest for example
case _ key.endsWith("/")
}
}
// TODO this must be optimised, we really don't want to scan the entire key-set each time to make sure
private def wasCompressedPreviously(key: String): Boolean = {
var i = 0
val len = activeTable.table.length
while (i < len) {
if (activeTable.table(i) == key) return true
i += 1
}
false
} }
/** Mutates heavy hitters */ /** Mutates heavy hitters */
@ -186,10 +157,8 @@ private[remote] abstract class InboundCompression[T >: Null](
* INTERNAL / TESTING API * INTERNAL / TESTING API
* Used for manually triggering when a compression table should be advertised. * Used for manually triggering when a compression table should be advertised.
* Note that most likely you'd want to set the advertisment-interval to `0` when using this. * Note that most likely you'd want to set the advertisment-interval to `0` when using this.
*
* TODO: Technically this would be solvable by a "triggerable" scheduler.
*/ */
private[remote] def triggerNextTableAdvertisement(): Unit = // TODO expose and use in tests private[remote] def triggerNextTableAdvertisement(): Unit = // TODO use this in tests for triggering
runNextTableAdvertisement() runNextTableAdvertisement()
def scheduleNextTableAdvertisement(): Unit = def scheduleNextTableAdvertisement(): Unit =
@ -200,9 +169,9 @@ private[remote] abstract class InboundCompression[T >: Null](
log.debug("Scheduled {} advertisement in [{}] from now...", getClass.getSimpleName, PrettyDuration.format(tableAdvertisementInterval, includeNanos = false, 1)) log.debug("Scheduled {} advertisement in [{}] from now...", getClass.getSimpleName, PrettyDuration.format(tableAdvertisementInterval, includeNanos = false, 1))
} catch { } catch {
case ex: IllegalStateException case ex: IllegalStateException
log.warning("Unable to schedule {} advertisement, " + // this is usually harmless
"likely system is shutting down. " + log.debug("Unable to schedule {} advertisement, " +
"Reason: {}", getClass.getName, ex.getMessage) "likely system is shutting down. Reason: {}", getClass.getName, ex.getMessage)
} }
case _ // ignore... case _ // ignore...
} }
@ -213,6 +182,9 @@ private[remote] abstract class InboundCompression[T >: Null](
finally scheduleNextTableAdvertisement() finally scheduleNextTableAdvertisement()
} }
// FIXME use AtomicBoolean instead?
@volatile private[this] var advertisementInProgress = false
/** /**
* Entry point to advertising a new compression table. * Entry point to advertising a new compression table.
* *
@ -223,9 +195,11 @@ private[remote] abstract class InboundCompression[T >: Null](
* It must be advertised to the other side so it can start using it in its outgoing compression. * It must be advertised to the other side so it can start using it in its outgoing compression.
* Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing. * Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing.
*/ */
def runNextTableAdvertisement() = { // TODO guard against re-entrancy? private[remote] def runNextTableAdvertisement() =
if (!advertisementInProgress)
inboundContext.association(originUid) match { inboundContext.association(originUid) match {
case OptionVal.Some(association) case OptionVal.Some(association)
advertisementInProgress = true
val table = prepareCompressionAdvertisement() val table = prepareCompressionAdvertisement()
nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster? nextTable = table.invert // TODO expensive, check if building the other way wouldn't be faster?
advertiseCompressionTable(association, table) advertiseCompressionTable(association, table)
@ -235,7 +209,6 @@ private[remote] abstract class InboundCompression[T >: Null](
// so we don't build the table since we would not be able to send it anyway. // so we don't build the table since we would not be able to send it anyway.
log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid) log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid)
} }
}
/** /**
* Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]] * Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]]
@ -244,7 +217,7 @@ private[remote] abstract class InboundCompression[T >: Null](
protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit
/** Drop `activeTable` and start using the `nextTable` in its place. */ /** Drop `activeTable` and start using the `nextTable` in its place. */
private def flipTables(): Unit = { private def startUsingNextTable(): Unit = {
log.debug("Swaping active decompression table to version {}.", nextTable.version) log.debug("Swaping active decompression table to version {}.", nextTable.version)
activeTable = nextTable activeTable = nextTable
nextTable = DecompressionTable.empty nextTable = DecompressionTable.empty
@ -261,11 +234,6 @@ private[remote] abstract class InboundCompression[T >: Null](
} }
final class ExistingcompressedIdReuseAttemptException(id: Long, value: Any)
extends RuntimeException(
s"Attempted to re-allocate compressedId [$id] which is still in use for compressing [$value]! " +
s"This should never happen and is likely an implementation bug.")
final class UnknownCompressedIdException(id: Long) final class UnknownCompressedIdException(id: Long)
extends RuntimeException( extends RuntimeException(
s"Attempted de-compress unknown id [$id]! " + s"Attempted de-compress unknown id [$id]! " +

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.{ util ju } import java.{ util ju }
import akka.actor.{ ActorRef, ActorSystem, Address } import akka.actor.{ ActorRef, ActorSystem, Address }
import akka.event.Logging import akka.event.{ Logging, LoggingAdapter }
import akka.remote.artery.compress.OutboundCompression.OutboundCompressionState import akka.remote.artery.compress.OutboundCompression.OutboundCompressionState
import scala.annotation.tailrec import scala.annotation.tailrec
@ -20,9 +20,14 @@ private[remote] final class OutboundActorRefCompression(system: ActorSystem, rem
flipTable(CompressionTable( flipTable(CompressionTable(
version = 0, version = 0,
map = Map( map = Map(
system.deadLetters 0 system.deadLetters 0)))
) }
))
/** INTERNAL API */
private[remote] final class OutboundClassManifestCompression(system: ActorSystem, remoteAddress: Address)
extends OutboundCompressionTable[String](system, remoteAddress) {
flipTable(CompressionTable(version = 0, Map.empty))
} }
/** /**
@ -36,7 +41,13 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd
// TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel // TODO: The compression map may benefit from padding if we want multiple compressions to be running in parallel
private[this] val log = Logging(system, "OutboundCompressionTable") protected val log: LoggingAdapter = Logging(system, Logging.simpleName(getClass))
// TODO this exposes us to a race between setting the Version and USING the table...?
def activeCompressionTableVersion = {
val version = get.version
version
}
/** /**
* Flips the currently used compression table to the new one (iff the new one has a version number higher than the currently used one). * Flips the currently used compression table to the new one (iff the new one has a version number higher than the currently used one).
@ -44,16 +55,16 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd
// (°° ┻━┻ // (°° ┻━┻
@tailrec final def flipTable(activate: CompressionTable[T]): Unit = { @tailrec final def flipTable(activate: CompressionTable[T]): Unit = {
val state = get() val state = get()
if (state.version < activate.version) // TODO or we could demand it to be strictly `currentVersion + 1` if (activate.version > state.version) // TODO this should handle roll-over as we move to Byte
if (compareAndSet(state, prepareState(activate))) if (compareAndSet(state, prepareState(activate)))
log.debug("Successfully flipped compression table to version {}, for ourgoing connection to {}", activate.version, remoteAddress) log.debug(s"Successfully flipped compression table versions {}=>{}, for outgoing to [{}]", state.version, activate.version, remoteAddress)
else else
flipTable(activate) // retry flipTable(activate) // retry
else if (state.version == activate.version) else if (state.version == activate.version)
log.warning("Received duplicate compression table (version: {})! Ignoring it.", state.version) log.warning("Received duplicate compression table (version: {})! Ignoring it.", state.version)
else else
log.error("Received unexpected compression table with version nr [{}]! " + log.error("Received unexpected compression table with version nr [{}]! " +
"Current version number is []") "Current version number is [{}].", activate.version, state.version)
} }
@ -66,11 +77,8 @@ private[remote] class OutboundCompressionTable[T](system: ActorSystem, remoteAdd
// load factor is `1` since we will never grow this table beyond the initial size, // load factor is `1` since we will never grow this table beyond the initial size,
// this way we can avoid any rehashing from happening. // this way we can avoid any rehashing from happening.
val m = new ju.HashMap[T, Integer](size, 1.0f) // TODO could be replaced with primitive `int` specialized version val m = new ju.HashMap[T, Integer](size, 1.0f) // TODO could be replaced with primitive `int` specialized version
val it = activate.map.keysIterator activate.map.foreach {
var i = 0 case (key, value) m.put(key, value) // TODO boxing :<
while (it.hasNext) {
m.put(it.next(), i) // TODO boxing :<
i += 1
} }
OutboundCompressionState(activate.version, m) OutboundCompressionState(activate.version, m)
} }
@ -98,7 +106,7 @@ private[remote] object OutboundCompression {
// format: ON // format: ON
/** INTERNAL API */ /** INTERNAL API */
private[remote] final case class OutboundCompressionState[T](version: Long, table: ju.Map[T, Integer]) private[remote] final case class OutboundCompressionState[T](version: Int, table: ju.Map[T, Integer])
private[remote] object OutboundCompressionState { private[remote] object OutboundCompressionState {
def initial[T] = OutboundCompressionState[T](-1, ju.Collections.emptyMap()) def initial[T] = OutboundCompressionState[T](-1, ju.Collections.emptyMap())
} }

View file

@ -1,9 +1,13 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery package akka.remote.artery
import java.nio.{ ByteBuffer, ByteOrder } import java.nio.{ ByteBuffer, ByteOrder }
import akka.actor._ import akka.actor._
import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils } import akka.remote.artery.compress.{ CompressionTable, CompressionTestUtils, InboundCompressions, OutboundCompressions }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.util.{ ByteString, OptionVal } import akka.util.{ ByteString, OptionVal }
@ -28,11 +32,13 @@ class EnvelopeBufferSpec extends AkkaSpec {
val idxToManifest = manifestToIdx.map(_.swap) val idxToManifest = manifestToIdx.map(_.swap)
override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests override def applyActorRefCompressionTable(table: CompressionTable[ActorRef]): Unit = ??? // dynamic allocating not needed in these tests
override def actorRefCompressionTableVersion: Int = 0
override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1) override def compressActorRef(ref: ActorRef): Int = refToIdx.getOrElse(ref, -1)
override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = () override def hitActorRef(originUid: Long, tableVersion: Int, remote: Address, ref: ActorRef): Unit = ()
override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx)) override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx))
override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests override def applyClassManifestCompressionTable(table: CompressionTable[String]): Unit = ??? // dynamic allocating not needed in these tests
override def classManifestCompressionTableVersion: Int = 0
override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1) override def compressClassManifest(manifest: String): Int = manifestToIdx.getOrElse(manifest, -1)
override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = () override def hitClassManifest(originUid: Long, tableVersion: Int, remote: Address, manifest: String): Unit = ()
override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx)) override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx))

View file

@ -1,3 +1,7 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery package akka.remote.artery
import java.io.{ File, IOException, RandomAccessFile } import java.io.{ File, IOException, RandomAccessFile }

View file

@ -9,6 +9,9 @@ import akka.remote.artery.compress.CompressionProtocol.Events
import akka.testkit._ import akka.testkit._
import akka.util.Timeout import akka.util.Timeout
import akka.pattern.ask import akka.pattern.ask
import akka.remote.RARP
import akka.remote.artery.ArteryTransport
import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedCompressionTable }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
@ -33,7 +36,8 @@ object HandshakeShouldDropCompressionTableSpec {
enabled = on enabled = on
actor-refs { actor-refs {
enabled = on enabled = on
advertisement-interval = 3 seconds # we'll trigger advertisement manually
advertisement-interval = 10 hours
} }
} }
} }
@ -57,57 +61,77 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
} }
"Outgoing compression table" must { "Outgoing compression table" must {
// FIXME this is failing, we must rethink how tables are identified and updated
"be dropped on system restart" in { "be dropped on system restart" in {
val messagesToExchange = 10 val messagesToExchange = 10
val systemATransport = RARP(system).provider.transport.asInstanceOf[ArteryTransport]
def systemBTransport = RARP(systemB).provider.transport.asInstanceOf[ArteryTransport]
// listen for compression table events // listen for compression table events
val aProbe = TestProbe() val aProbe = TestProbe()
val a1Probe = TestProbe() val a1Probe = TestProbe()
val b1Probe = TestProbe()(systemB) val b1Probe = TestProbe()(systemB)
system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event]) system.eventStream.subscribe(aProbe.ref, classOf[Event])
systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event]) systemB.eventStream.subscribe(b1Probe.ref, classOf[Event])
def voidSel = system.actorSelection(s"artery://systemB@localhost:$portB/user/void") def echoSel = system.actorSelection(s"artery://systemB@localhost:$portB/user/echo")
systemB.actorOf(TestActors.blackholeProps, "void") systemB.actorOf(TestActors.echoActorProps, "echo")
// cause testActor-1 to become a heavy hitter // cause testActor-1 to become a heavy hitter
(1 to messagesToExchange).foreach { i voidSel ! "hello" } // does not reply, but a hot receiver should be advertised (1 to messagesToExchange).foreach { i echoSel ! s"hello-$i" } // does not reply, but a hot receiver should be advertised
// give it enough time to advertise first table waitForEcho(this, s"hello-$messagesToExchange")
val a0 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a0 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
info("System [A] received: " + a0) info("System [A] received: " + a0)
assertCompression[ActorRef](a0.table, 1, _.toString should include(testActor.path.name)) a0.table.map.keySet should contain(testActor)
// cause a1Probe to become a heavy hitter (we want to not have it in the 2nd compression table later) // cause a1Probe to become a heavy hitter (we want to not have it in the 2nd compression table later)
(1 to messagesToExchange).foreach { i voidSel.tell("hello", a1Probe.ref) } // does not reply, but a hot receiver should be advertised (1 to messagesToExchange).foreach { i echoSel.tell(s"hello-$i", a1Probe.ref) }
// give it enough time to advertise first table waitForEcho(a1Probe, s"hello-$messagesToExchange")
val a1 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a1 = aProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
info("System [A] received: " + a1) info("System [A] received: " + a1)
assertCompression[ActorRef](a1.table, 2, _.toString should include(a1Probe.ref.path.name)) a1.table.map.keySet should contain(a1Probe.ref)
log.warning("SHUTTING DOWN system {}...", systemB) log.warning("SHUTTING DOWN system {}...", systemB)
shutdown(systemB) shutdown(systemB)
systemB = ActorSystem("systemB", configB) systemB = ActorSystem("systemB", configB)
Thread.sleep(5000) Thread.sleep(1000)
log.warning("SYSTEM READY {}...", systemB) log.warning("SYSTEM READY {}...", systemB)
val aNewProbe = TestProbe() val aNewProbe = TestProbe()
system.eventStream.subscribe(aNewProbe.ref, classOf[CompressionProtocol.Events.Event]) system.eventStream.subscribe(aNewProbe.ref, classOf[Event])
systemB.actorOf(TestActors.blackholeProps, "void") // start it again systemB.actorOf(TestActors.echoActorProps, "echo") // start it again
(1 to messagesToExchange).foreach { i voidSel ! "hello" } // does not reply, but a hot receiver should be advertised (1 to 5) foreach { _
// compression triggered again // since some messages may end up being lost
val a2 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) (1 to messagesToExchange).foreach { i echoSel ! s"hello-$i" } // does not reply, but a hot receiver should be advertised
Thread.sleep(100)
}
waitForEcho(this, s"hello-$messagesToExchange", max = 10.seconds)
systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a2 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
info("System [A] received: " + a2) info("System [A] received: " + a2)
assertCompression[ActorRef](a2.table, 1, _.toString should include(testActor.path.name)) a2.table.map.keySet should contain(testActor)
val aNew2Probe = TestProbe() val aNew2Probe = TestProbe()
(1 to messagesToExchange).foreach { i voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised (1 to messagesToExchange).foreach { i echoSel.tell(s"hello-$i", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised
// compression triggered again waitForEcho(aNew2Probe, s"hello-$messagesToExchange")
val a3 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) systemBTransport.triggerCompressionAdvertisements(actorRef = true, manifest = false)
val a3 = aNewProbe.expectMsgType[ReceivedCompressionTable[ActorRef]](10.seconds)
info("Received second compression: " + a3) info("Received second compression: " + a3)
assertCompression[ActorRef](a3.table, 2, _.toString should include(aNew2Probe.ref.path.name)) a3.table.map.keySet should contain(aNew2Probe.ref)
}
} }
def waitForEcho(probe: TestKit, m: String, max: Duration = 3.seconds): Any =
probe.fishForMessage(max = max, hint = s"waiting for '$m'") {
case `m` true
case x false
} }
def identify(_system: String, port: Int, name: String) = { def identify(_system: String, port: Int, name: String) = {