+rem #24265 protobuf serializer for Address and UniqueAddress in akka… (#24267)

* +rem #24265 protobuf serializer for Address and UniqueAddress in akka-remote

* remove the duplication, by using previously existing type

* fixed doc link

* make it easier to enable the additional no-java-serialization bindings

* fixed akka-actor failure due to changes

* cleanup

* Update reference.conf

* Update serialization.md

* Update reference.conf
This commit is contained in:
Konrad `ktoso` Malawski 2018-01-10 19:28:51 +09:00 committed by GitHub
parent eba7473199
commit 59a48c728b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 1132 additions and 1027 deletions

View file

@ -11,8 +11,9 @@ import java.util.concurrent.TimeUnit
import akka.Done
import akka.actor._
import akka.dispatch.Dispatchers
import akka.remote.WireFormats.AddressData
import akka.remote.routing.RemoteRouterConfig
import akka.remote.{ ContainerFormats, RemoteScope, RemoteWatcher, WireFormats }
import akka.remote._
import akka.routing._
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions }
@ -48,6 +49,8 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
case hbrsp: RemoteWatcher.HeartbeatRsp serializeHeartbeatRsp(hbrsp)
case rs: RemoteScope serializeRemoteScope(rs)
case LocalScope ParameterlessSerializedMessage
case a: Address serializeAddressData(a)
case u: UniqueAddress serializeClassicUniqueAddress(u)
case c: Config serializeConfig(c)
case dr: DefaultResizer serializeDefaultResizer(dr)
case fc: FromConfig serializeFromConfig(fc)
@ -135,6 +138,35 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
c.root.render(ConfigRenderOptions.concise()).getBytes(StandardCharsets.UTF_8)
}
private def protoForAddressData(address: Address): AddressData.Builder =
address match {
case Address(protocol, actorSystem, Some(host), Some(port))
WireFormats.AddressData.newBuilder()
.setSystem(actorSystem)
.setHostname(host)
.setPort(port)
.setProtocol(protocol)
case _ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.")
}
private def protoForAddress(address: Address): ArteryControlFormats.Address.Builder =
address match {
case Address(protocol, actorSystem, Some(host), Some(port))
ArteryControlFormats.Address.newBuilder()
.setSystem(actorSystem)
.setHostname(host)
.setPort(port)
.setProtocol(protocol)
case _ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.")
}
private def serializeAddressData(address: Address): Array[Byte] =
protoForAddressData(address).build().toByteArray
private def serializeClassicUniqueAddress(uniqueAddress: UniqueAddress): Array[Byte] =
ArteryControlFormats.UniqueAddress.newBuilder()
.setUid(uniqueAddress.uid)
.setAddress(protoForAddress(uniqueAddress.address))
.build().toByteArray
private def serializeDefaultResizer(dr: DefaultResizer): Array[Byte] = {
val builder = WireFormats.DefaultResizer.newBuilder()
builder.setBackoffRate(dr.backoffRate)
@ -256,6 +288,8 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val KillManifest = "K"
private val RemoteWatcherHBManifest = "RWHB"
private val DoneManifest = "DONE"
private val AddressManifest = "AD"
private val UniqueAddressManifest = "UD"
private val RemoteWatcherHBRespManifest = "RWHR"
private val ActorInitializationExceptionManifest = "AIEX"
private val LocalScopeManifest = "LS"
@ -285,6 +319,8 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
KillManifest ((_) Kill),
RemoteWatcherHBManifest ((_) RemoteWatcher.Heartbeat),
DoneManifest ((_) Done),
AddressManifest deserializeAddressData,
UniqueAddressManifest deserializeUniqueAddress,
RemoteWatcherHBRespManifest deserializeHeartbeatRsp,
ActorInitializationExceptionManifest deserializeActorInitializationException,
LocalScopeManifest ((_) LocalScope),
@ -316,6 +352,8 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
case Kill KillManifest
case RemoteWatcher.Heartbeat RemoteWatcherHBManifest
case Done DoneManifest
case _: Address AddressManifest
case _: UniqueAddress UniqueAddressManifest
case _: RemoteWatcher.HeartbeatRsp RemoteWatcherHBRespManifest
case LocalScope LocalScopeManifest
case _: RemoteScope RemoteScopeManifest
@ -387,6 +425,36 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private def deserializeStatusFailure(bytes: Array[Byte]): Status.Failure =
Status.Failure(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)).asInstanceOf[Throwable])
private def deserializeAddressData(bytes: Array[Byte]): Address =
addressFromDataProto(WireFormats.AddressData.parseFrom(bytes))
private def addressFromDataProto(a: WireFormats.AddressData): Address = {
Address(
a.getProtocol,
a.getSystem,
// technicaly the presence of hostname and port are guaranteed, see our serializeAddressData
if (a.hasHostname) Some(a.getHostname) else None,
if (a.hasPort) Some(a.getPort) else None
)
}
private def addressFromProto(a: ArteryControlFormats.Address): Address = {
Address(
a.getProtocol,
a.getSystem,
// technicaly the presence of hostname and port are guaranteed, see our serializeAddressData
if (a.hasHostname) Some(a.getHostname) else None,
if (a.hasPort) Some(a.getPort) else None
)
}
private def deserializeUniqueAddress(bytes: Array[Byte]): UniqueAddress = {
val u = ArteryControlFormats.UniqueAddress.parseFrom(bytes)
UniqueAddress(
addressFromProto(u.getAddress),
u.getUid
)
}
private def deserializeHeartbeatRsp(bytes: Array[Byte]): RemoteWatcher.HeartbeatRsp = {
RemoteWatcher.HeartbeatRsp(ContainerFormats.WatcherHeartbeatResponse.parseFrom(bytes).getUid.toInt)
}