Use protobuf3 for internal serialization (#27366)

* Only load akka protobuf serializer if protobuf on the classpath
This commit is contained in:
Christopher Batey 2019-08-15 16:43:19 +01:00 committed by Patrik Nordwall
parent bbff92ade6
commit 6c13949aec
77 changed files with 59524 additions and 36063 deletions

View file

@ -0,0 +1,2 @@
# Upgrade to protobuf 3
ProblemFilters.exclude[Problem]("akka.cluster.ddata.protobuf.*")

View file

@ -2,6 +2,8 @@
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
syntax = "proto2";
package akka.cluster.ddata;
option java_package = "akka.cluster.ddata.protobuf.msg";

View file

@ -2,6 +2,8 @@
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
syntax = "proto2";
package akka.cluster.ddata;
option java_package = "akka.cluster.ddata.protobuf.msg";

View file

@ -8,12 +8,11 @@ import java.{ util, lang => jl }
import java.util.ArrayList
import java.util.Collections
import java.util.Comparator
import java.util.TreeSet
import scala.annotation.tailrec
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable
import scala.collection.immutable
import akka.actor.ExtendedActorSystem
import akka.cluster.ddata._
import akka.cluster.ddata.Replicator.Internal._
@ -21,14 +20,15 @@ import akka.cluster.ddata.protobuf.msg.{ ReplicatedDataMessages => rd }
import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm }
import akka.serialization.SerializerWithStringManifest
import akka.serialization.BaseSerializer
import akka.protobuf.{ ByteString, GeneratedMessage }
import akka.protobufv3.internal.{ ByteString, GeneratedMessageV3 }
import akka.util.ByteString.UTF_8
import java.io.NotSerializableException
import java.util
import com.github.ghik.silencer.silent
import akka.actor.ActorRef
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
import akka.protobufv3.internal.GeneratedMessageV3
import akka.serialization.Serialization
import akka.util.ccompat._
@ -37,7 +37,7 @@ private object ReplicatedDataSerializer {
/*
* Generic superclass to allow to compare Entry types used in protobuf.
*/
abstract class KeyComparator[A <: GeneratedMessage] extends Comparator[A] {
abstract class KeyComparator[A <: GeneratedMessageV3] extends Comparator[A] {
/**
* Get the key from the entry. The key may be a String, Int, Long, or OtherMessage
@ -95,14 +95,16 @@ private object ReplicatedDataSerializer {
}
sealed trait ProtoMapEntryWriter[
Entry <: GeneratedMessage, EntryBuilder <: GeneratedMessage.Builder[EntryBuilder], Value <: GeneratedMessage] {
Entry <: GeneratedMessageV3,
EntryBuilder <: GeneratedMessageV3.Builder[EntryBuilder],
Value <: GeneratedMessageV3] {
def setStringKey(builder: EntryBuilder, key: String, value: Value): Entry
def setLongKey(builder: EntryBuilder, key: Long, value: Value): Entry
def setIntKey(builder: EntryBuilder, key: Int, value: Value): Entry
def setOtherKey(builder: EntryBuilder, key: dm.OtherMessage, value: Value): Entry
}
sealed trait ProtoMapEntryReader[Entry <: GeneratedMessage, A <: GeneratedMessage] {
sealed trait ProtoMapEntryReader[Entry <: GeneratedMessageV3, A <: GeneratedMessageV3] {
def hasStringKey(entry: Entry): Boolean
def getStringKey(entry: Entry): String
def hasIntKey(entry: Entry): Boolean
@ -659,16 +661,16 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
private def getEntries[
IKey,
IValue,
EntryBuilder <: GeneratedMessage.Builder[EntryBuilder],
PEntry <: GeneratedMessage,
PValue <: GeneratedMessage](
EntryBuilder <: GeneratedMessageV3.Builder[EntryBuilder],
PEntry <: GeneratedMessageV3,
PValue <: GeneratedMessageV3](
input: Map[IKey, IValue],
createBuilder: () => EntryBuilder,
valueConverter: IValue => PValue)(
implicit comparator: Comparator[PEntry],
eh: ProtoMapEntryWriter[PEntry, EntryBuilder, PValue]): java.lang.Iterable[PEntry] = {
// The resulting Iterable needs to be ordered deterministically in order to create same signature upon serializing same data
val protoEntries = new TreeSet[PEntry](comparator)
val protoEntries = new util.TreeSet[PEntry](comparator)
input.foreach {
case (key: String, value) => protoEntries.add(eh.setStringKey(createBuilder(), key, valueConverter(value)))
case (key: Int, value) => protoEntries.add(eh.setIntKey(createBuilder(), key, valueConverter(value)))
@ -689,7 +691,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
def ormapFromBinary(bytes: Array[Byte]): ORMap[Any, ReplicatedData] =
ormapFromProto(rd.ORMap.parseFrom(decompress(bytes)))
def mapTypeFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](
def mapTypeFromProto[PEntry <: GeneratedMessageV3, A <: GeneratedMessageV3, B <: ReplicatedData](
input: util.List[PEntry],
valueCreator: A => B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
input.asScala.map { entry =>
@ -710,7 +712,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
new ORMap(keys = orsetFromProto(ormap.getKeys), entries, ORMap.VanillaORMapTag)
}
def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](
def singleMapEntryFromProto[PEntry <: GeneratedMessageV3, A <: GeneratedMessageV3, B <: ReplicatedData](
input: util.List[PEntry],
valueCreator: A => B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
val map = mapTypeFromProto(input, valueCreator)
@ -721,7 +723,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
map
}
def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entryOption: Option[PEntry])(
def singleKeyEntryFromProto[PEntry <: GeneratedMessageV3, A <: GeneratedMessageV3](entryOption: Option[PEntry])(
implicit eh: ProtoMapEntryReader[PEntry, A]): Any =
entryOption match {
case Some(entry) =>

View file

@ -21,7 +21,7 @@ import akka.serialization.Serialization
import akka.serialization.SerializerWithStringManifest
import akka.serialization.BaseSerializer
import akka.util.{ ByteString => AkkaByteString }
import akka.protobuf.ByteString
import akka.protobufv3.internal.ByteString
import akka.cluster.ddata.Key.KeyR
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec

View file

@ -17,8 +17,8 @@ import akka.actor.ExtendedActorSystem
import akka.cluster.UniqueAddress
import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm }
import akka.serialization._
import akka.protobuf.ByteString
import akka.protobuf.MessageLite
import akka.protobufv3.internal.ByteString
import akka.protobufv3.internal.MessageLite
import akka.cluster.ddata.VersionVector
import akka.util.ccompat._

View file

@ -7,7 +7,6 @@ package akka.cluster.ddata
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
@ -18,12 +17,13 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.scalatest.CancelAfterFailure
final case class DurableDataSpecConfig(writeBehind: Boolean) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString(s"""
commonConfig(ConfigFactory.parseString(s"""akka.loglevel = DEBUG
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.log-dead-letters-during-shutdown = off
akka.cluster.distributed-data.durable.keys = ["durable*"]
@ -46,7 +46,7 @@ object DurableDataSpec {
def receive = {
case LoadAll =>
if (failLoad)
throw new LoadFailed("failed to load durable distributed-data") with NoStackTrace
throw new LoadFailed("TestDurableStore: failed to load durable distributed-data") with NoStackTrace
else
sender() ! LoadAllCompleted
@ -74,7 +74,9 @@ class DurableDataWriteBehindSpecMultiJvmNode2 extends DurableDataSpec(DurableDat
abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
extends MultiNodeSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender {
with ImplicitSender
with CancelAfterFailure {
import DurableDataSpec._
import Replicator._
import multiNodeConfig._
@ -207,6 +209,8 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
val r = newReplicator()
runOn(first) {
// FIXME
log.debug("sending message with sender: {}", implicitly[ActorRef])
r ! Update(KeyC, ORSet.empty[String], WriteLocal)(_ :+ myself.name)
expectMsg(UpdateSuccess(KeyC, None))
}
@ -224,8 +228,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
system.stop(r)
expectTerminated(r)
var r2: ActorRef = null
awaitAssert { r2 = newReplicator() } // try until name is free
val r2 = awaitAssert { newReplicator() } // try until name is free
awaitAssert {
r2 ! GetKeyIds
expectMsgType[GetKeyIdsResult].keyIds should !==(Set.empty[String])
@ -308,7 +311,10 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
}
}
system.log.info("Setup complete")
enterBarrierAfterTestStep()
system.log.info("All setup complete")
}
"stop Replicator if Load fails" in {