diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 2e1df09fc8..1d97121216 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -317,6 +317,6 @@ final case class UniqueAddress(address: Address, longUid: Long) extends Ordered[ * Stops `copy(Address, Long)` copy from being generated, use `apply` instead. */ @deprecated("Use Long UID constructor instead", since = "2.4.11") - def copy(address: Address = address, uid: Int = uid) = new UniqueAddress(address, uid) + def copy(address: Address = address, uid: Int = uid) = new UniqueAddress(address, uid.toLong) } diff --git a/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes b/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes new file mode 100644 index 0000000000..4a7d80ac34 --- /dev/null +++ b/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes @@ -0,0 +1,4 @@ +# #26542 compiler warnings (internal API's) +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.ORSet.clear") +ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.cluster.ddata.PruningState.addSeen") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.PruningState.addSeen") diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index 1b3af52425..8f7bb0bed1 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -482,7 +482,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( case ORMap.DeltaGroup(ops) => ops.foreach { processDelta.orElse { - case ORMap.DeltaGroup(args) => + case ORMap.DeltaGroup(_) => throw new IllegalStateException("Cannot nest DeltaGroups") } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index 127b3540bc..5bd4ae402c 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -181,7 +181,7 @@ final class ORMultiMap[A, B] private[akka] ( */ @InternalApi private[akka] def put(node: UniqueAddress, key: A, value: Set[B]): ORMultiMap[A, B] = { val newUnderlying = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas) { existing => - value.foldLeft(existing.clear(node)) { (s, element) => + value.foldLeft(existing.clear()) { (s, element) => s.add(node, element) } } @@ -216,7 +216,7 @@ final class ORMultiMap[A, B] private[akka] ( @InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMultiMap[A, B] = { if (withValueDeltas) { val u = underlying.updated(node, key, ORSet.empty[B], valueDeltas = true) { existing => - existing.clear(node) + existing.clear() } new ORMultiMap(u.removeKey(node, key), withValueDeltas) } else { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index 07c8c1b985..dfb584beb8 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -6,10 +6,9 @@ package akka.cluster.ddata import scala.annotation.tailrec import scala.collection.immutable - import akka.cluster.Cluster import akka.cluster.UniqueAddress -import akka.util.HashCode +import akka.util.{ unused, HashCode } import akka.annotation.InternalApi object ORSet { @@ -392,15 +391,15 @@ final class ORSet[A] private[akka] ( * This has the same result as using [[#remove]] for each * element, but it is more efficient. */ - def clear(node: SelfUniqueAddress): ORSet[A] = clear(node.uniqueAddress) + def clear(@unused node: SelfUniqueAddress): ORSet[A] = clear() @deprecated("Use `remove` that takes a `SelfUniqueAddress` parameter instead.", since = "2.5.20") - def clear(node: Cluster): ORSet[A] = clear(node.selfUniqueAddress) + def clear(@unused node: Cluster): ORSet[A] = clear() /** * INTERNAL API */ - @InternalApi private[akka] def clear(node: UniqueAddress): ORSet[A] = { + @InternalApi private[akka] def clear(): ORSet[A] = { val newFullState = new ORSet[A](elementsMap = Map.empty, vvector) val clearOp = ORSet.FullStateDeltaOp(newFullState) val newDelta = delta match { @@ -461,7 +460,7 @@ final class ORSet[A] private[akka] ( case (acc, op: ORSet.AddDeltaOp[A]) => acc.dryMerge(op.underlying, addDeltaOp = true) case (acc, op: ORSet.RemoveDeltaOp[A]) => acc.mergeRemoveDelta(op) case (acc, op: ORSet.FullStateDeltaOp[A]) => acc.dryMerge(op.underlying, addDeltaOp = false) - case (acc, op: ORSet.DeltaGroup[A]) => + case (_, _: ORSet.DeltaGroup[A]) => throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested") } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala index e641dbeec0..c6fda6780f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/PruningState.scala @@ -8,6 +8,7 @@ import akka.actor.Address import akka.cluster.Member import akka.cluster.UniqueAddress import akka.annotation.InternalApi +import akka.util.unused /** * INTERNAL API @@ -21,6 +22,7 @@ import akka.annotation.InternalApi } final case class PruningPerformed(obsoleteTime: Long) extends PruningState { def isObsolete(currentTime: Long): Boolean = obsoleteTime <= currentTime + def addSeen(@unused node: Address): PruningState = this } } @@ -44,5 +46,5 @@ import akka.annotation.InternalApi this } - def addSeen(node: Address): PruningState = this + def addSeen(node: Address): PruningState } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 802ed97244..cc47fafb13 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1569,7 +1569,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog getData(key) match { case someEnvelope @ Some(envelope) if envelope eq writeEnvelope => someEnvelope case Some(DataEnvelope(DeletedData, _, _)) => Some(DeletedEnvelope) // already deleted - case Some(envelope @ DataEnvelope(existing, _, _)) => + case Some(envelope @ DataEnvelope(existingData @ _, _, _)) => try { // DataEnvelope will mergeDelta when needed val merged = envelope.merge(writeEnvelope).addSeen(selfAddress) @@ -1918,8 +1918,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def hasSubscriber(subscriber: ActorRef): Boolean = - (subscribers.exists { case (k, s) => s.contains(subscriber) }) || - (newSubscribers.exists { case (k, s) => s.contains(subscriber) }) + subscribers.exists { case (_, s) => s.contains(subscriber) } || + newSubscribers.exists { case (_, s) => s.contains(subscriber) } def receiveTerminated(ref: ActorRef): Unit = { if (ref == durableStore) { @@ -2006,7 +2006,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val knownNodes = nodes.union(weaklyUpNodes).union(removedNodes.keySet.map(_.address)) val newRemovedNodes = dataEntries.foldLeft(Set.empty[UniqueAddress]) { - case (acc, (_, (envelope @ DataEnvelope(data: RemovedNodePruning, _, _), _))) => + case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _))) => acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n.address))) case (acc, _) => acc @@ -2037,7 +2037,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (envelope.needPruningFrom(removed)) { envelope.data match { - case dataWithRemovedNodePruning: RemovedNodePruning => + case _: RemovedNodePruning => envelope.pruning.get(removed) match { case None => init() case Some(PruningInitialized(owner, _)) if owner != selfUniqueAddress => init() @@ -2256,19 +2256,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case _: Replicator.UpdateSuccess[_] => gotLocalStoreReply = true if (isDone) reply(isTimeout = false) - case f: Replicator.StoreFailure[_] => + case _: Replicator.StoreFailure[_] => gotLocalStoreReply = true gotWriteNackFrom += selfUniqueAddress.address if (isDone) reply(isTimeout = false) case SendToSecondary => - deltaMsg match { - case None => - case Some(d) => - // Deltas must be applied in order and we can't keep track of ordering of - // simultaneous updates so there is a chance that the delta could not be applied. - // Try again with the full state to the primary nodes that have not acked. - primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg } + if (deltaMsg.isDefined) { + // Deltas must be applied in order and we can't keep track of ordering of + // simultaneous updates so there is a chance that the delta could not be applied. + // Try again with the full state to the primary nodes that have not acked. + primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg } } secondaryNodes.foreach { replica(_) ! writeMsg } case ReceiveTimeout => diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 10f0e7a396..5dbff116f9 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -37,7 +37,7 @@ private object ReplicatedDataSerializer { abstract class KeyComparator[A <: GeneratedMessage] extends Comparator[A] { /** - * Get the key from the entry. The key may be a String, Integer, Long, or Any + * Get the key from the entry. The key may be a String, Int, Long, or OtherMessage * @param entry The protobuf entry used with Map types * @return The Key */ @@ -54,6 +54,9 @@ private object ReplicatedDataSerializer { case (k1: Long, k2) => -1 case (k1, k2: Long) => 1 case (k1: OtherMessage, k2: OtherMessage) => OtherMessageComparator.compare(k1, k2) + case (k1, k2) => + throw new IllegalStateException( + s"Invalid keys (${k1.getClass}, ${k2.getClass}): must be of type String, Int, Long or OtherMessage") } } @@ -541,7 +544,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) b.addEntries(createEntry(rd.ORSetDeltaOp.Remove, u)) case ORSet.FullStateDeltaOp(u) => b.addEntries(createEntry(rd.ORSetDeltaOp.Full, u)) - case ORSet.DeltaGroup(u) => + case ORSet.DeltaGroup(_) => throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested") } b.build() @@ -899,7 +902,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) case ORMap.UpdateDeltaOp(op, m, zt) => b.addEntries( createEntry(rd.ORMapDeltaOp.ORMapUpdate, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, m, zt.value)) - case ORMap.DeltaGroup(u) => + case ORMap.DeltaGroup(_) => throw new IllegalArgumentException("ORMap.DeltaGroup should not be nested") } b.build() diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index f44eb0fd2d..201b0616f8 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -261,7 +261,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def statusToProto(status: Status): dm.Status = { val b = dm.Status.newBuilder() b.setChunk(status.chunk).setTotChunks(status.totChunks) - val entries = status.digests.foreach { + status.digests.foreach { case (key, digest) => b.addEntries(dm.Status.Entry.newBuilder().setKey(key).setDigest(ByteString.copyFrom(digest.toArray))) } @@ -278,7 +278,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) private def gossipToProto(gossip: Gossip): dm.Gossip = { val b = dm.Gossip.newBuilder().setSendBack(gossip.sendBack) - val entries = gossip.updatedData.foreach { + gossip.updatedData.foreach { case (key, data) => b.addEntries(dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data))) } @@ -296,7 +296,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val b = dm.DeltaPropagation.newBuilder().setFromNode(uniqueAddressToProto(deltaPropagation.fromNode)) if (deltaPropagation.reply) b.setReply(deltaPropagation.reply) - val entries = deltaPropagation.deltas.foreach { + deltaPropagation.deltas.foreach { case (key, Delta(data, fromSeqNr, toSeqNr)) => val b2 = dm.DeltaPropagation.Entry .newBuilder() @@ -331,11 +331,14 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case _: ReadAll => -1 } + val timoutInMillis = get.consistency.timeout.toMillis + require(timoutInMillis <= 0XFFFFFFFFL, "Timeouts must fit in a 32-bit unsigned int") + val b = dm.Get .newBuilder() .setKey(otherMessageToProto(get.key)) .setConsistency(consistencyValue) - .setTimeout(get.consistency.timeout.toMillis.toInt) + .setTimeout(timoutInMillis.toInt) get.request.foreach(o => b.setRequest(otherMessageToProto(o))) b.build() @@ -345,7 +348,11 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val get = dm.Get.parseFrom(bytes) val key = otherMessageFromProto(get.getKey).asInstanceOf[KeyR] val request = if (get.hasRequest()) Some(otherMessageFromProto(get.getRequest)) else None - val timeout = Duration(get.getTimeout, TimeUnit.MILLISECONDS) + // 32-bit unsigned protobuf integers are mapped to + // 32-bit signed Java ints, using the leftmost bit as sign. + val timeout = + if (get.getTimeout < 0) Duration(Int.MaxValue.toLong + (get.getTimeout - Int.MaxValue), TimeUnit.MILLISECONDS) + else Duration(get.getTimeout.toLong, TimeUnit.MILLISECONDS) val consistency = get.getConsistency match { case 0 => ReadMajority(timeout) case -1 => ReadAll(timeout) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/FlagSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/FlagSpec.scala index d02088034b..b7788f2717 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/FlagSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/FlagSpec.scala @@ -34,11 +34,15 @@ class FlagSpec extends WordSpec with Matchers { val f1 = Flag.Disabled.switchOn val Flag(value1) = f1 val value2: Boolean = value1 + value2 should be(true) + Changed(FlagKey("key"))(f1) match { case c @ Changed(FlagKey("key")) => val Flag(value3) = c.dataValue val value4: Boolean = value3 value4 should be(true) + case changed => + fail(s"Failed to match [$changed]") } } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala index 34b4a4d814..aeea896af4 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GCounterSpec.scala @@ -172,11 +172,15 @@ class GCounterSpec extends WordSpec with Matchers { val c1 = GCounter.empty.increment(node1).increment(node2) val GCounter(value1) = c1 val value2: BigInt = value1 + value2 should be(2L) + Changed(GCounterKey("key"))(c1) match { case c @ Changed(GCounterKey("key")) => val GCounter(value3) = c.dataValue val value4: BigInt = value3 value4 should be(2L) + case _ => + fail("Failed to update") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala index a5eaeda440..ad18a58007 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala @@ -141,14 +141,18 @@ class GSetSpec extends WordSpec with Matchers { "have unapply extractor" in { val s1 = GSet.empty + "a" + "b" - val s2: GSet[String] = s1 + val _: GSet[String] = s1 val GSet(elements1) = s1 val elements2: Set[String] = elements1 + elements2 should be(Set("a", "b")) + Changed(GSetKey[String]("key"))(s1) match { case c @ Changed(GSetKey("key")) => val GSet(elements3) = c.dataValue val elements4: Set[String] = elements3 elements4 should be(Set("a", "b")) + case changed => + fail(s"Failed to match [$changed]") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala index 6d6b659151..9a223eef46 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala @@ -13,8 +13,8 @@ import org.scalatest.WordSpec class LWWMapSpec extends WordSpec with Matchers { import LWWRegister.defaultClock - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) "A LWWMap" must { @@ -72,11 +72,15 @@ class LWWMapSpec extends WordSpec with Matchers { val m1 = LWWMap.empty[String, Long].put(node1, "a", 1L, defaultClock[Long]) val LWWMap(entries1) = m1 val entries2: Map[String, Long] = entries1 + entries2 should be(Map("a" -> 1L)) + Changed(LWWMapKey[String, Long]("key"))(m1) match { case c @ Changed(LWWMapKey("key")) => val LWWMap(entries3) = c.dataValue val entries4: Map[String, Long] = entries3 entries4 should be(Map("a" -> 1L)) + case changed => + fail(s"Failed to match [$changed]") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala index 9be4c5b8af..251cdac31f 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWRegisterSpec.scala @@ -13,8 +13,8 @@ import org.scalatest.WordSpec class LWWRegisterSpec extends WordSpec with Matchers { import LWWRegister.defaultClock - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) "A LWWRegister" must { "use latest of successive assignments" in { @@ -71,11 +71,15 @@ class LWWRegisterSpec extends WordSpec with Matchers { val r1 = LWWRegister(node1, "a", defaultClock[String]) val LWWRegister(value1) = r1 val value2: String = value1 + value2 should be("a") + Changed(LWWRegisterKey[String]("key"))(r1) match { case c @ Changed(LWWRegisterKey("key")) => val LWWRegister(value3) = c.dataValue val value4: String = value3 value4 should be("a") + case changed => + fail(s"Failed to match [$changed]") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala index 0f9807418d..b9bb18dc64 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala @@ -13,8 +13,8 @@ import org.scalatest.WordSpec class ORMapSpec extends WordSpec with Matchers { - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) "A ORMap" must { @@ -153,7 +153,7 @@ class ORMapSpec extends WordSpec with Matchers { val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A") val deltaVersion = m1.delta.get match { - case ORMap.PutDeltaOp(delta, v, dt) => + case ORMap.PutDeltaOp(delta, _, _) => delta match { case AddDeltaOp(u) => if (u.elementsMap.contains("a")) @@ -618,7 +618,7 @@ class ORMapSpec extends WordSpec with Matchers { val merged1: ORMap[String, ORSet[String]] = m1.merge(m2) - val m3 = merged1.updated(node1, "b", ORSet.empty[String])(_.clear(node1).add(node1, "B2")) + val m3 = merged1.updated(node1, "b", ORSet.empty[String])(_.clear().add(node1, "B2")) val merged2 = merged1.merge(m3) merged2.entries("a").elements should be(Set("A")) @@ -687,14 +687,18 @@ class ORMapSpec extends WordSpec with Matchers { "have unapply extractor" in { val m1 = ORMap.empty.put(node1, "a", Flag(true)).put(node2, "b", Flag(false)) - val m2: ORMap[String, Flag] = m1 + val _: ORMap[String, Flag] = m1 val ORMap(entries1) = m1 val entries2: Map[String, Flag] = entries1 + entries2 should be(Map("a" -> Flag(true), "b" -> Flag(false))) + Changed(ORMapKey[String, Flag]("key"))(m1) match { case c @ Changed(ORMapKey("key")) => val ORMap(entries3) = c.dataValue val entries4: Map[String, ReplicatedData] = entries3 entries4 should be(Map("a" -> Flag(true), "b" -> Flag(false))) + case changed => + fail(s"Failed to match [$changed]") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala index 3d0068588d..908db3eb1a 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala @@ -552,14 +552,18 @@ class ORMultiMapSpec extends WordSpec with Matchers { "have unapply extractor" in { val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L)) - val m2: ORMultiMap[String, Long] = m1 + val _: ORMultiMap[String, Long] = m1 val ORMultiMap(entries1) = m1 val entries2: Map[String, Set[Long]] = entries1 + entries2 should be(Map("a" -> Set(1L, 2L), "b" -> Set(3L))) + Changed(ORMultiMapKey[String, Long]("key"))(m1) match { case c @ Changed(ORMultiMapKey("key")) => val ORMultiMap(entries3) = c.dataValue val entries4: Map[String, Set[Long]] = entries3 entries4 should be(Map("a" -> Set(1L, 2L), "b" -> Set(3L))) + case changed => + fail(s"Failed to match [$changed]") } } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala index d730d8569d..15b0bed584 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala @@ -335,7 +335,7 @@ class ORSetSpec extends WordSpec with Matchers { "work for clear" in { val s1 = ORSet.empty[String] val s2 = s1.add(node1, "a").add(node1, "b") - val s3 = s2.resetDelta.clear(node1) + val s3 = s2.resetDelta.clear() val s4 = s3.resetDelta.add(node1, "c") s2.merge(s3) should ===(s3) s2.mergeDelta(s3.delta.get) should ===(s3) @@ -600,16 +600,19 @@ class ORSetSpec extends WordSpec with Matchers { "have unapply extractor" in { val s1 = ORSet.empty.add(node1, "a").add(node2, "b") - val s2: ORSet[String] = s1 + val _: ORSet[String] = s1 val ORSet(elements1) = s1 // `unapply[A](s: ORSet[A])` is used here val elements2: Set[String] = elements1 + elements2 should be(Set("a", "b")) Changed(ORSetKey[String]("key"))(s1) match { case c @ Changed(ORSetKey("key")) => - val x: ORSet[String] = c.dataValue + val _: ORSet[String] = c.dataValue val ORSet(elements3) = c.dataValue val elements4: Set[String] = elements3 elements4 should be(Set("a", "b")) + case changed => + fail(s"Failed to match [$changed]") } val msg: Any = Changed(ORSetKey[String]("key"))(s1) @@ -621,6 +624,8 @@ class ORSetSpec extends WordSpec with Matchers { // but trait Set is invariant in type A. You may wish to investigate a wildcard type such as _ <: Any. (SLS 3.2.10) val elements4: Set[Any] = elements3 elements4 should be(Set("a", "b")) + case changed => + fail(s"Failed to match [$changed]") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala index cac9e3c750..e35881f319 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala @@ -12,8 +12,8 @@ import org.scalatest.WordSpec class PNCounterMapSpec extends WordSpec with Matchers { - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) "A PNCounterMap" must { @@ -74,11 +74,15 @@ class PNCounterMapSpec extends WordSpec with Matchers { val m1 = PNCounterMap.empty.increment(node1, "a", 1).increment(node2, "b", 2) val PNCounterMap(entries1) = m1 val entries2: Map[String, BigInt] = entries1 + entries2 should be(Map("a" -> 1L, "b" -> 2L)) + Changed(PNCounterMapKey[String]("key"))(m1) match { case c @ Changed(PNCounterMapKey("key")) => val PNCounterMap(entries3) = c.dataValue val entries4: Map[String, BigInt] = entries3 entries4 should be(Map("a" -> 1L, "b" -> 2L)) + case _ => + fail("Did not match") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala index 3826eec682..b120bfff29 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterSpec.scala @@ -192,11 +192,15 @@ class PNCounterSpec extends WordSpec with Matchers { val c1 = PNCounter.empty.increment(node1).increment(node1).decrement(node2) val PNCounter(value1) = c1 val value2: BigInt = value1 + value2 should be(1L) + Changed(PNCounterKey("key"))(c1) match { case c @ Changed(PNCounterKey("key")) => val PNCounter(value3) = c.dataValue val value4: BigInt = value3 value4 should be(1L) + case changed => + fail(s"Failed to match [$changed]") } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala index 23084e2f42..d9c95db95e 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala @@ -18,10 +18,10 @@ class VersionVectorSpec with Matchers with BeforeAndAfterAll { - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) - val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3) - val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) + val node3 = UniqueAddress(node1.address.copy(port = Some(2553)), 3L) + val node4 = UniqueAddress(node1.address.copy(port = Some(2554)), 4L) override def afterAll: Unit = { shutdown() @@ -83,7 +83,7 @@ class VersionVectorSpec } "pass misc comparison test 3" in { - var vv1_1 = VersionVector() + val vv1_1 = VersionVector() val vv2_1 = vv1_1 + node1 val vv1_2 = VersionVector() @@ -253,9 +253,9 @@ class VersionVectorSpec val a1 = a + node1 val b1 = b + node2 - var a2 = a1 + node1 - var c = a2.merge(b1) - var c1 = c + node3 + val a2 = a1 + node1 + val c = a2.merge(b1) + val c1 = c + node3 (c1 > a2) should be(true) (c1 > b1) should be(true) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index c2fa51d70b..2a4783f78b 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -17,7 +17,6 @@ import akka.cluster.ddata.Replicator._ import akka.remote.RARP import scala.concurrent.Future -import akka.cluster.Cluster object WriteAggregatorSpec { @@ -61,7 +60,7 @@ object WriteAggregatorSpec { context.actorSelection(probes(address).path) override def senderAddress(): Address = - probes.find { case (a, r) => r == sender() }.get._1 + probes.find { case (_, r) => r == sender() }.get._1 } def writeAckAdapterProps(replica: ActorRef): Props = @@ -126,7 +125,6 @@ class WriteAggregatorSpec extends AkkaSpec(s""" * Create a tuple for each node with the WriteAckAdapter and the TestProbe */ def probes(): Map[Address, TestMock] = { - val probe = TestProbe() nodes.toSeq.map(_ -> TestMock()).toMap } @@ -160,7 +158,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val t = timeout / 5 - 50.milliseconds.dilated import system.dispatcher Future.sequence { - Seq(Future { testProbes(nodeC).expectNoMsg(t) }, Future { testProbes(nodeD).expectNoMsg(t) }) + Seq(Future { testProbes(nodeC).expectNoMessage(t) }, Future { testProbes(nodeD).expectNoMessage(t) }) }.futureValue testProbes(nodeC).expectMsgType[Write] testProbes(nodeC).lastSender ! WriteAck @@ -210,9 +208,9 @@ class WriteAggregatorSpec extends AkkaSpec(s""" } "WriteAggregator with delta" must { - implicit val cluster = Cluster(system) - val fullState1 = ORSet.empty[String] + "a" + "b" - val fullState2 = fullState1.resetDelta + "c" + implicit val node = DistributedData(system).selfUniqueAddress + val fullState1 = ORSet.empty[String] :+ "a" :+ "b" + val fullState2 = fullState1.resetDelta :+ "c" val delta = Delta(DataEnvelope(fullState2.delta.get), 2L, 2L) "send deltas first" in { @@ -265,8 +263,8 @@ class WriteAggregatorSpec extends AkkaSpec(s""" testProbes(nodeA).lastSender ! WriteAck testProbes(nodeD).expectMsgType[Write] testProbes(nodeD).lastSender ! WriteAck - testProbes(nodeB).expectNoMsg(100.millis) - testProbes(nodeC).expectNoMsg(100.millis) + testProbes(nodeB).expectNoMessage(100.millis) + testProbes(nodeC).expectNoMessage(100.millis) expectMsg(UpdateSuccess(WriteAggregatorSpec.KeyB, None)) watch(aggr) @@ -320,7 +318,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteAck - expectNoMsg(200.millis) + expectNoMessage(200.millis) // the local write aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index c2deecbac0..9d88e56b9e 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -143,7 +143,10 @@ class ReplicatedDataSerializerSpec .tell(Identify("2"), testActor) val echo2 = expectMsgType[ActorIdentity].ref.get - val msg = ORSet.empty[ActorRef].add(Cluster(system), echo1).add(Cluster(system), echo2) + val msg = ORSet + .empty[ActorRef] + .add(Cluster(system).selfUniqueAddress, echo1) + .add(Cluster(system).selfUniqueAddress, echo2) echo2.tell(msg, testActor) val reply = expectMsgType[ORSet[ActorRef]] reply.elements should ===(Set(echo1, echo2)) @@ -157,8 +160,8 @@ class ReplicatedDataSerializerSpec checkSerialization(ORSet().add(address1, "a").delta.get) checkSerialization(ORSet().add(address1, "a").resetDelta.remove(address2, "a").delta.get) checkSerialization(ORSet().add(address1, "a").remove(address2, "a").delta.get) - checkSerialization(ORSet().add(address1, "a").resetDelta.clear(address2).delta.get) - checkSerialization(ORSet().add(address1, "a").clear(address2).delta.get) + checkSerialization(ORSet().add(address1, "a").resetDelta.clear().delta.get) + checkSerialization(ORSet().add(address1, "a").clear().delta.get) } "serialize large GSet" in { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index 5e5e525253..df0089fdb7 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -4,6 +4,8 @@ package akka.cluster.ddata.protobuf +import java.lang.IllegalArgumentException + import scala.concurrent.duration._ import org.scalatest.BeforeAndAfterAll import org.scalatest.Matchers @@ -19,7 +21,7 @@ import akka.cluster.ddata.PruningState.PruningPerformed import akka.cluster.ddata.Replicator._ import akka.cluster.ddata.Replicator.Internal._ import akka.testkit.TestKit -import akka.util.ByteString +import akka.util.{ unused, ByteString } import akka.cluster.UniqueAddress import akka.remote.RARP import com.typesafe.config.ConfigFactory @@ -79,6 +81,14 @@ class ReplicatorMessageSerializerSpec checkSerialization(Get(keyA, ReadLocal)) checkSerialization(Get(keyA, ReadMajority(2.seconds), Some("x"))) + checkSerialization(Get(keyA, ReadMajority((Int.MaxValue.toLong + 50).milliseconds), Some("x"))) + try { + serializer.toBinary(Get(keyA, ReadMajority((Int.MaxValue.toLong * 3).milliseconds), Some("x"))) + fail("Our protobuf protocol does not support timeouts larger than unsigned ints") + } catch { + case e: IllegalArgumentException => + e.getMessage should include("unsigned int") + } checkSerialization(GetSuccess(keyA, None)(data1)) checkSerialization(GetSuccess(keyA, Some("x"))(data1)) checkSerialization(NotFound(keyA, Some("x"))) @@ -223,7 +233,7 @@ class ReplicatorMessageSerializerSpec "suppory getOrAdd" in { var n = 0 - def createValue(a: Read): AnyRef = { + def createValue(@unused a: Read): AnyRef = { n += 1 new AnyRef { override val toString = "v" + n diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 754ced560d..cb1cd96325 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -80,6 +80,7 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "-Yno-adapted-args", "-Ywarn-numeric-widen", // end + "-deprecation", "-Xfuture", "-Xlint", "-Ywarn-dead-code", @@ -90,6 +91,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "-Ywarn-unused:_", "-Ypartial-unification", "-Ywarn-extra-implicit", - "-Ywarn-numeric-widen") + ) }