forward fit Akka compat changes (#1211)
* Handle mixed akka/pekko protocol names * add extra changes needed to get akka cluster support * add default for pekko.remote.akka.version (#1112) * add default for pekko.cluster.akka.version * refactor configs * Update reference.conf * add validations for config settings * Update RemoteSettings.scala * Update RemoteSettingsSpec.scala * scalafmt --------- Co-authored-by: Matthew de Detrich <matthew.dedetrich@aiven.io>
This commit is contained in:
parent
4bb851db12
commit
0646754739
11 changed files with 199 additions and 26 deletions
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.pekko.cluster
|
||||
|
||||
import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType }
|
||||
|
||||
import scala.annotation.nowarn
|
||||
|
||||
private[cluster] object ConfigUtil {
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
|
||||
import scala.collection.JavaConverters._
|
||||
val innerSet = cfg.entrySet().asScala
|
||||
.filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT)
|
||||
.map { entry =>
|
||||
entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue)
|
||||
}
|
||||
var newConfig = cfg
|
||||
innerSet.foreach { case (key, value) =>
|
||||
newConfig = newConfig.withValue(key, value)
|
||||
}
|
||||
newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion))
|
||||
}
|
||||
|
||||
private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = {
|
||||
if (cv.valueType() == ConfigValueType.STRING) {
|
||||
val str = cv.unwrapped().toString
|
||||
if (str.startsWith("org.apache.pekko")) {
|
||||
ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka"))
|
||||
} else {
|
||||
cv
|
||||
}
|
||||
} else {
|
||||
cv
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -47,6 +47,21 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
|
|||
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
|
||||
"This node will be shutdown!"
|
||||
|
||||
private lazy val needsAkkaConfig: Boolean = {
|
||||
context.system.settings.config
|
||||
.getStringList("pekko.remote.accept-protocol-names")
|
||||
.contains("akka")
|
||||
}
|
||||
|
||||
private lazy val akkaVersion: String = {
|
||||
val cfg = context.system.settings.config
|
||||
if (cfg.hasPath("akka.version")) {
|
||||
cfg.getString("akka.version")
|
||||
} else {
|
||||
cfg.getString("pekko.remote.akka.version")
|
||||
}
|
||||
}
|
||||
|
||||
private def stopOrBecome(behavior: Option[Actor.Receive]): Unit =
|
||||
behavior match {
|
||||
case Some(done) => context.become(done) // JoinSeedNodeProcess
|
||||
|
|
@ -65,8 +80,12 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
|
|||
val configToValidate =
|
||||
JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config)
|
||||
|
||||
val adjustedConfig = if (needsAkkaConfig)
|
||||
ConfigUtil.addAkkaConfig(configToValidate, akkaVersion)
|
||||
else configToValidate
|
||||
|
||||
seedNodes.foreach { a =>
|
||||
context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate)
|
||||
context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(adjustedConfig)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig)
|
|||
remotePath,
|
||||
Nobody,
|
||||
None,
|
||||
None)
|
||||
None,
|
||||
Set("pekko", "akka"))
|
||||
|
||||
rar.start()
|
||||
rar
|
||||
|
|
|
|||
|
|
@ -175,6 +175,33 @@ pekko {
|
|||
# is 'off'. Set this to 'off' to suppress these.
|
||||
warn-unsafe-watch-outside-cluster = on
|
||||
|
||||
# When receiving requests from other remote actors, what are the valid
|
||||
# prefixes to check against. Useful for when dealing with rolling cluster
|
||||
# migrations with compatible systems such as Lightbend's Akka.
|
||||
# By default, we only support "pekko" protocol.
|
||||
# If you want to also support Akka, change this config to:
|
||||
# pekko.remote.accept-protocol-names = ["pekko", "akka"]
|
||||
# A ConfigurationException will be thrown at runtime if the array is empty
|
||||
# or contains values other than "pekko" and/or "akka".
|
||||
accept-protocol-names = ["pekko"]
|
||||
|
||||
# The protocol name to use when sending requests to other remote actors.
|
||||
# Useful when dealing with rolling migration, i.e. temporarily change
|
||||
# the protocol name to match another compatible actor implementation
|
||||
# such as Lightbend's "akka" (whilst making sure accept-protocol-names
|
||||
# contains "akka") so that you can gracefully migrate all nodes to Apache
|
||||
# Pekko and then change the protocol-name back to "pekko" once all
|
||||
# nodes have been are running on Apache Pekko.
|
||||
# A ConfigurationException will be thrown at runtime if the value is not
|
||||
# set to "pekko" or "akka".
|
||||
protocol-name = "pekko"
|
||||
|
||||
# When pekko.remote.accept-protocol-names contains "akka", then we
|
||||
# need to know the Akka version. If you include the Akka jars on the classpath,
|
||||
# we can use the akka.version from their configuration. This configuration
|
||||
# setting is only used if we can't find an akka.version setting.
|
||||
akka.version = "2.6.21"
|
||||
|
||||
# Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf
|
||||
# [Hayashibara et al]) used for remote death watch.
|
||||
# The default PhiAccrualFailureDetector will trigger if there are no heartbeats within
|
||||
|
|
|
|||
|
|
@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with
|
|||
|
||||
class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension {
|
||||
|
||||
private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config)
|
||||
|
||||
/**
|
||||
* Returns a mapping from a protocol to a set of bound addresses.
|
||||
*/
|
||||
def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match {
|
||||
case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address))
|
||||
case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address))
|
||||
case remoting: Remoting => remoting.boundAddresses
|
||||
case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider(
|
|||
val rpath =
|
||||
(RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements)
|
||||
.withUid(path.uid)
|
||||
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
|
||||
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d),
|
||||
remoteSettings.AcceptProtocolNames)
|
||||
} else {
|
||||
warnIfNotRemoteActorRef(path)
|
||||
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
|
||||
|
|
@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider(
|
|||
RootActorPath(address),
|
||||
Nobody,
|
||||
props = None,
|
||||
deploy = None)
|
||||
deploy = None,
|
||||
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
log.error(e, "No root guardian at [{}]", address)
|
||||
|
|
@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider(
|
|||
RootActorPath(address) / elems,
|
||||
Nobody,
|
||||
props = None,
|
||||
deploy = None)
|
||||
deploy = None,
|
||||
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
|
||||
|
|
@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider(
|
|||
rootPath,
|
||||
Nobody,
|
||||
props = None,
|
||||
deploy = None)
|
||||
deploy = None,
|
||||
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
|
||||
|
|
@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider(
|
|||
path,
|
||||
Nobody,
|
||||
props = None,
|
||||
deploy = None)
|
||||
deploy = None,
|
||||
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
|
||||
|
|
@ -672,7 +677,8 @@ private[pekko] class RemoteActorRef private[pekko] (
|
|||
val path: ActorPath,
|
||||
val getParent: InternalActorRef,
|
||||
props: Option[Props],
|
||||
deploy: Option[Deploy])
|
||||
deploy: Option[Deploy],
|
||||
val acceptProtocolNames: Set[String])
|
||||
extends InternalActorRef
|
||||
with RemoteRef {
|
||||
|
||||
|
|
@ -680,10 +686,17 @@ private[pekko] class RemoteActorRef private[pekko] (
|
|||
throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]")
|
||||
|
||||
remote match {
|
||||
case t: ArteryTransport =>
|
||||
// detect mistakes such as using "pekko.tcp" with Artery
|
||||
if (path.address.protocol != t.localAddress.address.protocol)
|
||||
throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]")
|
||||
case _: ArteryTransport =>
|
||||
// detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names
|
||||
if (!acceptProtocolNames.contains(path.address.protocol)) {
|
||||
val expectedString = if (acceptProtocolNames.size == 1)
|
||||
"expected"
|
||||
else
|
||||
"expected one of"
|
||||
|
||||
throw new IllegalArgumentException(
|
||||
s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]")
|
||||
}
|
||||
case _ =>
|
||||
}
|
||||
@volatile private[remote] var cachedAssociation: artery.Association = null
|
||||
|
|
@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] (
|
|||
s.headOption match {
|
||||
case None => this
|
||||
case Some("..") => getParent.getChild(name)
|
||||
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None)
|
||||
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None,
|
||||
acceptProtocolNames = acceptProtocolNames)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -199,6 +199,31 @@ final class RemoteSettings(val config: Config) {
|
|||
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
|
||||
val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters"))
|
||||
|
||||
private val AllowableProtocolNames = Set("pekko", "akka")
|
||||
|
||||
val ProtocolName: String = {
|
||||
val setting = getString("pekko.remote.protocol-name")
|
||||
if (!AllowableProtocolNames.contains(setting)) {
|
||||
throw new ConfigurationException("The only allowed values for pekko.remote.protocol-name " +
|
||||
"are \"pekko\" and \"akka\".")
|
||||
}
|
||||
setting
|
||||
}
|
||||
|
||||
val AcceptProtocolNames: Set[String] = {
|
||||
val set = immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet
|
||||
if (set.isEmpty) {
|
||||
throw new ConfigurationException("pekko.remote.accept-protocol-names setting must not be empty. " +
|
||||
"The setting is an array and the only acceptable values are \"pekko\" and \"akka\".")
|
||||
}
|
||||
val filteredSet = set.filterNot(AllowableProtocolNames.contains)
|
||||
if (filteredSet.nonEmpty) {
|
||||
throw new ConfigurationException("pekko.remote.accept-protocol-names is an array setting " +
|
||||
"that only accepts the values \"pekko\" and \"akka\".")
|
||||
}
|
||||
set
|
||||
}
|
||||
|
||||
private def transportNames: immutable.Seq[String] =
|
||||
immutableSeq(getStringList("pekko.remote.classic.enabled-transports"))
|
||||
|
||||
|
|
|
|||
|
|
@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
|
|||
val (port, boundPort) = bindInboundStreams()
|
||||
|
||||
_localAddress = UniqueAddress(
|
||||
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
|
||||
Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port),
|
||||
AddressUidExtension(system).longAddressUid)
|
||||
_addresses = Set(_localAddress.address)
|
||||
|
||||
_bindAddress = UniqueAddress(
|
||||
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
|
||||
Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
|
||||
AddressUidExtension(system).longAddressUid)
|
||||
|
||||
flightRecorder.transportUniqueAddressSet(_localAddress)
|
||||
|
|
@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
|
|||
*/
|
||||
private[remote] object ArteryTransport {
|
||||
|
||||
val ProtocolName = "pekko"
|
||||
|
||||
// Note that the used version of the header format for outbound messages is defined in
|
||||
// `ArterySettings.Version` because that may depend on configuration settings.
|
||||
// This is the highest supported version on receiving (decoding) side.
|
||||
|
|
|
|||
|
|
@ -71,11 +71,11 @@ private[remote] class PekkoProtocolSettings(config: Config) {
|
|||
}
|
||||
|
||||
val ManagerNamePrefix: String = config.getString("pekko.remote.classic.manager-name-prefix")
|
||||
val PekkoScheme: String = new RemoteSettings(config).ProtocolName
|
||||
}
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead?
|
||||
val PekkoScheme: String = "pekko"
|
||||
val PekkoOverhead: Int = 0 // Don't know yet
|
||||
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)
|
||||
|
||||
|
|
@ -124,7 +124,7 @@ private[remote] class PekkoProtocolTransport(
|
|||
private val codec: PekkoPduCodec)
|
||||
extends ActorTransportAdapter(wrappedTransport, system) {
|
||||
|
||||
override val addedSchemeIdentifier: String = PekkoScheme
|
||||
override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName
|
||||
|
||||
override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd)
|
||||
|
||||
|
|
@ -232,8 +232,9 @@ private[remote] class PekkoProtocolHandle(
|
|||
_wrappedHandle: AssociationHandle,
|
||||
val handshakeInfo: HandshakeInfo,
|
||||
private val stateActor: ActorRef,
|
||||
private val codec: PekkoPduCodec)
|
||||
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) {
|
||||
private val codec: PekkoPduCodec,
|
||||
override val addedSchemeIdentifier: String)
|
||||
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) {
|
||||
|
||||
override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))
|
||||
|
||||
|
|
@ -716,7 +717,8 @@ private[remote] class ProtocolStateActor(
|
|||
wrappedHandle,
|
||||
handshakeInfo,
|
||||
self,
|
||||
codec))
|
||||
codec,
|
||||
settings.PekkoScheme))
|
||||
readHandlerPromise.future
|
||||
}
|
||||
|
||||
|
|
@ -736,7 +738,8 @@ private[remote] class ProtocolStateActor(
|
|||
wrappedHandle,
|
||||
handshakeInfo,
|
||||
self,
|
||||
codec)))
|
||||
codec,
|
||||
settings.PekkoScheme)))
|
||||
readHandlerPromise.future
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off
|
|||
extinctPath,
|
||||
Nobody,
|
||||
props = None,
|
||||
deploy = None)
|
||||
deploy = None,
|
||||
acceptProtocolNames = Set("pekko", "akka"))
|
||||
|
||||
val probe = TestProbe()
|
||||
probe.watch(extinctRef)
|
||||
|
|
|
|||
|
|
@ -18,7 +18,10 @@ import com.typesafe.config.ConfigFactory
|
|||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
|
||||
import org.apache.pekko.remote.RemoteSettings
|
||||
import org.apache.pekko
|
||||
import pekko.ConfigurationException
|
||||
import pekko.remote.RemoteSettings
|
||||
import pekko.testkit.PekkoSpec
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
class RemoteSettingsSpec extends AnyWordSpec with Matchers {
|
||||
|
|
@ -34,6 +37,32 @@ class RemoteSettingsSpec extends AnyWordSpec with Matchers {
|
|||
.parseString("pekko.remote.classic.log-frame-size-exceeding = 100b")
|
||||
.withFallback(ConfigFactory.load())).LogFrameSizeExceeding shouldEqual Some(100)
|
||||
}
|
||||
"fail if unknown protocol name is used" in {
|
||||
val cfg = ConfigFactory.parseString("pekko.remote.protocol-name=unknown")
|
||||
.withFallback(PekkoSpec.testConf)
|
||||
val ex = intercept[ConfigurationException] {
|
||||
new RemoteSettings(ConfigFactory.load(cfg))
|
||||
}
|
||||
ex.getMessage shouldEqual
|
||||
"""The only allowed values for pekko.remote.protocol-name are "pekko" and "akka"."""
|
||||
}
|
||||
"fail if empty accept-protocol-names is used" in {
|
||||
val cfg = ConfigFactory.parseString("pekko.remote.accept-protocol-names=[]")
|
||||
.withFallback(PekkoSpec.testConf)
|
||||
val ex = intercept[ConfigurationException] {
|
||||
new RemoteSettings(ConfigFactory.load(cfg))
|
||||
}
|
||||
ex.getMessage should startWith("pekko.remote.accept-protocol-names setting must not be empty")
|
||||
}
|
||||
"fail if invalid accept-protocol-names value is used" in {
|
||||
val cfg = ConfigFactory.parseString("""pekko.remote.accept-protocol-names=["pekko", "unknown"]""")
|
||||
.withFallback(PekkoSpec.testConf)
|
||||
val ex = intercept[ConfigurationException] {
|
||||
new RemoteSettings(ConfigFactory.load(cfg))
|
||||
}
|
||||
ex.getMessage shouldEqual
|
||||
"""pekko.remote.accept-protocol-names is an array setting that only accepts the values "pekko" and "akka"."""
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue