Deprecate classic remoting classes (#27237)

* Sprinkle deprecated annotation to public classes for classic remoting
* Use right dispatcher setting for a few remoting actors
* New Artery events for QuarantinedEvent, GracefulShutdownQuarantinedEvent
  and ThisActorSystemQuarantinedEvent because old were not good
  and would cause binary compatibility trouble when we remove classic
* silence more deprecation warnings
This commit is contained in:
Patrik Nordwall 2019-07-10 14:26:51 +02:00 committed by Johan Andrén
parent 83d0f8bb05
commit 8d69388d0a
46 changed files with 313 additions and 78 deletions

View file

@ -12,16 +12,18 @@ import akka.cluster.ClusterEvent._
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.Done
import akka.pattern.ask
import akka.remote.QuarantinedEvent
import akka.remote.{ QuarantinedEvent => ClassicQuarantinedEvent }
import akka.remote.artery.QuarantinedEvent
import akka.util.Timeout
import com.typesafe.config.Config
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.control.NonFatal
import com.github.ghik.silencer.silent
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
*/
@ -402,7 +404,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}
override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[QuarantinedEvent])
subscribeQuarantinedEvent()
cluster.downingProvider.downingActorProps.foreach { props =>
val propsWithDispatcher =
@ -424,6 +426,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}
}
@silent
private def subscribeQuarantinedEvent(): Unit = {
context.system.eventStream.subscribe(self, classOf[QuarantinedEvent])
context.system.eventStream.subscribe(self, classOf[ClassicQuarantinedEvent])
}
private def isClusterBootstrapUsed: Boolean = {
val conf = context.system.settings.config
conf.hasPath("akka.management.cluster.bootstrap") &&
@ -537,12 +545,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case InitJoin(joiningNodeConfig) =>
logInfo("Received InitJoin message from [{}] to [{}]", sender(), selfAddress)
initJoin(joiningNodeConfig)
case Join(node, roles) => joining(node, roles)
case ClusterUserAction.Down(address) => downing(address)
case ClusterUserAction.Leave(address) => leaving(address)
case SendGossipTo(address) => sendGossipTo(address)
case msg: SubscriptionMessage => publisher.forward(msg)
case QuarantinedEvent(address, uid) => quarantined(UniqueAddress(address, uid))
case Join(node, roles) => joining(node, roles)
case ClusterUserAction.Down(address) => downing(address)
case ClusterUserAction.Leave(address) => leaving(address)
case SendGossipTo(address) => sendGossipTo(address)
case msg: SubscriptionMessage => publisher.forward(msg)
case QuarantinedEvent(ua) => quarantined(UniqueAddress(ua))
case ClassicQuarantinedEvent(address, uid) => quarantined(UniqueAddress(address, uid))
case ClusterUserAction.JoinTo(address) =>
logInfo("Trying to join [{}] when already part of a cluster, ignoring", address)
case JoinSeedNodes(nodes) =>

View file

@ -288,6 +288,9 @@ object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] {
@deprecated("Use Long UID apply instead", since = "2.4.11")
def apply(address: Address, uid: Int) = new UniqueAddress(address, uid.toLong)
def apply(remoteUniqueAddress: akka.remote.UniqueAddress): UniqueAddress =
new UniqueAddress(remoteUniqueAddress.address, remoteUniqueAddress.uid)
}
/**

View file

@ -4,24 +4,24 @@
package akka.cluster
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.Terminated
import akka.remote.artery.QuarantinedEvent
import akka.remote.RARP
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import scala.concurrent.duration._
import akka.testkit._
import akka.remote.testconductor.RoleName
import akka.actor.Props
import akka.actor.Actor
import scala.util.control.NoStackTrace
import akka.remote.{ QuarantinedEvent, RemoteActorRefProvider }
import akka.actor.ExtendedActorSystem
import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.actor.Terminated
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -90,6 +90,31 @@ abstract class SurviveNetworkInstabilitySpec
override def expectedTestDuration = 3.minutes
private val remoteSettings = RARP(system).provider.remoteSettings
@silent
def quarantinedEventClass: Class[_] =
if (remoteSettings.Artery.Enabled)
classOf[QuarantinedEvent]
else
classOf[akka.remote.QuarantinedEvent]
@silent
def quarantinedEventFrom(event: Any): Address = {
event match {
case QuarantinedEvent(uniqueAddress) => uniqueAddress.address
case akka.remote.QuarantinedEvent(address, _) => address
}
}
@silent
def sysMsgBufferSize: Int =
if (RARP(system).provider.remoteSettings.Artery.Enabled)
remoteSettings.Artery.Advanced.SysMsgBufferSize
else
remoteSettings.SysMsgBufferSize
def assertUnreachable(subjects: RoleName*): Unit = {
val expected = subjects.toSet.map(address)
awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(expected))
@ -266,12 +291,6 @@ abstract class SurviveNetworkInstabilitySpec
enterBarrier("watcher-created")
runOn(second) {
val sysMsgBufferSize = system
.asInstanceOf[ExtendedActorSystem]
.provider
.asInstanceOf[RemoteActorRefProvider]
.remoteSettings
.SysMsgBufferSize
val refs = Vector.fill(sysMsgBufferSize + 1)(system.actorOf(Props[Echo])).toSet
system.actorSelection(node(third) / "user" / "watcher") ! Targets(refs)
expectMsg(TargetsRegistered)
@ -290,7 +309,7 @@ abstract class SurviveNetworkInstabilitySpec
// system messages and quarantine
system.actorSelection("/user/watcher") ! "boom"
within(10.seconds) {
expectMsgType[QuarantinedEvent].address should ===(address(second))
quarantinedEventFrom(expectMsgClass(quarantinedEventClass)) should ===(address(second))
}
system.eventStream.unsubscribe(testActor, classOf[QuarantinedEvent])
}

View file

@ -156,6 +156,11 @@ is completely different. It will require a full cluster shutdown and new startup
If using SSL then `tcp-tls` needs to be enabled and setup. See @ref[Artery docs for SSL](../remoting-artery.md#configuring-ssl-tls-for-akka-remoting)
for how to do this.
The following events that are published to the `eventStream` have changed:
* classic `akka.remote.QuarantinedEvent` is `akka.remote.artery.QuarantinedEvent` in Artery
* classic `akka.remote.GracefulShutdownQuarantinedEvent` is `akka.remote.artery.GracefulShutdownQuarantinedEvent` in Artery
* classic `akka.remote.ThisActorSystemQuarantinedEvent` is `akka.remote.artery.ThisActorSystemQuarantinedEvent` in Artery
#### Migration from 2.5.x Artery to 2.6.x Artery

View file

@ -7,6 +7,7 @@ package akka.remote
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
@ -18,6 +19,7 @@ import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.testkit._
import akka.util.unused
import com.github.ghik.silencer.silent
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -95,6 +97,7 @@ object TransportFailSpec {
* This was fixed by not stopping the ReliableDeliverySupervisor so that the
* receive buffer was preserved.
*/
@silent // deprecated
abstract class TransportFailSpec extends RemotingMultiNodeSpec(TransportFailConfig) {
import TransportFailConfig._
import TransportFailSpec._

View file

@ -7,7 +7,7 @@ package akka.remote.artery
import akka.actor.{ ActorIdentity, Identify, _ }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.{ AddressUidExtension, RARP, RemotingMultiNodeSpec, ThisActorSystemQuarantinedEvent }
import akka.remote.{ AddressUidExtension, RARP, RemotingMultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory

View file

@ -5,13 +5,15 @@
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor._
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.remote.{ QuarantinedEvent, RARP, RemotingMultiNodeSpec }
import akka.remote.{ RARP, RemotingMultiNodeSpec }
import akka.remote.testkit.MultiNodeConfig
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
object SurviveNetworkPartitionSpec extends MultiNodeConfig {
@ -33,6 +35,7 @@ object SurviveNetworkPartitionSpec extends MultiNodeConfig {
class SurviveNetworkPartitionSpecMultiJvmNode1 extends SurviveNetworkPartitionSpec
class SurviveNetworkPartitionSpecMultiJvmNode2 extends SurviveNetworkPartitionSpec
@silent // deprecated
abstract class SurviveNetworkPartitionSpec extends RemotingMultiNodeSpec(SurviveNetworkPartitionSpec) {
import SurviveNetworkPartitionSpec._
@ -98,7 +101,7 @@ abstract class SurviveNetworkPartitionSpec extends RemotingMultiNodeSpec(Survive
watch(ref)
// keep the network partition for a while, longer than give-up-system-message-after
expectNoMessage(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 1.second)
qProbe.expectMsgType[QuarantinedEvent](5.seconds).address should ===(node(second).address)
qProbe.expectMsgType[QuarantinedEvent](5.seconds).uniqueAddress.address should ===(node(second).address)
expectTerminated(ref)
}

View file

@ -12,10 +12,11 @@ import akka.remote.transport.ThrottlerTransportAdapter.ForceDisassociateExplicit
import akka.remote.{ RARP, RemotingMultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
object RemoteGatePiercingSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
@ -46,6 +47,7 @@ object RemoteGatePiercingSpec extends MultiNodeConfig {
class RemoteGatePiercingSpecMultiJvmNode1 extends RemoteGatePiercingSpec
class RemoteGatePiercingSpecMultiJvmNode2 extends RemoteGatePiercingSpec
@silent // deprecated
abstract class RemoteGatePiercingSpec extends RemotingMultiNodeSpec(RemoteGatePiercingSpec) {
import RemoteGatePiercingSpec._

View file

@ -12,10 +12,11 @@ import akka.remote.transport.ThrottlerTransportAdapter.ForceDisassociateExplicit
import akka.remote.{ RARP, RemotingMultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
object RemoteNodeRestartGateSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
@ -43,6 +44,7 @@ object RemoteNodeRestartGateSpec extends MultiNodeConfig {
class RemoteNodeRestartGateSpecMultiJvmNode1 extends RemoteNodeRestartGateSpec
class RemoteNodeRestartGateSpecMultiJvmNode2 extends RemoteNodeRestartGateSpec
@silent // deprecated
abstract class RemoteNodeRestartGateSpec extends RemotingMultiNodeSpec(RemoteNodeRestartGateSpec) {
import RemoteNodeRestartGateSpec._

View file

@ -12,10 +12,11 @@ import akka.remote.transport.ThrottlerTransportAdapter.ForceDisassociateExplicit
import akka.remote.{ RARP, RemotingMultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
object Ticket15109Spec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
@ -45,6 +46,7 @@ object Ticket15109Spec extends MultiNodeConfig {
class Ticket15109SpecMultiJvmNode1 extends Ticket15109Spec
class Ticket15109SpecMultiJvmNode2 extends Ticket15109Spec
@silent // deprecated
abstract class Ticket15109Spec extends RemotingMultiNodeSpec(Ticket15109Spec) {
import Ticket15109Spec._

View file

@ -7,6 +7,7 @@ package akka.remote
import scala.collection.immutable._
import akka.AkkaException
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object SeqNo {
implicit val ord: Ordering[SeqNo] = new Ordering[SeqNo] {
@ -21,6 +22,7 @@ object SeqNo {
/**
* Implements a 64 bit sequence number with proper wrap-around ordering.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class SeqNo(rawValue: Long) extends Ordered[SeqNo] {
/**
@ -42,6 +44,7 @@ final case class SeqNo(rawValue: Long) extends Ordered[SeqNo] {
override def toString = String.valueOf(rawValue)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object HasSequenceNumber {
implicit def seqOrdering[T <: HasSequenceNumber]: Ordering[T] = new Ordering[T] {
def compare(x: T, y: T) = x.seq.compare(y.seq)
@ -52,6 +55,7 @@ object HasSequenceNumber {
* Messages that are to be buffered in [[akka.remote.AckedSendBuffer]] or [[akka.remote.AckedReceiveBuffer]] has
* to implement this interface to provide the sequence needed by the buffers.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
trait HasSequenceNumber {
/**
@ -66,13 +70,16 @@ trait HasSequenceNumber {
* @param cumulativeAck Represents the highest sequence number received.
* @param nacks Set of sequence numbers between the last delivered one and cumulativeAck that has been not yet received.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class Ack(cumulativeAck: SeqNo, nacks: Set[SeqNo] = Set.empty) {
override def toString = s"ACK[$cumulativeAck, ${nacks.mkString("{", ", ", "}")}]"
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class ResendBufferCapacityReachedException(c: Int)
extends AkkaException(s"Resend buffer capacity of [$c] has been reached.")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class ResendUnfulfillableException
extends AkkaException(
"Unable to fulfill resend request since negatively acknowledged payload is no longer in buffer. " +
@ -89,6 +96,7 @@ class ResendUnfulfillableException
* @param maxSeq The maximum sequence number that has been stored in this buffer. Messages having lower sequence number
* will be not stored but rejected with [[java.lang.IllegalArgumentException]]
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class AckedSendBuffer[T <: HasSequenceNumber](
capacity: Int,
nonAcked: IndexedSeq[T] = Vector.empty[T],
@ -144,6 +152,7 @@ final case class AckedSendBuffer[T <: HasSequenceNumber](
* @param cumulativeAck The highest sequence number received so far.
* @param buf Buffer of messages that are waiting for delivery
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class AckedReceiveBuffer[T <: HasSequenceNumber](
lastDelivered: SeqNo = SeqNo(-1),
cumulativeAck: SeqNo = SeqNo(-1),

View file

@ -38,6 +38,7 @@ import scala.concurrent.Future
import scala.concurrent.duration.Duration
import akka.util.OptionVal
import com.github.ghik.silencer.silent
/**
* INTERNAL API
@ -53,6 +54,7 @@ private[remote] trait InboundMessageDispatcher {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class DefaultMessageDispatcher(
private val system: ExtendedActorSystem,
private val provider: RemoteActorRefProvider,
@ -243,6 +245,7 @@ private[remote] object ReliableDeliverySupervisor {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class ReliableDeliverySupervisor(
handleOrActive: Option[AkkaProtocolHandle],
val localAddress: Address,
@ -533,6 +536,7 @@ private[remote] class ReliableDeliverySupervisor(
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] abstract class EndpointActor(
val localAddress: Address,
val remoteAddress: Address,
@ -559,6 +563,7 @@ private[remote] abstract class EndpointActor(
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] object EndpointWriter {
def props(
@ -615,6 +620,7 @@ private[remote] object EndpointWriter {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class EndpointWriter(
handleOrActive: Option[AkkaProtocolHandle],
localAddress: Address,
@ -1029,6 +1035,7 @@ private[remote] class EndpointWriter(
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] object EndpointReader {
def props(
@ -1060,6 +1067,7 @@ private[remote] object EndpointReader {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class EndpointReader(
localAddress: Address,
remoteAddress: Address,

View file

@ -237,7 +237,7 @@ private[akka] class RemoteActorRefProvider(
rootGuardian,
remotingTerminator,
_log,
untrustedMode = remoteSettings.UntrustedMode)
untrustedMode = remoteSettings.untrustedMode)
local.registerExtraNames(Map(("remote", d)))
d
},

View file

@ -5,7 +5,9 @@
package akka.remote
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import akka.actor.ActorSelectionMessage
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
@ -14,12 +16,14 @@ import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.event.Logging
import akka.routing.RouterEnvelope
import com.github.ghik.silencer.silent
/**
* INTERNAL API
* Extension that keeps track of remote metrics, such
* as max size of different message types.
*/
@silent // deprecated
private[akka] object RemoteMetricsExtension extends ExtensionId[RemoteMetrics] with ExtensionIdProvider {
override def get(system: ActorSystem): RemoteMetrics = super.get(system)
@ -55,6 +59,7 @@ private[akka] class RemoteMetricsOff extends RemoteMetrics {
/**
* INTERNAL API
*/
@silent // deprecated
private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteMetrics {
private val logFrameSizeExceeding: Int =

View file

@ -5,7 +5,6 @@
package akka.remote
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
@ -16,7 +15,9 @@ import akka.actor.Props
import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.ConfigurationException
import akka.annotation.InternalApi
import akka.remote.artery.ArterySettings
import com.github.ghik.silencer.silent
final class RemoteSettings(val config: Config) {
import config._
@ -26,20 +27,32 @@ final class RemoteSettings(val config: Config) {
val WarnAboutDirectUse: Boolean = getBoolean("akka.remote.warn-about-direct-use")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val LogReceive: Boolean = getBoolean("akka.remote.classic.log-received-messages")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val LogSend: Boolean = getBoolean("akka.remote.classic.log-sent-messages")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val LogFrameSizeExceeding: Option[Int] = {
if (config.getString("akka.remote.classic.log-frame-size-exceeding").toLowerCase == "off") None
else Some(getBytes("akka.remote.classic.log-frame-size-exceeding").toInt)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val UntrustedMode: Boolean = getBoolean("akka.remote.classic.untrusted-mode")
/**
* INTERNAL API
*/
@silent
@InternalApi private[akka] def untrustedMode: Boolean =
if (Artery.Enabled) Artery.UntrustedMode else UntrustedMode
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val TrustedSelectionPaths: Set[String] =
immutableSeq(getStringList("akka.remote.classic.trusted-selection-paths")).toSet
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val RemoteLifecycleEventsLogLevel: LogLevel = toRootLowerCase(
getString("akka.remote.classic.log-remote-lifecycle-events")) match {
case "on" => Logging.DebugLevel
@ -51,32 +64,46 @@ final class RemoteSettings(val config: Config) {
}
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val Dispatcher: String = getString("akka.remote.classic.use-dispatcher")
def configureDispatcher(props: Props): Props = if (Dispatcher.isEmpty) props else props.withDispatcher(Dispatcher)
@silent
def configureDispatcher(props: Props): Props =
if (Artery.Enabled) {
if (Artery.Advanced.Dispatcher.isEmpty) props else props.withDispatcher(Artery.Advanced.Dispatcher)
} else {
if (Dispatcher.isEmpty) props else props.withDispatcher(Dispatcher)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val ShutdownTimeout: Timeout = {
Timeout(config.getMillisDuration("akka.remote.classic.shutdown-timeout"))
}.requiring(_.duration > Duration.Zero, "shutdown-timeout must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val FlushWait: FiniteDuration = {
config.getMillisDuration("akka.remote.classic.flush-wait-on-shutdown")
}.requiring(_ > Duration.Zero, "flush-wait-on-shutdown must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val StartupTimeout: Timeout = {
Timeout(config.getMillisDuration("akka.remote.classic.startup-timeout"))
}.requiring(_.duration > Duration.Zero, "startup-timeout must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val RetryGateClosedFor: FiniteDuration = {
config.getMillisDuration("akka.remote.classic.retry-gate-closed-for")
}.requiring(_ >= Duration.Zero, "retry-gate-closed-for must be >= 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val UsePassiveConnections: Boolean = getBoolean("akka.remote.classic.use-passive-connections")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val BackoffPeriod: FiniteDuration = {
config.getMillisDuration("akka.remote.classic.backoff-interval")
}.requiring(_ > Duration.Zero, "backoff-interval must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val LogBufferSizeExceeding: Int = {
val key = "akka.remote.classic.log-buffer-size-exceeding"
config.getString(key).toLowerCase match {
@ -85,26 +112,32 @@ final class RemoteSettings(val config: Config) {
}
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val SysMsgAckTimeout: FiniteDuration = {
config.getMillisDuration("akka.remote.classic.system-message-ack-piggyback-timeout")
}.requiring(_ > Duration.Zero, "system-message-ack-piggyback-timeout must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val SysResendTimeout: FiniteDuration = {
config.getMillisDuration("akka.remote.classic.resend-interval")
}.requiring(_ > Duration.Zero, "resend-interval must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val SysResendLimit: Int = {
config.getInt("akka.remote.classic.resend-limit")
}.requiring(_ > 0, "resend-limit must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val SysMsgBufferSize: Int = {
getInt("akka.remote.classic.system-message-buffer-size")
}.requiring(_ > 0, "system-message-buffer-size must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val InitialSysMsgDeliveryTimeout: FiniteDuration = {
config.getMillisDuration("akka.remote.classic.initial-system-message-delivery-timeout")
}.requiring(_ > Duration.Zero, "initial-system-message-delivery-timeout must be > 0")
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val QuarantineSilentSystemTimeout: FiniteDuration = {
val key = "akka.remote.classic.quarantine-after-silence"
config.getString(key).toLowerCase match {
@ -114,12 +147,14 @@ final class RemoteSettings(val config: Config) {
}
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val QuarantineDuration: FiniteDuration = {
config
.getMillisDuration("akka.remote.classic.prune-quarantine-marker-after")
.requiring(_ > Duration.Zero, "prune-quarantine-marker-after must be > 0 ms")
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val CommandAckTimeout: Timeout = {
Timeout(config.getMillisDuration("akka.remote.classic.command-ack-timeout"))
}.requiring(_.duration > Duration.Zero, "command-ack-timeout must be > 0")
@ -149,6 +184,7 @@ final class RemoteSettings(val config: Config) {
transportConfig)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
val Adapters: Map[String, String] = configToMap(getConfig("akka.remote.classic.adapters"))
private def transportNames: immutable.Seq[String] =

View file

@ -77,6 +77,7 @@ private[akka] trait HeartbeatMessage extends PriorityMessage
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] object Remoting {
final val EndpointManagerName = "endpointManager"
@ -132,6 +133,7 @@ private[remote] object Remoting {
/**
* INTERNAL API
*/
@silent // deprecated
@ccompatUsedUntil213
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends RemoteTransport(_system, _provider) {
@ -285,6 +287,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] object EndpointManager {
// Messages between Remoting and EndpointManager
@ -477,6 +480,7 @@ private[remote] object EndpointManager {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
extends Actor
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {

View file

@ -13,12 +13,14 @@ import scala.runtime.AbstractFunction2
@silent
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
sealed trait RemotingLifecycleEvent extends Serializable {
def logLevel: Logging.LogLevel
}
@silent
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
sealed trait AssociationEvent extends RemotingLifecycleEvent {
def localAddress: Address
def remoteAddress: Address
@ -31,6 +33,7 @@ sealed trait AssociationEvent extends RemotingLifecycleEvent {
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class AssociatedEvent(localAddress: Address, remoteAddress: Address, inbound: Boolean)
extends AssociationEvent {
@ -40,6 +43,7 @@ final case class AssociatedEvent(localAddress: Address, remoteAddress: Address,
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class DisassociatedEvent(localAddress: Address, remoteAddress: Address, inbound: Boolean)
extends AssociationEvent {
protected override def eventName: String = "Disassociated"
@ -47,6 +51,7 @@ final case class DisassociatedEvent(localAddress: Address, remoteAddress: Addres
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class AssociationErrorEvent(
cause: Throwable,
localAddress: Address,
@ -60,6 +65,7 @@ final case class AssociationErrorEvent(
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class RemotingListenEvent(listenAddresses: Set[Address]) extends RemotingLifecycleEvent {
@silent
def getListenAddresses: java.util.Set[Address] =
@ -69,12 +75,14 @@ final case class RemotingListenEvent(listenAddresses: Set[Address]) extends Remo
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
case object RemotingShutdownEvent extends RemotingLifecycleEvent {
override def logLevel: Logging.LogLevel = Logging.InfoLevel
override val toString: String = "Remoting shut down"
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleEvent {
def getCause: Throwable = cause
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
@ -82,6 +90,7 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE
}
// For binary compatibility
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object QuarantinedEvent extends AbstractFunction2[Address, Int, QuarantinedEvent] {
@deprecated("Use long uid apply", "2.4.x")
@ -89,6 +98,7 @@ object QuarantinedEvent extends AbstractFunction2[Address, Int, QuarantinedEvent
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class QuarantinedEvent(address: Address, longUid: Long) extends RemotingLifecycleEvent {
override def logLevel: Logging.LogLevel = Logging.WarningLevel
@ -114,6 +124,7 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot
* The `uniqueAddress` was quarantined but it was due to normal shutdown or cluster leaving/exiting.
*/
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class GracefulShutdownQuarantinedEvent(uniqueAddress: UniqueAddress, reason: String)
extends RemotingLifecycleEvent {
override def logLevel: Logging.LogLevel = Logging.InfoLevel
@ -123,6 +134,7 @@ final case class GracefulShutdownQuarantinedEvent(uniqueAddress: UniqueAddress,
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class ThisActorSystemQuarantinedEvent(localAddress: Address, remoteAddress: Address)
extends RemotingLifecycleEvent {
override def logLevel: LogLevel = Logging.WarningLevel
@ -132,6 +144,7 @@ final case class ThisActorSystemQuarantinedEvent(localAddress: Address, remoteAd
/**
* INTERNAL API
*/
@silent
private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logLevel: Logging.LogLevel) {
def notifyListeners(message: RemotingLifecycleEvent): Unit = {
system.eventStream.publish(message)

View file

@ -38,7 +38,6 @@ import akka.remote.AddressUidExtension
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransport
import akka.remote.ThisActorSystemQuarantinedEvent
import akka.remote.UniqueAddress
import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.Encoder.OutboundCompressionAccess
@ -59,6 +58,7 @@ import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.util.{ unused, OptionVal, WildcardIndex }
import com.github.ghik.silencer.silent
/**
* INTERNAL API
@ -627,7 +627,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
// and can result in forming two separate clusters (cluster split).
// Instead, the downing strategy should act on ThisActorSystemQuarantinedEvent, e.g.
// use it as a STONITH signal.
val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address)
val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress, from)
system.eventStream.publish(lifecycleEvent)
case _ => // not interesting
@ -741,6 +741,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
private[remote] def isShutdown: Boolean = hasBeenShutdown.get()
@silent // ThrottleMode from classic is deprecated, we can replace when removing classic
override def managementCommand(cmd: Any): Future[Boolean] = {
cmd match {
case SetThrottle(address, direction, Blackhole) =>

View file

@ -5,8 +5,6 @@
package akka.remote.artery
import java.net.ConnectException
import akka.util.PrettyDuration._
import java.util.Queue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
@ -18,40 +16,45 @@ import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.{ Done, NotUsed }
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.Dropped
import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote._
import akka.remote.DaemonMsgCreate
import akka.remote.QuarantinedEvent
import akka.remote.artery.aeron.AeronSink.GaveUpMessageException
import akka.remote.artery.ArteryTransport.{ AeronTerminated, ShuttingDown }
import akka.remote.PriorityMessage
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.ArteryTransport.AeronTerminated
import akka.remote.artery.ArteryTransport.ShuttingDown
import akka.remote.artery.Encoder.OutboundCompressionAccess
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
import akka.remote.artery.aeron.AeronSink.GaveUpMessageException
import akka.remote.artery.compress.CompressionTable
import akka.stream.AbruptTerminationException
import akka.stream.KillSwitches
import akka.stream.Materializer
import akka.stream.SharedKillSwitch
import akka.stream.StreamTcpException
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.MergeHub
import akka.stream.scaladsl.Source
import akka.util.{ OptionVal, Unsafe, WildcardIndex }
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.stream.SharedKillSwitch
import scala.util.control.NoStackTrace
import akka.actor.Cancellable
import akka.actor.Dropped
import akka.stream.StreamTcpException
import akka.util.OptionVal
import akka.util.PrettyDuration._
import akka.util.Unsafe
import akka.util.WildcardIndex
import akka.util.ccompat._
import com.github.ghik.silencer.silent
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
/**
* INTERNAL API
@ -496,7 +499,7 @@ private[remote] class Association(
remoteAddress,
u,
reason)
transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u))
transport.system.eventStream.publish(QuarantinedEvent(UniqueAddress(remoteAddress, u)))
}
flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u")
clearOutboundCompression()

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import akka.remote.UniqueAddress
final case class QuarantinedEvent(uniqueAddress: UniqueAddress) {
override val toString: String =
s"QuarantinedEvent: Association to [${uniqueAddress.address}] having UID [${uniqueAddress.uid}] is" +
"irrecoverably failed. UID is now quarantined and all messages to this UID will be delivered to dead letters. " +
"Remote ActorSystem must be restarted to recover from this situation."
}
final case class GracefulShutdownQuarantinedEvent(uniqueAddress: UniqueAddress, reason: String) {
override val toString: String =
s"GracefulShutdownQuarantinedEvent: Association to [${uniqueAddress.address}] having UID [${uniqueAddress.uid}] " +
s"has been stopped. All messages to this UID will be delivered to dead letters. Reason: $reason"
}
final case class ThisActorSystemQuarantinedEvent(localAddress: UniqueAddress, remoteAddress: UniqueAddress) {
override val toString: String =
s"ThisActorSystemQuarantinedEvent: The remote system [$remoteAddress] has quarantined this system [$localAddress]."
}

View file

@ -17,6 +17,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.transport.AssociationHandle.DisassociateInfo
import akka.actor.DeadLetterSuppression
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
trait TransportAdapterProvider {
/**
@ -25,6 +26,7 @@ trait TransportAdapterProvider {
def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class TransportAdapters(system: ExtendedActorSystem) extends Extension {
val settings = RARP(system).provider.remoteSettings
@ -44,6 +46,7 @@ class TransportAdapters(system: ExtendedActorSystem) extends Extension {
}
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with ExtensionIdProvider {
override def get(system: ActorSystem): TransportAdapters = super.get(system)
override def lookup = TransportAdaptersExtension
@ -51,6 +54,7 @@ object TransportAdaptersExtension extends ExtensionId[TransportAdapters] with Ex
new TransportAdapters(system)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
trait SchemeAugmenter {
protected def addedSchemeIdentifier: String
@ -69,6 +73,7 @@ trait SchemeAugmenter {
/**
* An adapter that wraps a transport and provides interception
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
abstract class AbstractTransportAdapter(protected val wrappedTransport: Transport)(implicit val ec: ExecutionContext)
extends Transport
with SchemeAugmenter {
@ -124,6 +129,7 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
abstract class AbstractTransportAdapterHandle(
val originalLocalAddress: Address,
val originalRemoteAddress: Address,
@ -140,6 +146,7 @@ abstract class AbstractTransportAdapterHandle(
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object ActorTransportAdapter {
sealed trait TransportOperation extends NoSerializationVerificationNeeded
@ -155,6 +162,7 @@ object ActorTransportAdapter {
implicit val AskTimeout = Timeout(5.seconds)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
extends AbstractTransportAdapter(wrappedTransport)(system.dispatchers.internalDispatcher) {
@ -191,6 +199,7 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
} yield stopResult && wrappedStopResult
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
abstract class ActorTransportAdapterManager extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import ActorTransportAdapter.{ ListenUnderlying, ListenerRegistered }

View file

@ -12,6 +12,7 @@ import akka.util.ByteString
import akka.protobuf.InvalidProtocolBufferException
import akka.protobuf.{ ByteString => PByteString }
import akka.util.OptionVal
import com.github.ghik.silencer.silent
/**
* INTERNAL API
@ -25,6 +26,7 @@ private[remote] class PduCodecException(msg: String, cause: Throwable) extends A
* Companion object of the [[akka.remote.transport.AkkaPduCodec]] trait. Contains the representation case classes
* of decoded Akka Protocol Data Units (PDUs).
*/
@silent // deprecated
private[remote] object AkkaPduCodec {
/**
@ -55,6 +57,7 @@ private[remote] object AkkaPduCodec {
*
* A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s.
*/
@silent // deprecated
private[remote] trait AkkaPduCodec {
import AkkaPduCodec._
@ -114,6 +117,7 @@ private[remote] trait AkkaPduCodec {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
import AkkaPduCodec._

View file

@ -63,6 +63,7 @@ private[remote] class AkkaProtocolSettings(config: Config) {
}
}
@silent // deprecated
private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead?
val AkkaScheme: String = "akka"
val AkkaOverhead: Int = 0 //Don't know yet
@ -100,6 +101,7 @@ final case class HandshakeInfo(origin: Address, uid: Int, cookie: Option[String]
* @param codec
* the codec that will be used to encode/decode Akka PDUs
*/
@silent // deprecated
private[remote] class AkkaProtocolTransport(
wrappedTransport: Transport,
private val system: ActorSystem,
@ -129,6 +131,7 @@ private[remote] class AkkaProtocolTransport(
}
}
@silent // deprecated
private[transport] class AkkaProtocolManager(
private val wrappedTransport: Transport,
private val settings: AkkaProtocolSettings)
@ -205,6 +208,7 @@ private[transport] class AkkaProtocolManager(
}
@silent // deprecated
private[remote] class AkkaProtocolHandle(
_localAddress: Address,
_remoteAddress: Address,
@ -222,6 +226,7 @@ private[remote] class AkkaProtocolHandle(
def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info)
}
@silent // deprecated
private[remote] object ProtocolStateActor {
sealed trait AssociationState
@ -324,6 +329,7 @@ private[remote] object ProtocolStateActor {
failureDetector).withDeploy(Deploy.local)
}
@silent // deprecated
private[remote] class ProtocolStateActor(
initialData: InitialProtocolStateData,
private val localHandshakeInfo: HandshakeInfo,

View file

@ -20,8 +20,10 @@ import scala.concurrent.{ Future, Promise }
import scala.util.control.NoStackTrace
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class FailureInjectorException(msg: String) extends AkkaException(msg) with NoStackTrace
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class FailureInjectorProvider extends TransportAdapterProvider {
override def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
@ -58,6 +60,7 @@ private[remote] object FailureInjectorTransportAdapter {
/**
* INTERNAL API
*/
@silent
private[remote] class FailureInjectorTransportAdapter(
wrappedTransport: Transport,
val extendedSystem: ExtendedActorSystem)
@ -158,6 +161,7 @@ private[remote] class FailureInjectorTransportAdapter(
/**
* INTERNAL API
*/
@silent
private[remote] final case class FailureInjectorHandle(
_wrappedHandle: AssociationHandle,
private val gremlinAdapter: FailureInjectorTransportAdapter)

View file

@ -25,6 +25,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
* requested to do. This class is not optimized for performance and MUST not be used as an in-memory transport in
* production systems.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class TestTransport(
val localAddress: Address,
final val registry: AssociationRegistry,
@ -170,6 +171,7 @@ class TestTransport(
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object TestTransport {
type Behavior[A, B] = (A) => Future[B]
@ -440,6 +442,7 @@ object TestTransport {
up via a string key. Until we find a better way to inject an AssociationRegistry to multiple actor systems it is
strongly recommended to use long, randomly generated strings to key the registry to avoid interference between tests.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object AssociationRegistry {
private final val registries = scala.collection.mutable.Map[String, AssociationRegistry]()
@ -450,6 +453,7 @@ object AssociationRegistry {
def clear(): Unit = this.synchronized { registries.clear() }
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
final case class TestAssociationHandle(
localAddress: Address,
remoteAddress: Address,

View file

@ -34,6 +34,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.RARP
import com.github.ghik.silencer.silent
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class ThrottlerProvider extends TransportAdapterProvider {
override def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
@ -41,6 +42,7 @@ class ThrottlerProvider extends TransportAdapterProvider {
}
// not deprecating this because Direction is widely used, we can change testkit anyway when removing classic
object ThrottlerTransportAdapter {
val SchemeIdentifier = "trttl"
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)
@ -203,6 +205,7 @@ object ThrottlerTransportAdapter {
def unthrottledThrottleMode(): ThrottleMode = Unthrottled
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem)
extends ActorTransportAdapter(_wrappedTransport, _system) {
@ -229,6 +232,7 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
/**
* INTERNAL API
*/
@silent
private[transport] object ThrottlerManager {
final case class Checkin(origin: Address, handle: ThrottlerHandle) extends NoSerializationVerificationNeeded
@ -246,6 +250,7 @@ private[transport] object ThrottlerManager {
/**
* INTERNAL API
*/
@silent
private[transport] class ThrottlerManager(wrappedTransport: Transport)
extends ActorTransportAdapterManager
with ActorLogging {
@ -415,6 +420,7 @@ private[transport] object ThrottledAssociation {
/**
* INTERNAL API
*/
@silent
private[transport] class ThrottledAssociation(
val manager: ActorRef,
val associationHandler: AssociationEventListener,
@ -578,6 +584,7 @@ private[transport] class ThrottledAssociation(
/**
* INTERNAL API
*/
@silent
private[transport] final case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef)
extends AbstractTransportAdapterHandle(_wrappedHandle, SchemeIdentifier) {

View file

@ -14,6 +14,7 @@ import akka.actor.DeadLetterSuppression
import akka.event.LoggingAdapter
import com.github.ghik.silencer.silent
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object Transport {
trait AssociationEvent extends NoSerializationVerificationNeeded
@ -66,6 +67,7 @@ object Transport {
* Transport implementations that are loaded dynamically by the remoting must have a constructor that accepts a
* [[com.typesafe.config.Config]] and an [[akka.actor.ExtendedActorSystem]] as parameters.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
trait Transport {
import akka.remote.transport.Transport._
@ -149,6 +151,7 @@ trait Transport {
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object AssociationHandle {
/**
@ -215,6 +218,7 @@ object AssociationHandle {
* returned by [[akka.remote.transport.AssociationHandle#readHandlerPromise]]. Incoming data is not processed until
* this registration takes place.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
trait AssociationHandle {
/**

View file

@ -8,6 +8,7 @@ import akka.japi.Util._
import com.typesafe.config.Config
import org.jboss.netty.handler.ssl.SslHandler
import akka.util.ccompat._
import com.github.ghik.silencer.silent
/**
* INTERNAL API
@ -42,6 +43,7 @@ private[akka] class SSLSettings(config: Config) {
* The `SSLEngine` is created via the configured [[SSLEngineProvider]].
*/
@ccompatUsedUntil213
@silent // deprecated
private[akka] object NettySSLSupport {
/**

View file

@ -56,6 +56,7 @@ import org.jboss.netty.handler.codec.frame.LengthFieldPrepender
import org.jboss.netty.handler.ssl.SslHandler
import org.jboss.netty.util.HashedWheelTimer
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object NettyFutureBridge {
def apply(nettyFuture: ChannelFuture): Future[Channel] = {
val p = Promise[Channel]()
@ -92,6 +93,7 @@ object NettyFutureBridge {
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class NettyTransportException(msg: String, cause: Throwable)
extends RuntimeException(msg, cause)
with OnlyCauseStackTrace {
@ -99,12 +101,14 @@ class NettyTransportException(msg: String, cause: Throwable)
}
@SerialVersionUID(1L)
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class NettyTransportExceptionNoStack(msg: String, cause: Throwable)
extends NettyTransportException(msg, cause)
with NoStackTrace {
def this(msg: String) = this(msg, null)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class NettyTransportSettings(config: Config) {
import akka.util.Helpers.ConfigOps
@ -206,6 +210,7 @@ class NettyTransportSettings(config: Config) {
/**
* INTERNAL API
*/
@silent // deprecated
private[netty] trait CommonHandlers extends NettyHelpers {
protected val transport: NettyTransport
@ -248,6 +253,7 @@ private[netty] trait CommonHandlers extends NettyHelpers {
/**
* INTERNAL API
*/
@silent // deprecated
private[netty] abstract class ServerHandler(
protected final val transport: NettyTransport,
private final val associationListenerFuture: Future[AssociationEventListener])
@ -279,6 +285,7 @@ private[netty] abstract class ServerHandler(
/**
* INTERNAL API
*/
@silent // deprecated
private[netty] abstract class ClientHandler(protected final val transport: NettyTransport, remoteAddress: Address)
extends NettyClientHelpers
with CommonHandlers {
@ -327,6 +334,7 @@ private[transport] object NettyTransport {
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None)
}
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedActorSystem) extends Transport {
def this(system: ExtendedActorSystem, conf: Config) = this(new NettyTransportSettings(conf), system)

View file

@ -27,6 +27,7 @@ import javax.net.ssl.SSLEngine
import javax.net.ssl.TrustManager
import javax.net.ssl.TrustManagerFactory
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
trait SSLEngineProvider {
def createServerSSLEngine(): SSLEngine
@ -40,6 +41,7 @@ trait SSLEngineProvider {
*
* Subclass may override protected methods to replace certain parts, such as key and trust manager.
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class ConfigSSLEngineProvider(protected val log: MarkerLoggingAdapter, private val settings: SSLSettings)
extends SSLEngineProvider {

View file

@ -14,9 +14,10 @@ import java.net.InetSocketAddress
import akka.event.LoggingAdapter
import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
import org.jboss.netty.channel._
import scala.concurrent.{ Future, Promise }
import com.github.ghik.silencer.silent
/**
* INTERNAL API
*/
@ -28,6 +29,7 @@ private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEvent
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] trait TcpHandlers extends CommonHandlers {
protected def log: LoggingAdapter
@ -63,6 +65,7 @@ private[remote] trait TcpHandlers extends CommonHandlers {
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class TcpServerHandler(
_transport: NettyTransport,
_associationListenerFuture: Future[AssociationEventListener],
@ -78,6 +81,7 @@ private[remote] class TcpServerHandler(
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress: Address, val log: LoggingAdapter)
extends ClientHandler(_transport, remoteAddress)
with TcpHandlers {
@ -90,6 +94,7 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress
/**
* INTERNAL API
*/
@silent // deprecated
private[remote] class TcpAssociationHandle(
val localAddress: Address,
val remoteAddress: Address,

View file

@ -8,6 +8,9 @@ import akka.testkit.AkkaSpec
import scala.annotation.tailrec
import java.util.concurrent.ThreadLocalRandom
import com.github.ghik.silencer.silent
@silent // deprecated
object AckedDeliverySpec {
final case class Sequenced(seq: SeqNo, body: String) extends HasSequenceNumber {
@ -16,6 +19,7 @@ object AckedDeliverySpec {
}
@silent // deprecated
class AckedDeliverySpec extends AkkaSpec {
import AckedDeliverySpec._

View file

@ -5,13 +5,17 @@
package akka.remote
import language.postfixOps
import akka.testkit.AkkaSpec
import scala.concurrent.duration._
import akka.remote.transport.AkkaProtocolSettings
import akka.util.{ Helpers }
import akka.util.Helpers
import akka.util.Helpers.ConfigOps
import akka.remote.transport.netty.{ NettyTransportSettings, SSLSettings }
import com.github.ghik.silencer.silent
@silent // classic deprecated
class RemoteConfigSpec extends AkkaSpec("""
akka.actor.provider = remote
akka.remote.classic.netty.tcp.port = 0

View file

@ -22,6 +22,7 @@ import akka.remote.artery.RemoteDeploymentSpec
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestProbe
import com.github.ghik.silencer.silent
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -56,10 +57,14 @@ abstract class RemoteFeaturesSpec(c: Config) extends ArteryMultiNodeSpec(c) with
protected val remoteSystem1 = newRemoteSystem(name = Some("RS1"), extraConfig = Some(common(useUnsafe)))
Seq(system, remoteSystem1).foreach(
muteDeadLetters(
akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass)(_))
@silent // deprecated
private def mute(): Unit = {
Seq(system, remoteSystem1).foreach(
muteDeadLetters(
akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass)(_))
}
mute()
import akka.remote.artery.RemoteWatcherSpec.TestRemoteWatcher
protected val monitor = system.actorOf(Props(new TestRemoteWatcher), "monitor1")

View file

@ -21,7 +21,9 @@ import scala.concurrent.duration._
import scala.reflect.classTag
import akka.remote.transport.netty.ConfigSSLEngineProvider
import com.github.ghik.silencer.silent
@silent // deprecated
object Configuration {
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
// The certificate will expire in 2109
@ -116,6 +118,7 @@ class Ticket1978CrappyRSAWithMD5OnlyHereToMakeSureThingsWorkSpec
class Ticket1978NonExistingRNGSecureSpec
extends Ticket1978CommunicationSpec(CipherConfig(false, AkkaSpec.testConf, "NonExistingRNG", 12345, 12346, None))
@silent // deprecated
abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig)
extends AkkaSpec(cipherConfig.config)
with ImplicitSender {

View file

@ -11,7 +11,6 @@ import akka.actor.RootActorPath
import scala.concurrent.duration._
import akka.testkit.SocketUtil
import akka.remote.QuarantinedEvent
import akka.remote.RARP
import com.github.ghik.silencer.silent

View file

@ -4,12 +4,12 @@
package akka.remote.artery
import language.postfixOps
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.testkit._
import akka.actor._
import akka.remote._
import akka.testkit._
import com.typesafe.config.ConfigFactory
object RemoteWatcherSpec {
@ -74,8 +74,8 @@ class RemoteWatcherSpec
.withFallback(ArterySpecSupport.defaultConfig))
with ImplicitSender {
import RemoteWatcherSpec._
import RemoteWatcher._
import RemoteWatcherSpec._
override def expectedTestDuration = 2.minutes
@ -83,11 +83,6 @@ class RemoteWatcherSpec
val remoteAddress = address(remoteSystem)
def remoteAddressUid = AddressUidExtension(remoteSystem).longAddressUid
Seq(system, remoteSystem).foreach(
muteDeadLetters(
akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass)(_))
override def afterTermination(): Unit = {
shutdown(remoteSystem)
super.afterTermination()

View file

@ -13,6 +13,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
@silent // classic deprecated
class RemoteDeathWatchSpec
extends AkkaSpec(ConfigFactory.parseString("""
akka {

View file

@ -9,9 +9,10 @@ import akka.remote.EndpointException
import akka.remote.transport._
import akka.testkit._
import com.typesafe.config._
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
// relies on test transport
object RemoteDeploymentWhitelistSpec {
@ -99,6 +100,7 @@ object RemoteDeploymentWhitelistSpec {
}
}
@silent // deprecated
class RemoteDeploymentWhitelistSpec
extends AkkaSpec(RemoteDeploymentWhitelistSpec.cfg)
with ImplicitSender

View file

@ -5,9 +5,11 @@
package akka.remote.classic
import akka.remote.RemoteSettings
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
@silent // deprecated
class RemoteSettingsSpec extends WordSpec with Matchers {
"Remote settings" must {

View file

@ -67,6 +67,7 @@ object RemoteWatcherSpec {
}
@silent // deprecated
class RemoteWatcherSpec extends AkkaSpec("""akka {
loglevel = INFO
log-dead-letters-during-shutdown = false

View file

@ -24,10 +24,11 @@ import akka.remote.{ FailureDetector, WireFormats }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.util.{ ByteString, OptionVal }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.{ Await, Promise }
import com.github.ghik.silencer.silent
object AkkaProtocolSpec {
class TestFailureDetector extends FailureDetector {
@ -42,6 +43,7 @@ object AkkaProtocolSpec {
}
@silent // deprecated
class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = remote """) with ImplicitSender {
val conf = ConfigFactory.parseString("""

View file

@ -12,9 +12,11 @@ import akka.remote.transport.Transport._
import akka.remote.transport.{ AssociationRegistry => _, _ }
import akka.testkit.{ AkkaSpec, DefaultTimeout, ImplicitSender }
import akka.util.ByteString
import scala.concurrent.{ Await, Future }
import com.github.ghik.silencer.silent
@silent // deprecated
abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
extends AkkaSpec("""
akka.remote.artery.enabled = false

View file

@ -12,10 +12,11 @@ import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.{ EndpointException, QuarantinedEvent, RARP }
import akka.testkit.{ AkkaSpec, DefaultTimeout, EventFilter, ImplicitSender, TestEvent, TimingTest, _ }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
object SystemMessageDeliveryStressTest {
val msgCount = 5000
val burstSize = 100
@ -98,6 +99,7 @@ object SystemMessageDeliveryStressTest {
}
@silent // deprecated
abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(SystemMessageDeliveryStressTest.baseConfig))
with ImplicitSender

View file

@ -11,9 +11,11 @@ import akka.remote.transport.Transport._
import akka.remote.transport.{ AssociationHandle, TestTransport }
import akka.testkit._
import akka.util.ByteString
import scala.concurrent._
import com.github.ghik.silencer.silent
@silent // deprecated
class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
val addressA: Address = Address("test", "testsytemA", "testhostA", 4321)

View file

@ -11,10 +11,11 @@ import akka.remote.transport.{ TestTransport, ThrottlerTransportAdapter }
import akka.remote.{ EndpointException, RemoteActorRefProvider }
import akka.testkit.{ AkkaSpec, DefaultTimeout, EventFilter, ImplicitSender, TestEvent, TimingTest }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
object ThrottlerTransportAdapterSpec {
val configA: Config =
ConfigFactory.parseString("""
@ -69,6 +70,7 @@ object ThrottlerTransportAdapterSpec {
final case class Lost(msg: String)
}
@silent // deprecated
class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSender with DefaultTimeout {
val systemB = ActorSystem("systemB", system.settings.config)
@ -155,6 +157,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
override def afterTermination(): Unit = shutdown(systemB)
}
@silent // deprecated
class ThrottlerTransportAdapterGenericSpec extends GenericTransportSpec(withAkkaProtocol = true) {
def transportName = "ThrottlerTransportAdapter"