diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala index 3f3930232e..561ed83751 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/InvertCompressionTableBenchmark.scala @@ -13,14 +13,14 @@ import org.openjdk.jmh.annotations._ class InvertCompressionTableBenchmark { /* - TODO: Possibly specialise the inversion, it's not in hot path so not doing it for now + TODO: Possibly specialise the inversion, it's not in hot path so not doing it for now a.r.artery.compress.CompressionTableBenchmark.invert_comp_to_decomp_1024 N/A thrpt 20 5828.963 ± 281.631 ops/s a.r.artery.compress.CompressionTableBenchmark.invert_comp_to_decomp_256 N/A thrpt 20 29040.889 ± 345.425 ops/s */ def randomName = ThreadLocalRandom.current().nextInt(1000).toString - val compTable_256 = CompressionTable(2, Map(Vector.fill[String](256)(randomName).zipWithIndex: _*)) - val compTable_1024 = CompressionTable(3, Map(Vector.fill[String](1024)(randomName).zipWithIndex: _*)) + val compTable_256 = CompressionTable(17L, 2, Map(Vector.fill[String](256)(randomName).zipWithIndex: _*)) + val compTable_1024 = CompressionTable(17L, 3, Map(Vector.fill[String](1024)(randomName).zipWithIndex: _*)) @Benchmark def invert_comp_to_decomp_256 = compTable_256.invert @Benchmark def invert_comp_to_decomp_1024 = compTable_1024.invert diff --git a/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala b/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala index cc89de28d9..f3325d4996 100644 --- a/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala +++ b/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala @@ -22,7 +22,7 @@ class LruBoundedCacheBench { @Param(Array("128", "256")) var stringSize = 0 - var lruCache: LruBoundedCache[String, String] = _ + private var lruCache: LruBoundedCache[String, String] = _ @Param(Array("90", "99")) var loadFactor: Int = _ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 6e2b29184d..9966befdaa 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -139,7 +139,7 @@ object MaxThroughputSpec extends MultiNodeConfig { def waitingForCompression: Receive = { case ReceivedActorRefCompressionTable(_, table) ⇒ - if (table.map.contains(target)) { + if (table.dictionary.contains(target)) { sendBatch() // first some warmup target ! Start // then Start, which will echo back here context.setReceiveTimeout(Duration.Undefined) diff --git a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java index 516c71356a..eb176e91ba 100644 --- a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java +++ b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java @@ -1297,19 +1297,29 @@ public final class ArteryControlFormats { */ akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder(); - // required uint32 tableVersion = 2; + // required uint64 originUid = 2; /** - * required uint32 tableVersion = 2; + * required uint64 originUid = 2; + */ + boolean hasOriginUid(); + /** + * required uint64 originUid = 2; + */ + long getOriginUid(); + + // required uint32 tableVersion = 3; + /** + * required uint32 tableVersion = 3; */ boolean hasTableVersion(); /** - * required uint32 tableVersion = 2; + * required uint32 tableVersion = 3; */ int getTableVersion(); - // repeated string keys = 3; + // repeated string keys = 4; /** - * repeated string keys = 3; + * repeated string keys = 4; * *
      * actual Map is represented by separate sequences of keys and values,
@@ -1320,7 +1330,7 @@ public final class ArteryControlFormats {
     java.util.List
     getKeysList();
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1330,7 +1340,7 @@ public final class ArteryControlFormats {
      */
     int getKeysCount();
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1340,7 +1350,7 @@ public final class ArteryControlFormats {
      */
     java.lang.String getKeys(int index);
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1351,17 +1361,17 @@ public final class ArteryControlFormats {
     akka.protobuf.ByteString
         getKeysBytes(int index);
 
-    // repeated uint32 values = 4;
+    // repeated uint32 values = 5;
     /**
-     * repeated uint32 values = 4;
+     * repeated uint32 values = 5;
      */
     java.util.List getValuesList();
     /**
-     * repeated uint32 values = 4;
+     * repeated uint32 values = 5;
      */
     int getValuesCount();
     /**
-     * repeated uint32 values = 4;
+     * repeated uint32 values = 5;
      */
     int getValues(int index);
   }
@@ -1436,31 +1446,36 @@ public final class ArteryControlFormats {
             }
             case 16: {
               bitField0_ |= 0x00000002;
+              originUid_ = input.readUInt64();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
               tableVersion_ = input.readUInt32();
               break;
             }
-            case 26: {
-              if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+            case 34: {
+              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
                 keys_ = new akka.protobuf.LazyStringArrayList();
-                mutable_bitField0_ |= 0x00000004;
+                mutable_bitField0_ |= 0x00000008;
               }
               keys_.add(input.readBytes());
               break;
             }
-            case 32: {
-              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+            case 40: {
+              if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
                 values_ = new java.util.ArrayList();
-                mutable_bitField0_ |= 0x00000008;
+                mutable_bitField0_ |= 0x00000010;
               }
               values_.add(input.readUInt32());
               break;
             }
-            case 34: {
+            case 42: {
               int length = input.readRawVarint32();
               int limit = input.pushLimit(length);
-              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008) && input.getBytesUntilLimit() > 0) {
+              if (!((mutable_bitField0_ & 0x00000010) == 0x00000010) && input.getBytesUntilLimit() > 0) {
                 values_ = new java.util.ArrayList();
-                mutable_bitField0_ |= 0x00000008;
+                mutable_bitField0_ |= 0x00000010;
               }
               while (input.getBytesUntilLimit() > 0) {
                 values_.add(input.readUInt32());
@@ -1476,10 +1491,10 @@ public final class ArteryControlFormats {
         throw new akka.protobuf.InvalidProtocolBufferException(
             e.getMessage()).setUnfinishedMessage(this);
       } finally {
-        if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
+        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
           keys_ = new akka.protobuf.UnmodifiableLazyStringList(keys_);
         }
-        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+        if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
           values_ = java.util.Collections.unmodifiableList(values_);
         }
         this.unknownFields = unknownFields.build();
@@ -1536,27 +1551,43 @@ public final class ArteryControlFormats {
       return from_;
     }
 
-    // required uint32 tableVersion = 2;
-    public static final int TABLEVERSION_FIELD_NUMBER = 2;
-    private int tableVersion_;
+    // required uint64 originUid = 2;
+    public static final int ORIGINUID_FIELD_NUMBER = 2;
+    private long originUid_;
     /**
-     * required uint32 tableVersion = 2;
+     * required uint64 originUid = 2;
      */
-    public boolean hasTableVersion() {
+    public boolean hasOriginUid() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
-     * required uint32 tableVersion = 2;
+     * required uint64 originUid = 2;
+     */
+    public long getOriginUid() {
+      return originUid_;
+    }
+
+    // required uint32 tableVersion = 3;
+    public static final int TABLEVERSION_FIELD_NUMBER = 3;
+    private int tableVersion_;
+    /**
+     * required uint32 tableVersion = 3;
+     */
+    public boolean hasTableVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * required uint32 tableVersion = 3;
      */
     public int getTableVersion() {
       return tableVersion_;
     }
 
-    // repeated string keys = 3;
-    public static final int KEYS_FIELD_NUMBER = 3;
+    // repeated string keys = 4;
+    public static final int KEYS_FIELD_NUMBER = 4;
     private akka.protobuf.LazyStringList keys_;
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1569,7 +1600,7 @@ public final class ArteryControlFormats {
       return keys_;
     }
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1581,7 +1612,7 @@ public final class ArteryControlFormats {
       return keys_.size();
     }
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1593,7 +1624,7 @@ public final class ArteryControlFormats {
       return keys_.get(index);
     }
     /**
-     * repeated string keys = 3;
+     * repeated string keys = 4;
      *
      * 
      * actual Map is represented by separate sequences of keys and values,
@@ -1606,24 +1637,24 @@ public final class ArteryControlFormats {
       return keys_.getByteString(index);
     }
 
-    // repeated uint32 values = 4;
-    public static final int VALUES_FIELD_NUMBER = 4;
+    // repeated uint32 values = 5;
+    public static final int VALUES_FIELD_NUMBER = 5;
     private java.util.List values_;
     /**
-     * repeated uint32 values = 4;
+     * repeated uint32 values = 5;
      */
     public java.util.List
         getValuesList() {
       return values_;
     }
     /**
-     * repeated uint32 values = 4;
+     * repeated uint32 values = 5;
      */
     public int getValuesCount() {
       return values_.size();
     }
     /**
-     * repeated uint32 values = 4;
+     * repeated uint32 values = 5;
      */
     public int getValues(int index) {
       return values_.get(index);
@@ -1631,6 +1662,7 @@ public final class ArteryControlFormats {
 
     private void initFields() {
       from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance();
+      originUid_ = 0L;
       tableVersion_ = 0;
       keys_ = akka.protobuf.LazyStringArrayList.EMPTY;
       values_ = java.util.Collections.emptyList();
@@ -1644,6 +1676,10 @@ public final class ArteryControlFormats {
         memoizedIsInitialized = 0;
         return false;
       }
+      if (!hasOriginUid()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
       if (!hasTableVersion()) {
         memoizedIsInitialized = 0;
         return false;
@@ -1663,13 +1699,16 @@ public final class ArteryControlFormats {
         output.writeMessage(1, from_);
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeUInt32(2, tableVersion_);
+        output.writeUInt64(2, originUid_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeUInt32(3, tableVersion_);
       }
       for (int i = 0; i < keys_.size(); i++) {
-        output.writeBytes(3, keys_.getByteString(i));
+        output.writeBytes(4, keys_.getByteString(i));
       }
       for (int i = 0; i < values_.size(); i++) {
-        output.writeUInt32(4, values_.get(i));
+        output.writeUInt32(5, values_.get(i));
       }
       getUnknownFields().writeTo(output);
     }
@@ -1686,7 +1725,11 @@ public final class ArteryControlFormats {
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += akka.protobuf.CodedOutputStream
-          .computeUInt32Size(2, tableVersion_);
+          .computeUInt64Size(2, originUid_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += akka.protobuf.CodedOutputStream
+          .computeUInt32Size(3, tableVersion_);
       }
       {
         int dataSize = 0;
@@ -1834,12 +1877,14 @@ public final class ArteryControlFormats {
           fromBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000001);
-        tableVersion_ = 0;
+        originUid_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
-        keys_ = akka.protobuf.LazyStringArrayList.EMPTY;
+        tableVersion_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
-        values_ = java.util.Collections.emptyList();
+        keys_ = akka.protobuf.LazyStringArrayList.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000008);
+        values_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -1879,16 +1924,20 @@ public final class ArteryControlFormats {
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
+        result.originUid_ = originUid_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
         result.tableVersion_ = tableVersion_;
-        if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        if (((bitField0_ & 0x00000008) == 0x00000008)) {
           keys_ = new akka.protobuf.UnmodifiableLazyStringList(
               keys_);
-          bitField0_ = (bitField0_ & ~0x00000004);
+          bitField0_ = (bitField0_ & ~0x00000008);
         }
         result.keys_ = keys_;
-        if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
           values_ = java.util.Collections.unmodifiableList(values_);
-          bitField0_ = (bitField0_ & ~0x00000008);
+          bitField0_ = (bitField0_ & ~0x00000010);
         }
         result.values_ = values_;
         result.bitField0_ = to_bitField0_;
@@ -1910,13 +1959,16 @@ public final class ArteryControlFormats {
         if (other.hasFrom()) {
           mergeFrom(other.getFrom());
         }
+        if (other.hasOriginUid()) {
+          setOriginUid(other.getOriginUid());
+        }
         if (other.hasTableVersion()) {
           setTableVersion(other.getTableVersion());
         }
         if (!other.keys_.isEmpty()) {
           if (keys_.isEmpty()) {
             keys_ = other.keys_;
-            bitField0_ = (bitField0_ & ~0x00000004);
+            bitField0_ = (bitField0_ & ~0x00000008);
           } else {
             ensureKeysIsMutable();
             keys_.addAll(other.keys_);
@@ -1926,7 +1978,7 @@ public final class ArteryControlFormats {
         if (!other.values_.isEmpty()) {
           if (values_.isEmpty()) {
             values_ = other.values_;
-            bitField0_ = (bitField0_ & ~0x00000008);
+            bitField0_ = (bitField0_ & ~0x00000010);
           } else {
             ensureValuesIsMutable();
             values_.addAll(other.values_);
@@ -1942,6 +1994,10 @@ public final class ArteryControlFormats {
           
           return false;
         }
+        if (!hasOriginUid()) {
+          
+          return false;
+        }
         if (!hasTableVersion()) {
           
           return false;
@@ -2089,49 +2145,82 @@ public final class ArteryControlFormats {
         return fromBuilder_;
       }
 
-      // required uint32 tableVersion = 2;
-      private int tableVersion_ ;
+      // required uint64 originUid = 2;
+      private long originUid_ ;
       /**
-       * required uint32 tableVersion = 2;
+       * required uint64 originUid = 2;
        */
-      public boolean hasTableVersion() {
+      public boolean hasOriginUid() {
         return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       /**
-       * required uint32 tableVersion = 2;
+       * required uint64 originUid = 2;
+       */
+      public long getOriginUid() {
+        return originUid_;
+      }
+      /**
+       * required uint64 originUid = 2;
+       */
+      public Builder setOriginUid(long value) {
+        bitField0_ |= 0x00000002;
+        originUid_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * required uint64 originUid = 2;
+       */
+      public Builder clearOriginUid() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        originUid_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // required uint32 tableVersion = 3;
+      private int tableVersion_ ;
+      /**
+       * required uint32 tableVersion = 3;
+       */
+      public boolean hasTableVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * required uint32 tableVersion = 3;
        */
       public int getTableVersion() {
         return tableVersion_;
       }
       /**
-       * required uint32 tableVersion = 2;
+       * required uint32 tableVersion = 3;
        */
       public Builder setTableVersion(int value) {
-        bitField0_ |= 0x00000002;
+        bitField0_ |= 0x00000004;
         tableVersion_ = value;
         onChanged();
         return this;
       }
       /**
-       * required uint32 tableVersion = 2;
+       * required uint32 tableVersion = 3;
        */
       public Builder clearTableVersion() {
-        bitField0_ = (bitField0_ & ~0x00000002);
+        bitField0_ = (bitField0_ & ~0x00000004);
         tableVersion_ = 0;
         onChanged();
         return this;
       }
 
-      // repeated string keys = 3;
+      // repeated string keys = 4;
       private akka.protobuf.LazyStringList keys_ = akka.protobuf.LazyStringArrayList.EMPTY;
       private void ensureKeysIsMutable() {
-        if (!((bitField0_ & 0x00000004) == 0x00000004)) {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
           keys_ = new akka.protobuf.LazyStringArrayList(keys_);
-          bitField0_ |= 0x00000004;
+          bitField0_ |= 0x00000008;
          }
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2144,7 +2233,7 @@ public final class ArteryControlFormats {
         return java.util.Collections.unmodifiableList(keys_);
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2156,7 +2245,7 @@ public final class ArteryControlFormats {
         return keys_.size();
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2168,7 +2257,7 @@ public final class ArteryControlFormats {
         return keys_.get(index);
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2181,7 +2270,7 @@ public final class ArteryControlFormats {
         return keys_.getByteString(index);
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2200,7 +2289,7 @@ public final class ArteryControlFormats {
         return this;
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2219,7 +2308,7 @@ public final class ArteryControlFormats {
         return this;
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2235,7 +2324,7 @@ public final class ArteryControlFormats {
         return this;
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2245,12 +2334,12 @@ public final class ArteryControlFormats {
        */
       public Builder clearKeys() {
         keys_ = akka.protobuf.LazyStringArrayList.EMPTY;
-        bitField0_ = (bitField0_ & ~0x00000004);
+        bitField0_ = (bitField0_ & ~0x00000008);
         onChanged();
         return this;
       }
       /**
-       * repeated string keys = 3;
+       * repeated string keys = 4;
        *
        * 
        * actual Map is represented by separate sequences of keys and values,
@@ -2269,35 +2358,35 @@ public final class ArteryControlFormats {
         return this;
       }
 
-      // repeated uint32 values = 4;
+      // repeated uint32 values = 5;
       private java.util.List values_ = java.util.Collections.emptyList();
       private void ensureValuesIsMutable() {
-        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
           values_ = new java.util.ArrayList(values_);
-          bitField0_ |= 0x00000008;
+          bitField0_ |= 0x00000010;
          }
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public java.util.List
           getValuesList() {
         return java.util.Collections.unmodifiableList(values_);
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public int getValuesCount() {
         return values_.size();
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public int getValues(int index) {
         return values_.get(index);
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public Builder setValues(
           int index, int value) {
@@ -2307,7 +2396,7 @@ public final class ArteryControlFormats {
         return this;
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public Builder addValues(int value) {
         ensureValuesIsMutable();
@@ -2316,7 +2405,7 @@ public final class ArteryControlFormats {
         return this;
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public Builder addAllValues(
           java.lang.Iterable values) {
@@ -2326,11 +2415,11 @@ public final class ArteryControlFormats {
         return this;
       }
       /**
-       * repeated uint32 values = 4;
+       * repeated uint32 values = 5;
        */
       public Builder clearValues() {
         values_ = java.util.Collections.emptyList();
-        bitField0_ = (bitField0_ & ~0x00000008);
+        bitField0_ = (bitField0_ & ~0x00000010);
         onChanged();
         return this;
       }
@@ -6055,21 +6144,22 @@ public final class ArteryControlFormats {
       "\n\032ArteryControlFormats.proto\"G\n\013Quaranti" +
       "ned\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030" +
       "\002 \002(\0132\016.UniqueAddress\"5\n\022MessageWithAddr" +
-      "ess\022\037\n\007address\030\001 \002(\0132\016.UniqueAddress\"q\n\035" +
-      "CompressionTableAdvertisement\022\034\n\004from\030\001 " +
-      "\002(\0132\016.UniqueAddress\022\024\n\014tableVersion\030\002 \002(" +
-      "\r\022\014\n\004keys\030\003 \003(\t\022\016\n\006values\030\004 \003(\r\"Q\n Compr" +
-      "essionTableAdvertisementAck\022\034\n\004from\030\001 \002(" +
-      "\0132\016.UniqueAddress\022\017\n\007version\030\002 \002(\r\"\212\001\n\025S" +
-      "ystemMessageEnvelope\022\017\n\007message\030\001 \002(\014\022\024\n",
-      "\014serializerId\030\002 \002(\005\022\027\n\017messageManifest\030\003" +
-      " \001(\014\022\r\n\005seqNo\030\004 \002(\004\022\"\n\nackReplyTo\030\005 \002(\0132" +
-      "\016.UniqueAddress\"G\n\030SystemMessageDelivery" +
-      "Ack\022\r\n\005seqNo\030\001 \002(\004\022\034\n\004from\030\002 \002(\0132\016.Uniqu" +
-      "eAddress\"K\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n" +
-      "\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004port\030" +
-      "\004 \002(\r\"7\n\rUniqueAddress\022\031\n\007address\030\001 \002(\0132" +
-      "\010.Address\022\013\n\003uid\030\002 \002(\004B\017\n\013akka.remoteH\001"
+      "ess\022\037\n\007address\030\001 \002(\0132\016.UniqueAddress\"\204\001\n" +
+      "\035CompressionTableAdvertisement\022\034\n\004from\030\001" +
+      " \002(\0132\016.UniqueAddress\022\021\n\toriginUid\030\002 \002(\004\022" +
+      "\024\n\014tableVersion\030\003 \002(\r\022\014\n\004keys\030\004 \003(\t\022\016\n\006v" +
+      "alues\030\005 \003(\r\"Q\n CompressionTableAdvertise" +
+      "mentAck\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\017\n" +
+      "\007version\030\002 \002(\r\"\212\001\n\025SystemMessageEnvelope",
+      "\022\017\n\007message\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022" +
+      "\027\n\017messageManifest\030\003 \001(\014\022\r\n\005seqNo\030\004 \002(\004\022" +
+      "\"\n\nackReplyTo\030\005 \002(\0132\016.UniqueAddress\"G\n\030S" +
+      "ystemMessageDeliveryAck\022\r\n\005seqNo\030\001 \002(\004\022\034" +
+      "\n\004from\030\002 \002(\0132\016.UniqueAddress\"K\n\007Address\022" +
+      "\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hos" +
+      "tname\030\003 \002(\t\022\014\n\004port\030\004 \002(\r\"7\n\rUniqueAddre" +
+      "ss\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002" +
+      "(\004B\017\n\013akka.remoteH\001"
     };
     akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6093,7 +6183,7 @@ public final class ArteryControlFormats {
           internal_static_CompressionTableAdvertisement_fieldAccessorTable = new
             akka.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CompressionTableAdvertisement_descriptor,
-              new java.lang.String[] { "From", "TableVersion", "Keys", "Values", });
+              new java.lang.String[] { "From", "OriginUid", "TableVersion", "Keys", "Values", });
           internal_static_CompressionTableAdvertisementAck_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_CompressionTableAdvertisementAck_fieldAccessorTable = new
diff --git a/akka-remote/src/main/protobuf/ArteryControlFormats.proto b/akka-remote/src/main/protobuf/ArteryControlFormats.proto
index 5f47710670..aafff98016 100644
--- a/akka-remote/src/main/protobuf/ArteryControlFormats.proto
+++ b/akka-remote/src/main/protobuf/ArteryControlFormats.proto
@@ -24,13 +24,14 @@ message MessageWithAddress {
 // CompressionProtocol.ClassManifestCompressionAdvertisement
 message CompressionTableAdvertisement {
   required UniqueAddress from = 1;
-  required uint32 tableVersion = 2;
+  required uint64 originUid = 2;
+  required uint32 tableVersion = 3;
 
   // actual Map is represented by separate sequences of keys and values,
   // relies on both sequences using the same order so that corresponding entries can be
   // associated again when deserializing
-  repeated string keys = 3;
-  repeated uint32 values = 4;
+  repeated string keys = 4;
+  repeated uint32 values = 5;
 }
 
 // CompressionProtocol.ActorRefCompressionAdvertisementAck
diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
index c0d3e02ce0..799b114829 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
@@ -295,7 +295,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
   @volatile private[this] var aeronErrorLogTask: Cancellable = _
   @volatile private[this] var areonErrorLog: AeronErrorLog = _
 
-  @volatile private[this] var inboundCompressions: Option[InboundCompressions] = None
+  @volatile private[this] var _inboundCompressions: Option[InboundCompressions] = None
+  def inboundCompressions: Option[InboundCompressions] = _inboundCompressions
 
   def bindAddress: UniqueAddress = _bindAddress
   override def localAddress: UniqueAddress = _localAddress
@@ -421,7 +422,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
     runInboundStreams()
     topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData)
 
-    log.info("Remoting started; listening on address: {}", defaultAddress)
+    log.info("Remoting started; listening on address: [{}] with uid [{}]", localAddress.address, localAddress.uid)
   }
 
   private lazy val shutdownHook = new Thread {
@@ -572,7 +573,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
   private def runInboundStreams(): Unit = {
     val noCompressions = NoInboundCompressions // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082
     val compressions = createInboundCompressions(this)
-    inboundCompressions = Some(compressions)
+    _inboundCompressions = Some(compressions)
 
     runInboundControlStream(noCompressions) // TODO should understand compressions too
     runInboundOrdinaryMessagesStream(compressions)
@@ -598,31 +599,43 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
             import CompressionProtocol._
             m match {
               case ActorRefCompressionAdvertisement(from, table) ⇒
-                log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table)
-                val a = association(from.address)
-                // make sure uid is same for active association
-                if (a.associationState.uniqueRemoteAddressValue().contains(from)) {
-                  import system.dispatcher
-                  a.changeActorRefCompression(table).foreach { _ ⇒
-                    a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version))
-                    system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table))
+                if (table.originUid == localAddress.uid) {
+                  log.debug("Incoming ActorRef compression advertisement from [{}], table: [{}]", from, table)
+                  val a = association(from.address)
+                  // make sure uid is same for active association
+                  if (a.associationState.uniqueRemoteAddressValue().contains(from)) {
+                    import system.dispatcher
+                    a.changeActorRefCompression(table).foreach { _ ⇒
+                      a.sendControl(ActorRefCompressionAdvertisementAck(localAddress, table.version))
+                      system.eventStream.publish(Events.ReceivedActorRefCompressionTable(from, table))
+                    }
                   }
-                }
+                } else
+                  log.debug(
+                    "Discarding incoming ActorRef compression advertisement from [{}] that was " +
+                      "prepared for another incarnation with uid [{}] than current uid [{}], table: [{}]",
+                    from, table.originUid, localAddress.uid, table)
               case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒
                 inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion))
               case ClassManifestCompressionAdvertisement(from, table) ⇒
-                log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table)
-                val a = association(from.address)
-                // make sure uid is same for active association
-                if (a.associationState.uniqueRemoteAddressValue().contains(from)) {
-                  import system.dispatcher
-                  a.changeClassManifestCompression(table).foreach { _ ⇒
-                    a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version))
-                    system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table))
+                if (table.originUid == localAddress.uid) {
+                  log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table)
+                  val a = association(from.address)
+                  // make sure uid is same for active association
+                  if (a.associationState.uniqueRemoteAddressValue().contains(from)) {
+                    import system.dispatcher
+                    a.changeClassManifestCompression(table).foreach { _ ⇒
+                      a.sendControl(ClassManifestCompressionAdvertisementAck(localAddress, table.version))
+                      system.eventStream.publish(Events.ReceivedClassManifestCompressionTable(from, table))
+                    }
                   }
-                }
+                } else
+                  log.debug(
+                    "Discarding incoming Class Manifest compression advertisement from [{}] that was " +
+                      "prepared for another incarnation with uid [{}] than current uid [{}], table: [{}]",
+                    from, table.originUid, localAddress.uid, table)
               case ClassManifestCompressionAdvertisementAck(from, tableVersion) ⇒
-                inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion))
+                inboundCompressions.foreach(_.confirmClassManifestCompressionAdvertisement(from.uid, tableVersion))
             }
 
           case Quarantined(from, to) if to == localAddress ⇒
@@ -722,6 +735,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
         log.error(cause, s"{} failed after shutdown. {}", streamName, cause.getMessage)
       case _: AbruptTerminationException ⇒ // ActorSystem shutdown
       case cause ⇒
+        _inboundCompressions.foreach(_.close())
+        _inboundCompressions = None
         if (restartCounter.restart()) {
           log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage)
           restart()
@@ -759,6 +774,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
     } yield {
       topLevelFREvents.loFreq(Transport_Stopped, NoMetaData)
 
+      _inboundCompressions.foreach(_.close())
+      _inboundCompressions = None
+
       if (aeronErrorLogTask != null) {
         aeronErrorLogTask.cancel()
         topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala
index 3f39dd575b..15fb1d2054 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala
@@ -152,7 +152,7 @@ private[remote] class Association(
     timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed)
   }
 
-  private[artery] def clearCompression(): Future[Done] = {
+  private def clearOutboundCompression(): Future[Done] = {
     import transport.system.dispatcher
     val c = changeOutboundCompression
     val result =
@@ -162,6 +162,9 @@ private[remote] class Association(
     timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed)
   }
 
+  private def clearInboundCompression(originUid: Long): Unit =
+    transport.inboundCompressions.foreach(_.close(originUid))
+
   private def timeoutAfter[T](f: Future[T], timeout: FiniteDuration, e: ⇒ Throwable): Future[T] = {
     import transport.system.dispatcher
     val f2 = after(timeout, transport.system.scheduler)(Future.failed(e))
@@ -227,7 +230,7 @@ private[remote] class Association(
         // completes handshake at same time, but it's important to clear it before
         // we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess)
         import transport.system.dispatcher
-        clearCompression().map { _ ⇒
+        clearOutboundCompression().map { _ ⇒
           current.uniqueRemoteAddressPromise.trySuccess(peer)
           current.uniqueRemoteAddressValue() match {
             case Some(`peer`) ⇒
@@ -240,6 +243,7 @@ private[remote] class Association(
                     log.debug(
                       "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
                       newState.incarnation, peer.address, peer.uid, old.uid)
+                    clearInboundCompression(old.uid)
                   case None ⇒
                   // Failed, nothing to do
                 }
@@ -366,8 +370,8 @@ private[remote] class Association(
                 log.warning(
                   "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}",
                   remoteAddress, u, reason)
-                // clear outbound compression
-                clearCompression()
+                clearOutboundCompression()
+                clearInboundCompression(u)
                 // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644
                 transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt))
                 // end delivery of system messages to that incarnation after this point
diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
index 1786f527c9..0c21194e43 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
@@ -89,8 +89,6 @@ private[remote] object EnvelopeBuffer {
 
   val UsAscii = Charset.forName("US-ASCII")
 
-  val DeadLettersCode = 0
-
   // accessing the internal char array of String when writing literal strings to ByteBuffer
   val StringValueFieldOffset = Unsafe.instance.objectFieldOffset(classOf[String].getDeclaredField("value"))
 }
@@ -106,6 +104,8 @@ private[remote] object HeaderBuilder {
 
   def out(): HeaderBuilder =
     new HeaderBuilderImpl(NoInboundCompressions, CompressionTable.empty[ActorRef], CompressionTable.empty[String])
+
+  final val DeadLettersCode = -1
 }
 
 /**
@@ -123,6 +123,8 @@ private[remote] sealed trait HeaderBuilder {
   def inboundActorRefCompressionTableVersion: Int
   def inboundClassManifestCompressionTableVersion: Int
 
+  def useOutboundCompression(on: Boolean): Unit
+
   def outboundActorRefCompression: CompressionTable[ActorRef]
   def setOutboundActorRefCompression(table: CompressionTable[ActorRef]): Unit
 
@@ -172,7 +174,14 @@ private[remote] sealed trait HeaderBuilder {
   def serializer: Int
 
   def setManifest(manifest: String): Unit
-  def manifest(originUid: Long): String
+  def manifest(originUid: Long): OptionVal[String]
+
+  /**
+   * Reset all fields that are related to an outbound message,
+   * i.e. Encoder calls this as the first thing in onPush.
+   */
+  def resetMessageFields(): Unit
+
 }
 
 /**
@@ -197,27 +206,41 @@ private[remote] final class HeaderBuilderImpl(
   inboundCompression:                    InboundCompressions,
   var _outboundActorRefCompression:      CompressionTable[ActorRef],
   var _outboundClassManifestCompression: CompressionTable[String]) extends HeaderBuilder {
+  import HeaderBuilder.DeadLettersCode
 
   private[this] val toSerializationFormat: SerializationFormatCache = new SerializationFormatCache
 
   // Fields only available for EnvelopeBuffer
-  var _version: Byte = _
-  var _flags: Byte = _
-  var _uid: Long = _
+  var _version: Byte = 0
+  var _flags: Byte = 0
+  var _uid: Long = 0
   var _inboundActorRefCompressionTableVersion: Int = 0
   var _inboundClassManifestCompressionTableVersion: Int = 0
+  var _useOutboundCompression: Boolean = true
 
   var _senderActorRef: String = null
   var _senderActorRefIdx: Int = -1
   var _recipientActorRef: String = null
   var _recipientActorRefIdx: Int = -1
 
-  var _serializer: Int = _
+  var _serializer: Int = 0
   var _manifest: String = null
   var _manifestIdx: Int = -1
 
   var _metadataContainer: ByteString = null
 
+  override def resetMessageFields(): Unit = {
+    _flags = 0
+    _senderActorRef = null
+    _senderActorRefIdx = -1
+    _recipientActorRef = null
+    _recipientActorRefIdx = -1
+
+    _serializer = 0
+    _manifest = null
+    _manifestIdx = -1
+  }
+
   override def setVersion(v: Byte) = _version = v
   override def version = _version
 
@@ -234,6 +257,9 @@ private[remote] final class HeaderBuilderImpl(
   override def inboundActorRefCompressionTableVersion: Int = _inboundActorRefCompressionTableVersion
   override def inboundClassManifestCompressionTableVersion: Int = _inboundClassManifestCompressionTableVersion
 
+  def useOutboundCompression(on: Boolean): Unit =
+    _useOutboundCompression = on
+
   def setOutboundActorRefCompression(table: CompressionTable[ActorRef]): Unit = {
     _outboundActorRefCompression = table
   }
@@ -245,39 +271,49 @@ private[remote] final class HeaderBuilderImpl(
   def outboundClassManifestCompression: CompressionTable[String] = _outboundClassManifestCompression
 
   override def setSenderActorRef(ref: ActorRef): Unit = {
-    _senderActorRefIdx = outboundActorRefCompression.compress(ref)
-    if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) // includes local address from `currentTransportInformation`
+    // serializedActorPath includes local address from `currentTransportInformation`
+    if (_useOutboundCompression) {
+      _senderActorRefIdx = outboundActorRefCompression.compress(ref)
+      if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref)
+    } else
+      _senderActorRef = Serialization.serializedActorPath(ref)
   }
   override def setNoSender(): Unit = {
     _senderActorRef = null
-    _senderActorRefIdx = EnvelopeBuffer.DeadLettersCode
+    _senderActorRefIdx = DeadLettersCode
   }
   override def isNoSender: Boolean =
-    (_senderActorRef eq null) && _senderActorRefIdx == EnvelopeBuffer.DeadLettersCode
-  override def senderActorRef(originUid: Long): OptionVal[ActorRef] =
-    if (_senderActorRef eq null)
+    (_senderActorRef eq null) && _senderActorRefIdx == DeadLettersCode
+  override def senderActorRef(originUid: Long): OptionVal[ActorRef] = {
+    // we treat deadLetters as always present, but not included in table
+    if ((_senderActorRef eq null) && !isNoSender)
       inboundCompression.decompressActorRef(originUid, inboundActorRefCompressionTableVersion, _senderActorRefIdx)
     else OptionVal.None
+  }
+
   def senderActorRefPath: OptionVal[String] =
     OptionVal(_senderActorRef)
 
   def setNoRecipient(): Unit = {
     _recipientActorRef = null
-    _recipientActorRefIdx = EnvelopeBuffer.DeadLettersCode
+    _recipientActorRefIdx = DeadLettersCode
   }
   def isNoRecipient: Boolean =
-    (_recipientActorRef eq null) && _recipientActorRefIdx == EnvelopeBuffer.DeadLettersCode
+    (_recipientActorRef eq null) && _recipientActorRefIdx == DeadLettersCode
 
   def setRecipientActorRef(ref: ActorRef): Unit = {
-    _recipientActorRefIdx = outboundActorRefCompression.compress(ref)
-    if (_recipientActorRefIdx == -1) {
+    if (_useOutboundCompression) {
+      _recipientActorRefIdx = outboundActorRefCompression.compress(ref)
+      if (_recipientActorRefIdx == -1) _recipientActorRef = toSerializationFormat.getOrCompute(ref)
+    } else
       _recipientActorRef = toSerializationFormat.getOrCompute(ref)
-    }
   }
-  def recipientActorRef(originUid: Long): OptionVal[ActorRef] =
-    if (_recipientActorRef eq null)
+  def recipientActorRef(originUid: Long): OptionVal[ActorRef] = {
+    // we treat deadLetters as always present, but not included in table
+    if ((_recipientActorRef eq null) && !isNoRecipient)
       inboundCompression.decompressActorRef(originUid, inboundActorRefCompressionTableVersion, _recipientActorRefIdx)
     else OptionVal.None
+  }
   def recipientActorRefPath: OptionVal[String] =
     OptionVal(_recipientActorRef)
 
@@ -288,16 +324,18 @@ private[remote] final class HeaderBuilderImpl(
     _serializer
 
   override def setManifest(manifest: String): Unit = {
-    _manifestIdx = outboundClassManifestCompression.compress(manifest)
-    if (_manifestIdx == -1) _manifest = manifest
+    if (_useOutboundCompression) {
+      _manifestIdx = outboundClassManifestCompression.compress(manifest)
+      if (_manifestIdx == -1) _manifest = manifest
+    } else
+      _manifest = manifest
   }
-  override def manifest(originUid: Long): String = {
-    if (_manifest ne null) _manifest
+  override def manifest(originUid: Long): OptionVal[String] = {
+    if (_manifest ne null) OptionVal.Some(_manifest)
     else {
-      _manifest = inboundCompression.decompressClassManifest(
+      inboundCompression.decompressClassManifest(
         originUid,
-        inboundClassManifestCompressionTableVersion, _manifestIdx).get
-      _manifest
+        inboundClassManifestCompressionTableVersion, _manifestIdx)
     }
   }
 
@@ -434,7 +472,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
       header._senderActorRef = null
       header._senderActorRefIdx = idx
     } else {
-      header._senderActorRef = readLiteral()
+      header._senderActorRef = emptyAsNull(readLiteral())
     }
 
     // Deserialize recipient
@@ -444,7 +482,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
       header._recipientActorRef = null
       header._recipientActorRefIdx = idx
     } else {
-      header._recipientActorRef = readLiteral()
+      header._recipientActorRef = emptyAsNull(readLiteral())
     }
 
     // Deserialize class manifest
@@ -458,6 +496,10 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
     }
   }
 
+  private def emptyAsNull(s: String): String =
+    if (s == "") null
+    else s
+
   private def readLiteral(): String = {
     val length = byteBuffer.getShort
     if (length == 0) ""
@@ -477,7 +519,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
   }
 
   private def writeLiteral(tagOffset: Int, literal: String): Unit = {
-    val length = literal.length
+    val length = if (literal eq null) 0 else literal.length
     if (length > 65535)
       throw new IllegalArgumentException("Literals longer than 65535 cannot be encoded in the envelope")
 
diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
index 2ef13b9b3f..58474c3be3 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
@@ -94,6 +94,11 @@ private[remote] class Encoder(
         val outboundEnvelope = grab(in)
         val envelope = bufferPool.acquire()
 
+        headerBuilder.resetMessageFields()
+        // don't use outbound compression for ArteryMessage, e.g. handshake messages must get through
+        // without depending on compression tables being in sync when systems are restarted
+        headerBuilder.useOutboundCompression(!outboundEnvelope.message.isInstanceOf[ArteryMessage])
+
         // internally compression is applied by the builder:
         outboundEnvelope.recipient match {
           case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r
@@ -288,38 +293,62 @@ private[remote] class Decoder(
       override def onPush(): Unit = {
         messageCount += 1
         val envelope = grab(in)
+        headerBuilder.resetMessageFields()
         envelope.parseHeader(headerBuilder)
 
         val originUid = headerBuilder.uid
         val association = inboundContext.association(originUid)
 
-        val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef(originUid) match {
+        val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid) match {
           case OptionVal.Some(ref) ⇒
             OptionVal(ref.asInstanceOf[InternalActorRef])
           case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined ⇒
             resolveRecipient(headerBuilder.recipientActorRefPath.get)
           case _ ⇒
             OptionVal.None
+        } catch {
+          case NonFatal(e) ⇒
+            // probably version mismatch due to restarted system
+            log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage)
+            OptionVal.None
         }
 
-        if (recipient.isEmpty && headerBuilder.recipientActorRefPath.isEmpty && !headerBuilder.isNoRecipient) {
-          log.debug("Dropping message for unknown recipient. It was probably sent from system [{}] with compression " +
+        val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid) match {
+          case OptionVal.Some(ref) ⇒
+            OptionVal(ref.asInstanceOf[InternalActorRef])
+          case OptionVal.None if headerBuilder.senderActorRefPath.isDefined ⇒
+            OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get))
+          case _ ⇒
+            OptionVal.None
+        } catch {
+          case NonFatal(e) ⇒
+            // probably version mismatch due to restarted system
+            log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage)
+            OptionVal.None
+        }
+
+        val classManifestOpt = try headerBuilder.manifest(originUid) catch {
+          case NonFatal(e) ⇒
+            // probably version mismatch due to restarted system
+            log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e.getMessage)
+            OptionVal.None
+        }
+
+        if ((recipient.isEmpty && headerBuilder.recipientActorRefPath.isEmpty && !headerBuilder.isNoRecipient) ||
+          (sender.isEmpty && headerBuilder.senderActorRefPath.isEmpty && !headerBuilder.isNoSender)) {
+          log.debug("Dropping message for unknown recipient/sender. It was probably sent from system [{}] with compression " +
+            "table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
+            "that has already been discarded in the destination system.", originUid,
+            headerBuilder.inboundActorRefCompressionTableVersion)
+          pull(in)
+        } else if (classManifestOpt.isEmpty) {
+          log.debug("Dropping message with unknown manifest. It was probably sent from system [{}] with compression " +
             "table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
             "that has already been discarded in the destination system.", originUid,
             headerBuilder.inboundActorRefCompressionTableVersion)
           pull(in)
         } else {
-
-          val sender: OptionVal[InternalActorRef] = headerBuilder.senderActorRef(originUid) match {
-            case OptionVal.Some(ref) ⇒
-              OptionVal(ref.asInstanceOf[InternalActorRef])
-            case OptionVal.None if headerBuilder.senderActorRefPath.isDefined ⇒
-              OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get))
-            case _ ⇒
-              OptionVal.None
-          }
-
-          val classManifest = headerBuilder.manifest(originUid)
+          val classManifest = classManifestOpt.get
 
           if ((messageCount & heavyHitterMask) == 0) {
             // --- hit refs and manifests for heavy-hitter counting
diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
index 63d8c05693..801547d87f 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
@@ -230,7 +230,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
               s"Dropping message [{}] from unknown system with UID [{}]. " +
                 "This system with UID [{}] was probably restarted. " +
                 "Messages will be accepted when new handshake has been completed.",
-              env.message.getClass.getName, inboundContext.localAddress.uid, env.originUid)
+              env.message.getClass.getName, env.originUid, inboundContext.localAddress.uid)
           pull(in)
         }
       }
diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala
index 70bbdff2b3..106b733c16 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/compress/CompressionTable.scala
@@ -8,30 +8,30 @@ import java.util
 import java.util.Comparator
 
 /** INTERNAL API: Versioned compression table to be advertised between systems */
-private[remote] final case class CompressionTable[T](version: Int, map: Map[T, Int]) {
+private[remote] final case class CompressionTable[T](originUid: Long, version: Int, dictionary: Map[T, Int]) {
   import CompressionTable.NotCompressedId
 
   def compress(value: T): Int =
-    map.get(value) match {
+    dictionary.get(value) match {
       case Some(id) ⇒ id
       case None     ⇒ NotCompressedId
     }
 
   def invert: DecompressionTable[T] =
-    if (map.isEmpty) DecompressionTable.empty[T].copy(version = version)
+    if (dictionary.isEmpty) DecompressionTable.empty[T].copy(originUid = originUid, version = version)
     else {
       // TODO: these are some expensive sanity checks, about the numbers being consecutive, without gaps
       // TODO: we can remove them, make them re-map (not needed I believe though)
-      val expectedGaplessSum = Integer.valueOf((map.size * (map.size + 1)) / 2) /* Dirichlet */
-      require(map.values.min == 0, "Compression table should start allocating from 0, yet lowest allocated id was " + map.values.min)
-      require(map.values.sum + map.size == expectedGaplessSum, "Given compression map does not seem to be gap-less and starting from zero, " +
-        "which makes compressing it into an Array difficult, bailing out! Map was: " + map)
+      val expectedGaplessSum = Integer.valueOf((dictionary.size * (dictionary.size + 1)) / 2) /* Dirichlet */
+      require(dictionary.values.min == 0, "Compression table should start allocating from 0, yet lowest allocated id was " + dictionary.values.min)
+      require(dictionary.values.sum + dictionary.size == expectedGaplessSum, "Given compression map does not seem to be gap-less and starting from zero, " +
+        "which makes compressing it into an Array difficult, bailing out! Map was: " + dictionary)
 
-      val tups = Array.ofDim[(Object, Int)](map.size).asInstanceOf[Array[(T, Int)]]
-      val ts = Array.ofDim[Object](map.size).asInstanceOf[Array[T]]
+      val tups = Array.ofDim[(Object, Int)](dictionary.size).asInstanceOf[Array[(T, Int)]]
+      val ts = Array.ofDim[Object](dictionary.size).asInstanceOf[Array[T]]
 
       var i = 0
-      val mit = map.iterator
+      val mit = dictionary.iterator
       while (i < tups.length) {
         tups(i) = mit.next()
         i += 1
@@ -44,7 +44,7 @@ private[remote] final case class CompressionTable[T](version: Int, map: Map[T, I
         i += 1
       }
 
-      DecompressionTable[T](version, ts)
+      DecompressionTable[T](originUid, version, ts)
     }
 }
 /** INTERNAL API */
@@ -57,6 +57,6 @@ private[remote] object CompressionTable {
   }
   def compareBy2ndValue[T]: Comparator[Tuple2[T, Int]] = CompareBy2ndValue.asInstanceOf[Comparator[(T, Int)]]
 
-  private[this] val _empty = new CompressionTable[Any](0, Map.empty)
+  private[this] val _empty = new CompressionTable[Any](0, 0, Map.empty)
   def empty[T] = _empty.asInstanceOf[CompressionTable[T]]
 }
diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala
index d6ade2eae8..a53b5e7258 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/compress/DecompressionTable.scala
@@ -5,31 +5,28 @@
 package akka.remote.artery.compress
 
 /** INTERNAL API */
-private[artery] final case class DecompressionTable[T](version: Int, table: Array[T]) {
+private[artery] final case class DecompressionTable[T](originUid: Long, version: Int, table: Array[T]) {
   // TODO version maybe better as Long? // OR implement roll-over
   private[this] val length = table.length
 
   def get(idx: Int): T = {
     if (idx >= length)
       throw new IllegalArgumentException(s"Attempted decompression of unknown id: [$idx]! " +
-        s"Only $length ids allocated in table version [$version].")
+        s"Only $length ids allocated in table version [$version] for origin [$originUid].")
     table(idx)
   }
 
   def invert: CompressionTable[T] =
-    CompressionTable(version, Map(table.zipWithIndex: _*))
+    CompressionTable(originUid, version, Map(table.zipWithIndex: _*))
 
   /** Writes complete table as String (heavy operation) */
-  def toDebugString =
-    getClass.getName +
-      s"(version: $version, " +
-      (
-        if (length == 0) "[empty]"
-        else s"table: [${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}") + "])"
+  override def toString =
+    s"DecompressionTable($originUid, $version, " +
+      s"Map(${table.zipWithIndex.map({ case (t, i) ⇒ s"$i -> $t" }).mkString(",")}))"
 }
 
 /** INTERNAL API */
 private[artery] object DecompressionTable {
-  private[this] val _empty = DecompressionTable(0, Array.empty)
+  private[this] val _empty = DecompressionTable(0, 0, Array.empty)
   def empty[T] = _empty.asInstanceOf[DecompressionTable[T]]
 }
diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala
index 01d7928b7a..c6deac61cf 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala
@@ -15,6 +15,11 @@ import akka.util.{ OptionVal, PrettyDuration }
 import org.agrona.collections.Long2ObjectHashMap
 
 import scala.annotation.tailrec
+import scala.concurrent.Future
+import akka.Done
+import akka.actor.Cancellable
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  * INTERNAL API
@@ -30,6 +35,17 @@ private[remote] trait InboundCompressions {
   def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit
   def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String]
   def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit
+
+  /**
+   * Cancel advertisement scheduling
+   */
+  def close(): Unit
+
+  /**
+   * Remove compression and cancel advertisement scheduling for a specific origin
+   */
+  def close(originUid: Long): Unit
+
 }
 
 /**
@@ -42,14 +58,13 @@ private[remote] final class InboundCompressionsImpl(
   inboundContext: InboundContext,
   settings:       ArterySettings.Compression) extends InboundCompressions {
 
-  // TODO we also must remove the ones that won't be used anymore - when quarantine triggers?
-  //      Why is that important? Won't be naturally be removed in new advertisements since they
-  //      are not used any more?
+  private val stopped = new AtomicBoolean
+
   private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]()
   private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] {
     override def apply(originUid: Long): InboundActorRefCompression = {
       val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max)
-      new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters)
+      new InboundActorRefCompression(system, settings, originUid, inboundContext, actorRefHitters, stopped)
     }
   }
   private def actorRefsIn(originUid: Long): InboundActorRefCompression =
@@ -59,7 +74,7 @@ private[remote] final class InboundCompressionsImpl(
   private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] {
     override def apply(originUid: Long): InboundManifestCompression = {
       val manifestHitters = new TopHeavyHitters[String](settings.Manifests.Max)
-      new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters)
+      new InboundManifestCompression(system, settings, originUid, inboundContext, manifestHitters, stopped)
     }
   }
   private def classManifestsIn(originUid: Long): InboundManifestCompression =
@@ -75,7 +90,10 @@ private[remote] final class InboundCompressionsImpl(
   }
 
   override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = {
-    actorRefsIn(originUid).confirmAdvertisement(tableVersion)
+    _actorRefsIns.get(originUid) match {
+      case null ⇒ // ignore, it was closed
+      case a    ⇒ a.confirmAdvertisement(tableVersion)
+    }
   }
 
   // class manifest compression ---
@@ -86,8 +104,23 @@ private[remote] final class InboundCompressionsImpl(
     if (ArterySettings.Compression.Debug) println(s"[compress] hitClassManifest($originUid, $address, $manifest, $n)")
     classManifestsIn(originUid).increment(address, manifest, n)
   }
-  override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit =
-    actorRefsIn(originUid).confirmAdvertisement(tableVersion)
+  override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = {
+    _classManifestsIns.get(originUid) match {
+      case null ⇒ // ignore, it was closed
+      case a    ⇒ a.confirmAdvertisement(tableVersion)
+    }
+  }
+
+  override def close(): Unit = stopped.set(true)
+
+  override def close(originUid: Long): Unit = {
+    actorRefsIn(originUid).close()
+    classManifestsIn(originUid).close()
+    // FIXME This is not safe, it can be created again (concurrently), at least in theory.
+    //       However, we should make the inbound compressions owned by the Decoder and it doesn't have to be thread-safe
+    _actorRefsIns.remove(originUid)
+    _classManifestsIns.remove(originUid)
+  }
 
   // testing utilities ---
 
@@ -117,24 +150,18 @@ private[remote] final class InboundActorRefCompression(
   settings:       ArterySettings.Compression,
   originUid:      Long,
   inboundContext: InboundContext,
-  heavyHitters:   TopHeavyHitters[ActorRef]) extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters) {
-
-  preAllocate(system.deadLetters)
-
-  /* Since the table is empty here, anything we increment here becomes a heavy hitter immediately. */
-  def preAllocate(allocations: ActorRef*): Unit = {
-    allocations foreach { ref ⇒ increment(null, ref, 100000) }
-  }
+  heavyHitters:   TopHeavyHitters[ActorRef],
+  stopped:        AtomicBoolean)
+  extends InboundCompression[ActorRef](system, settings, originUid, inboundContext, heavyHitters, stopped) {
 
   override def decompress(tableVersion: Int, idx: Int): OptionVal[ActorRef] =
-    if (idx == 0) OptionVal.Some(system.deadLetters)
-    else super.decompressInternal(tableVersion, idx, 0)
+    super.decompressInternal(tableVersion, idx, 0)
 
-  scheduleNextTableAdvertisement()
   override protected def tableAdvertisementInterval = settings.ActorRefs.AdvertisementInterval
 
   override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[ActorRef]): Unit = {
-    log.debug(s"Advertise ActorRef compression [$table], from [${inboundContext.localAddress}] to [${outboundContext.remoteAddress}]")
+    log.debug(s"Advertise {} compression [{}] to [{}#{}]", Logging.simpleName(getClass), table, outboundContext.remoteAddress,
+      originUid)
     outboundContext.sendControl(CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table))
   }
 }
@@ -144,13 +171,15 @@ final class InboundManifestCompression(
   settings:       ArterySettings.Compression,
   originUid:      Long,
   inboundContext: InboundContext,
-  heavyHitters:   TopHeavyHitters[String]) extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters) {
+  heavyHitters:   TopHeavyHitters[String],
+  stopped:        AtomicBoolean)
+  extends InboundCompression[String](system, settings, originUid, inboundContext, heavyHitters, stopped) {
 
-  scheduleNextTableAdvertisement()
   override protected def tableAdvertisementInterval = settings.Manifests.AdvertisementInterval
 
   override def advertiseCompressionTable(outboundContext: OutboundContext, table: CompressionTable[String]): Unit = {
-    log.debug(s"Advertise {} compression [{}] to [{}]", Logging.simpleName(getClass), table, outboundContext.remoteAddress)
+    log.debug(s"Advertise {} compression [{}] to [{}#{}]", Logging.simpleName(getClass), table, outboundContext.remoteAddress,
+      originUid)
     outboundContext.sendControl(CompressionProtocol.ClassManifestCompressionAdvertisement(inboundContext.localAddress, table))
   }
 
@@ -198,14 +227,34 @@ private[remote] abstract class InboundCompression[T >: Null](
   val settings:     ArterySettings.Compression,
   originUid:        Long,
   inboundContext:   InboundContext,
-  val heavyHitters: TopHeavyHitters[T]) {
+  val heavyHitters: TopHeavyHitters[T],
+  stopped:          AtomicBoolean) {
 
-  lazy val log = Logging(system, getClass.getSimpleName)
+  val log = Logging(system, getClass)
 
-  private[this] val state: AtomicReference[InboundCompression.State[T]] = new AtomicReference(InboundCompression.State.empty)
+  // FIXME InboundCompressions should be owned by the Decoder stage, and then doesn't have to be thread-safe
+  private[this] val state: AtomicReference[InboundCompression.State[T]] =
+    new AtomicReference(InboundCompression.State.empty)
+  // We should not continue sending advertisements to an association that might be dead (not quarantined yet)
+  @volatile private[this] var alive = true
+  private[this] val resendCount = new AtomicInteger
 
   private[this] val cms = new CountMinSketch(16, 1024, System.currentTimeMillis().toInt)
 
+  log.debug("Initializing inbound compression for originUid [{}]", originUid)
+  val schedulerTask: Option[Cancellable] =
+    tableAdvertisementInterval match {
+      case d: FiniteDuration ⇒
+        Some(system.scheduler.schedule(d, d)(runNextTableAdvertisement)(system.dispatcher))
+      case _ ⇒
+        None
+    }
+
+  def close(): Unit = {
+    schedulerTask.foreach(_.cancel())
+    log.debug("Closed inbound compression for originUid [{}]", originUid)
+  }
+
   /* ==== COMPRESSION ==== */
 
   /** Override and specialize if needed, for default compression logic delegate to 3-param overload */
@@ -237,46 +286,53 @@ private[remote] abstract class InboundCompression[T >: Null](
       if (value != null) OptionVal.Some[T](value)
       else throw new UnknownCompressedIdException(idx)
     } else if (incomingTableVersion < activeVersion) {
-      log.debug("Received value compressed with old table: [{}], current table version is: [{}]", incomingTableVersion, activeVersion)
-      OptionVal.None
-    } else if (incomingTableVersion == current.nextTable.version) {
       log.debug(
-        "Received first value compressed using the next prepared compression table, flipping to it (version: {})",
-        current.nextTable.version)
+        "Received value from originUid [{}] compressed with old table: [{}], current table version is: [{}]",
+        originUid, incomingTableVersion, activeVersion)
+      OptionVal.None
+    } else if (current.advertisementInProgress.isDefined && incomingTableVersion == current.advertisementInProgress.get.version) {
+      log.debug(
+        "Received first value from originUid [{}] compressed using the advertised compression table, flipping to it (version: {})",
+        originUid, current.nextTable.version)
       confirmAdvertisement(incomingTableVersion)
-      decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse, activeTable will not be able to handle this
+      decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse
     } else {
       // which means that incoming version was > nextTable.version, which likely that
       // it is using a table that was built for previous incarnation of this system
       log.warning(
-        "Inbound message is using compression table version higher than the highest allocated table on this node. " +
+        "Inbound message from originUid [{}] is using unknown compression table version. " +
           "It was probably sent with compression table built for previous incarnation of this system. " +
-          "State: activeTable: {}, nextTable: {}, incoming tableVersion: {}",
-        activeVersion, current.nextTable.version, incomingTableVersion)
+          "Versions activeTable: {}, nextTable: {}, incomingTable: {}",
+        originUid, activeVersion, current.nextTable.version, incomingTableVersion)
       OptionVal.None
     }
   }
 
-  def confirmAdvertisement(tableVersion: Int): Unit = {
+  @tailrec final def confirmAdvertisement(tableVersion: Int): Unit = {
     val current = state.get
     current.advertisementInProgress match {
       case Some(inProgress) if tableVersion == inProgress.version ⇒
         if (state.compareAndSet(current, current.startUsingNextTable()))
-          log.debug("Confirmed compression table version {}", tableVersion)
+          log.debug("Confirmed compression table version [{}] for originUid [{}]", tableVersion, originUid)
+        else
+          confirmAdvertisement(tableVersion) // recur
       case Some(inProgress) if tableVersion != inProgress.version ⇒
-        log.debug("Confirmed compression table version {} but in progress {}", tableVersion, inProgress.version)
+        log.debug(
+          "Confirmed compression table version [{}] for originUid [{}] but other version in progress [{}]",
+          tableVersion, originUid, inProgress.version)
       case None ⇒ // already confirmed
     }
 
   }
 
   /**
-   * Add `n` occurance for the given key and call `heavyHittedDetected` if element has become a heavy hitter.
+   * Add `n` occurrence for the given key and call `heavyHittedDetected` if element has become a heavy hitter.
    * Empty keys are omitted.
    */
   def increment(remoteAddress: Address, value: T, n: Long): Unit = {
     val count = cms.addObjectAndEstimateCount(value, n)
     addAndCheckIfheavyHitterDetected(value, count)
+    alive = true
   }
 
   /** Mutates heavy hitters */
@@ -296,27 +352,6 @@ private[remote] abstract class InboundCompression[T >: Null](
   private[remote] def triggerNextTableAdvertisement(): Unit = // TODO use this in tests for triggering
     runNextTableAdvertisement()
 
-  def scheduleNextTableAdvertisement(): Unit =
-    tableAdvertisementInterval match {
-      case d: FiniteDuration ⇒
-        try {
-          system.scheduler.scheduleOnce(d, ScheduledTableAdvertisementRunnable)(system.dispatcher)
-          log.debug("Scheduled {} advertisement in [{}] from now...", getClass.getSimpleName, PrettyDuration.format(tableAdvertisementInterval, includeNanos = false, 1))
-        } catch {
-          case ex: IllegalStateException ⇒
-            // this is usually harmless
-            log.debug("Unable to schedule {} advertisement, " +
-              "likely system is shutting down. Reason: {}", getClass.getName, ex.getMessage)
-        }
-      case _ ⇒ // ignore...
-    }
-
-  private val ScheduledTableAdvertisementRunnable = new Runnable {
-    override def run(): Unit =
-      try runNextTableAdvertisement()
-      finally scheduleNextTableAdvertisement()
-  }
-
   /**
    * Entry point to advertising a new compression table.
    *
@@ -328,32 +363,52 @@ private[remote] abstract class InboundCompression[T >: Null](
    * Triggers compression table advertisement. May be triggered by schedule or manually, i.e. for testing.
    */
   private[remote] def runNextTableAdvertisement() = {
-    val current = state.get
-    if (ArterySettings.Compression.Debug) println(s"[compress] runNextTableAdvertisement, state = $current")
-    current.advertisementInProgress match {
-      case None ⇒
-        inboundContext.association(originUid) match {
-          case OptionVal.Some(association) ⇒
-            val table = prepareCompressionAdvertisement(current.nextTable.version)
-            // TODO expensive, check if building the other way wouldn't be faster?
-            val nextState = current.copy(nextTable = table.invert, advertisementInProgress = Some(table))
-            if (state.compareAndSet(current, nextState))
-              advertiseCompressionTable(association, table)
+    if (stopped.get) {
+      schedulerTask.foreach(_.cancel())
+    } else {
+      val current = state.get
+      if (ArterySettings.Compression.Debug) println(s"[compress] runNextTableAdvertisement, state = $current")
+      current.advertisementInProgress match {
+        case None ⇒
+          inboundContext.association(originUid) match {
+            case OptionVal.Some(association) ⇒
+              if (alive) {
+                val table = prepareCompressionAdvertisement(current.nextTable.version)
+                // TODO expensive, check if building the other way wouldn't be faster?
+                val nextState = current.copy(nextTable = table.invert, advertisementInProgress = Some(table))
+                if (state.compareAndSet(current, nextState)) {
+                  alive = false // will be set to true on first incoming message
+                  resendCount.set(0)
+                  advertiseCompressionTable(association, table)
+                }
+              } else
+                log.debug("Inbound compression table for originUid [{}] not changed, no need to advertise same.", originUid)
 
-          case OptionVal.None ⇒
-            // otherwise it's too early, association not ready yet.
-            // so we don't build the table since we would not be able to send it anyway.
-            log.warning("No Association for originUid [{}] yet, unable to advertise compression table.", originUid)
-        }
+            case OptionVal.None ⇒
+              // otherwise it's too early, association not ready yet.
+              // so we don't build the table since we would not be able to send it anyway.
+              log.debug("No Association for originUid [{}] yet, unable to advertise compression table.", originUid)
+          }
 
-      case Some(inProgress) ⇒
-        // The ActorRefCompressionAdvertisement message is resent because it can be lost
-        log.debug("Advertisment in progress for version {}, resending", inProgress.version)
-        inboundContext.association(originUid) match {
-          case OptionVal.Some(association) ⇒
-            advertiseCompressionTable(association, inProgress) // resend
-          case OptionVal.None ⇒
-        }
+        case Some(inProgress) ⇒
+          if (resendCount.incrementAndGet() <= 5) {
+            // The ActorRefCompressionAdvertisement message is resent because it can be lost
+            log.debug(
+              "Advertisment in progress for originUid [{}] version {}, resending",
+              originUid, inProgress.version)
+            inboundContext.association(originUid) match {
+              case OptionVal.Some(association) ⇒
+                advertiseCompressionTable(association, inProgress) // resend
+              case OptionVal.None ⇒
+            }
+          } else {
+            // give up, it might be dead
+            log.debug(
+              "Advertisment in progress for originUid [{}] version {} but no confirmation after retries.",
+              originUid, inProgress.version)
+            confirmAdvertisement(inProgress.version)
+          }
+      }
     }
   }
 
@@ -365,7 +420,7 @@ private[remote] abstract class InboundCompression[T >: Null](
 
   private def prepareCompressionAdvertisement(nextTableVersion: Int): CompressionTable[T] = {
     // TODO surely we can do better than that, optimise
-    CompressionTable(nextTableVersion, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*))
+    CompressionTable(originUid, nextTableVersion, Map(heavyHitters.snapshot.filterNot(_ == null).zipWithIndex: _*))
   }
 
   override def toString =
@@ -379,3 +434,26 @@ final class UnknownCompressedIdException(id: Long)
       s"This could happen if this node has started a new ActorSystem bound to the same address as previously, " +
       s"and previous messages from a remote system were still in flight (using an old compression table). " +
       s"The remote system is expected to drop the compression table and this system will advertise a new one.")
+
+/**
+ * INTERNAL API
+ *
+ * Literarily, no compression!
+ */
+case object NoInboundCompressions extends InboundCompressions {
+  override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = ()
+  override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] =
+    if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
+    else OptionVal.None
+  override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
+
+  override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = ()
+  override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] =
+    if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
+    else OptionVal.None
+  override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
+
+  override def close(): Unit = ()
+
+  override def close(originUid: Long): Unit = ()
+}
diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala
deleted file mode 100644
index 7219cb0cfa..0000000000
--- a/akka-remote/src/main/scala/akka/remote/artery/compress/NoInboundCompressions.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (C) 2016 Lightbend Inc. 
- */
-package akka.remote.artery.compress
-
-import akka.actor.{ ActorRef, Address }
-import akka.util.OptionVal
-
-/**
- * INTERNAL API
- *
- * Literarily, no compression!
- */
-case object NoInboundCompressions extends InboundCompressions {
-  override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = ()
-  override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] =
-    if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
-    else OptionVal.None
-  override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
-
-  override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = ()
-  override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] =
-    if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
-    else OptionVal.None
-  override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
-}
-
diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
index 98fe74a0ad..30499f86a4 100644
--- a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
+++ b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
@@ -26,6 +26,8 @@ private[akka] object ArteryMessageSerializer {
   private val SystemMessageEnvelopeManifest = "j"
   private val SystemMessageDeliveryAckManifest = "k"
   private val SystemMessageDeliveryNackManifest = "l"
+
+  private final val DeadLettersRepresentation = ""
 }
 
 /** INTERNAL API */
@@ -93,42 +95,45 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
   def deserializeQuarantined(quarantined: ArteryControlFormats.Quarantined): Quarantined =
     Quarantined(deserializeUniqueAddress(quarantined.getFrom), deserializeUniqueAddress(quarantined.getTo))
 
-  def serializeActorRef(ref: ActorRef): String = Serialization.serializedActorPath(ref)
-  def deserializeActorRef(str: String): ActorRef = system.provider.resolveActorRef(str)
+  def serializeActorRef(ref: ActorRef): String =
+    if ((ref eq ActorRef.noSender) || (ref eq system.deadLetters)) DeadLettersRepresentation
+    else Serialization.serializedActorPath(ref)
+
+  def deserializeActorRef(str: String): ActorRef =
+    if (str == DeadLettersRepresentation) system.deadLetters
+    else system.provider.resolveActorRef(str)
 
   def serializeActorRefCompressionAdvertisement(adv: ActorRefCompressionAdvertisement): ArteryControlFormats.CompressionTableAdvertisement =
-    // FIXME: is it guaranteed that idx 0 is reserved for DeadLetters? In the best case, this knowledge should be managed in only one place.
-    serializeCompressionAdvertisement(adv)(serializeActorRef, _ != 0 /* 0 is reserved for DeadLetters and doesn't need to be serialized explicitly */ )
+    serializeCompressionAdvertisement(adv)(serializeActorRef)
 
   def deserializeActorRefCompressionAdvertisement(bytes: Array[Byte]): ActorRefCompressionAdvertisement =
-    deserializeCompressionAdvertisement(bytes, deserializeActorRef, ActorRefCompressionAdvertisement, Seq(system.deadLetters → 0) /* add DeadLetters explicitly */ )
+    deserializeCompressionAdvertisement(bytes, deserializeActorRef, ActorRefCompressionAdvertisement)
 
-  def serializeCompressionAdvertisement[T](adv: CompressionAdvertisement[T])(keySerializer: T ⇒ String, valueFilter: Int ⇒ Boolean = _ ⇒ true): ArteryControlFormats.CompressionTableAdvertisement = {
+  def serializeCompressionAdvertisement[T](adv: CompressionAdvertisement[T])(keySerializer: T ⇒ String): ArteryControlFormats.CompressionTableAdvertisement = {
     val builder =
       ArteryControlFormats.CompressionTableAdvertisement.newBuilder
         .setFrom(serializeUniqueAddress(adv.from))
+        .setOriginUid(adv.table.originUid)
         .setTableVersion(adv.table.version)
 
-    adv.table.map.foreach {
-      case (key, value) if valueFilter(value) ⇒
+    adv.table.dictionary.foreach {
+      case (key, value) ⇒
         builder
           .addKeys(keySerializer(key))
           .addValues(value)
-      case _ ⇒
     }
 
     builder.build
   }
 
-  def deserializeCompressionAdvertisement[T, U](bytes: Array[Byte], keyDeserializer: String ⇒ T, create: (UniqueAddress, CompressionTable[T]) ⇒ U, extraValues: Seq[(T, Int)] = Nil): U = {
+  def deserializeCompressionAdvertisement[T, U](bytes: Array[Byte], keyDeserializer: String ⇒ T, create: (UniqueAddress, CompressionTable[T]) ⇒ U): U = {
     val protoAdv = ArteryControlFormats.CompressionTableAdvertisement.parseFrom(bytes)
 
     val kvs =
       protoAdv.getKeysList.asScala.map(keyDeserializer).zip(
-        protoAdv.getValuesList.asScala.asInstanceOf[Iterable[Int]] /* to avoid having to call toInt explicitly */ ) ++
-        extraValues
+        protoAdv.getValuesList.asScala.asInstanceOf[Iterable[Int]] /* to avoid having to call toInt explicitly */ )
 
-    val table = CompressionTable(protoAdv.getTableVersion, kvs.toMap)
+    val table = CompressionTable(protoAdv.getOriginUid, protoAdv.getTableVersion, kvs.toMap)
     create(deserializeUniqueAddress(protoAdv.getFrom), table)
   }
 
@@ -210,4 +215,4 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
 
   def deserializeAddress(address: ArteryControlFormats.Address): Address =
     Address(address.getProtocol, address.getSystem, address.getHostname, address.getPort)
-}
\ No newline at end of file
+}
diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala
index c6f85a2e98..ecd049b445 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala
@@ -33,10 +33,10 @@ class EnvelopeBufferSpec extends AkkaSpec {
     val idxToManifest = manifestToIdx.map(_.swap)
 
     val outboundActorRefTable: CompressionTable[ActorRef] =
-      CompressionTable(version = 0xCAFE, refToIdx)
+      CompressionTable(17L, version = 0xCAFE, refToIdx)
 
     val outboundClassManifestTable: CompressionTable[String] =
-      CompressionTable(version = 0xBABE, manifestToIdx)
+      CompressionTable(17L, version = 0xBABE, manifestToIdx)
 
     override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = ()
     override def decompressActorRef(originUid: Long, tableVersion: Int, idx: Int): OptionVal[ActorRef] = OptionVal(idxToRef(idx))
@@ -45,6 +45,9 @@ class EnvelopeBufferSpec extends AkkaSpec {
     override def hitClassManifest(originUid: Long, remote: Address, manifest: String, n: Int): Unit = ()
     override def decompressClassManifest(originUid: Long, tableVersion: Int, idx: Int): OptionVal[String] = OptionVal(idxToManifest(idx))
     override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Int): Unit = ()
+
+    override def close(): Unit = ()
+    override def close(originUid: Long): Unit = ()
   }
 
   "EnvelopeBuffer" must {
@@ -83,7 +86,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
       headerOut.senderActorRefPath should ===(OptionVal.None)
       headerOut.recipientActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/compressable1")
       headerOut.recipientActorRefPath should ===(OptionVal.None)
-      headerOut.manifest(originUid) should ===("manifest1")
+      headerOut.manifest(originUid).get should ===("manifest1")
     }
 
     "be able to encode and decode headers with uncompressed literals" in {
@@ -116,7 +119,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
       headerOut.senderActorRef(originUid) should ===(OptionVal.None)
       headerOut.recipientActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable11"))
       headerOut.recipientActorRef(originUid) should ===(OptionVal.None)
-      headerOut.manifest(originUid) should ===("uncompressable3333")
+      headerOut.manifest(originUid).get should ===("uncompressable3333")
     }
 
     "be able to encode and decode headers with mixed literals" in {
@@ -144,7 +147,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
       headerOut.senderActorRefPath should ===(OptionVal.None)
       headerOut.recipientActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable1"))
       headerOut.recipientActorRef(originUid) should ===(OptionVal.None)
-      headerOut.manifest(originUid) should ===("manifest1")
+      headerOut.manifest(originUid).get should ===("manifest1")
 
       val senderRef = minimalRef("uncompressable0")
 
@@ -171,7 +174,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
       headerOut.senderActorRef(originUid) should ===(OptionVal.None)
       headerOut.recipientActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring")
       headerOut.recipientActorRefPath should ===(OptionVal.None)
-      headerOut.manifest(originUid) should ===("longlonglongliteralmanifest")
+      headerOut.manifest(originUid).get should ===("longlonglongliteralmanifest")
     }
 
     "be able to encode and decode headers with mixed literals and payload" in {
@@ -197,7 +200,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
       headerOut.senderActorRefPath should ===(OptionVal.None)
       headerOut.recipientActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable1"))
       headerOut.recipientActorRef(originUid) should ===(OptionVal.None)
-      headerOut.manifest(originUid) should ===("manifest1")
+      headerOut.manifest(originUid).get should ===("manifest1")
 
       ByteString.fromByteBuffer(envelope.byteBuffer) should ===(payload)
     }
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/ActorRefCompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/ActorRefCompressionIntegrationSpec.scala
deleted file mode 100644
index f59005268a..0000000000
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/ActorRefCompressionIntegrationSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (C) 2016 Lightbend Inc. 
- */
-
-package akka.remote.artery.compress
-
-import akka.actor._
-import akka.pattern.ask
-import akka.remote.artery.compress.CompressionProtocol.Events
-import akka.testkit._
-import akka.util.Timeout
-import com.typesafe.config.ConfigFactory
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.{ Eventually, PatienceConfiguration }
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-class ActorRefCompressionIntegrationSpec extends AkkaSpec(CompressionIntegrationSpec.commonConfig)
-  with ImplicitSender with BeforeAndAfter with Eventually {
-  import CompressionIntegrationSpec._
-
-  implicit val t = Timeout(3.seconds)
-  var systemB = ActorSystem("systemB", configB)
-
-  "Outgoing ActorRef compression table" must {
-    "compress chatty actor" in {
-      val messagesToExchange = 10
-
-      // listen for compression table events
-      val aProbe = TestProbe()
-      val b1Probe = TestProbe()
-      system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
-      systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
-
-      def voidSel = system.actorSelection(s"akka://systemB@localhost:$portB/user/void")
-      systemB.actorOf(TestActors.blackholeProps, "void")
-
-      // cause testActor-1 to become a heavy hitter
-      (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised
-
-      val a1 = aProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](10.seconds)
-      info("System [A] received: " + a1)
-      assertCompression[ActorRef](a1.table, 0, _ should ===(system.deadLetters))
-      assertCompression[ActorRef](a1.table, 1, _ should ===(testActor))
-    }
-  }
-
-  def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T ⇒ Unit): Unit = {
-    table.map.find(_._2 == id)
-      .orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") }
-      .foreach(i ⇒ assertion(i._1))
-  }
-
-  def identify(_system: String, port: Int, name: String): ActorRef = {
-    val selection = system.actorSelection(s"akka://${_system}@localhost:$port/user/$name").resolveOne(3.seconds)
-    Await.result(selection, 4.seconds)
-  }
-
-  override def afterTermination(): Unit =
-    shutdownAllActorSystems()
-
-  private def shutdownAllActorSystems(): Unit = {
-    if (systemB != null) shutdown(systemB)
-  }
-}
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/ClassManifestCompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/ClassManifestCompressionIntegrationSpec.scala
deleted file mode 100644
index fbd5568d52..0000000000
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/ClassManifestCompressionIntegrationSpec.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (C) 2016 Lightbend Inc. 
- */
-
-package akka.remote.artery.compress
-
-import akka.actor._
-import akka.pattern.ask
-import akka.remote.artery.compress.CompressionProtocol.Events
-import akka.testkit._
-import akka.util.Timeout
-import com.typesafe.config.ConfigFactory
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.{ Eventually, PatienceConfiguration }
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-class ClassManifestCompressionIntegrationSpec extends AkkaSpec(CompressionIntegrationSpec.commonConfig)
-  with ImplicitSender with BeforeAndAfter with Eventually {
-  import CompressionIntegrationSpec._
-
-  implicit val t = Timeout(3.seconds)
-  var systemB = ActorSystem("systemB", configB)
-
-  "Outgoing Manifest compression table" must {
-    "compress chatty manifest" in {
-      val messagesToExchange = 10
-
-      // listen for compression table events
-      val aProbe = TestProbe()
-      val b1Probe = TestProbe()
-      system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
-      systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
-      systemB.actorOf(TestActors.blackholeProps, "void-2")
-
-      Thread.sleep(1000)
-      val voidRef = Await.result(system.actorSelection(s"akka://systemB@localhost:$portB/user/void-2").resolveOne(3.second), 3.seconds)
-
-      // cause testActor-1 to become a heavy hitter
-      (1 to messagesToExchange).foreach { i ⇒ voidRef ! TestMessage("hello") } // does not reply, but a hot receiver should be advertised
-
-      eventually(PatienceConfiguration.Timeout(20.seconds)) {
-        val a1 = aProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](10.seconds)
-        info("System [A] received: " + a1)
-        assertCompression[String](a1.table, 0, _ should ===("TestMessageManifest"))
-      }
-    }
-  }
-
-  def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T ⇒ Unit): Unit = {
-    table.map.find(_._2 == id)
-      .orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") }
-      .foreach(i ⇒ assertion(i._1))
-  }
-
-  def identify(_system: String, port: Int, name: String) = {
-    val selection =
-      system.actorSelection(s"akka://${_system}@localhost:$port/user/$name")
-    val ActorIdentity(1, ref) = Await.result(selection ? Identify(1), 3.seconds)
-    ref.get
-  }
-
-  override def afterTermination(): Unit =
-    shutdownAllActorSystems()
-
-  private def shutdownAllActorSystems(): Unit = {
-    if (systemB != null) shutdown(systemB)
-  }
-}
-
-import akka.actor.ExtendedActorSystem
-import akka.serialization.SerializerWithStringManifest
-
-final case class TestMessage(name: String)
-
-class TestMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest {
-
-  val TestMessageManifest = "TestMessageManifest"
-
-  override val identifier: Int = 101
-
-  override def manifest(o: AnyRef): String =
-    o match {
-      case _: TestMessage ⇒ TestMessageManifest
-    }
-
-  override def toBinary(o: AnyRef): Array[Byte] = o match {
-    case msg: TestMessage ⇒ msg.name.getBytes
-  }
-
-  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
-    manifest match {
-      case TestMessageManifest ⇒ TestMessage(new String(bytes))
-      case unknown             ⇒ throw new Exception("Unknown manifest: " + unknown)
-    }
-  }
-}
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala
index 47f7523603..49337d6bb1 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala
@@ -4,20 +4,28 @@
 
 package akka.remote.artery.compress
 
-import akka.testkit.SocketUtil
 import com.typesafe.config.ConfigFactory
+import akka.actor._
+import akka.pattern.ask
+import akka.remote.artery.compress.CompressionProtocol.Events
+import akka.testkit._
+import akka.util.Timeout
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfter
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import akka.actor.ExtendedActorSystem
+import akka.serialization.SerializerWithStringManifest
+import akka.remote.artery.ArteryMultiNodeSpec
 
 object CompressionIntegrationSpec {
-  // need the port before systems are started
-  val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
 
   val commonConfig = ConfigFactory.parseString(s"""
      akka {
        loglevel = INFO
 
        actor {
-         provider = "akka.remote.RemoteActorRefProvider"
-
          serializers {
            test-message = "akka.remote.artery.compress.TestMessageSerializer"
          }
@@ -25,19 +33,255 @@ object CompressionIntegrationSpec {
            "akka.remote.artery.compress.TestMessage" = test-message
          }
        }
-       remote.artery.enabled = on
-       remote.artery.canonical.hostname = localhost
-       remote.artery.canonical.port = 0
-       remote.artery.advanced.handshake-timeout = 10s
 
        remote.artery.advanced.compression {
-         actor-refs.advertisement-interval = 3 seconds
-         manifests.advertisement-interval = 3 seconds
+         actor-refs.advertisement-interval = 2 seconds
+         manifests.advertisement-interval = 2 seconds
        }
-
      }
   """)
 
-  val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
-    .withFallback(commonConfig)
+}
+
+class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrationSpec.commonConfig)
+  with ImplicitSender {
+  import CompressionIntegrationSpec._
+
+  val systemB = newRemoteSystem(name = Some("systemB"))
+  val messagesToExchange = 10
+
+  "Compression table" must {
+    "be advertised for chatty ActorRef and manifest" in {
+      // listen for compression table events
+      val aManifestProbe = TestProbe()(system)
+      val bManifestProbe = TestProbe()(systemB)
+      system.eventStream.subscribe(aManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
+      systemB.eventStream.subscribe(bManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
+      val aRefProbe = TestProbe()(system)
+      val bRefProbe = TestProbe()(systemB)
+      system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
+      systemB.eventStream.subscribe(bRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
+
+      val echoRefB = systemB.actorOf(TestActors.echoActorProps, "echo")
+
+      system.actorSelection(rootActorPath(systemB) / "user" / "echo") ! Identify(None)
+      val echoRefA = expectMsgType[ActorIdentity].ref.get
+
+      // cause TestMessage manifest to become a heavy hitter
+      // cause echo to become a heavy hitter
+      (1 to messagesToExchange).foreach { i ⇒ echoRefA ! TestMessage("hello") }
+      receiveN(messagesToExchange) // the replies
+
+      within(10.seconds) {
+        // on system A side
+        awaitAssert {
+          val a1 = aManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
+          info("System [A] received: " + a1)
+          a1.table.version should be >= (1)
+          a1.table.dictionary.keySet should contain("TestMessageManifest")
+        }
+        awaitAssert {
+          val a1 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+          info("System [A] received: " + a1)
+          a1.table.version should be >= (1)
+          a1.table.dictionary.keySet should contain(echoRefA) // recipient
+          a1.table.dictionary.keySet should contain(testActor) // sender
+        }
+
+        // on system B side
+        awaitAssert {
+          val b1 = bManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
+          info("System [B] received: " + b1)
+          b1.table.version should be >= (1)
+          b1.table.dictionary.keySet should contain("TestMessageManifest")
+        }
+        awaitAssert {
+          val b1 = bRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+          info("System [B] received: " + b1)
+          b1.table.version should be >= (1)
+          b1.table.dictionary.keySet should contain(echoRefB)
+        }
+      }
+
+      // and if we continue sending new advertisements with higher version number are advertised
+      within(20.seconds) {
+        val ignore = TestProbe()(system)
+        awaitAssert {
+          echoRefA.tell(TestMessage("hello2"), ignore.ref)
+          val a2 = aManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
+          info("System [A] received more: " + a2)
+          a2.table.version should be >= (3)
+        }
+        awaitAssert {
+          echoRefA.tell(TestMessage("hello2"), ignore.ref)
+          val a2 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+          info("System [A] received more: " + a2)
+          a2.table.version should be >= (3)
+        }
+
+        awaitAssert {
+          echoRefA.tell(TestMessage("hello3"), ignore.ref)
+          val b2 = bManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
+          info("System [B] received more: " + b2)
+          b2.table.version should be >= (3)
+        }
+        awaitAssert {
+          echoRefA.tell(TestMessage("hello3"), ignore.ref)
+          val b2 = bRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+          info("System [B] received more: " + b2)
+          b2.table.version should be >= (3)
+        }
+      }
+    }
+  }
+
+  "handle noSender sender" in {
+    val aRefProbe = TestProbe()(systemB)
+    system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
+
+    val probeB = TestProbe()(systemB)
+    systemB.actorOf(TestActors.forwardActorProps(probeB.ref), "fw1")
+
+    system.actorSelection(rootActorPath(systemB) / "user" / "fw1") ! Identify(None)
+    val fwRefA = expectMsgType[ActorIdentity].ref.get
+
+    fwRefA.tell(TestMessage("hello-fw1-a"), ActorRef.noSender)
+    probeB.expectMsg(TestMessage("hello-fw1-a"))
+
+    within(10.seconds) {
+      // on system A side
+      awaitAssert {
+        val a1 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+        info("System [A] received: " + a1)
+        a1.table.dictionary.keySet should contain(fwRefA) // recipient
+        a1.table.dictionary.keySet should not contain (system.deadLetters) // sender
+      }
+    }
+
+    fwRefA.tell(TestMessage("hello-fw1-b"), ActorRef.noSender)
+    probeB.expectMsg(TestMessage("hello-fw1-b"))
+  }
+
+  "handle deadLetters sender" in {
+    val aRefProbe = TestProbe()(systemB)
+    system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
+
+    val probeB = TestProbe()(systemB)
+    systemB.actorOf(TestActors.forwardActorProps(probeB.ref), "fw2")
+
+    system.actorSelection(rootActorPath(systemB) / "user" / "fw2") ! Identify(None)
+    val fwRefA = expectMsgType[ActorIdentity].ref.get
+
+    fwRefA.tell(TestMessage("hello-fw2-a"), ActorRef.noSender)
+    probeB.expectMsg(TestMessage("hello-fw2-a"))
+
+    within(10.seconds) {
+      // on system A side
+      awaitAssert {
+        val a1 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+        info("System [A] received: " + a1)
+        a1.table.dictionary.keySet should contain(fwRefA) // recipient
+        a1.table.dictionary.keySet should not contain (system.deadLetters) // sender
+      }
+    }
+
+    fwRefA.tell(TestMessage("hello-fw2-b"), ActorRef.noSender)
+    probeB.expectMsg(TestMessage("hello-fw2-b"))
+  }
+
+  "work when starting new ActorSystem with same hostname:port" in {
+    val port = address(systemB).port.get
+    shutdown(systemB)
+    val systemB2 = newRemoteSystem(
+      extraConfig = Some(s"akka.remote.artery.canonical.port=$port"),
+      name = Some("systemB"))
+
+    // listen for compression table events
+    val aManifestProbe = TestProbe()(system)
+    val bManifestProbe = TestProbe()(systemB2)
+    system.eventStream.subscribe(aManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
+    systemB2.eventStream.subscribe(bManifestProbe.ref, classOf[CompressionProtocol.Events.ReceivedClassManifestCompressionTable])
+    val aRefProbe = TestProbe()(system)
+    val bRefProbe = TestProbe()(systemB2)
+    system.eventStream.subscribe(aRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
+    systemB2.eventStream.subscribe(bRefProbe.ref, classOf[CompressionProtocol.Events.ReceivedActorRefCompressionTable])
+
+    val echoRefB2 = systemB2.actorOf(TestActors.echoActorProps, "echo2")
+
+    // messages to the new system might be dropped, before new handshake is completed
+    within(5.seconds) {
+      awaitAssert {
+        val p = TestProbe()(system)
+        system.actorSelection(rootActorPath(systemB2) / "user" / "echo2").tell(Identify(None), p.ref)
+        p.expectMsgType[ActorIdentity](1.second).ref.get
+      }
+    }
+
+    system.actorSelection(rootActorPath(systemB2) / "user" / "echo2") ! Identify(None)
+    val echoRefA = expectMsgType[ActorIdentity].ref.get
+
+    // cause TestMessage manifest to become a heavy hitter
+    (1 to messagesToExchange).foreach { i ⇒ echoRefA ! TestMessage("hello") }
+    receiveN(messagesToExchange) // the replies
+
+    within(10.seconds) {
+      // on system A side
+      awaitAssert {
+        val a2 = aManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
+        info("System [A] received: " + a2)
+        a2.table.version should be >= (1)
+        a2.table.version should be < (3)
+        a2.table.dictionary.keySet should contain("TestMessageManifest")
+      }
+      awaitAssert {
+        val a2 = aRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+        info("System [A] received: " + a2)
+        a2.table.version should be >= (1)
+        a2.table.version should be < (3)
+        a2.table.dictionary.keySet should contain(echoRefA) // recipient
+        a2.table.dictionary.keySet should contain(testActor) // sender
+      }
+
+      // on system B2 side
+      awaitAssert {
+        val b2 = bManifestProbe.expectMsgType[Events.ReceivedClassManifestCompressionTable](2.seconds)
+        info("System [B2] received: " + b2)
+        b2.table.version should be >= (1)
+        b2.table.dictionary.keySet should contain("TestMessageManifest")
+      }
+      awaitAssert {
+        val b2 = bRefProbe.expectMsgType[Events.ReceivedActorRefCompressionTable](2.seconds)
+        info("System [B] received: " + b2)
+        b2.table.version should be >= (1)
+        b2.table.dictionary.keySet should contain(echoRefB2)
+      }
+    }
+
+  }
+
+}
+
+final case class TestMessage(name: String)
+
+class TestMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest {
+
+  val TestMessageManifest = "TestMessageManifest"
+
+  override val identifier: Int = 101
+
+  override def manifest(o: AnyRef): String =
+    o match {
+      case _: TestMessage ⇒ TestMessageManifest
+    }
+
+  override def toBinary(o: AnyRef): Array[Byte] = o match {
+    case msg: TestMessage ⇒ msg.name.getBytes
+  }
+
+  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
+    manifest match {
+      case TestMessageManifest ⇒ TestMessage(new String(bytes))
+      case unknown             ⇒ throw new Exception("Unknown manifest: " + unknown)
+    }
+  }
 }
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala
index 27b61c03d2..ae8c9f17ea 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTableSpec.scala
@@ -10,12 +10,14 @@ class CompressionTableSpec extends AkkaSpec {
 
   "CompressionTable" must {
     "should invert" in {
-      val decomp = CompressionTable(1, Map("0" → 0, "1" → 1, "2" → 2, "3" → 3)).invert
+      val decomp = CompressionTable(17L, 1, Map("0" → 0, "1" → 1, "2" → 2, "3" → 3)).invert
       decomp.table should ===(Array("0", "1", "2", "3"))
+      decomp.originUid should ===(17L)
+      decomp.version should ===(1)
     }
 
     "enforce to start allocating from 0th index" in {
-      val compressionTable = CompressionTable(1, Map("1" → 1, "3" → 3)) // missing 0 is a gap too
+      val compressionTable = CompressionTable(17L, 1, Map("1" → 1, "3" → 3)) // missing 0 is a gap too
 
       val ex = intercept[IllegalArgumentException] {
         compressionTable.invert
@@ -24,7 +26,7 @@ class CompressionTableSpec extends AkkaSpec {
     }
 
     "should not allow having gaps in compression ids (inversion would fail)" in {
-      val compressionTable = CompressionTable(1, Map("0" → 0, "1" → 1, "3" → 3)) // missing 0 is a gap too
+      val compressionTable = CompressionTable(17L, 1, Map("0" → 0, "1" → 1, "3" → 3)) // missing 0 is a gap too
 
       val ex = intercept[IllegalArgumentException] {
         compressionTable.invert
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala
deleted file mode 100644
index 7bdd8973e8..0000000000
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionTestKit.scala
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright (C) 2016 Lightbend Inc. 
- */
-
-package akka.remote.artery.compress
-
-/* INTERNAL API */
-private[akka] trait CompressionTestKit {
-  def assertCompression[T](table: CompressionTable[T], id: Int, assertion: T ⇒ Unit): Unit = {
-    table.map.find(_._2 == id)
-      .orElse { throw new AssertionError(s"No key was compressed to the id [$id]! Table was: $table") }
-      .foreach(i ⇒ assertion(i._1))
-  }
-}
-
-/* INTERNAL API */
-private[akka] object CompressionTestKit extends CompressionTestKit
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala
index 4cf2c69646..869d8c0ff1 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala
@@ -47,8 +47,7 @@ object HandshakeShouldDropCompressionTableSpec {
 }
 
 class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDropCompressionTableSpec.commonConfig)
-  with ImplicitSender with BeforeAndAfter
-  with CompressionTestKit {
+  with ImplicitSender with BeforeAndAfter {
   import HandshakeShouldDropCompressionTableSpec._
 
   implicit val t = Timeout(3.seconds)
@@ -81,7 +80,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
 
       val a0 = aProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
       info("System [A] received: " + a0)
-      a0.table.map.keySet should contain(testActor)
+      a0.table.dictionary.keySet should contain(testActor)
 
       // cause a1Probe to become a heavy hitter (we want to not have it in the 2nd compression table later)
       (1 to messagesToExchange).foreach { i ⇒ echoSel.tell(s"hello-$i", a1Probe.ref) }
@@ -90,7 +89,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
 
       val a1 = aProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
       info("System [A] received: " + a1)
-      a1.table.map.keySet should contain(a1Probe.ref)
+      a1.table.dictionary.keySet should contain(a1Probe.ref)
 
       log.warning("SHUTTING DOWN system {}...", systemB)
       shutdown(systemB)
@@ -112,7 +111,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
 
       val a2 = aNewProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
       info("System [A] received: " + a2)
-      a2.table.map.keySet should contain(testActor)
+      a2.table.dictionary.keySet should contain(testActor)
 
       val aNew2Probe = TestProbe()
       (1 to messagesToExchange).foreach { i ⇒ echoSel.tell(s"hello-$i", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised
@@ -121,7 +120,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
 
       val a3 = aNewProbe.expectMsgType[ReceivedActorRefCompressionTable](10.seconds)
       info("Received second compression: " + a3)
-      a3.table.map.keySet should contain(aNew2Probe.ref)
+      a3.table.dictionary.keySet should contain(aNew2Probe.ref)
     }
   }
 
diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala
index ac04610e9b..3c18d06f58 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/compress/OutboundCompressionSpec.scala
@@ -20,11 +20,11 @@ class OutboundCompressionSpec extends AkkaSpec {
     }
 
     "compress previously registered actor ref" in {
-      val table = CompressionTable(1, Map(system.deadLetters → 0, alice → 1))
+      val table = CompressionTable(17L, 1, Map(system.deadLetters → 0, alice → 1))
       table.compress(alice) should ===(1) // compressed
       table.compress(bob) should ===(-1) // not compressed
 
-      val table2 = table.copy(2, map = table.map.updated(bob, 2))
+      val table2 = table.copy(2, dictionary = table.dictionary.updated(bob, 2))
       table2.compress(alice) should ===(1) // compressed
       table2.compress(bob) should ===(2) // compressed
     }
diff --git a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
index 34bc13b1de..2d247067b9 100644
--- a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
@@ -24,14 +24,13 @@ class ArteryMessageSerializerSpec extends AkkaSpec {
       "ActorSystemTerminatingAck" → ActorSystemTerminatingAck(uniqueAddress()),
       "HandshakeReq" → HandshakeReq(uniqueAddress()),
       "HandshakeRsp" → HandshakeRsp(uniqueAddress()),
-      "ActorRefCompressionAdvertisement" → ActorRefCompressionAdvertisement(uniqueAddress(), CompressionTable(123, Map(actorA → 123, actorB → 456, system.deadLetters → 0))),
+      "ActorRefCompressionAdvertisement" → ActorRefCompressionAdvertisement(uniqueAddress(), CompressionTable(17L, 123, Map(actorA → 123, actorB → 456, system.deadLetters → 0))),
       "ActorRefCompressionAdvertisementAck" → ActorRefCompressionAdvertisementAck(uniqueAddress(), 23),
-      "ClassManifestCompressionAdvertisement" → ClassManifestCompressionAdvertisement(uniqueAddress(), CompressionTable(42, Map("a" → 535, "b" → 23))),
+      "ClassManifestCompressionAdvertisement" → ClassManifestCompressionAdvertisement(uniqueAddress(), CompressionTable(17L, 42, Map("a" → 535, "b" → 23))),
       "ClassManifestCompressionAdvertisementAck" → ClassManifestCompressionAdvertisementAck(uniqueAddress(), 23),
       "SystemMessageDelivery.SystemMessageEnvelop" → SystemMessageDelivery.SystemMessageEnvelope("test", 1234567890123L, uniqueAddress()),
       "SystemMessageDelivery.Ack" → SystemMessageDelivery.Ack(98765432109876L, uniqueAddress()),
-      "SystemMessageDelivery.Nack" → SystemMessageDelivery.Nack(98765432109876L, uniqueAddress())
-    ).foreach {
+      "SystemMessageDelivery.Nack" → SystemMessageDelivery.Nack(98765432109876L, uniqueAddress())).foreach {
         case (scenario, item) ⇒
           s"resolve serializer for $scenario" in {
             val serializer = SerializationExtension(system)