diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 3fed3aabb9..784d87c3b6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -15,6 +15,7 @@ import akka.annotation.InternalApi import akka.persistence.AtLeastOnceDelivery.Internal.Delivery import akka.util.ccompat._ +@ccompatUsedUntil213 object AtLeastOnceDelivery { /** diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 19d20d119a..8d22e0cbfd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -12,6 +12,7 @@ import akka.annotation.InternalApi import akka.dispatch.Envelope import akka.event.{ Logging, LoggingAdapter } import akka.util.Helpers.ConfigOps +import com.github.ghik.silencer.silent import com.typesafe.config.ConfigFactory import scala.collection.immutable @@ -87,6 +88,7 @@ private[persistence] trait Eventsourced private var journalBatch = Vector.empty[PersistentEnvelope] // no longer used, but kept for binary compatibility + @silent private val maxMessageBatchSize = { val journalPluginConfig = this match { case c: RuntimePluginConfig => c.journalPluginConfig @@ -249,8 +251,8 @@ private[persistence] trait Eventsourced require(persistenceId.trim.nonEmpty, s"persistenceId cannot be empty for PersistentActor [${self.path}]") // Fail fast on missing plugins. - val j = journal; - val s = snapshotStore + journal + snapshotStore requestRecoveryPermit() super.aroundPreStart() } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 4c313db05e..2e6246ffe9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -404,7 +404,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { log.debug(s"Create plugin: $pluginActorName $pluginClassName") val pluginClass = system.dynamicAccess.getClassFor[Any](pluginClassName).get val pluginDispatcherId = pluginConfig.getString("plugin-dispatcher") - val pluginActorArgs = try { + val pluginActorArgs: List[AnyRef] = try { Reflect.findConstructor(pluginClass, List(pluginConfig, configPath)) // will throw if not found List(pluginConfig, configPath) } catch { diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala b/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala index 380800f03b..45499a2dd0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistencePlugin.scala @@ -82,11 +82,11 @@ private[akka] abstract class PersistencePlugin[ScalaDsl, JavaDsl, T: ClassTag](s (classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: (classOf[String], configPath) :: Nil) .recoverWith { - case x: NoSuchMethodException => + case _: NoSuchMethodException => instantiate((classOf[ExtendedActorSystem], system) :: (classOf[Config], pluginConfig) :: Nil) } - .recoverWith { case x: NoSuchMethodException => instantiate((classOf[ExtendedActorSystem], system) :: Nil) } - .recoverWith { case x: NoSuchMethodException => instantiate(Nil) } + .recoverWith { case _: NoSuchMethodException => instantiate((classOf[ExtendedActorSystem], system) :: Nil) } + .recoverWith { case _: NoSuchMethodException => instantiate(Nil) } .recoverWith { case ex: Exception => Failure.apply( diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index 812472d21d..2a7dc387a6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -10,6 +10,7 @@ import akka.persistence.fsm.PersistentFSM.FSMState import akka.persistence.serialization.Message import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer } import akka.util.JavaDurationConverters +import com.github.ghik.silencer.silent import com.typesafe.config.Config import scala.annotation.varargs @@ -45,7 +46,7 @@ private[akka] class SnapshotAfter(config: Config) extends Extension { */ val isSnapshotAfterSeqNo: Long => Boolean = snapshotAfterValue match { case Some(snapShotAfterValue) => seqNo: Long => seqNo % snapShotAfterValue == 0 - case None => seqNo: Long => false //always false, if snapshotAfter is not specified in config + case None => _: Long => false //always false, if snapshotAfter is not specified in config } } @@ -118,10 +119,11 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent /** * Discover the latest recorded state */ + @silent override def receiveRecover: Receive = { case domainEventTag(event) => startWith(stateName, applyEvent(event, stateData)) case StateChangeEvent(stateIdentifier, timeout) => startWith(statesMap(stateIdentifier), stateData, timeout) - case SnapshotOffer(_, PersistentFSMSnapshot(stateIdentifier, data: D, timeout)) => + case SnapshotOffer(_, PersistentFSMSnapshot(stateIdentifier, data: D @unchecked, timeout)) => startWith(statesMap(stateIdentifier), data, timeout) case RecoveryCompleted => initialize() @@ -168,7 +170,7 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent nextData = applyEvent(event, nextData) doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr) applyStateOnLastHandler() - case StateChangeEvent(stateIdentifier, timeout) => + case _: StateChangeEvent => doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr) applyStateOnLastHandler() } diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala index f7d49bd461..8785ba09cb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSMBase.scala @@ -10,6 +10,8 @@ import akka.japi.pf.{ FSMTransitionHandlerBuilder, UnitMatch, UnitPFBuilder } import language.implicitConversions import scala.collection.mutable import akka.routing.{ Deafen, Listen, Listeners } +import akka.util.unused + import scala.concurrent.duration.FiniteDuration /** @@ -384,7 +386,7 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging * unhandled event handler */ private val handleEventDefault: StateFunction = { - case Event(value, stateData) => + case Event(value, _) => log.warning("unhandled event " + value + " in state " + stateName) stay } @@ -454,7 +456,7 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging processEvent(event, source) } - private[akka] def processEvent(event: Event, source: AnyRef): Unit = { + private[akka] def processEvent(event: Event, @unused source: AnyRef): Unit = { val stateFunc = stateFunctions(currentState.stateName) val nextState = if (stateFunc.isDefinedAt(event)) { stateFunc(event) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala index 4b70f879a9..41c6fbdafc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala @@ -20,6 +20,7 @@ import scala.util.Try /** * `EventAdapters` serves as a per-journal collection of bound event adapters. */ +@ccompatUsedUntil213 class EventAdapters( map: ConcurrentHashMap[Class[_], EventAdapter], bindings: immutable.Seq[(Class[_], EventAdapter)], @@ -92,13 +93,14 @@ private[akka] object EventAdapters { // bindings is a Seq of tuple representing the mapping from Class to handler. // It is primarily ordered by the most specific classes first, and secondly in the configured order. val bindings: immutable.Seq[ClassHandler] = { - val bs = for ((k: FQN, as: BoundAdapters) <- adapterBindings) - yield - if (as.size == 1) (system.dynamicAccess.getClassFor[Any](k).get, handlers(as.head)) - else - ( - system.dynamicAccess.getClassFor[Any](k).get, - NoopWriteEventAdapter(CombinedReadEventAdapter(as.map(handlers)))) + val bs = + for ((k: FQN, as: BoundAdapters) <- adapterBindings) + yield + if (as.size == 1) (system.dynamicAccess.getClassFor[Any](k).get, handlers(as.head)) + else + ( + system.dynamicAccess.getClassFor[Any](k).get, + NoopWriteEventAdapter(CombinedReadEventAdapter(as.map(handlers)))) sort(bs) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala index 7f775169b9..38319682ac 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/PersistencePluginProxy.scala @@ -140,7 +140,7 @@ final class PersistencePluginProxy(config: Config) extends Actor with Stash with context.become(initTimedOut) unstashAll() // will trigger appropriate failures case Terminated(_) => - case msg => + case _ => stash() } @@ -197,21 +197,21 @@ final class PersistencePluginProxy(config: Config) extends Actor with Stash with case r: NonPersistentRepr => persistentActor ! LoopMessageSuccess(r.payload, actorInstanceId) } - case ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) => + case ReplayMessages(_, _, _, _, persistentActor) => persistentActor ! ReplayMessagesFailure(timeoutException) - case DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) => + case DeleteMessagesTo(_, toSequenceNr, persistentActor) => persistentActor ! DeleteMessagesFailure(timeoutException, toSequenceNr) } case req: SnapshotProtocol.Request => req match { // exhaustive match - case LoadSnapshot(persistenceId, criteria, toSequenceNr) => + case _: LoadSnapshot => sender() ! LoadSnapshotFailed(timeoutException) - case SaveSnapshot(metadata, snapshot) => + case SaveSnapshot(metadata, _) => sender() ! SaveSnapshotFailure(metadata, timeoutException) case DeleteSnapshot(metadata) => sender() ! DeleteSnapshotFailure(metadata, timeoutException) - case DeleteSnapshots(persistenceId, criteria) => + case DeleteSnapshots(_, criteria) => sender() ! DeleteSnapshotsFailure(criteria, timeoutException) } @@ -219,7 +219,7 @@ final class PersistencePluginProxy(config: Config) extends Actor with Stash with becomeIdentifying(address) case Terminated(_) => - case other => + case _ => val e = timeoutException() log.error(e, "Failed PersistencePluginProxy request: {}", e.getMessage) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala index a7eeca4ce3..38a7fc5e79 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala @@ -168,7 +168,7 @@ private[akka] class ReplayFilter( persistentActor.tell(ReplayMessagesFailure(cause), Actor.noSender) context.become { case _: ReplayedMessage => // discard - case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) => + case _: RecoverySuccess | _: ReplayMessagesFailure => context.stop(self) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index d1098eb45f..03aca23371 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -16,6 +16,7 @@ import scala.util.Failure /** * Java API: abstract journal, optimized for asynchronous, non-blocking writes. */ +@ccompatUsedUntil213 abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal with AsyncWritePlugin { import SAsyncWriteJournal.successUnit import context.dispatcher diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index 540153c36b..c8990738c8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -5,9 +5,9 @@ package akka.persistence.journal.leveldb import org.iq80.leveldb.DBIterator - import akka.actor.Actor import akka.util.ByteString.UTF_8 +import akka.util.unused /** * INTERNAL API. @@ -69,7 +69,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore = numericId } - protected def newPersistenceIdAdded(id: String): Unit = () + override protected def newPersistenceIdAdded(@unused id: String): Unit = () override def preStart(): Unit = { idMap = readIdMap() diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index cdcad180d0..29a242fb5e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -31,7 +31,7 @@ private[persistence] class LeveldbJournal(cfg: Config) extends AsyncWriteJournal else context.system.settings.config.getConfig("akka.persistence.journal.leveldb") override def receivePluginInternal: Receive = receiveCompactionInternal.orElse { - case r @ ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) => + case ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) => import context.dispatcher val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) asyncReadHighestSequenceNr(tagAsPersistenceId(tag), readHighestSequenceNrFrom) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index fc5e6a89f6..60c2a5307a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -42,7 +42,7 @@ class SharedLeveldbStore(cfg: Config) extends LeveldbStore { catch { case NonFatal(e) => Future.failed(e) } case f @ Failure(_) => // exception from preparePersistentBatch => rejected - Future.successful(messages.collect { case a: AtomicWrite => f }) + Future.successful(messages.collect { case _: AtomicWrite => f }) }).map { results => if (results.nonEmpty && results.size != atomicWriteCount) throw new IllegalStateException( diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 2e3581223d..2ea3c8a3da 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -27,6 +27,7 @@ trait Message extends Serializable /** * Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFSM.StateChangeEvent]] messages. */ +@ccompatUsedUntil213 class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import PersistentRepr.Undefined @@ -46,12 +47,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer * message's payload to a matching `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { - case p: PersistentRepr => persistentMessageBuilder(p).build().toByteArray - case a: AtomicWrite => atomicWriteBuilder(a).build().toByteArray - case a: AtLeastOnceDeliverySnapshot => atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray - case s: StateChangeEvent => stateChangeBuilder(s).build.toByteArray - case p: PersistentFSMSnapshot[Any] => persistentFSMSnapshotBuilder(p).build.toByteArray - case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") + case p: PersistentRepr => persistentMessageBuilder(p).build().toByteArray + case a: AtomicWrite => atomicWriteBuilder(a).build().toByteArray + case a: AtLeastOnceDeliverySnapshot => atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray + case s: StateChangeEvent => stateChangeBuilder(s).build.toByteArray + case p: PersistentFSMSnapshot[Any @unchecked] => persistentFSMSnapshotBuilder(p).build.toByteArray + case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index 14cb15a8b7..abe72652c9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -27,12 +27,6 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer private lazy val serialization = SerializationExtension(system) - private lazy val transportInformation: Option[Serialization.Information] = { - val address = system.provider.getDefaultAddress - if (address.hasLocalScope) None - else Some(Serialization.Information(address, system)) - } - /** * Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching * `akka.serialization.Serializer`. diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index e56c1e3398..3c0ce1686e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -26,6 +26,7 @@ import java.nio.file.Files * * Local filesystem backed snapshot store. */ +@ccompatUsedUntil213 private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotStore with ActorLogging { private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r private val persistenceIdStartIdx = 9 // Persistence ID starts after the "snapshot-" substring @@ -179,8 +180,11 @@ private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotSt dir } + // system default encoding is a bad idea but kept for backwards compatibility + private val defaultSystemEncoding = System.getProperty("file.encoding") + private final class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { - val encodedPersistenceId = URLEncoder.encode(persistenceId) + val encodedPersistenceId = URLEncoder.encode(persistenceId, defaultSystemEncoding) def accept(dir: File, name: String): Boolean = { val persistenceIdEndIdx = name.lastIndexOf('-', name.lastIndexOf('-') - 1) @@ -191,7 +195,7 @@ private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotSt private final class SnapshotSeqNrFilenameFilter(md: SnapshotMetadata) extends FilenameFilter { private final def matches(pid: String, snr: String, tms: String): Boolean = { - pid.equals(URLEncoder.encode(md.persistenceId)) && + pid.equals(URLEncoder.encode(md.persistenceId, defaultSystemEncoding)) && Try(snr.toLong == md.sequenceNr && (md.timestamp == 0L || tms.toLong == md.timestamp)).getOrElse(false) } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala index 4a0abaa8f7..278393cb24 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryCrashSpec.scala @@ -48,7 +48,7 @@ object AtLeastOnceDeliveryCrashSpec { override def receiveCommand: Receive = { case Message => persist(Message)(_ => send()) case CrashMessage => - persist(CrashMessage) { evt => + persist(CrashMessage) { _ => } } @@ -80,9 +80,9 @@ class AtLeastOnceDeliveryCrashSpec system.stop(superVisor) deathProbe.expectTerminated(superVisor) - testProbe.expectNoMsg(250.millis) + testProbe.expectNoMessage(250.millis) createCrashActorUnderSupervisor() - testProbe.expectNoMsg(1.second) + testProbe.expectNoMessage(1.second) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala index 1300d62c2d..4922b48ee8 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -109,7 +109,7 @@ object AtLeastOnceDeliveryFailureSpec { add(i) deliver(destination.path)(deliveryId => Msg(deliveryId, i)) - case MsgConfirmed(deliveryId, i) => + case MsgConfirmed(deliveryId, _) => confirmDelivery(deliveryId) } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index a2fc219661..ade27ffbd5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -145,7 +145,7 @@ object AtLeastOnceDeliverySpec { var allReceived = Set.empty[Long] def receive = { - case a @ Action(id, payload) => + case a @ Action(id, _) => // discard duplicates (naive impl) if (!allReceived.contains(id)) { log.debug("Destination got {}, all count {}", a, allReceived.size + 1) @@ -201,7 +201,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c snd.tell(Req("a"), probe.ref) probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a")) - probeA.expectNoMsg(1.second) + probeA.expectNoMessage(1.second) } s"re-deliver lost messages (using actorSelection: $deliverUsingActorSelection)" taggedAs (TimingTest) in { @@ -236,7 +236,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c probeA.expectMsg(Action(4, "a-4")) // and then re-delivered probeA.expectMsg(Action(3, "a-3")) - probeA.expectNoMsg(1.second) + probeA.expectNoMessage(1.second) } } @@ -276,7 +276,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c probe.expectMsg(ReqAck) probeA.expectMsg(Action(5, "a-5")) - probeA.expectNoMsg(1.second) + probeA.expectNoMessage(1.second) } "re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" taggedAs (TimingTest) in { @@ -313,7 +313,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c Action(5, "a-5"), // re-delivered Action(4, "a-4")) // re-delivered, 3rd time - probeA.expectNoMsg(1.second) + probeA.expectNoMessage(1.second) } "restore state from snapshot" taggedAs (TimingTest) in { @@ -351,7 +351,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c probe.expectMsg(ReqAck) probeA.expectMsg(Action(5, "a-5")) - probeA.expectNoMsg(1.second) + probeA.expectNoMessage(1.second) } "warn about unconfirmed messages" taggedAs (TimingTest) in { @@ -423,14 +423,16 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends PersistenceSpec(c // initially all odd messages should go through for (n <- 1 to N if n % 2 == 1) probeA.expectMsg(Action(n, s"a-$n")) - probeA.expectNoMsg(100.millis) + probeA.expectNoMessage(100.millis) // at each redelivery round, 2 (even) messages are sent, the first goes through // without throttling, at each round half of the messages would go through var toDeliver = (1 to N).filter(_ % 2 == 0).map(_.toLong).toSet - for (n <- 1 to N if n % 2 == 0) { - toDeliver -= probeA.expectMsgType[Action].id - probeA.expectNoMsg(100.millis) + for (n <- 1 to N) { + if (n % 2 == 0) { + toDeliver -= probeA.expectMsgType[Action].id + probeA.expectNoMessage(100.millis) + } } toDeliver should ===(Set.empty[Long]) diff --git a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala index b8b3033b0c..708d2b676c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala @@ -10,6 +10,7 @@ import akka.actor._ import akka.persistence.EndToEndEventAdapterSpec.NewA import akka.persistence.journal.{ EventAdapter, EventSeq } import akka.testkit.{ EventFilter, TestProbe } +import akka.util.unused import com.typesafe.config.{ Config, ConfigFactory } import org.apache.commons.io.FileUtils import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } @@ -27,7 +28,7 @@ object EndToEndEventAdapterSpec { case class JSON(payload: Any) - class AEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + class AEndToEndAdapter(@unused system: ExtendedActorSystem) extends EventAdapter { override def manifest(event: Any): String = event.getClass.getCanonicalName override def toJournal(event: Any): Any = @@ -37,7 +38,7 @@ object EndToEndEventAdapterSpec { case _ => EventSeq.empty } } - class NewAEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + class NewAEndToEndAdapter(@unused system: ExtendedActorSystem) extends EventAdapter { override def manifest(event: Any): String = event.getClass.getCanonicalName override def toJournal(event: Any): Any = @@ -47,7 +48,7 @@ object EndToEndEventAdapterSpec { case _ => EventSeq.empty } } - class BEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + class BEndToEndAdapter(@unused system: ExtendedActorSystem) extends EventAdapter { override def manifest(event: Any): String = event.getClass.getCanonicalName override def toJournal(event: Any): Any = @@ -57,7 +58,7 @@ object EndToEndEventAdapterSpec { case _ => EventSeq.empty } } - class NewBEndToEndAdapter(system: ExtendedActorSystem) extends EventAdapter { + class NewBEndToEndAdapter(@unused system: ExtendedActorSystem) extends EventAdapter { override def manifest(event: Any): String = event.getClass.getCanonicalName override def toJournal(event: Any): Any = @@ -68,7 +69,7 @@ object EndToEndEventAdapterSpec { } } - class EndToEndAdapterActor(name: String, override val journalPluginId: String, probe: Option[ActorRef]) + class EndToEndAdapterActor(name: String, override val journalPluginId: String, @unused probe: Option[ActorRef]) extends NamedPersistentActor(name) with PersistentActor { diff --git a/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala index f795bd5141..e858d8e31d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventAdapterSpec.scala @@ -35,9 +35,9 @@ object EventAdapterSpec { val Minor = Set("minor") override def toJournal(event: Any): Any = event match { - case e @ UserDataChanged(_, age) if age > 18 => Tagged(e, Adult) - case e @ UserDataChanged(_, age) => Tagged(e, Minor) - case e => NotTagged(e) + case e: UserDataChanged if e.age > 18 => Tagged(e, Adult) + case e @ UserDataChanged(_, _) => Tagged(e, Minor) + case e => NotTagged(e) } override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single { event match { @@ -51,7 +51,7 @@ object EventAdapterSpec { class ReplayPassThroughAdapter extends UserAgeTaggingAdapter { override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single { event match { - case m: JournalModel => event // don't unpack, just pass through the JournalModel + case m: JournalModel => m // don't unpack, just pass through the JournalModel } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala index b9d007a922..da9b4a7747 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala @@ -25,7 +25,7 @@ object EventSourcedActorDeleteFailureSpec { Future.failed(new SimulatedException("Boom! Unable to delete events!")) } - class DoesNotHandleDeleteFailureActor(name: String, probe: ActorRef) extends PersistentActor { + class DoesNotHandleDeleteFailureActor(name: String) extends PersistentActor { override def persistenceId = name override def receiveCommand: Receive = { case DeleteTo(n) => deleteMessages(n) @@ -60,7 +60,7 @@ class EventSourcedActorDeleteFailureSpec "A persistent actor" must { "have default warn logging be triggered, when deletion failed" in { - val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name, testActor)) + val persistentActor = system.actorOf(Props(classOf[DoesNotHandleDeleteFailureActor], name)) system.eventStream.subscribe(testActor, classOf[Logging.Warning]) persistentActor ! DeleteTo(Long.MaxValue) val message = expectMsgType[Warning].message.toString diff --git a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala index 185601f72a..559e5fe79d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala @@ -36,7 +36,7 @@ object EventSourcedActorFailureSpec { override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( recoveryCallback: PersistentRepr => Unit): Future[Unit] = { - val highest = highestSequenceNr(persistenceId) + val readFromStore = read(persistenceId, fromSequenceNr, toSequenceNr, max) if (readFromStore.isEmpty) Future.successful(()) @@ -73,7 +73,7 @@ object EventSourcedActorFailureSpec { class OnRecoveryFailurePersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior.orElse { - case c @ Cmd(txt) => persist(Evt(txt))(updateState) + case Cmd(txt) => persist(Evt(txt))(updateState) } override protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Unit = @@ -102,8 +102,7 @@ object EventSourcedActorFailureSpec { } - class FailingRecovery(name: String, recoveryFailureProbe: Option[ActorRef]) extends ExamplePersistentActor(name) { - def this(name: String) = this(name, None) + class FailingRecovery(name: String) extends ExamplePersistentActor(name) { override val receiveCommand: Receive = commonBehavior.orElse { case Cmd(data) => persist(Evt(s"${data}"))(updateState) diff --git a/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala index f98c3cf485..48e7cd769a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ManyRecoveriesSpec.scala @@ -23,7 +23,7 @@ object ManyRecoveriesSpec { override def persistenceId = name override def receiveRecover: Receive = { - case Evt(s) => + case Evt(_) => latch.foreach(Await.ready(_, 10.seconds)) } override def receiveCommand: Receive = { diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index 54cf4819f5..dd8b659ece 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -80,7 +80,7 @@ object PerformanceSpec { class MixedTestPersistentActor(name: String) extends PerformanceTestPersistentActor(name) { var counter = 0 - val handler: Any => Unit = { evt => + val handler: Any => Unit = { _ => if (lastSequenceNr % 1000 == 0) print(".") if (lastSequenceNr == failAt) throw new TestException("boom") } @@ -108,7 +108,7 @@ object PerformanceSpec { case "c" => persist("c")(_ => context.unbecome()) unstashAll() - case other => stash() + case _ => stash() } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala index affdc63e38..9a238a92c8 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorBoundedStashingSpec.scala @@ -131,7 +131,7 @@ class DiscardStrategyPersistentActorBoundedStashingSpec //so, 11 to 20 discard to deadletter ((1 + capacity) to (2 * capacity)).foreach(i => expectMsg(DeadLetter(Cmd(i), testActor, persistentActor))) //allow "a" and 1 to 10 write complete - (1 to (1 + capacity)).foreach(i => SteppingInmemJournal.step(journal)) + (1 to (1 + capacity)).foreach(_ => SteppingInmemJournal.step(journal)) persistentActor ! GetState @@ -157,9 +157,9 @@ class ReplyToStrategyPersistentActorBoundedStashingSpec //internal stash overflow after 10 (1 to (2 * capacity)).foreach(persistentActor ! Cmd(_)) //so, 11 to 20 reply to with "Reject" String - ((1 + capacity) to (2 * capacity)).foreach(i => expectMsg("RejectToStash")) + ((1 + capacity) to (2 * capacity)).foreach(_ => expectMsg("RejectToStash")) //allow "a" and 1 to 10 write complete - (1 to (1 + capacity)).foreach(i => SteppingInmemJournal.step(journal)) + (1 to (1 + capacity)).foreach(_ => SteppingInmemJournal.step(journal)) persistentActor ! GetState diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala index 27fb2333ba..4e48b6bafb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala @@ -150,7 +150,7 @@ class PersistentActorJournalProtocolSpec extends AkkaSpec(config) with ImplicitS subject ! Persist(1, "a-1") val w1 = expectWrite(subject, Msgs("a-1")) subject ! Persist(2, "a-2") - expectNoMsg(300.millis) + expectNoMessage(300.millis) journal.msgAvailable should ===(false) confirm(w1) expectMsg(Done(1, 1)) @@ -167,7 +167,7 @@ class PersistentActorJournalProtocolSpec extends AkkaSpec(config) with ImplicitS subject ! Persist(1, Persist(2, "a-1")) val w1 = expectWrite(subject, Msgs(Persist(2, "a-1"))) subject ! Persist(3, "a-2") - expectNoMsg(300.millis) + expectNoMessage(300.millis) journal.msgAvailable should ===(false) confirm(w1) expectMsg(Done(1, 1)) @@ -218,11 +218,11 @@ class PersistentActorJournalProtocolSpec extends AkkaSpec(config) with ImplicitS subject ! Multi(commands(0, 10): _*) subject ! Multi(commands(10, 20): _*) val w0 = expectWrite(subject, Msgs("a" +: commands(20, 30): _*)) - journal.expectNoMsg(300.millis) + journal.expectNoMessage(300.millis) confirm(w0) (1 to 11).foreach(x => expectMsg(Done(-1, x))) val w1 = expectWrite(subject, msgs(0, 20): _*) - journal.expectNoMsg(300.millis) + journal.expectNoMessage(300.millis) confirm(w1) expectDone(0, 20) val w2 = expectWrite(subject, msgs(20, 30): _*) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala index 9ead77376e..bf13f9f094 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorRecoveryTimeoutSpec.scala @@ -131,8 +131,7 @@ class PersistentActorRecoveryTimeoutSpec // now replay, but don't give the journal any tokens to replay events // so that we cause the timeout to trigger - val replaying = - system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor], timeout, probe.ref)) + system.actorOf(Props(classOf[PersistentActorRecoveryTimeoutSpec.TestReceiveTimeoutActor], timeout, probe.ref)) // initial read highest SteppingInmemJournal.step(journal) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 4bb253517e..e4b43f5e81 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import akka.persistence.PersistentActorSpec._ import akka.testkit.{ EventFilter, ImplicitSender, TestLatch, TestProbe } +import com.github.ghik.silencer.silent import com.typesafe.config.{ Config, ConfigFactory } import scala.collection.immutable.Seq @@ -32,8 +33,8 @@ object PersistentActorSpec { var askedForDelete: Option[ActorRef] = None val updateState: Receive = { - case Evt(data) => events = data :: events - case d @ Some(ref: ActorRef) => askedForDelete = d.asInstanceOf[Some[ActorRef]] + case Evt(data) => events = data :: events + case d @ Some(_: ActorRef) => askedForDelete = d.asInstanceOf[Some[ActorRef]] } val commonBehavior: Receive = { @@ -351,7 +352,7 @@ object PersistentActorSpec { case Cmd(data) => sender() ! data - (1 to 3).foreach { i => + (1 to 3).foreach { _ => persistAsync(Evt(s"$data-${incCounter()}")) { evt => sender() ! ("a" + evt.data.toString.drop(1)) // c-1 => a-1, as in "ack" } @@ -399,6 +400,7 @@ object PersistentActorSpec { extends AsyncPersistSameEventTwicePersistentActor(name) with InmemRuntimePluginConfig + @silent // compiler knows persistAll(Nil)(lambda) will never invoke lambda class PersistAllNilPersistentActor(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = commonBehavior.orElse { @@ -430,16 +432,16 @@ object PersistentActorSpec { case Cmd(data) => sender() ! data - persist(Evt(data + "-e1")) { evt => + persist(Evt(s"$data-e1")) { evt => sender() ! s"${evt.data}-${incCounter()}" } // this should be happily executed - persistAsync(Evt(data + "-ea2")) { evt => + persistAsync(Evt(s"$data-ea2")) { evt => sender() ! s"${evt.data}-${incCounter()}" } - persist(Evt(data + "-e3")) { evt => + persist(Evt(s"$data-e3")) { evt => sender() ! s"${evt.data}-${incCounter()}" } } @@ -468,11 +470,11 @@ object PersistentActorSpec { case Cmd(data) => sender() ! data - persist(Evt(data + "-e1")) { evt => + persist(Evt(s"$data-e1")) { evt => sender() ! s"${evt.data}-${incCounter()}" } - persistAsync(Evt(data + "-ea2")) { evt => + persistAsync(Evt(s"$data-ea2")) { evt => sender() ! s"${evt.data}-${incCounter()}" } } @@ -494,8 +496,6 @@ object PersistentActorSpec { with InmemRuntimePluginConfig class AsyncPersistHandlerCorrelationCheck(name: String) extends ExamplePersistentActor(name) { - var counter = 0 - val receiveCommand: Receive = commonBehavior.orElse { case Cmd(data) => persistAsync(Evt(data)) { evt => @@ -505,11 +505,6 @@ object PersistentActorSpec { sender() ! "done" } } - - private def incCounter(): Int = { - counter += 1 - counter - } } class AsyncPersistHandlerCorrelationCheckWithLevelDbRuntimePluginConfig(name: String, val providedConfig: Config) extends AsyncPersistHandlerCorrelationCheck(name) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala index 2109492913..e16ceee6d5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorStashingSpec.scala @@ -8,7 +8,9 @@ import akka.actor.SupervisorStrategy.Resume import akka.actor.{ Actor, ActorRef, OneForOneStrategy, Props } import akka.persistence.journal.SteppingInmemJournal import akka.testkit.ImplicitSender +import akka.util.unused import com.typesafe.config.Config + import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -21,8 +23,8 @@ object PersistentActorStashingSpec { var askedForDelete: Option[ActorRef] = None val updateState: Receive = { - case Evt(data) => events = data :: events - case d @ Some(ref: ActorRef) => askedForDelete = d.asInstanceOf[Some[ActorRef]] + case Evt(data) => events = data :: events + case d @ Some(_: ActorRef) => askedForDelete = d.asInstanceOf[Some[ActorRef]] } val commonBehavior: Receive = { @@ -71,7 +73,7 @@ object PersistentActorStashingSpec { } val processC: Receive = unstashBehavior.orElse { - case other => stash() + case _ => stash() } def unstashBehavior: Receive = { @@ -103,7 +105,7 @@ object PersistentActorStashingSpec { } val otherCommandHandler: Receive = unstashBehavior.orElse { - case other => stash() + case _ => stash() } def unstashBehavior: Receive = { @@ -172,7 +174,7 @@ object PersistentActorStashingSpec { case _ => // ignore } - def stashWithinHandler(evt: Evt) = { + def stashWithinHandler(@unused evt: Evt) = { stash() } diff --git a/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala index fc9c91518d..8bb0b79d8a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala @@ -73,12 +73,12 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(s" permitter.tell(RequestRecoveryPermit, p4.ref) permitter.tell(RequestRecoveryPermit, p5.ref) - p4.expectNoMsg(100.millis) - p5.expectNoMsg(10.millis) + p4.expectNoMessage(100.millis) + p5.expectNoMessage(10.millis) permitter.tell(ReturnRecoveryPermit, p2.ref) p4.expectMsg(RecoveryPermitGranted) - p5.expectNoMsg(100.millis) + p5.expectNoMessage(100.millis) permitter.tell(ReturnRecoveryPermit, p1.ref) p5.expectMsg(RecoveryPermitGranted) @@ -104,7 +104,7 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(s" val persistentActor = system.actorOf(testProps("p4", p4.ref)) p4.watch(persistentActor) persistentActor ! "stop" - p4.expectNoMsg(200.millis) + p4.expectNoMessage(200.millis) permitter.tell(ReturnRecoveryPermit, p3.ref) p4.expectMsg(RecoveryCompleted) @@ -121,17 +121,17 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(s" requestPermit(p3) val persistentActor = system.actorOf(testProps("p4", p4.ref)) - p4.expectNoMsg(100.millis) + p4.expectNoMessage(100.millis) permitter.tell(RequestRecoveryPermit, p5.ref) - p5.expectNoMsg(100.millis) + p5.expectNoMessage(100.millis) // PoisonPill is not stashed persistentActor ! PoisonPill p4.expectMsg("postStop") // persistentActor didn't hold a permit so still - p5.expectNoMsg(100.millis) + p5.expectNoMessage(100.millis) permitter.tell(ReturnRecoveryPermit, p1.ref) p5.expectMsg(RecoveryPermitGranted) @@ -150,7 +150,7 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(s" requestPermit(p3) permitter.tell(RequestRecoveryPermit, p4.ref) - p4.expectNoMsg(100.millis) + p4.expectNoMessage(100.millis) actor ! PoisonPill p4.expectMsg(RecoveryPermitGranted) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index c98158e5d4..57b2ce17b0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -142,7 +142,6 @@ class SnapshotFailureRobustnessSpec "fail recovery and stop actor when no snapshot could be loaded" in { val sPersistentActor = system.actorOf(Props(classOf[SaveSnapshotTestPersistentActor], name, testActor)) - val persistenceId = name expectMsg(RecoveryCompleted) sPersistentActor ! Cmd("ok") @@ -189,7 +188,6 @@ class SnapshotFailureRobustnessSpec } "receive failure message when bulk deleting snapshot fails" in { val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) - val persistenceId = name expectMsg(RecoveryCompleted) p ! Cmd("hello") diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index c4fb4f13be..98f782bd40 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -59,9 +59,9 @@ class SnapshotRecoveryLocalStoreSpec "A persistent actor which is persisted at the same time as another actor whose persistenceId is an extension of the first " must { "recover state only from its own correct snapshot file" in { - val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) - expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) => } + expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, _, _), _) => } expectMsg(RecoveryCompleted) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index daef8d5882..cc067c29de 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -92,7 +92,7 @@ class SnapshotSerializationSpec sPersistentActor ! "blahonga" expectMsg(0) - val lPersistentActor = system.actorOf(Props(classOf[TestPersistentActor], name, testActor)) + system.actorOf(Props(classOf[TestPersistentActor], name, testActor)) expectMsgPF() { case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) => state should ===(new MySnapshot("blahonga")) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index ac85750099..6109ab19d6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -34,9 +34,9 @@ object SnapshotSpec { override def recovery: Recovery = _recovery override def receiveRecover: Receive = { - case payload: String => probe ! s"${payload}-${lastSequenceNr}" - case offer @ SnapshotOffer(md, s) => probe ! offer - case other => probe ! other + case payload: String => probe ! s"${payload}-${lastSequenceNr}" + case offer: SnapshotOffer => probe ! offer + case other => probe ! other } override def receiveCommand = { @@ -45,8 +45,8 @@ object SnapshotSpec { persist(payload) { _ => probe ! s"${payload}-${lastSequenceNr}" } - case offer @ SnapshotOffer(md, s) => probe ! offer - case other => probe ! other + case offer: SnapshotOffer => probe ! offer + case other => probe ! other } } @@ -105,7 +105,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn "A persistentActor" must { "recover state starting from the most recent snapshot" in { - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(), testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(), testActor)) val persistenceId = name expectMsgPF() { @@ -118,9 +118,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg(RecoveryCompleted) } "recover completely if snapshot is not handled" in { - val persistentActor = - system.actorOf(Props(classOf[IgnoringSnapshotTestPersistentActor], name, Recovery(), testActor)) - val persistenceId = name + system.actorOf(Props(classOf[IgnoringSnapshotTestPersistentActor], name, Recovery(), testActor)) expectMsg("a-1") expectMsg("b-2") @@ -131,8 +129,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound" in { - val persistentActor = - system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(toSequenceNr = 3), testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, Recovery(toSequenceNr = 3), testActor)) val persistenceId = name expectMsgPF() { @@ -160,7 +157,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn } "recover state starting from the most recent snapshot matching criteria" in { val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) val persistenceId = name expectMsgPF() { @@ -176,7 +173,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn } "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) val persistenceId = name expectMsgPF() { @@ -189,7 +186,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn } "recover state from scratch if snapshot based recovery is disabled" in { val recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3) - val persistentActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) + system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], name, recovery, testActor)) expectMsg("a-1") expectMsg("b-2") @@ -219,11 +216,10 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! Delete1(metadata) deleteProbe.expectMsgType[DeleteSnapshot] - expectMsgPF() { case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) => } + expectMsgPF() { case DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) => } // recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages - val persistentActor2 = - system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) + system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, 0), null)) { case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 2, _), state) => @@ -248,7 +244,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn val criteria = SnapshotSelectionCriteria(maxSequenceNr = 4) persistentActor1 ! DeleteN(criteria) expectMsgPF() { - case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) => + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, _), state) => state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) } expectMsg(RecoveryCompleted) @@ -256,8 +252,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsgPF() { case DeleteSnapshotsSuccess(`criteria`) => } // recover persistentActor from replayed messages (all snapshots deleted) - val persistentActor2 = - system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) + system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, recovery, testActor)) expectMsg("a-1") expectMsg("b-2") diff --git a/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala index b9a79b8d1e..50b19c72dd 100644 --- a/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/TimerPersistentActorSpec.scala @@ -34,7 +34,7 @@ object TimerPersistentActorSpec { override def receiveCommand: Receive = { case Scheduled(msg, replyTo) => replyTo ! msg - case AutoReceivedMessageWrapper(msg) => + case AutoReceivedMessageWrapper(_) => timers.startSingleTimer("PoisonPill", PoisonPill, Duration.Zero) case msg => timers.startSingleTimer("key", Scheduled(msg, sender()), Duration.Zero) diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index d034449fd0..f4fab6743c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -198,7 +198,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) expectMsg(CurrentState(fsmRef, LookingAround, None)) expectMsg(Transition(fsmRef, LookingAround, Shopping, Some(1 second))) - expectNoMsg(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM + expectNoMessage(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM fsmRef ! PoisonPill expectTerminated(fsmRef) @@ -215,7 +215,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) expectMsg(Transition(recoveredFsmRef, Shopping, Inactive, Some(2 seconds))) } - expectNoMsg(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM + expectNoMessage(0.6 seconds) // arbitrarily chosen delay, less than the timeout, before stopping the FSM recoveredFsmRef ! PoisonPill expectTerminated(recoveredFsmRef) @@ -238,7 +238,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) probe.expectMsg(3.seconds, "LookingAround -> LookingAround") fsmRef ! "stay" // causes stay() - probe.expectNoMsg(3.seconds) + probe.expectNoMessage(3.seconds) } "not persist state change event when staying in the same state" in { @@ -318,7 +318,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) expectMsg(NonEmptyShoppingCart(List(shirt, shoes, coat))) - expectNoMsg(1 second) + expectNoMessage(1 second) fsmRef ! PoisonPill expectTerminated(fsmRef) @@ -336,7 +336,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor)) expectMsgPF() { - case SnapshotOffer(SnapshotMetadata(name, _, timestamp), PersistentFSMSnapshot(stateIdentifier, cart, None)) => + case SnapshotOffer(SnapshotMetadata(_, _, timestamp), PersistentFSMSnapshot(stateIdentifier, cart, None)) => stateIdentifier should ===(Paid.identifier) cart should ===(NonEmptyShoppingCart(List(shirt, shoes, coat))) timestamp should be > 0L @@ -356,7 +356,7 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) fsm ! TimeoutFSM.OverrideTimeoutToInf p.expectMsg(TimeoutFSM.OverrideTimeoutToInf) - p.expectNoMsg(1.seconds) + p.expectNoMessage(1.seconds) } @@ -461,7 +461,7 @@ object PersistentFSMSpec { when(LookingAround) { case Event("stay", _) => stay - case Event(e, _) => goto(LookingAround) + case Event(_, _) => goto(LookingAround) } onTransition { diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 5ab5848ffb..8912a6013b 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -25,6 +25,7 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "akka-protobuf", "akka-stream-typed", "akka-cluster-typed", + "akka-persistence", "akka-cluster-tools", "akka-stream")