diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 1b167aa9d2..612df3b927 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -652,7 +652,7 @@ akka { # `akka.actor.serialization-identifiers."FQCN" = ID` # where `FQCN` is fully qualified class name of the serializer implementation # and `ID` is globally unique serializer identifier number. - # Identifier values from 0 to 16 are reserved for Akka internal usage. + # Identifier values from 0 to 40 are reserved for Akka internal usage. serialization-identifiers { "akka.serialization.JavaSerializer" = 1 "akka.serialization.ByteArraySerializer" = 4 diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 873577f3b5..bf3cfcae97 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -5,13 +5,14 @@ package akka.actor.dungeon import scala.annotation.tailrec -import akka.dispatch.{ Mailbox, Envelope } +import akka.dispatch.{ Envelope, Mailbox } import akka.dispatch.sysmsg._ import akka.event.Logging.Error import akka.util.Unsafe import akka.actor._ import akka.serialization.SerializationExtension -import scala.util.control.NonFatal + +import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.Exception.Catcher import akka.dispatch.MailboxType import akka.dispatch.ProducesMessageQueue @@ -106,7 +107,11 @@ private[akka] trait Dispatch { this: ActorCell ⇒ system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "interrupted during message send")) Thread.currentThread.interrupt() case NonFatal(e) ⇒ - system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send")) + val message = e match { + case n: NoStackTrace ⇒ "swallowing exception during message send: " + n.getMessage + case _ ⇒ "swallowing exception during message send" // stack trace includes message + } + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), message)) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 928d25a5b4..5032ec9c29 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -39,7 +39,7 @@ trait Serializer { /** * Completely unique value to identify this implementation of Serializer, used to optimize network traffic. - * Values from 0 to 16 are reserved for Akka internal usage. + * Values from 0 to 40 are reserved for Akka internal usage. */ def identifier: Int @@ -105,7 +105,7 @@ abstract class SerializerWithStringManifest extends Serializer { /** * Completely unique value to identify this implementation of Serializer, used to optimize network traffic. - * Values from 0 to 16 are reserved for Akka internal usage. + * Values from 0 to 40 are reserved for Akka internal usage. */ def identifier: Int diff --git a/akka-bench-jmh/src/main/scala/akka/stream/EmptySourceBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/EmptySourceBenchmark.scala new file mode 100644 index 0000000000..d149a56ec0 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/EmptySourceBenchmark.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ + +package akka.stream + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import akka.{ Done, NotUsed } +import org.openjdk.jmh.annotations._ + +import scala.concurrent._ +import scala.concurrent.duration._ + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class EmptySourceBenchmark { + implicit val system = ActorSystem("EmptySourceBenchmark") + val materializerSettings = ActorMaterializerSettings(system).withDispatcher("akka.test.stream-dispatcher") + implicit val materializer = ActorMaterializer(materializerSettings) + + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + val setup = Source.empty[String].toMat(Sink.ignore)(Keep.right) + + @Benchmark def empty(): Unit = + Await.result(setup.run(), Duration.Inf) + + + /* + (not serious benchmark, just sanity check: run on macbook 15, late 2013) + + While it was a PublisherSource: + [info] EmptySourceBenchmark.empty thrpt 10 11.219 ± 6.498 ops/ms + + Rewrite to GraphStage: + [info] EmptySourceBenchmark.empty thrpt 10 17.556 ± 2.865 ops/ms + + */ +} diff --git a/akka-docs/rst/dev/documentation.rst b/akka-docs/rst/dev/documentation.rst index 936da82f96..e493d92ef3 100644 --- a/akka-docs/rst/dev/documentation.rst +++ b/akka-docs/rst/dev/documentation.rst @@ -144,3 +144,72 @@ If you get the error "unknown locale: UTF-8" when generating the documentation t export LANG=en_US.UTF-8 export LC_ALL=en_US.UTF-8 +Installing Sphinx on Linux +-------------------------- +Install Python with your package manager: + +:: + + apt-get install python # for Debian based systems + yum install python # for CentOS/RHEL systems + +This will automatically add Python executable to your $PATH and pip is a part of the default Python installation. Remember you need `sudo` rights to run this command. + +More information in case of trouble: +https://packaging.python.org/install_requirements_linux/ + +Install Sphinx: + +:: + + apt-get install python-sphinx # for Debian based systems + #alternatively + pip install sphinx + +For other Linux systems please check Sphinx website: +http://www.sphinx-doc.org/en/stable/install.html#other-linux-distributions + +Install TextLive: + +:: + + apt-get install texlive-latex-base texlive-latex-extra texlive-latex-recommended + # additionally you may need xzdec + apt-get install xzdec + +In case you get the following error: + + + + Unknown directive ...containerchecksum c59200574a316416a23695c258edf3a32531fbda43ccdc09360ee105c3f07f9fb77df17c4ba4c2ea4f3a5ea6667e064b51e3d8c2fe6c984ba3e71b4e32716955... , please fix it! at /usr/share/texlive/tlpkg/TeXLive/TLPOBJ.pm line 210, <$retfh> line 5579. + +you need to specify you want to continue using the 2015 version: + +:: + + tlmgr option repository ftp://tug.org/historic/systems/texlive/2015/tlnet-final + +Add missing tex packages: + +:: + + sudo tlmgr update --self + sudo tlmgr install titlesec + sudo tlmgr install framed + sudo tlmgr install threeparttable + sudo tlmgr install wrapfig + sudo tlmgr install helvetic + sudo tlmgr install courier + sudo tlmgr install multirow + sudo tlmgr install capt-of + sudo tlmgr install needspace + sudo tlmgr install eqparbox + sudo tlmgr install environ + sudo tlmgr install trimspaces + +If you get the error "unknown locale: UTF-8" when generating the documentation the solution is to define the following environment variables: + +:: + + export LANG=en_US.UTF-8 + export LC_ALL=en_US.UTF-8 diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index f7ef257843..7f942c2412 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -125,7 +125,7 @@ seed nodes in the existing cluster. If you don't configure seed nodes you need to join the cluster programmatically or manually. -Manual joining can be performed by using ref:`cluster_jmx_java` or :ref:`cluster_http_java`. +Manual joining can be performed by using :ref:`cluster_jmx_java` or :ref:`cluster_http_java`. Joining programmatically can be performed with ``Cluster.get(system).join``. Unsuccessful join attempts are automatically retried after the time period defined in configuration property ``retry-unsuccessful-join-after``. Retries can be disabled by setting the property to ``off``. @@ -173,9 +173,9 @@ can be performed automatically or manually. By default it must be done manually, It can also be performed programmatically with ``Cluster.get(system).down(address)``. A pre-packaged solution for the downing problem is provided by -`Split Brain Resolver `_, +`Split Brain Resolver `_, which is part of the `Lightbend Reactive Platform `_. -If you don’t use RP, you should anyway carefully read the `documentation `_ +If you don’t use RP, you should anyway carefully read the `documentation `_ of the Split Brain Resolver and make sure that the solution you are using handles the concerns described there. diff --git a/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java b/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java index d6b0ac7c20..fc136366ee 100644 --- a/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java +++ b/akka-docs/rst/java/code/docs/remoting/RemoteDeploymentDocTest.java @@ -4,6 +4,7 @@ package docs.remoting; import akka.testkit.AkkaJUnitActorSystemResource; +import docs.AbstractJavaTest; import org.junit.ClassRule; import org.junit.Test; @@ -21,7 +22,7 @@ import akka.remote.RemoteScope; import akka.actor.AbstractActor; -public class RemoteDeploymentDocTest { +public class RemoteDeploymentDocTest extends AbstractJavaTest { public static class SampleActor extends AbstractActor { @Override diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java index f60c87c320..46e07d4197 100644 --- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java +++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java @@ -29,7 +29,7 @@ public class SerializationDocTest { // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, - // 0 - 16 is reserved by Akka itself + // 0 - 40 is reserved by Akka itself @Override public int identifier() { return 1234567; } @@ -80,7 +80,7 @@ public class SerializationDocTest { // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, - // 0 - 16 is reserved by Akka itself + // 0 - 40 is reserved by Akka itself @Override public int identifier() { return 1234567; } diff --git a/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java b/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java index 6e41c5557e..e58a55318c 100644 --- a/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import docs.AbstractJavaTest; import scala.concurrent.duration.Duration; //#other-imports @@ -29,7 +30,7 @@ import org.junit.*; * This class is not meant to be run as a test in the test suite, but it * is set up such that it can be run interactively from within an IDE. */ -public class QuickStartDocTest { +public class QuickStartDocTest extends AbstractJavaTest { static //#create-materializer diff --git a/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java b/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java index 7c775f4be1..d48034863d 100644 --- a/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java +++ b/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java @@ -14,10 +14,11 @@ import akka.testkit.TestProbe; import com.typesafe.config.ConfigFactory; +import docs.AbstractJavaTest; import org.junit.ClassRule; import org.junit.Test; -public class ParentChildTest { +public class ParentChildTest extends AbstractJavaTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("TestKitDocTest", diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java index 472cf33068..3e9107daf6 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java @@ -6,6 +6,8 @@ package docs.testkit; import static org.junit.Assert.*; import akka.testkit.*; +import docs.AbstractJavaTest; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -24,7 +26,7 @@ import scala.concurrent.Future; import akka.testkit.TestActor.AutoPilot; import scala.concurrent.duration.Duration; -public class TestKitDocTest { +public class TestKitDocTest extends AbstractJavaTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = @@ -76,7 +78,7 @@ public class TestKitDocTest { final TestActorRef ref = TestActorRef.create(system, props, "myActor"); try { ref.receive(new Exception("expected")); - fail("expected an exception to be thrown"); + Assert.fail("expected an exception to be thrown"); } catch (Exception e) { assertEquals("expected", e.getMessage()); } diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java index a99ec38c6a..d2706e1b26 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitSampleTest.java @@ -4,6 +4,7 @@ package docs.testkit; //#fullsample +import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -16,7 +17,7 @@ import akka.actor.AbstractActor; import akka.testkit.JavaTestKit; import scala.concurrent.duration.Duration; -public class TestKitSampleTest { +public class TestKitSampleTest extends AbstractJavaTest { public static class SomeActor extends AbstractActor { ActorRef target = null; diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index b19b241d19..3618ccb16c 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -219,10 +219,10 @@ maximum stash capacity in the mailbox configuration:: Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, you may need to define a small stash capacity to ensure that the total number of stashed messages in the system -don't consume too much memory. Additionally, The persistent actor defines three strategies to handle failure when the +doesn't consume too much memory. Additionally, the persistent actor defines three strategies to handle failure when the internal stash capacity is exceeded. The default overflow strategy is the ``ThrowOverflowExceptionStrategy``, which -discards the current received message and throws a ``StashOverflowException``, causing actor restart if default -supervision strategy is used. you can override the ``internalStashOverflowStrategy`` method to return +discards the current received message and throws a ``StashOverflowException``, causing actor restart if the default +supervision strategy is used. You can override the ``internalStashOverflowStrategy`` method to return ``DiscardToDeadLetterStrategy`` or ``ReplyToStrategy`` for any "individual" persistent actor, or define the "default" for all persistent actors by providing FQCN, which must be a subclass of ``StashOverflowStrategyConfigurator``, in the persistence configuration:: @@ -233,7 +233,7 @@ persistence configuration:: The ``DiscardToDeadLetterStrategy`` strategy also has a pre-packaged companion configurator ``akka.persistence.DiscardConfigurator``. -You can also query default strategy via the Akka persistence extension singleton:: +You can also query the default strategy via the Akka persistence extension singleton:: Persistence.get(getContext().system()).defaultInternalStashOverflowStrategy(); diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 93131361c5..bc3fd378d8 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -119,7 +119,7 @@ seed nodes in the existing cluster. If you don't configure seed nodes you need to join the cluster programmatically or manually. -Manual joining can be performed by using ref:`cluster_jmx_scala` or :ref:`cluster_http_scala`. +Manual joining can be performed by using :ref:`cluster_jmx_scala` or :ref:`cluster_http_scala`. Joining programmatically can be performed with ``Cluster(system).join``. Unsuccessful join attempts are automatically retried after the time period defined in configuration property ``retry-unsuccessful-join-after``. Retries can be disabled by setting the property to ``off``. @@ -168,9 +168,9 @@ can be performed automatically or manually. By default it must be done manually, It can also be performed programmatically with ``Cluster(system).down(address)``. A pre-packaged solution for the downing problem is provided by -`Split Brain Resolver `_, +`Split Brain Resolver `_, which is part of the `Lightbend Reactive Platform `_. -If you don’t use RP, you should anyway carefully read the `documentation `_ +If you don’t use RP, you should anyway carefully read the `documentation `_ of the Split Brain Resolver and make sure that the solution you are using handles the concerns described there. diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala index 369e51c48c..85787c4eec 100644 --- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala @@ -25,7 +25,7 @@ package docs.serialization { // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, - // 0 - 16 is reserved by Akka itself + // 0 - 40 is reserved by Akka itself def identifier = 1234567 // "toBinary" serializes the given object to an Array of Bytes @@ -58,7 +58,7 @@ package docs.serialization { // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, - // 0 - 16 is reserved by Akka itself + // 0 - 40 is reserved by Akka itself def identifier = 1234567 // The manifest (type hint) that will be provided in the fromBinary method diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 4b346d28c4..e8628b9d84 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -108,8 +108,8 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { trait HiddenDefinitions { //#graph-dsl-broadcast - val writeAuthors: Sink[Author, Unit] = ??? - val writeHashtags: Sink[Hashtag, Unit] = ??? + val writeAuthors: Sink[Author, NotUsed] = ??? + val writeHashtags: Sink[Hashtag, NotUsed] = ??? //#graph-dsl-broadcast } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index f2168a9a22..728d68f442 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -218,10 +218,10 @@ maximum stash capacity in the mailbox configuration:: Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding, you may need to define a small stash capacity to ensure that the total number of stashed messages in the system -don't consume too much memory. Additionally, The persistent actor defines three strategies to handle failure when the +doesn't consume too much memory. Additionally, the persistent actor defines three strategies to handle failure when the internal stash capacity is exceeded. The default overflow strategy is the ``ThrowOverflowExceptionStrategy``, which -discards the current received message and throws a ``StashOverflowException``, causing actor restart if default -supervision strategy is used. you can override the ``internalStashOverflowStrategy`` method to return +discards the current received message and throws a ``StashOverflowException``, causing actor restart if the default +supervision strategy is used. You can override the ``internalStashOverflowStrategy`` method to return ``DiscardToDeadLetterStrategy`` or ``ReplyToStrategy`` for any "individual" persistent actor, or define the "default" for all persistent actors by providing FQCN, which must be a subclass of ``StashOverflowStrategyConfigurator``, in the persistence configuration:: @@ -232,7 +232,7 @@ persistence configuration:: The ``DiscardToDeadLetterStrategy`` strategy also has a pre-packaged companion configurator ``akka.persistence.DiscardConfigurator``. -You can also query default strategy via the Akka persistence extension singleton:: +You can also query the default strategy via the Akka persistence extension singleton:: Persistence(context.system).defaultInternalStashOverflowStrategy diff --git a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala index 33168fc15d..058c591aca 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/DaemonMsgCreateSerializerSpec.scala @@ -4,16 +4,15 @@ package akka.remote.serialization -import language.postfixOps -import akka.serialization.SerializationExtension -import com.typesafe.config.ConfigFactory -import akka.testkit.AkkaSpec -import akka.actor.{ Actor, ActorRef, Address, Deploy, ExtendedActorSystem, OneForOneStrategy, Props, SupervisorStrategy } +import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, OneForOneStrategy, Props, SupervisorStrategy } import akka.remote.{ DaemonMsgCreate, RemoteScope } import akka.routing.{ FromConfig, RoundRobinPool } -import akka.util.ByteString +import akka.serialization.SerializationExtension +import akka.testkit.{ AkkaSpec, TestKit } +import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ +import scala.language.postfixOps object DaemonMsgCreateSerializerSpec { @@ -81,33 +80,46 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { } "deserialize the old wire format with just class and field for props parameters (if possible)" in { - val serializer = new DaemonMsgCreateSerializer(system.asInstanceOf[ExtendedActorSystem]) + val system = ActorSystem("DaemonMsgCreateSerializer-old-wire-format", ConfigFactory.parseString( + """ + # in 2.4 this is off by default, but in 2.5+ its on so we wouldn't + # get the right set of serializers (and since the old wire protocol doesn't + # contain serializer ids that will go unnoticed with unpleasant consequences) + akka.actor.enable-additional-serialization-bindings = off + """)) - // the oldSnapshot was created with the version of DemonMsgCreateSerializer in Akka 2.4.17. See issue #22224. - // It was created with: - /* + try { + val serializer = new DaemonMsgCreateSerializer(system.asInstanceOf[ExtendedActorSystem]) + + // the oldSnapshot was created with the version of DemonMsgCreateSerializer in Akka 2.4.17. See issue #22224. + // It was created with: + /* import org.apache.commons.codec.binary.Hex.encodeHex val bytes = serializer.toBinary( DaemonMsgCreate(Props(classOf[MyActorWithParam], "a string"), Deploy.local, "/user/test", system.actorFor("/user"))) println(String.valueOf(encodeHex(bytes))) */ - val oldBytesHex = - "0a6a12020a001a48616b6b612e72656d6f74652e73657269616c697a6174696f" + - "6e2e4461656d6f6e4d736743726561746553657269616c697a6572537065632" + - "44d794163746f7257697468506172616d22086120737472696e672a106a6176" + - "612e6c616e672e537472696e67122f0a00222baced000573720016616b6b612" + - "e6163746f722e4c6f63616c53636f706524000000000000000102000078701a" + - "0a2f757365722f74657374222b0a29616b6b613a2f2f4461656d6f6e4d73674" + - "3726561746553657269616c697a6572537065632f75736572" + val oldBytesHex = + "0a7112020a001a48616b6b612e72656d6f74652e73657269616c697a617" + + "4696f6e2e4461656d6f6e4d736743726561746553657269616c697a6572" + + "53706563244d794163746f7257697468506172616d220faced000574000" + + "86120737472696e672a106a6176612e6c616e672e537472696e67122f0a" + + "00222baced000573720016616b6b612e6163746f722e4c6f63616c53636" + + "f706524000000000000000102000078701a0a2f757365722f7465737422" + + "2b0a29616b6b613a2f2f4461656d6f6e4d7367437265617465536572696" + + "16c697a6572537065632f75736572" - import org.apache.commons.codec.binary.Hex.decodeHex - val oldBytes = decodeHex(oldBytesHex.toCharArray) - val result = serializer.fromBinary(oldBytes, classOf[DaemonMsgCreate]) + import org.apache.commons.codec.binary.Hex.decodeHex + val oldBytes = decodeHex(oldBytesHex.toCharArray) + val result = serializer.fromBinary(oldBytes, classOf[DaemonMsgCreate]) - result match { - case dmc: DaemonMsgCreate ⇒ - dmc.props.args should ===(Seq("a string": Any)) + result match { + case dmc: DaemonMsgCreate ⇒ + dmc.props.args should ===(Seq("a string": Any)) + } + } finally { + TestKit.shutdownActorSystem(system) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 5f6293f251..585a43617e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -3,12 +3,15 @@ */ package akka.stream.impl -import org.reactivestreams.{ Subscriber, Publisher, Subscription } +import akka.annotation.InternalApi +import org.reactivestreams.{ Publisher, Subscriber, Subscription } + import scala.concurrent.{ ExecutionContext, Promise } /** * INTERNAL API */ +@InternalApi private[akka] case object EmptyPublisher extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 6f0443dea1..3db2be2bf1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -16,6 +16,8 @@ import scala.concurrent.{ Future, Promise } import akka.Done import java.util.concurrent.CompletionStage +import akka.annotation.InternalApi + import scala.compat.java8.FutureConverters._ import scala.util.Try import scala.util.control.NonFatal @@ -396,3 +398,22 @@ final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphSt override def toString = "LazySource" } +/** INTERNAL API */ +@InternalApi +final object EmptySource extends GraphStage[SourceShape[Nothing]] { + val out = Outlet[Nothing]("EmptySource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes = DefaultAttributes.lazySource + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def preStart(): Unit = completeStage() + override def onPull(): Unit = completeStage() + + setHandler(out, this) + } + + override def toString = "EmptySource" +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index ce2018d953..2e7a51a3c8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -328,10 +328,8 @@ object Source { */ def empty[T]: Source[T, NotUsed] = _empty private[this] val _empty: Source[Nothing, NotUsed] = - fromGraph(new PublisherSource[Nothing]( - EmptyPublisher, - DefaultAttributes.emptySource, - shape("EmptySource"))) + Source.fromGraph(EmptySource) + /** * Create a `Source` which materializes a [[scala.concurrent.Promise]] which controls what element * will be emitted by the Source. diff --git a/akka-typed/src/main/scala/akka/typed/Ask.scala b/akka-typed/src/main/scala/akka/typed/Ask.scala index ac94a9cd77..e27e7c74ae 100644 --- a/akka-typed/src/main/scala/akka/typed/Ask.scala +++ b/akka-typed/src/main/scala/akka/typed/Ask.scala @@ -45,7 +45,9 @@ object AskPattern { } private class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { - val (ref: ActorRef[U], future: Future[U], promiseRef: PromiseActorRef) = + + // Note: _promiseRef mustn't have a type pattern, since it can be null + private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = if (untyped.isTerminated) ( adapter.ActorRefAdapter[U](untyped.provider.deadLetters), @@ -59,6 +61,10 @@ object AskPattern { val b = adapter.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } + + val ref: ActorRef[U] = _ref + val future: Future[U] = _future + val promiseRef: PromiseActorRef = _promiseRef } private def askUntyped[T, U](target: ActorRef[T], untyped: InternalActorRef, timeout: Timeout, f: ActorRef[U] ⇒ T): Future[U] = { diff --git a/akka-typed/src/test/scala/akka/typed/AskSpec.scala b/akka-typed/src/test/scala/akka/typed/AskSpec.scala new file mode 100644 index 0000000000..ada32b825c --- /dev/null +++ b/akka-typed/src/test/scala/akka/typed/AskSpec.scala @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +import org.scalatest.concurrent.ScalaFutures + +import akka.util.Timeout +import akka.pattern.AskTimeoutException + +import ScalaDSL._ +import AskPattern._ + +object AskSpec { + + sealed trait Msg + final case class Foo(s: String)(val replyTo: ActorRef[String]) extends Msg + final case class Stop(replyTo: ActorRef[Unit]) extends Msg +} + +class AskSpec extends TypedSpec with ScalaFutures { + + import AskSpec._ + + trait Common { + + def system: ActorSystem[TypedSpec.Command] + + implicit def executor: ExecutionContext = + system.executionContext + + val behavior: Behavior[Msg] = Total[Msg] { + case foo @ Foo(_) ⇒ + foo.replyTo ! "foo" + Same + case Stop(r) ⇒ + r ! () + Stopped + } + + def `must fail the future if the actor is already terminated`(): Unit = { + val fut = for { + ref ← system ? TypedSpec.Create(behavior, "test1") + _ ← ref ? Stop + answer ← ref.?(Foo("bar"))(Timeout(1.second), implicitly) + } yield answer + (fut.recover { case _: AskTimeoutException ⇒ "" }).futureValue should ===("") + } + + def `must succeed when the actor is alive`(): Unit = { + val fut = for { + ref ← system ? TypedSpec.Create(behavior, "test2") + answer ← ref ? Foo("bar") + } yield answer + fut.futureValue should ===("foo") + } + } + + object `Ask pattern (native)` extends Common with NativeSystem + + object `Ask pattern (adapted)` extends Common with AdaptedSystem { + + import AskSpec._ + + /** See issue #19947 (MatchError with adapted ActorRef) */ + def `must fail the future if the actor doesn't exist`(): Unit = { + val noSuchActor: ActorRef[Msg] = system match { + case adaptedSys: adapter.ActorSystemAdapter[_] ⇒ + adapter.actorRefAdapter(adaptedSys.untyped.provider.resolveActorRef("/foo/bar")) + case _ ⇒ + fail("this test must only run in an adapted actor system") + } + val fut = for { + answer ← noSuchActor.?(Foo("bar"))(Timeout(1.second), implicitly) + } yield answer + (fut.recover { case _: AskTimeoutException ⇒ "" }).futureValue should ===("") + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 565864e283..ca74ba1c35 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { val sslConfigVersion = "0.2.1" val slf4jVersion = "1.7.23" val scalaXmlVersion = "1.0.6" - val aeronVersion = "1.2.0" + val aeronVersion = "1.2.3" val Versions = Seq( crossScalaVersions := Seq("2.11.8", "2.12.1"), diff --git a/project/build.properties b/project/build.properties index 43b8278c68..27e88aa115 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.11 +sbt.version=0.13.13