use blocking-io-dispatcher correctly in OutputStreamSource, #20666

* it used the materializer dispatcher for the blocking take
* it was leaking threads (blocking threads) when when materializer was shutdown abruptly
This commit is contained in:
Patrik Nordwall 2016-05-31 08:47:22 +02:00
parent be448e9fbb
commit b97a72c773
4 changed files with 74 additions and 20 deletions

View file

@ -29,7 +29,7 @@ public class OutputStreamSinkTest extends StreamTest {
} }
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSink", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSinkTest",
Utils.UnboundedMailboxConfig()); Utils.UnboundedMailboxConfig());
@Test @Test
public void mustSignalFailureViaIoResult() throws Exception { public void mustSignalFailureViaIoResult() throws Exception {
@ -44,7 +44,7 @@ public class OutputStreamSinkTest extends StreamTest {
} }
}; };
final CompletionStage<IOResult> resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer); final CompletionStage<IOResult> resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer);
final IOResult result = resultFuture.toCompletableFuture().get(300, TimeUnit.MILLISECONDS); final IOResult result = resultFuture.toCompletableFuture().get(3000, TimeUnit.MILLISECONDS);
assertFalse(result.wasSuccessful()); assertFalse(result.wasSuccessful());
assertTrue(result.getError().getMessage().equals("Can't accept more data.")); assertTrue(result.getError().getMessage().equals("Can't accept more data."));

View file

@ -29,11 +29,11 @@ public class OutputStreamSourceTest extends StreamTest {
} }
@ClassRule @ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSource", public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("OutputStreamSourceTest2",
Utils.UnboundedMailboxConfig()); Utils.UnboundedMailboxConfig());
@Test @Test
public void mustSendEventsViaOutputStream() throws Exception { public void mustSendEventsViaOutputStream() throws Exception {
final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); final FiniteDuration timeout = FiniteDuration.create(3000, TimeUnit.MILLISECONDS);
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout); final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout);
@ -45,6 +45,8 @@ public class OutputStreamSourceTest extends StreamTest {
})).run(materializer); })).run(materializer);
s.write("a".getBytes()); s.write("a".getBytes());
assertEquals(ByteString.fromString("a"), probe.receiveOne(timeout)); assertEquals(ByteString.fromString("a"), probe.receiveOne(timeout));
s.close(); s.close();

View file

@ -31,7 +31,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings) implicit val materializer = ActorMaterializer(settings)
val timeout = 300.milliseconds val timeout = 3.seconds
val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte]) val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte])
val byteString = ByteString(bytesArray) val byteString = ByteString(bytesArray)
@ -41,6 +41,16 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
def expectSuccess[T](f: Future[T], value: T) = def expectSuccess[T](f: Future[T], value: T) =
Await.result(f, remainingOrDefault) should be(value) Await.result(f, remainingOrDefault) should be(value)
def assertNoBlockedThreads(): Unit = {
def threadsBlocked =
ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).toSeq
.filter(t t.getThreadName.startsWith("OutputStreamSourceSpec") &&
t.getLockName != null &&
t.getLockName.startsWith("java.util.concurrent.locks.AbstractQueuedSynchronizer"))
awaitAssert(threadsBlocked should ===(Seq()), 3.seconds)
}
"OutputStreamSource" must { "OutputStreamSource" must {
"read bytes from OutputStream" in assertAllStagesStopped { "read bytes from OutputStream" in assertAllStagesStopped {
val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run
@ -156,11 +166,11 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
.withAttributes(inputBuffer(0, 0)) .withAttributes(inputBuffer(0, 0))
.runWith(Sink.head) .runWith(Sink.head)
/* /*
With Sink.head we test the code path in which the source With Sink.head we test the code path in which the source
itself throws an exception when being materialized. If itself throws an exception when being materialized. If
Sink.ignore is used, the same exception is thrown by Sink.ignore is used, the same exception is thrown by
Materializer. Materializer.
*/ */
} }
} }
@ -175,13 +185,22 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
sub.request(1) sub.request(1)
sub.cancel() sub.cancel()
def threadsBlocked = assertNoBlockedThreads()
ManagementFactory.getThreadMXBean.dumpAllThreads(true, true).toSeq }
.filter(t t.getThreadName.startsWith("OutputStreamSourceSpec") &&
t.getLockName != null &&
t.getLockName.startsWith("java.util.concurrent.locks.AbstractQueuedSynchronizer"))
awaitAssert(threadsBlocked should ===(Seq()), 3.seconds) "not leave blocked threads when materializer shutdown" in {
val materializer2 = ActorMaterializer(settings)
val (outputStream, probe) = StreamConverters.asOutputStream(timeout)
.toMat(TestSink.probe[ByteString])(Keep.both).run()(materializer2)
val sub = probe.expectSubscription()
// triggers a blocking read on the queue
// and then shutdown the materializer before we got anything
sub.request(1)
materializer2.shutdown()
assertNoBlockedThreads()
} }
} }
} }

View file

@ -6,18 +6,21 @@ package akka.stream.impl.io
import java.io.{ IOException, OutputStream } import java.io.{ IOException, OutputStream }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue }
import akka.stream.{ Outlet, SourceShape, Attributes } import akka.stream.{ Outlet, SourceShape, Attributes }
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.impl.io.OutputStreamSourceStage._
import akka.stream.stage._ import akka.stream.stage._
import akka.util.ByteString import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Await, Future, Promise }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.ActorAttributes.Dispatcher
import scala.concurrent.ExecutionContext
import akka.stream.ActorMaterializer
private[stream] object OutputStreamSourceStage { private[stream] object OutputStreamSourceStage {
sealed trait AdapterToStageMessage sealed trait AdapterToStageMessage
@ -40,6 +43,9 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = {
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val dispatcherId = inheritedAttributes.get[Dispatcher](IODispatcher).dispatcher
require(maxBuffer > 0, "Buffer size must be greater than 0") require(maxBuffer > 0, "Buffer size must be greater than 0")
val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer)
@ -49,6 +55,9 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
var flush: Option[Promise[Unit]] = None var flush: Option[Promise[Unit]] = None
var close: Option[Promise[Unit]] = None var close: Option[Promise[Unit]] = None
private var dispatcher: ExecutionContext = null // set in preStart
private var blockingThread: Thread = null // for postStop interrupt
private val downstreamCallback: AsyncCallback[Try[ByteString]] = private val downstreamCallback: AsyncCallback[Try[ByteString]] =
getAsyncCallback { getAsyncCallback {
case Success(elem) onPush(elem) case Success(elem) onPush(elem)
@ -102,6 +111,11 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
sendResponseIfNeed() sendResponseIfNeed()
} }
override def preStart(): Unit = {
dispatcher = ActorMaterializer.downcast(materializer).system.dispatchers.lookup(dispatcherId)
super.preStart()
}
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
override def onDownstreamFinish(): Unit = { override def onDownstreamFinish(): Unit = {
//assuming there can be no further in messages //assuming there can be no further in messages
@ -112,10 +126,29 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
completeStage() completeStage()
} }
override def onPull(): Unit = { override def onPull(): Unit = {
implicit val ex = interpreter.materializer.executionContext implicit val ec = dispatcher
Future(dataQueue.take()).onComplete(downstreamCallback.invoke) Future {
// keep track of the thread for postStop interrupt
blockingThread = Thread.currentThread()
try {
dataQueue.take()
} catch {
case _: InterruptedException
Thread.interrupted()
ByteString()
} finally {
blockingThread = null
}
}.onComplete(downstreamCallback.invoke)
} }
}) })
override def postStop(): Unit = {
// interrupt any pending blocking take
if (blockingThread != null)
blockingThread.interrupt()
super.postStop()
}
} }
(logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout)) (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout))
} }