diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 0b5d0889b1..99c8be2804 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -25,6 +25,7 @@ import scala.collection.immutable import scala.compat.java8.FutureConverters import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration +import scala.concurrent.blocking import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } import scala.util.control.{ ControlThrowable, NonFatal } import scala.util.{ Failure, Success, Try } @@ -1125,7 +1126,10 @@ private[akka] class ActorSystemImpl( @tailrec private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { case c: CountDownLatch => - c.await(); findExtension(ext) //Registration in process, await completion and retry + blocking { + c.await() + } + findExtension(ext) //Registration in process, await completion and retry case t: Throwable => throw t //Initialization failed, throw same again case other => other.asInstanceOf[T] //could be a T or null, in which case we return the null as T diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala index 1fed257d32..1f4ae41f4e 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala @@ -13,6 +13,7 @@ import akka.dispatch.MailboxType import akka.actor.ActorRef import akka.actor.ActorRefWithCell import akka.actor.Actor +import akka.stream.impl.MaterializerGuardian /** * INTERNAL API @@ -34,7 +35,7 @@ private[akka] final case class StreamTestDefaultMailbox() s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]") // StreamTcpManager is allowed to use another dispatcher assert( - !actorClass.getName.startsWith("akka.stream."), + actorClass == classOf[MaterializerGuardian] || !actorClass.getName.startsWith("akka.stream."), s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " + "Did you forget to define `props.withDispatcher` when creating the actor? " + "Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " + diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index be159e44a5..177b4df4d2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -5,23 +5,83 @@ package akka.stream import akka.Done +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider import akka.actor.{ Actor, ActorSystem, PoisonPill, Props } import akka.stream.ActorMaterializerSpec.ActorWithMaterializer import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.scaladsl.{ Sink, Source } import akka.stream.testkit.{ StreamSpec, TestPublisher } +import akka.testkit.TestKit import akka.testkit.{ ImplicitSender, TestProbe } import com.github.ghik.silencer.silent +import com.typesafe.config.ConfigFactory import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.{ Failure, Try } +object IndirectMaterializerCreation extends ExtensionId[IndirectMaterializerCreation] with ExtensionIdProvider { + def createExtension(system: ExtendedActorSystem): IndirectMaterializerCreation = + new IndirectMaterializerCreation(system) + + def lookup(): ExtensionId[IndirectMaterializerCreation] = this +} + +@silent +class IndirectMaterializerCreation(ex: ExtendedActorSystem) extends Extension { + // extension instantiation blocked on materializer (which has Await.result inside) + implicit val mat = ActorMaterializer()(ex) + + def futureThing(n: Int): Future[Int] = { + Source.single(n).runWith(Sink.head) + } + +} + @silent class ActorMaterializerSpec extends StreamSpec with ImplicitSender { "ActorMaterializer" must { + "not suffer from deadlock" in { + val n = 4 + implicit val deadlockSystem = ActorSystem( + "ActorMaterializerSpec-deadlock", + ConfigFactory.parseString(s""" + akka.actor.default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = $n + parallelism-factor = 0.5 + parallelism-max = $n + } + } + # undo stream testkit specific dispatcher and run "normally" + akka.actor.default-mailbox.mailbox-type = "akka.dispatch.UnboundedMailbox" + akka.stream.materializer.dispatcher = "akka.actor.default-dispatcher" + """)) + try { + import deadlockSystem.dispatcher + + // tricky part here is that the concurrent access is to the extension + // so the threads are indirectly blocked and not waiting for the Await.result(ask) directly. + val result = Future.sequence((1 to (n + 1)).map(n => + Future { + IndirectMaterializerCreation(deadlockSystem).mat + })) + + // with starvation these fail + result.futureValue.size should ===(n + 1) + + } finally { + TestKit.shutdownActorSystem(deadlockSystem) + } + } + "report shutdown status properly" in { val m = ActorMaterializer.create(system) diff --git a/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala b/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala index fee8ae3ab9..1916c7d5b7 100644 --- a/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala @@ -11,6 +11,7 @@ import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.annotation.InternalApi +import akka.dispatch.Dispatchers import akka.stream.impl.MaterializerGuardian import scala.concurrent.Await @@ -52,7 +53,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension { private val materializerGuardian = system.systemActorOf( MaterializerGuardian .props(systemMaterializerPromise, materializerSettings) - .withDispatcher(materializerSettings.dispatcher) + // #28037 run on internal dispatcher to make sure default dispatcher starvation doesn't stop materializer creation + .withDispatcher(Dispatchers.InternalDispatcherId) .withDeploy(Deploy.local), "Materializers") diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 800ff9fdb0..e0c7291941 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -105,8 +105,9 @@ import com.github.ghik.silencer.silent attributes: Attributes): PhasedFusingActorMaterializer = { val haveShutDown = new AtomicBoolean(false) + val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher val supervisorProps = - StreamSupervisor.props(attributes, haveShutDown).withDispatcher(context.props.dispatcher).withDeploy(Deploy.local) + StreamSupervisor.props(attributes, haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local) // FIXME why do we need a global unique name for the child? val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName())