diff --git a/akka-bench-jmh/src/main/scala/akka/stream/EmptySourceBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/EmptySourceBenchmark.scala new file mode 100644 index 0000000000..d149a56ec0 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/EmptySourceBenchmark.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ + +package akka.stream + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import akka.{ Done, NotUsed } +import org.openjdk.jmh.annotations._ + +import scala.concurrent._ +import scala.concurrent.duration._ + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class EmptySourceBenchmark { + implicit val system = ActorSystem("EmptySourceBenchmark") + val materializerSettings = ActorMaterializerSettings(system).withDispatcher("akka.test.stream-dispatcher") + implicit val materializer = ActorMaterializer(materializerSettings) + + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + val setup = Source.empty[String].toMat(Sink.ignore)(Keep.right) + + @Benchmark def empty(): Unit = + Await.result(setup.run(), Duration.Inf) + + + /* + (not serious benchmark, just sanity check: run on macbook 15, late 2013) + + While it was a PublisherSource: + [info] EmptySourceBenchmark.empty thrpt 10 11.219 ± 6.498 ops/ms + + Rewrite to GraphStage: + [info] EmptySourceBenchmark.empty thrpt 10 17.556 ± 2.865 ops/ms + + */ +} diff --git a/akka-docs/rst/scala/code/docs/actor/LOLSPEC.scala b/akka-docs/rst/scala/code/docs/actor/LOLSPEC.scala new file mode 100644 index 0000000000..5abd0e45a0 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/actor/LOLSPEC.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package docs.actor + +import akka.actor.ActorLogging +import scala.language.postfixOps + +import akka.Done +import akka.actor.{ ActorRef, CoordinatedShutdown } +import akka.testkit._ +import akka.util._ + +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.actor.Actor +import akka.actor.Props +import akka.testkit.{ ImplicitSender, TestKit } + +class LOLSPEC extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { + + "schedule a one-off task" in { + val miku = system.actorOf(Props(new Actor { + def receive = { + case x => + println(s"sender() = ${sender()}") + } + })) + + system.eventStream.subscribe(miku, classOf[Object]) + + system.eventStream.publish("Hello!") + } + +} + diff --git a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala index c867b44a3e..058c591aca 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala @@ -4,11 +4,11 @@ package akka.remote.serialization -import akka.actor.{Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, OneForOneStrategy, Props, SupervisorStrategy} -import akka.remote.{DaemonMsgCreate, RemoteScope} -import akka.routing.{FromConfig, RoundRobinPool} +import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, OneForOneStrategy, Props, SupervisorStrategy } +import akka.remote.{ DaemonMsgCreate, RemoteScope } +import akka.routing.{ FromConfig, RoundRobinPool } import akka.serialization.SerializationExtension -import akka.testkit.{AkkaSpec, TestKit} +import akka.testkit.{ AkkaSpec, TestKit } import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 5f6293f251..585a43617e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -3,12 +3,15 @@ */ package akka.stream.impl -import org.reactivestreams.{ Subscriber, Publisher, Subscription } +import akka.annotation.InternalApi +import org.reactivestreams.{ Publisher, Subscriber, Subscription } + import scala.concurrent.{ ExecutionContext, Promise } /** * INTERNAL API */ +@InternalApi private[akka] case object EmptyPublisher extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 6f0443dea1..3db2be2bf1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -16,6 +16,8 @@ import scala.concurrent.{ Future, Promise } import akka.Done import java.util.concurrent.CompletionStage +import akka.annotation.InternalApi + import scala.compat.java8.FutureConverters._ import scala.util.Try import scala.util.control.NonFatal @@ -396,3 +398,22 @@ final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphSt override def toString = "LazySource" } +/** INTERNAL API */ +@InternalApi +final object EmptySource extends GraphStage[SourceShape[Nothing]] { + val out = Outlet[Nothing]("EmptySource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes = DefaultAttributes.lazySource + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def preStart(): Unit = completeStage() + override def onPull(): Unit = completeStage() + + setHandler(out, this) + } + + override def toString = "EmptySource" +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 930b7fda25..e34d09ebf8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -331,11 +331,7 @@ object Source { */ def empty[T]: Source[T, NotUsed] = _empty private[this] val _empty: Source[Nothing, NotUsed] = - new Source( - new PublisherSource[Nothing]( - EmptyPublisher, - DefaultAttributes.emptySource, - shape("EmptySource"))) + Source.fromGraph(EmptySource) /** * Create a `Source` which materializes a [[scala.concurrent.Promise]] which controls what element