Timout await extension initialization, #28301 (#28340)

* use the existing creation-timeout config
* and error message about the typical mistake of accessing
  SerializationExtension from constructor of serializer
* mention in docs
This commit is contained in:
Patrik Nordwall 2019-12-17 14:19:42 +01:00 committed by Johan Andrén
parent ffe769b4df
commit 00fc33d0a5
6 changed files with 55 additions and 6 deletions

View file

@ -532,6 +532,30 @@ class NoVerificationWarningOffSpec
} }
} }
class SerializerDeadlockSpec extends AkkaSpec {
"SerializationExtension" must {
"not be accessed from constructor of serializer" in {
intercept[IllegalStateException] {
val sys = ActorSystem(
"SerializerDeadlockSpec",
ConfigFactory.parseString("""
akka {
actor {
creation-timeout = 1s
serializers {
test = "akka.serialization.DeadlockSerializer"
}
}
}
"""))
shutdown(sys)
}.getMessage should include("SerializationExtension from its constructor")
}
}
}
protected[akka] class NoopSerializer extends Serializer { protected[akka] class NoopSerializer extends Serializer {
def includeManifest: Boolean = false def includeManifest: Boolean = false
@ -560,3 +584,17 @@ protected[akka] class NoopSerializer2 extends Serializer {
protected[akka] final case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable { protected[akka] final case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable {
override def fillInStackTrace = null override def fillInStackTrace = null
} }
class DeadlockSerializer(system: ExtendedActorSystem) extends Serializer {
// not allowed
SerializationExtension(system)
def includeManifest: Boolean = false
def identifier = 9999
def toBinary(o: AnyRef): Array[Byte] = Array.empty[Byte]
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null
}

View file

@ -119,7 +119,8 @@ akka {
# In addition to the default there is akka.actor.StoppingSupervisorStrategy. # In addition to the default there is akka.actor.StoppingSupervisorStrategy.
guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy" guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"
# Timeout for ActorSystem.actorOf # Timeout for Extension creation and a few other potentially blocking
# initialization tasks.
creation-timeout = 20s creation-timeout = 20s
# Serializes and deserializes (non-primitive) messages to ensure immutability, # Serializes and deserializes (non-primitive) messages to ensure immutability,
@ -140,6 +141,7 @@ akka {
# CallingThreadDispatcher for a top-level actor. # CallingThreadDispatcher for a top-level actor.
unstarted-push-timeout = 10s unstarted-push-timeout = 10s
# TypedActor deprecated since 2.6.0.
typed { typed {
# Default timeout for the depredated TypedActor (not the new actor APIs in 2.6) methods with non-void return type # Default timeout for the depredated TypedActor (not the new actor APIs in 2.6) methods with non-void return type
timeout = 5s timeout = 5s

View file

@ -31,6 +31,7 @@ import scala.util.control.{ ControlThrowable, NonFatal }
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.event.Logging.DefaultLogger import akka.event.Logging.DefaultLogger
import akka.serialization.SerializationExtension
object BootstrapSetup { object BootstrapSetup {
@ -1133,7 +1134,13 @@ private[akka] class ActorSystemImpl(
private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
case c: CountDownLatch => case c: CountDownLatch =>
blocking { blocking {
c.await() val awaitMillis = settings.CreationTimeout.duration.toMillis
if (!c.await(awaitMillis, TimeUnit.MILLISECONDS))
throw new IllegalStateException(
s"Initialization of [$ext] took more than [$awaitMillis ms]. " +
(if (ext == SerializationExtension)
"A serializer must not access the SerializationExtension from its constructor. Use lazy init."
else "Could be deadlock due to cyclic initialization of extensions."))
} }
findExtension(ext) //Registration in process, await completion and retry findExtension(ext) //Registration in process, await completion and retry
case t: Throwable => throw t //Initialization failed, throw same again case t: Throwable => throw t //Initialization failed, throw same again

View file

@ -127,6 +127,10 @@ bytes to different objects.
Then you only need to fill in the blanks, bind it to a name in your configuration and then Then you only need to fill in the blanks, bind it to a name in your configuration and then
list which classes that should be serialized using it. list which classes that should be serialized using it.
The serializers are initialized eagerly by the `SerializationExtension` when the `ActorSystem` is started and
therefore a serializer itself must not access the `SerializationExtension` from its constructor. Instead, it
should access the `SerializationExtension` lazily.
<a id="string-manifest-serializer"></a> <a id="string-manifest-serializer"></a>
### Serializer with String Manifest ### Serializer with String Manifest

View file

@ -196,8 +196,7 @@ public class SerializationDocTest {
akka.actor.typed.ActorSystem.create(Behaviors.empty(), "example"); akka.actor.typed.ActorSystem.create(Behaviors.empty(), "example");
// Get the Serialization Extension // Get the Serialization Extension
Serialization serialization = Serialization serialization = SerializationExtension.get(system);
SerializationExtension.get(akka.actor.typed.javadsl.Adapter.toClassic(system));
// #programmatic-typed // #programmatic-typed
} }
} }

View file

@ -214,12 +214,11 @@ package docs.serialization {
def demonstrateTypedActorSystem(): Unit = { def demonstrateTypedActorSystem(): Unit = {
//#programmatic-typed //#programmatic-typed
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter._
val system = ActorSystem(Behaviors.empty, "example") val system = ActorSystem(Behaviors.empty, "example")
// Get the Serialization Extension // Get the Serialization Extension
val serialization = SerializationExtension(system.toClassic) val serialization = SerializationExtension(system)
//#programmatic-typed //#programmatic-typed
} }