夸库斯和叛变

Quarkus 是构建基于 Java 的应用程序的基础; 无论是桌面、服务器还是云。 可以在 https://fedoramagazine.org/using-the-quarkus-framework-on-fedora-silverblue-just-a-quick-look/ 找到一篇关于使用的优秀文章。 本文是使用 Quarkus 和 Mutiny 编码异步进程的入门读物。

那么什么是兵变? Mutiny 允许在事件驱动的流中传输对象。 流可能来自本地进程或远程数据库,如数据库。 Mutiny 流由 Uni 或 Multi 对象完成。 我们使用 Uni 流式传输一个对象——一个包含许多整数的 List。 订阅模式启动流。

执行传统程序并在继续之前返回结果。 Mutiny 可以轻松支持非阻塞代码同时运行进程。 RxJava、ReactiveX 甚至原生 Java 都是备选方案。 Mutiny 易于使用(暴露的 API 很少),它是许多 Quarkus 扩展的默认设置。 使用的两个扩展是 quarkus-mutiny 和 quarkus-vertx。 Vert.x 是 Quarkus 封装的底层框架。 Promise 类由 quarkus-vertx 提供。 当进程完成时,promise 会返回一个 Uni 流。 首先,安装 Java JDK 和 Maven。

引导程序

最低要求是带有 Maven 的 Java-11 或 Java-17。

使用 Java-11

$ sudo dnf install -y java-11-openjdk-devel maven

使用 Java-17

$ sudo dnf install -y java-17-openjdk-devel maven

使用下面的 Maven 调用引导 Quarkus 和 Mutiny。 不包含扩展 quarkus-vertx 以演示如何添加其他扩展。 在执行之前找到适当的目录。 目录 mutiny-demo 将与初始应用程序一起创建。

$ mvn io.quarkus.platform:quarkus-maven-plugin:2.6.2.Final:create 
       -DprojectGroupId=fedoramag   
       -DprojectArtifactId=mutiny-demo  
       -DprojectVersion=1.0.0  
       -DclassName="org.demo.mag.Startup"  
       -Dextensions="mutiny" 
       -DbuildTool=gradle

现在 Gradle 已启动,可以添加其他扩展。 在 mutiny-demo 目录中执行:

$ ./gradlew addExtension --extensions="quarkus-vertx"

要查看所有可用的扩展,请执行:

$ ./gradlew listExtensions

要执行所有已定义的 Gradle 任务:

$ ./gradlew tasks

叛变代码

Quarkus 引导程序上的 className 条目是 org.demo.mag.Startup,它创建文件 src/main/java/org/demo/map/Startup.java。 将内容替换为以下代码:

package org.demo.mag;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.vertx.mutiny.core.Promise;

@QuarkusMain
public class Startup implements QuarkusApplication {
    public static void main(String... args) {
        Quarkus.run(Startup.class, args);
    }

    @Override
    public int run(String... args) throws InterruptedException, ExecutionException {
        final Promise<String> finalMessage = Promise.promise();
        final String elapsedTime = "Elapsed time for asynchronous method: %d milliseconds";
        final int[] syncResults = {0};

        Application.runTraditionalMethod();

        final Long millis = System.currentTimeMillis();
        Promise<List<Integer>> promiseRange = Application.getRange(115000);
        Promise<Tuple2<Promise<List<Integer>>, Promise<List<Integer>>>> promiseCombined = Application.getCombined(10000, 15000);
        Promise<List<Integer>> promiseReverse = Application.getReverse(24000);
        /*
        *   Retrieve the Uni stream and on the complete event obtain the List<Integer>
        */
        promiseRange.future().onItem().invoke(list -> {
            System.out.println("Primes Range: " + list.size());
            if(syncResults[0] == 1) {
                finalMessage.complete(String.format(elapsedTime, System.currentTimeMillis() - millis));
            } {
                syncResults[0] = 2;
            }
            return;
        }).subscribeAsCompletionStage();
        
        promiseReverse.future().onItem().invoke(list -> {
            System.out.println("Primes Reverse: " + list.size());
            return;
        }).subscribeAsCompletionStage();
        /*
        *   Notice that this finishes before the other two prime generators(smaller lists).
        */
        promiseCombined.future().onItem().invoke(p -> {
            /*
            *   Notice that "Combined Range" displays first
            */
            p.getItem2().future().invoke(reverse -> {
                System.out.println("Combined Reverse: " + reverse.size());
                return;
            }).subscribeAsCompletionStage();
            
            p.getItem1().future().invoke(range -> {
                System.out.println("Combined Range: " + range.size());
                /*
                * Nesting promises to get multple results together
                */
                p.getItem2().future().invoke(reverse -> {
                    System.out.println(String.format("Asserting that expected primes are equal: %d -- %d", range.get(0), reverse.get(reverse.size() - 1)));
                    assert range.get(0) == reverse.get(reverse.size() - 1)
                        : "Generated primes incorrect";
                    if(syncResults[0] == 2) {
                        finalMessage.complete(String.format(elapsedTime, System.currentTimeMillis() - millis));
                    } else {
                        syncResults[0] = 1;
                    }
                    return;
                }).subscribeAsCompletionStage();
                return;
            }).subscribeAsCompletionStage();
            return;
        }).subscribeAsCompletionStage();
        // Note: on very fast machines this may not display first.
        System.out.println("This should display first - indicating asynchronous code.");
        // blocking for final message
        String elapsedMessage = finalMessage.futureAndAwait();
        System.out.println(elapsedMessage);

        return 0;
    }

    public static class Application {

        public static Promise<List<Integer>> getRange(int n) {
            final Promise<List<Integer>> promise = Promise.promise();
            // non-blocking - this is only for demonstration(emulating some remote call)
            new Thread(() -> {
                try {
                    /*
                    * RangeGeneratedPrimes.primes is blocking, only returns when done
                    */
                    promise.complete(RangeGeneratedPrimes.primes(n));
                } catch (Exception exception) {
                    Thread.currentThread().interrupt();
                }
            }).start();
            
            return promise;
        }

        public static Promise<List<Integer>> getReverse(int n) {
            final Promise<List<Integer>> promise = Promise.promise();

            new Thread(() -> {
                try {
                    // Generating a new object stream
                    promise.complete(ReverseGeneratedPrimes.primes(n));
                } catch (Exception exception) {
                    Thread.currentThread().interrupt();
                }
            }).start();
            
            return promise;
        }

        public static Promise<Tuple2<Promise<List<Integer>>, Promise<List<Integer>>>> getCombined(int ran, int rev) {
            final Promise<Tuple2<Promise<List<Integer>>, Promise<List<Integer>>>> promise = Promise.promise();

            new Thread(() -> {
                try {
                    Uni.combine().all()
                        /*
                        *   Notice that these are running concurrently
                        */
                        .unis(Uni.createFrom().item(Application.getRange(ran)),
                                Uni.createFrom().item(Application.getReverse(rev)))
                        .asTuple().onItem().call(tuple -> {
                            promise.complete(tuple);
                            return Uni.createFrom().nullItem();
                        })
                        .onFailure().invoke(Throwable::printStackTrace)
                        .subscribeAsCompletionStage();
                } catch (Exception exception) {
                    Thread.currentThread().interrupt();
                }
            }).start();
            
            return promise;
        }

        public static void runTraditionalMethod() {
            Long millis = System.currentTimeMillis();
            System.out.println("Traditiona1-1: " + RangeGeneratedPrimes.primes(115000).size());
            System.out.println("Traditiona1-2: " + RangeGeneratedPrimes.primes(10000).size());
            System.out.println("Traditiona1-3: " + ReverseGeneratedPrimes.primes(15000).size());
            System.out.println("Traditiona1-4: " + ReverseGeneratedPrimes.primes(24000).size());
            System.out.println(String.format("Elapsed time for traditional method: %d millisecondsn", System.currentTimeMillis() - millis));
        }
    }

    public interface Primes {
        static List<Integer> primes(int n) {
            return null;
        };
    }

    public abstract static class PrimeBase {
        static boolean isPrime(int number) {
            return IntStream.rangeClosed(2, (int) (Math.sqrt(number)))
                    .allMatch(n -> number % n != 0);
        }
    }

    public static class RangeGeneratedPrimes extends PrimeBase implements Primes {
        public static List<Integer> primes(int n) {
            return IntStream.rangeClosed(2, n)
                    .filter(x -> isPrime(x)).boxed()
                    .collect(Collectors.toList());
        }
    }

    public static class ReverseGeneratedPrimes extends PrimeBase implements Primes {
        public static List<Integer> primes(int n) {
            List<Integer> list = IntStream.generate(getReverseList(n)).limit(n - 1)
                    .filter(x -> isPrime(x)).boxed()
                    .collect(Collectors.toList());

            return list;
        }

        private static IntSupplier getReverseList(int startValue) {
            IntSupplier reverse = new IntSupplier() {
                private int start = startValue;

                public int getAsInt() {
                    return this.start--;
                }
            };

            return reverse;
        }
    }
}

测试

Quarkus 安装默认展示 quarkus-resteasy 扩展。 我们没有使用它,将 src/test/java/org/demo/mag/StartupTest.java 的内容替换为:

package org.demo.mag;

import io.quarkus.test.junit.QuarkusTest;
import io.vertx.mutiny.core.Promise;

import java.util.List;

import org.demo.mag.Startup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@QuarkusTest
public class StartupTest {
    Promise<List<Integer>> promise = Promise.promise();
    Promise<Void> promiseAndAwait = Promise.promise();
    List<Integer> testValue;

    @Tag("DEV")
    @Test
    public void testVerifyAsync() {
        Assertions.assertEquals( null , testValue);
        promise.future().onItem().invoke(list -> {
            testValue = list;
            promiseAndAwait.complete();
        }).subscribeAsCompletionStage();
        Assertions.assertEquals(null, testValue);
        promise.complete(Startup.ReverseGeneratedPrimes.primes(100));
        promiseAndAwait.futureAndAwait();
        Assertions.assertNotNull(testValue);
        Assertions.assertEquals(2, testValue.get(testValue.size()-1));
    }
}

选修的

要减少下载量,请从 build.gradle 文件。

	implementation 'io.quarkus:quarkus-resteasy'
	testImplementation 'io.rest-assured:rest-assured'

安装和执行

下一步是构建项目。 这包括下载所有依赖项以及编译和执行 Startup.java 程序。 为简洁起见,所有内容都包含在一个文件中。

$ ./gradlew quarkusDev

上述命令从 Quarkus 和程序生成横幅和控制台输出。

这是开发模式。 注意提示:“按 [space] 重启”。 要查看编​​辑,请按空格键和回车键重新编译和执行。 Enter q 退出。

要构建 Uber jar(包括所有依赖项),请执行:

$ ./gradlew quarkusBuild -Dquarkus.package.type=uber-jar

这会在名为 mutiny-demo-1.0.0-runner.jar 的构建目录中创建一个 jar。 要运行 jar 文件,请输入以下命令。

$ java -jar ./build/mutiny-demo-1.0.0-runner.jar

要删除横幅和控制台日志,请将以下行添加到 src/main/resources/application.properties 文件中。

%prod.quarkus.log.console.enable=false
%prod.quarkus.banner.enabled=false

输出可能类似于以下内容。

	Traditional-1: 9592
	Traditional-2: 1229
	Traditional-3: 2262
	Traditional-4: 2762
	Elapsed time for traditional method: 67 milliseconds

	Combined Range: 1229
	This should display first - indicating asynchronous code.
	Combined Reverse: 2262
	Primes Reverse: 2762
	Asserting that expected primes are equal: 2 -- 2
	Primes Range: 9592
	Elapsed time for asynchronous method: 52 milliseconds

您仍然会在开发模式下获得横幅并登录。

更进一步,Quarkus 可以使用 GraalVM 生成开箱即用的可执行文件。

$ ./gradlew build -Dquarkus.package.type=native

上述命令生成的可执行文件将是 ./build/mutiny-demo-1.0.0-runner。

默认的 GraalVM 是下载的容器。 要覆盖它,请将环境变量 GRAALVM_HOME 设置为本地安装。 不要忘记使用以下命令安装本机映像。

$ ${GRAALVM_HOME}/bin/gu install native-image

编码

该代码为一个范围生成素数,在限制和两者的组合上反转。 为了 example,考虑范围:“Promise> promiseRange = Application.getRange(115000);”。

这将生成 1 到 115000 之间的所有素数,并显示该范围内的素数数。 它首先执行,但最后显示其结果。 主方法末尾附近的代码——System.out.println(“这应该首先显示——表示异步代码。”); — 首先显示。 这是个 example 的异步代码。 我们可以同时运行多个进程。 但是,完成的顺序是不可预测的。 传统的调用是有序的,完成后即可收集结果。

在返回结果之前可以阻止执行。 代码正是这样做来显示异步经过时间消息。 在 main 方法的末尾,我们有:“String elapsedMessage = finalMessage.futureAndAwait();”。 消息来自 promiseRange 或 promiseCombined——两个运行时间最长的进程。 但即使这样也不能保证。 底层操作系统的状态未知。 其他过程之一可能最后完成。 通常,异步调用被嵌套以协调结果。 这在 promiseCombined promise 中得到了证明,用于评估范围和反转素数的结果。

结论

传统方法与异步方法之间的比较表明,异步方法在现代计算机上的速度最高可提高 25%。 没有资源和计算能力的旧 CPU 使用传统方法可以更快地产生结果。 如果一台计算机有很多内核,为什么不使用它们呢‽

更多文档可以在以下网站上找到。