From 800923522a840d7af7e6196a805917986bea8036 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 30 Jan 2018 00:27:40 +0900 Subject: [PATCH] =str #24413 make sure to check if available to push (#24434) * =str #24413 make sure to check if available to push * address review * Update SourceRefBenchmark.scala --- .../akka/stream/SourceRefBenchmark.scala | 98 +++++++++++++++++++ .../stream/impl/streamref/SourceRefImpl.scala | 17 ++-- 2 files changed, 108 insertions(+), 7 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala diff --git a/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala new file mode 100644 index 0000000000..aacf2a0acf --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/SourceRefBenchmark.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ + +package akka.stream + +import java.util.concurrent.{ Semaphore, TimeUnit } + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.impl.fusing.GraphStages +import akka.stream.scaladsl._ +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ +import org.reactivestreams._ + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Success + +/* + Just a brief reference run (3.1 GHz Intel Core i7, MacBook Pro late 2017): + [info] SourceRefBenchmark.source_ref_100k_elements thrpt 10 724650.336 ± 233643.256 ops/s + */ +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class SourceRefBenchmark { + + val config = ConfigFactory.parseString( + """ + akka { + log-config-on-start = off + log-dead-letters-during-shutdown = off + loglevel = "WARNING" + }""".stripMargin + ).withFallback(ConfigFactory.load()) + + implicit val system = ActorSystem("test", config) + + implicit val materializer: ActorMaterializer = ActorMaterializer() + + final val successMarker = Success(1) + final val successFailure = Success(new Exception) + + // safe to be benchmark scoped because the flows we construct in this bench are stateless + var sourceRef: SourceRef[Int] = _ + + // @Param(Array("16", "32", "128")) + // var initialInputBufferSize = 0 + + @Setup(Level.Invocation) + def setup(): Unit = { + val sourcePublisher = new Publisher[Int] { + override def subscribe(s: Subscriber[_ >: Int]): Unit = { + val sub = new Subscription { + var counter = 0 // Piggyback on caller thread, no need for volatile + + override def request(n: Long): Unit = { + var i = n + while (i > 0) { + s.onNext(counter) + counter += 1 + if (counter == 100000) { + s.onComplete() + return + } + i -= 1 + } + } + + override def cancel(): Unit = () + } + + s.onSubscribe(sub) + } + } + + sourceRef = Await.result(Source.fromPublisher(sourcePublisher).runWith(StreamRefs.sourceRef()), 10.seconds) + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(100000) + def source_ref_100k_elements(): Unit = { + val lock = new Semaphore(1) // todo rethink what is the most lightweight way to await for a streams completion + lock.acquire() + + sourceRef.source.runWith(Sink.onComplete(_ ⇒ lock.release())) + + lock.acquire() + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index a5e62d7c8b..b344e6dee7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -119,7 +119,7 @@ private[stream] final class SourceRefStageImpl[Out]( } } - def scheduleDemandRedelivery() = + def scheduleDemandRedelivery(): Unit = scheduleOnce(DemandRedeliveryTimerKey, settings.demandRedeliveryInterval) override protected def onTimer(timerKey: Any): Unit = timerKey match { @@ -183,17 +183,20 @@ private[stream] final class SourceRefStageImpl[Out]( } def tryPush(): Unit = - if (receiveBuffer.nonEmpty) push(out, receiveBuffer.dequeue()) - else if ( /* buffer is empty && */ completed) completeStage() + if (receiveBuffer.nonEmpty && isAvailable(out)) push(out, receiveBuffer.dequeue()) + else if (receiveBuffer.isEmpty && completed) completeStage() private def onReceiveElement(payload: Out): Unit = { localRemainingRequested -= 1 - if (receiveBuffer.isEmpty && isAvailable(out)) + if (receiveBuffer.isEmpty && isAvailable(out)) { push(out, payload) - else if (receiveBuffer.isFull) - throw new IllegalStateException(s"Attempted to overflow buffer! Capacity: ${receiveBuffer.capacity}, incoming element: $payload, localRemainingRequested: ${localRemainingRequested}, localCumulativeDemand: ${localCumulativeDemand}") - else + } else if (receiveBuffer.isFull) { + throw new IllegalStateException(s"Attempted to overflow buffer! " + + s"Capacity: ${receiveBuffer.capacity}, incoming element: $payload, " + + s"localRemainingRequested: $localRemainingRequested, localCumulativeDemand: $localCumulativeDemand") + } else { receiveBuffer.enqueue(payload) + } } /** @throws InvalidPartnerActorException when partner ref is invalid */