Quarkus 是构建基于 Java 的应用程序的基础; 无论是桌面、服务器还是云。 可以在 https://fedoramagazine.org/using-the-quarkus-framework-on-fedora-silverblue-just-a-quick-look/ 找到一篇关于使用的优秀文章。 本文是使用 Quarkus 和 Mutiny 编码异步进程的入门读物。
那么什么是兵变? Mutiny 允许在事件驱动的流中传输对象。 流可能来自本地进程或远程数据库,如数据库。 Mutiny 流由 Uni 或 Multi 对象完成。 我们使用 Uni 流式传输一个对象——一个包含许多整数的 List。 订阅模式启动流。
执行传统程序并在继续之前返回结果。 Mutiny 可以轻松支持非阻塞代码同时运行进程。 RxJava、ReactiveX 甚至原生 Java 都是备选方案。 Mutiny 易于使用(暴露的 API 很少),它是许多 Quarkus 扩展的默认设置。 使用的两个扩展是 quarkus-mutiny 和 quarkus-vertx。 Vert.x 是 Quarkus 封装的底层框架。 Promise 类由 quarkus-vertx 提供。 当进程完成时,promise 会返回一个 Uni 流。 首先,安装 Java JDK 和 Maven。
引导程序
最低要求是带有 Maven 的 Java-11 或 Java-17。
使用 Java-11:
$ sudo dnf install -y java-11-openjdk-devel maven
使用 Java-17:
$ sudo dnf install -y java-17-openjdk-devel maven
使用下面的 Maven 调用引导 Quarkus 和 Mutiny。 不包含扩展 quarkus-vertx 以演示如何添加其他扩展。 在执行之前找到适当的目录。 目录 mutiny-demo 将与初始应用程序一起创建。
$ mvn io.quarkus.platform:quarkus-maven-plugin:2.6.2.Final:create -DprojectGroupId=fedoramag -DprojectArtifactId=mutiny-demo -DprojectVersion=1.0.0 -DclassName="org.demo.mag.Startup" -Dextensions="mutiny" -DbuildTool=gradle
现在 Gradle 已启动,可以添加其他扩展。 在 mutiny-demo 目录中执行:
$ ./gradlew addExtension --extensions="quarkus-vertx"
要查看所有可用的扩展,请执行:
$ ./gradlew listExtensions
要执行所有已定义的 Gradle 任务:
$ ./gradlew tasks
叛变代码
这 Quarkus 引导程序上的 className 条目是 org.demo.mag.Startup,它创建文件 src/main/java/org/demo/map/Startup.java。 将内容替换为以下代码:
package org.demo.mag; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.function.IntSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.QuarkusApplication; import io.quarkus.runtime.annotations.QuarkusMain; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.tuples.Tuple2; import io.vertx.mutiny.core.Promise; @QuarkusMain public class Startup implements QuarkusApplication { public static void main(String... args) { Quarkus.run(Startup.class, args); } @Override public int run(String... args) throws InterruptedException, ExecutionException { final Promise<String> finalMessage = Promise.promise(); final String elapsedTime = "Elapsed time for asynchronous method: %d milliseconds"; final int[] syncResults = {0}; Application.runTraditionalMethod(); final Long millis = System.currentTimeMillis(); Promise<List<Integer>> promiseRange = Application.getRange(115000); Promise<Tuple2<Promise<List<Integer>>, Promise<List<Integer>>>> promiseCombined = Application.getCombined(10000, 15000); Promise<List<Integer>> promiseReverse = Application.getReverse(24000); /* * Retrieve the Uni stream and on the complete event obtain the List<Integer> */ promiseRange.future().onItem().invoke(list -> { System.out.println("Primes Range: " + list.size()); if(syncResults[0] == 1) { finalMessage.complete(String.format(elapsedTime, System.currentTimeMillis() - millis)); } { syncResults[0] = 2; } return; }).subscribeAsCompletionStage(); promiseReverse.future().onItem().invoke(list -> { System.out.println("Primes Reverse: " + list.size()); return; }).subscribeAsCompletionStage(); /* * Notice that this finishes before the other two prime generators(smaller lists). */ promiseCombined.future().onItem().invoke(p -> { /* * Notice that "Combined Range" displays first */ p.getItem2().future().invoke(reverse -> { System.out.println("Combined Reverse: " + reverse.size()); return; }).subscribeAsCompletionStage(); p.getItem1().future().invoke(range -> { System.out.println("Combined Range: " + range.size()); /* * Nesting promises to get multple results together */ p.getItem2().future().invoke(reverse -> { System.out.println(String.format("Asserting that expected primes are equal: %d -- %d", range.get(0), reverse.get(reverse.size() - 1))); assert range.get(0) == reverse.get(reverse.size() - 1) : "Generated primes incorrect"; if(syncResults[0] == 2) { finalMessage.complete(String.format(elapsedTime, System.currentTimeMillis() - millis)); } else { syncResults[0] = 1; } return; }).subscribeAsCompletionStage(); return; }).subscribeAsCompletionStage(); return; }).subscribeAsCompletionStage(); // Note: on very fast machines this may not display first. System.out.println("This should display first - indicating asynchronous code."); // blocking for final message String elapsedMessage = finalMessage.futureAndAwait(); System.out.println(elapsedMessage); return 0; } public static class Application { public static Promise<List<Integer>> getRange(int n) { final Promise<List<Integer>> promise = Promise.promise(); // non-blocking - this is only for demonstration(emulating some remote call) new Thread(() -> { try { /* * RangeGeneratedPrimes.primes is blocking, only returns when done */ promise.complete(RangeGeneratedPrimes.primes(n)); } catch (Exception exception) { Thread.currentThread().interrupt(); } }).start(); return promise; } public static Promise<List<Integer>> getReverse(int n) { final Promise<List<Integer>> promise = Promise.promise(); new Thread(() -> { try { // Generating a new object stream promise.complete(ReverseGeneratedPrimes.primes(n)); } catch (Exception exception) { Thread.currentThread().interrupt(); } }).start(); return promise; } public static Promise<Tuple2<Promise<List<Integer>>, Promise<List<Integer>>>> getCombined(int ran, int rev) { final Promise<Tuple2<Promise<List<Integer>>, Promise<List<Integer>>>> promise = Promise.promise(); new Thread(() -> { try { Uni.combine().all() /* * Notice that these are running concurrently */ .unis(Uni.createFrom().item(Application.getRange(ran)), Uni.createFrom().item(Application.getReverse(rev))) .asTuple().onItem().call(tuple -> { promise.complete(tuple); return Uni.createFrom().nullItem(); }) .onFailure().invoke(Throwable::printStackTrace) .subscribeAsCompletionStage(); } catch (Exception exception) { Thread.currentThread().interrupt(); } }).start(); return promise; } public static void runTraditionalMethod() { Long millis = System.currentTimeMillis(); System.out.println("Traditiona1-1: " + RangeGeneratedPrimes.primes(115000).size()); System.out.println("Traditiona1-2: " + RangeGeneratedPrimes.primes(10000).size()); System.out.println("Traditiona1-3: " + ReverseGeneratedPrimes.primes(15000).size()); System.out.println("Traditiona1-4: " + ReverseGeneratedPrimes.primes(24000).size()); System.out.println(String.format("Elapsed time for traditional method: %d millisecondsn", System.currentTimeMillis() - millis)); } } public interface Primes { static List<Integer> primes(int n) { return null; }; } public abstract static class PrimeBase { static boolean isPrime(int number) { return IntStream.rangeClosed(2, (int) (Math.sqrt(number))) .allMatch(n -> number % n != 0); } } public static class RangeGeneratedPrimes extends PrimeBase implements Primes { public static List<Integer> primes(int n) { return IntStream.rangeClosed(2, n) .filter(x -> isPrime(x)).boxed() .collect(Collectors.toList()); } } public static class ReverseGeneratedPrimes extends PrimeBase implements Primes { public static List<Integer> primes(int n) { List<Integer> list = IntStream.generate(getReverseList(n)).limit(n - 1) .filter(x -> isPrime(x)).boxed() .collect(Collectors.toList()); return list; } private static IntSupplier getReverseList(int startValue) { IntSupplier reverse = new IntSupplier() { private int start = startValue; public int getAsInt() { return this.start--; } }; return reverse; } } }
测试
Quarkus 安装默认展示 quarkus-resteasy 扩展。 我们没有使用它,将 src/test/java/org/demo/mag/StartupTest.java 的内容替换为:
package org.demo.mag; import io.quarkus.test.junit.QuarkusTest; import io.vertx.mutiny.core.Promise; import java.util.List; import org.demo.mag.Startup; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @QuarkusTest public class StartupTest { Promise<List<Integer>> promise = Promise.promise(); Promise<Void> promiseAndAwait = Promise.promise(); List<Integer> testValue; @Tag("DEV") @Test public void testVerifyAsync() { Assertions.assertEquals( null , testValue); promise.future().onItem().invoke(list -> { testValue = list; promiseAndAwait.complete(); }).subscribeAsCompletionStage(); Assertions.assertEquals(null, testValue); promise.complete(Startup.ReverseGeneratedPrimes.primes(100)); promiseAndAwait.futureAndAwait(); Assertions.assertNotNull(testValue); Assertions.assertEquals(2, testValue.get(testValue.size()-1)); } }
选修的
要减少下载量,请从 build.gradle 文件。
implementation 'io.quarkus:quarkus-resteasy' testImplementation 'io.rest-assured:rest-assured'
安装和执行
下一步是构建项目。 这包括下载所有依赖项以及编译和执行 Startup.java 程序。 为简洁起见,所有内容都包含在一个文件中。
$ ./gradlew quarkusDev
上述命令从 Quarkus 和程序生成横幅和控制台输出。
这是开发模式。 注意提示:“按 [space] 重启”。 要查看编辑,请按空格键和回车键重新编译和执行。 Enter q 退出。
要构建 Uber jar(包括所有依赖项),请执行:
$ ./gradlew quarkusBuild -Dquarkus.package.type=uber-jar
这会在名为 mutiny-demo-1.0.0-runner.jar 的构建目录中创建一个 jar。 要运行 jar 文件,请输入以下命令。
$ java -jar ./build/mutiny-demo-1.0.0-runner.jar
要删除横幅和控制台日志,请将以下行添加到 src/main/resources/application.properties 文件中。
%prod.quarkus.log.console.enable=false %prod.quarkus.banner.enabled=false
输出可能类似于以下内容。
Traditional-1: 9592 Traditional-2: 1229 Traditional-3: 2262 Traditional-4: 2762 Elapsed time for traditional method: 67 milliseconds Combined Range: 1229 This should display first - indicating asynchronous code. Combined Reverse: 2262 Primes Reverse: 2762 Asserting that expected primes are equal: 2 -- 2 Primes Range: 9592 Elapsed time for asynchronous method: 52 milliseconds
您仍然会在开发模式下获得横幅并登录。
更进一步,Quarkus 可以使用 GraalVM 生成开箱即用的可执行文件。
$ ./gradlew build -Dquarkus.package.type=native
上述命令生成的可执行文件将是 ./build/mutiny-demo-1.0.0-runner。
默认的 GraalVM 是下载的容器。 要覆盖它,请将环境变量 GRAALVM_HOME 设置为本地安装。 不要忘记使用以下命令安装本机映像。
$ ${GRAALVM_HOME}/bin/gu install native-image
编码
该代码为一个范围生成素数,在限制和两者的组合上反转。 为了 example,考虑范围:“Promise> promiseRange = Application.getRange(115000);”。
这将生成 1 到 115000 之间的所有素数,并显示该范围内的素数数。 它首先执行,但最后显示其结果。 主方法末尾附近的代码——System.out.println(“这应该首先显示——表示异步代码。”); — 首先显示。 这是个 example 的异步代码。 我们可以同时运行多个进程。 但是,完成的顺序是不可预测的。 传统的调用是有序的,完成后即可收集结果。
在返回结果之前可以阻止执行。 代码正是这样做来显示异步经过时间消息。 在 main 方法的末尾,我们有:“String elapsedMessage = finalMessage.futureAndAwait();”。 消息来自 promiseRange 或 promiseCombined——两个运行时间最长的进程。 但即使这样也不能保证。 底层操作系统的状态未知。 其他过程之一可能最后完成。 通常,异步调用被嵌套以协调结果。 这在 promiseCombined promise 中得到了证明,用于评估范围和反转素数的结果。
结论
传统方法与异步方法之间的比较表明,异步方法在现代计算机上的速度最高可提高 25%。 没有资源和计算能力的旧 CPU 使用传统方法可以更快地产生结果。 如果一台计算机有很多内核,为什么不使用它们呢‽
更多文档可以在以下网站上找到。