From 06467547390c417506990fe0d21f50d6d0ea24cc Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 22 Mar 2024 10:23:09 +0100 Subject: [PATCH] forward fit Akka compat changes (#1211) * Handle mixed akka/pekko protocol names * add extra changes needed to get akka cluster support * add default for pekko.remote.akka.version (#1112) * add default for pekko.cluster.akka.version * refactor configs * Update reference.conf * add validations for config settings * Update RemoteSettings.scala * Update RemoteSettingsSpec.scala * scalafmt --------- Co-authored-by: Matthew de Detrich --- .../org/apache/pekko/cluster/ConfigUtil.scala | 54 +++++++++++++++++++ .../pekko/cluster/SeedNodeProcess.scala | 21 +++++++- .../pekko/remote/RemoteFeaturesSpec.scala | 3 +- remote/src/main/resources/reference.conf | 27 ++++++++++ .../remote/BoundAddressesExtension.scala | 4 +- .../pekko/remote/RemoteActorRefProvider.scala | 36 +++++++++---- .../apache/pekko/remote/RemoteSettings.scala | 25 +++++++++ .../pekko/remote/artery/ArteryTransport.scala | 6 +-- .../transport/PekkoProtocolTransport.scala | 15 +++--- .../remote/classic/RemoteDeathWatchSpec.scala | 3 +- .../remote/classic/RemoteSettingsSpec.scala | 31 ++++++++++- 11 files changed, 199 insertions(+), 26 deletions(-) create mode 100644 cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala new file mode 100644 index 0000000000..6c2bd2254e --- /dev/null +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType } + +import scala.annotation.nowarn + +private[cluster] object ConfigUtil { + + @nowarn("msg=deprecated") + def addAkkaConfig(cfg: Config, akkaVersion: String): Config = { + import scala.collection.JavaConverters._ + val innerSet = cfg.entrySet().asScala + .filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT) + .map { entry => + entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue) + } + var newConfig = cfg + innerSet.foreach { case (key, value) => + newConfig = newConfig.withValue(key, value) + } + newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion)) + } + + private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = { + if (cv.valueType() == ConfigValueType.STRING) { + val str = cv.unwrapped().toString + if (str.startsWith("org.apache.pekko")) { + ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka")) + } else { + cv + } + } else { + cv + } + } + +} diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala index ab090e3615..67a60b8189 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala @@ -47,6 +47,21 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon "Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " + "This node will be shutdown!" + private lazy val needsAkkaConfig: Boolean = { + context.system.settings.config + .getStringList("pekko.remote.accept-protocol-names") + .contains("akka") + } + + private lazy val akkaVersion: String = { + val cfg = context.system.settings.config + if (cfg.hasPath("akka.version")) { + cfg.getString("akka.version") + } else { + cfg.getString("pekko.remote.akka.version") + } + } + private def stopOrBecome(behavior: Option[Actor.Receive]): Unit = behavior match { case Some(done) => context.become(done) // JoinSeedNodeProcess @@ -65,8 +80,12 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon val configToValidate = JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config) + val adjustedConfig = if (needsAkkaConfig) + ConfigUtil.addAkkaConfig(configToValidate, akkaVersion) + else configToValidate + seedNodes.foreach { a => - context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) + context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(adjustedConfig) } } diff --git a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala index 58e97b1681..8b7c1b9262 100644 --- a/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala +++ b/remote-tests/src/multi-jvm/scala/org/apache/pekko/remote/RemoteFeaturesSpec.scala @@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig) remotePath, Nobody, None, - None) + None, + Set("pekko", "akka")) rar.start() rar diff --git a/remote/src/main/resources/reference.conf b/remote/src/main/resources/reference.conf index 20b1cc5517..9049be1ef1 100644 --- a/remote/src/main/resources/reference.conf +++ b/remote/src/main/resources/reference.conf @@ -175,6 +175,33 @@ pekko { # is 'off'. Set this to 'off' to suppress these. warn-unsafe-watch-outside-cluster = on + # When receiving requests from other remote actors, what are the valid + # prefixes to check against. Useful for when dealing with rolling cluster + # migrations with compatible systems such as Lightbend's Akka. + # By default, we only support "pekko" protocol. + # If you want to also support Akka, change this config to: + # pekko.remote.accept-protocol-names = ["pekko", "akka"] + # A ConfigurationException will be thrown at runtime if the array is empty + # or contains values other than "pekko" and/or "akka". + accept-protocol-names = ["pekko"] + + # The protocol name to use when sending requests to other remote actors. + # Useful when dealing with rolling migration, i.e. temporarily change + # the protocol name to match another compatible actor implementation + # such as Lightbend's "akka" (whilst making sure accept-protocol-names + # contains "akka") so that you can gracefully migrate all nodes to Apache + # Pekko and then change the protocol-name back to "pekko" once all + # nodes have been are running on Apache Pekko. + # A ConfigurationException will be thrown at runtime if the value is not + # set to "pekko" or "akka". + protocol-name = "pekko" + + # When pekko.remote.accept-protocol-names contains "akka", then we + # need to know the Akka version. If you include the Akka jars on the classpath, + # we can use the akka.version from their configuration. This configuration + # setting is only used if we can't find an akka.version setting. + akka.version = "2.6.21" + # Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf # [Hayashibara et al]) used for remote death watch. # The default PhiAccrualFailureDetector will trigger if there are no heartbeats within diff --git a/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala b/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala index 4139ada218..5c5d8ac074 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/BoundAddressesExtension.scala @@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension { + private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config) + /** * Returns a mapping from a protocol to a set of bound addresses. */ def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match { - case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address)) + case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address)) case remoting: Remoting => remoting.boundAddresses case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}") } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala index 62a2268292..ea877ecd9f 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteActorRefProvider.scala @@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider( val rpath = (RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements) .withUid(path.uid) - new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) + new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d), + remoteSettings.AcceptProtocolNames) } else { warnIfNotRemoteActorRef(path) local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async) @@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider( RootActorPath(address), Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.error(e, "No root guardian at [{}]", address) @@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider( RootActorPath(address) / elems, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider( rootPath, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider( path, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = remoteSettings.AcceptProtocolNames) } catch { case NonFatal(e) => log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) @@ -672,7 +677,8 @@ private[pekko] class RemoteActorRef private[pekko] ( val path: ActorPath, val getParent: InternalActorRef, props: Option[Props], - deploy: Option[Deploy]) + deploy: Option[Deploy], + val acceptProtocolNames: Set[String]) extends InternalActorRef with RemoteRef { @@ -680,10 +686,17 @@ private[pekko] class RemoteActorRef private[pekko] ( throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]") remote match { - case t: ArteryTransport => - // detect mistakes such as using "pekko.tcp" with Artery - if (path.address.protocol != t.localAddress.address.protocol) - throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]") + case _: ArteryTransport => + // detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names + if (!acceptProtocolNames.contains(path.address.protocol)) { + val expectedString = if (acceptProtocolNames.size == 1) + "expected" + else + "expected one of" + + throw new IllegalArgumentException( + s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]") + } case _ => } @volatile private[remote] var cachedAssociation: artery.Association = null @@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] ( s.headOption match { case None => this case Some("..") => getParent.getChild(name) - case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None) + case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None, + acceptProtocolNames = acceptProtocolNames) } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala b/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala index dd2fe5b8fd..f106284ca5 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala @@ -199,6 +199,31 @@ final class RemoteSettings(val config: Config) { @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters")) + private val AllowableProtocolNames = Set("pekko", "akka") + + val ProtocolName: String = { + val setting = getString("pekko.remote.protocol-name") + if (!AllowableProtocolNames.contains(setting)) { + throw new ConfigurationException("The only allowed values for pekko.remote.protocol-name " + + "are \"pekko\" and \"akka\".") + } + setting + } + + val AcceptProtocolNames: Set[String] = { + val set = immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet + if (set.isEmpty) { + throw new ConfigurationException("pekko.remote.accept-protocol-names setting must not be empty. " + + "The setting is an array and the only acceptable values are \"pekko\" and \"akka\".") + } + val filteredSet = set.filterNot(AllowableProtocolNames.contains) + if (filteredSet.nonEmpty) { + throw new ConfigurationException("pekko.remote.accept-protocol-names is an array setting " + + "that only accepts the values \"pekko\" and \"akka\".") + } + set + } + private def transportNames: immutable.Seq[String] = immutableSeq(getStringList("pekko.remote.classic.enabled-transports")) diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala index fb66fba6a4..44103d42ed 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala @@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr val (port, boundPort) = bindInboundStreams() _localAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port), + Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port), AddressUidExtension(system).longAddressUid) _addresses = Set(_localAddress.address) _bindAddress = UniqueAddress( - Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort), + Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort), AddressUidExtension(system).longAddressUid) flightRecorder.transportUniqueAddressSet(_localAddress) @@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr */ private[remote] object ArteryTransport { - val ProtocolName = "pekko" - // Note that the used version of the header format for outbound messages is defined in // `ArterySettings.Version` because that may depend on configuration settings. // This is the highest supported version on receiving (decoding) side. diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala index c590258c8e..7568d0b286 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/PekkoProtocolTransport.scala @@ -71,11 +71,11 @@ private[remote] class PekkoProtocolSettings(config: Config) { } val ManagerNamePrefix: String = config.getString("pekko.remote.classic.manager-name-prefix") + val PekkoScheme: String = new RemoteSettings(config).ProtocolName } @nowarn("msg=deprecated") private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead? - val PekkoScheme: String = "pekko" val PekkoOverhead: Int = 0 // Don't know yet val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0) @@ -124,7 +124,7 @@ private[remote] class PekkoProtocolTransport( private val codec: PekkoPduCodec) extends ActorTransportAdapter(wrappedTransport, system) { - override val addedSchemeIdentifier: String = PekkoScheme + override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd) @@ -232,8 +232,9 @@ private[remote] class PekkoProtocolHandle( _wrappedHandle: AssociationHandle, val handshakeInfo: HandshakeInfo, private val stateActor: ActorRef, - private val codec: PekkoPduCodec) - extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) { + private val codec: PekkoPduCodec, + override val addedSchemeIdentifier: String) + extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) { override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) @@ -716,7 +717,8 @@ private[remote] class ProtocolStateActor( wrappedHandle, handshakeInfo, self, - codec)) + codec, + settings.PekkoScheme)) readHandlerPromise.future } @@ -736,7 +738,8 @@ private[remote] class ProtocolStateActor( wrappedHandle, handshakeInfo, self, - codec))) + codec, + settings.PekkoScheme))) readHandlerPromise.future } diff --git a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala index c710966b90..c448719f3e 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteDeathWatchSpec.scala @@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off extinctPath, Nobody, props = None, - deploy = None) + deploy = None, + acceptProtocolNames = Set("pekko", "akka")) val probe = TestProbe() probe.watch(extinctRef) diff --git a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteSettingsSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteSettingsSpec.scala index cb513e1454..b2dee383db 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteSettingsSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/classic/RemoteSettingsSpec.scala @@ -18,7 +18,10 @@ import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import org.apache.pekko.remote.RemoteSettings +import org.apache.pekko +import pekko.ConfigurationException +import pekko.remote.RemoteSettings +import pekko.testkit.PekkoSpec @nowarn("msg=deprecated") class RemoteSettingsSpec extends AnyWordSpec with Matchers { @@ -34,6 +37,32 @@ class RemoteSettingsSpec extends AnyWordSpec with Matchers { .parseString("pekko.remote.classic.log-frame-size-exceeding = 100b") .withFallback(ConfigFactory.load())).LogFrameSizeExceeding shouldEqual Some(100) } + "fail if unknown protocol name is used" in { + val cfg = ConfigFactory.parseString("pekko.remote.protocol-name=unknown") + .withFallback(PekkoSpec.testConf) + val ex = intercept[ConfigurationException] { + new RemoteSettings(ConfigFactory.load(cfg)) + } + ex.getMessage shouldEqual + """The only allowed values for pekko.remote.protocol-name are "pekko" and "akka".""" + } + "fail if empty accept-protocol-names is used" in { + val cfg = ConfigFactory.parseString("pekko.remote.accept-protocol-names=[]") + .withFallback(PekkoSpec.testConf) + val ex = intercept[ConfigurationException] { + new RemoteSettings(ConfigFactory.load(cfg)) + } + ex.getMessage should startWith("pekko.remote.accept-protocol-names setting must not be empty") + } + "fail if invalid accept-protocol-names value is used" in { + val cfg = ConfigFactory.parseString("""pekko.remote.accept-protocol-names=["pekko", "unknown"]""") + .withFallback(PekkoSpec.testConf) + val ex = intercept[ConfigurationException] { + new RemoteSettings(ConfigFactory.load(cfg)) + } + ex.getMessage shouldEqual + """pekko.remote.accept-protocol-names is an array setting that only accepts the values "pekko" and "akka".""" + } } }