Avoid deadlock on concurrent extension with materializer init (#28046)

* Avoid deadlock on concurrent extension with materializer init #28037

* Wrap blocking extension await with blocking context
This commit is contained in:
Johan Andrén 2019-10-25 18:05:59 +02:00 committed by Christopher Batey
parent ab3f63899c
commit a56a53734f
5 changed files with 72 additions and 4 deletions

View file

@ -25,6 +25,7 @@ import scala.collection.immutable
import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.blocking
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
import scala.util.control.{ ControlThrowable, NonFatal }
import scala.util.{ Failure, Success, Try }
@ -1125,7 +1126,10 @@ private[akka] class ActorSystemImpl(
@tailrec
private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
case c: CountDownLatch =>
c.await(); findExtension(ext) //Registration in process, await completion and retry
blocking {
c.await()
}
findExtension(ext) //Registration in process, await completion and retry
case t: Throwable => throw t //Initialization failed, throw same again
case other =>
other.asInstanceOf[T] //could be a T or null, in which case we return the null as T

View file

@ -13,6 +13,7 @@ import akka.dispatch.MailboxType
import akka.actor.ActorRef
import akka.actor.ActorRefWithCell
import akka.actor.Actor
import akka.stream.impl.MaterializerGuardian
/**
* INTERNAL API
@ -34,7 +35,7 @@ private[akka] final case class StreamTestDefaultMailbox()
s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]")
// StreamTcpManager is allowed to use another dispatcher
assert(
!actorClass.getName.startsWith("akka.stream."),
actorClass == classOf[MaterializerGuardian] || !actorClass.getName.startsWith("akka.stream."),
s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " +
"Did you forget to define `props.withDispatcher` when creating the actor? " +
"Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " +

View file

@ -5,23 +5,83 @@
package akka.stream
import akka.Done
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.{ Actor, ActorSystem, PoisonPill, Props }
import akka.stream.ActorMaterializerSpec.ActorWithMaterializer
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.testkit.TestKit
import akka.testkit.{ ImplicitSender, TestProbe }
import com.github.ghik.silencer.silent
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Try }
object IndirectMaterializerCreation extends ExtensionId[IndirectMaterializerCreation] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): IndirectMaterializerCreation =
new IndirectMaterializerCreation(system)
def lookup(): ExtensionId[IndirectMaterializerCreation] = this
}
@silent
class IndirectMaterializerCreation(ex: ExtendedActorSystem) extends Extension {
// extension instantiation blocked on materializer (which has Await.result inside)
implicit val mat = ActorMaterializer()(ex)
def futureThing(n: Int): Future[Int] = {
Source.single(n).runWith(Sink.head)
}
}
@silent
class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
"ActorMaterializer" must {
"not suffer from deadlock" in {
val n = 4
implicit val deadlockSystem = ActorSystem(
"ActorMaterializerSpec-deadlock",
ConfigFactory.parseString(s"""
akka.actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = $n
parallelism-factor = 0.5
parallelism-max = $n
}
}
# undo stream testkit specific dispatcher and run "normally"
akka.actor.default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox"
akka.stream.materializer.dispatcher = "akka.actor.default-dispatcher"
"""))
try {
import deadlockSystem.dispatcher
// tricky part here is that the concurrent access is to the extension
// so the threads are indirectly blocked and not waiting for the Await.result(ask) directly.
val result = Future.sequence((1 to (n + 1)).map(n =>
Future {
IndirectMaterializerCreation(deadlockSystem).mat
}))
// with starvation these fail
result.futureValue.size should ===(n + 1)
} finally {
TestKit.shutdownActorSystem(deadlockSystem)
}
}
"report shutdown status properly" in {
val m = ActorMaterializer.create(system)

View file

@ -11,6 +11,7 @@ import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
import akka.stream.impl.MaterializerGuardian
import scala.concurrent.Await
@ -52,7 +53,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
private val materializerGuardian = system.systemActorOf(
MaterializerGuardian
.props(systemMaterializerPromise, materializerSettings)
.withDispatcher(materializerSettings.dispatcher)
// #28037 run on internal dispatcher to make sure default dispatcher starvation doesn't stop materializer creation
.withDispatcher(Dispatchers.InternalDispatcherId)
.withDeploy(Deploy.local),
"Materializers")

View file

@ -105,8 +105,9 @@ import com.github.ghik.silencer.silent
attributes: Attributes): PhasedFusingActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val supervisorProps =
StreamSupervisor.props(attributes, haveShutDown).withDispatcher(context.props.dispatcher).withDeploy(Deploy.local)
StreamSupervisor.props(attributes, haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local)
// FIXME why do we need a global unique name for the child?
val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName())