如何使用 Kafka 流进行有状态和无状态数据处理

一、介绍

Kafka Streams是一个Java库,用于使用Apache Kafka构建流处理应用程序。它可以在 Java 应用程序中使用或嵌入,以处理 Kafka 主题中的流数据。它是一个独立的库,仅依赖于 Kafka,并将其用作高可用性和可靠性的基础。

Kafka Streams 有两种类型的 API:

  1. Streams DSL – 一个高级API
  2. 处理器 – 低级 API

本指南介绍了高级 Streams DSL API,它提供了一个函数式编程模型,只需几行代码即可简洁地编写流处理拓扑。Streams DSL API提供了许多抽象,例如等。KStreamsKTable

分解它的一种方法是将这些 API 提供的功能分类如下:

  1. 无状态操作
  2. 有状态操作

本指南将包括代码示例,以演示无状态操作(如 and)和有状态计算(如 and)。mapfilteraggregatecount

二、Kafka Streams 无状态函数

本节介绍了以下方法:KStream

  • 地图
  • 滤波器
  • groupBy 和 groupByKey
  • 通过和到
  • 打印和速览
  • 合并

2.1、地图

map可以通过应用函数将单个记录转换为 A。它可用于转换键和值。如果只想转换值,请使用方法。该方法可以返回多条记录 ()。KStreammapValuesflatMapKeyValue

让我们看几个例子。

map可用于将每个记录的键和值转换为小写:KStreamString

KStream<String, String> words = builder.stream("words");

words.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {

    @Override

    public KeyValue<String, String> apply(String key, String val) {

            return new KeyValue<>(key.toLowerCase(), val.toLowerCase());

        }

    });

或者,仅用于处理值:mapValues

words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    });

使用,您可以进一步将值分解为值集合。flatMapValues

stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {

    @Override

    public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String key, String val) {

        String[] values = val.split(",");

        return Arrays.asList(values)

                    .stream()

                    .map(value -> new KeyValue<>(key, value))

                    .collect(Collectors.toList());

            }

    })

2.2、滤波器

通常,该方法仅包括满足特定条件的记录。它由用于排除记录的方法补充。在这两种情况下,过滤标准都是使用 aobject 定义的。filterKStreamfilterNotPredicate

例如,要仅处理特定信用卡类型的交易:

KStream<String, String> transactions = builder.stream("user-transactions");

transactions.filter(new Predicate<String, Transaction>() {

    @Override

    public boolean test(String userID, Transaction tx) {

            return tx.cardType().equals("VISA");

        }

    })

要忽略/排除所有尚未设置密码的用户,请执行以下操作:

KStream<String, String> users = builder.stream("users");

users.filterNot(new Predicate<String, User>() {

    @Override

    public boolean test(String userID, User user) {

            return user.isPwdSet();

        }

    })

2.3、群

分组操作通常用于转换 ato ato 的内容以执行有状态计算(本指南稍后将介绍)。这可以使用更通用的方法来实现。KStreamKGroupedStreamgroupByKeygroup

虽然使用很简单,但请注意,acan 可以与使用不同的键一起使用。例如,您可以使用它根据卡类型对用户交易进行分组:groupByKeyKeyValueMappergroupBy

KStream<String, User> transactions = builder.stream("transactions");



KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {

            @Override

            public String apply(String txID, User user) {

                return user.getCardType();

            }

    });

2.4、往返和通过

该方法与您到目前为止遇到的其他一些操作不同。它非常简单,但非常强大,因为它允许我们将记录具体化(存储)到 Kafka 中的另一个主题 – 只需一个简单的方法调用!toKStream

它返回 aresult 而不是 a(or) – 此类操作也称为终端方法。voidKStreamKTable

在此示例中,所有小写单词都发送到一个主题,该主题在转换后使用操作(前面已介绍过)。lowercase-wordsmapValues

KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .to("lowercase-words");

through是另一个简单但功能强大的操作,通常用于在构建流管道时补充该方法。要继续上面的例子 – 比如说,在一个主题中存储所有小写单词后,您需要删除所有具有特定字符(例如连字符)的单词,并将最终结果存储在另一个 Kafka 主题中。无需使用方法并创建一个 newfrom 主题,可以像这样简化代码:to-toKStreamlowercase-words

KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .through("lowercase-words")

    .filter(new Predicate<String, String>() {

        @Override

        public boolean test(String k, String v) {

                return v.contains("-");

            }

        })

    .to("processed-words");

2.5、打印和速览

如果你想记录记录(用于调试目的),是一个方便的方法(它也是一个终端操作,就像)。也可以使用 aobject(接受)配置此方法的行为。KStreamprinttoPrintedprint

例如,要记录标准输出终端 ato 的值:KStream

KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .print(Printed.withLabel("demo").toSysOut());

该方法在功能方面类似,但它不是终端操作。相反,它允许调用方使用 ato 定义特定操作并返回相同的实例。peekprintForeachActionKStream

在此示例中,我们只需将键和值记录到标准输出:

KStream<String, String> words = builder.stream("words");



words.mapValues(new ValueMapper<String, String>() {

    @Override

    public String apply(String val) {

            return val.toLowerCase();

        }

    })

    .peek(new ForeachAction<String, String>() {

        @Override

        public void apply(String k, String v) {

                System.out.println("key is "+k+", value is "+v);

            }

        })

    .to("lowercase-words");

2.6、合并

如果有两个流并且需要合并它们,请使用。merge

KStream<String, String> fte = builder.stream("fte");

KStream<String, String> contractor = builder.stream("contractors");



fte.merge(contractor).to("all-employees");

三、Kafka Streams 有状态函数

本节将介绍聚合操作(,和)以及Kafka Streams中的窗口化概述。所有这些操作的副作用是“状态”(因此称为有状态操作),了解它的存储位置以及如何管理它非常重要。aggregatecountreduce

与这些操作关联的状态存储在本地“状态存储”中 – 内存中或磁盘上。“数据局部性”使处理效率大大提高。您还可以配置应用程序,以便此状态存储数据也发送 Kafka 主题。这对于高可用性和容错非常重要,因为如果发生应用程序问题或崩溃,可以从 Kafka 恢复数据。

让我们回顾一下其中的一些有状态操作。

3.1、计数

KGroupedStream支持此操作。通过此操作,通过单一方法,可以方便地计算特定键的记录数。

继续前面介绍的示例。一旦我们按卡类型对交易进行分组,我们就可以简单地用于获取每种卡类型的交易数量。groupBycount

KStream<String, User> transactions = builder.stream("transactions");



KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {

            @Override

            public String apply(String txID, User user) {

                return user.getCardType();

            }

    });



KTable<String, Long> txPerCardType = grouped.count();

为了在本地存储此状态(计数),count 接受 的实例,该实例可以按如下方式使用:Materialized

KTable<String, Long> txPerCardType = grouped.count(Materialized.as("tx-per-card-type"));

3.2、递增

aggregate在流式数据集上执行移动平均线等计算时派上用场。这需要处理状态,并且必须考虑当前值和计算聚合的当前值。

一个很好的理解方法是实际使用它来实现操作。收到第一条记录时,该 用于初始化状态(在此示例中,计数设置为零)并使用第一条记录调用。之后,接管 – 在此示例中,每当收到记录时,当前计数都会递增 1。aggregatecountInitializerAggregator

    KStream<String, String> stream = builder.stream("transactions");



    KTable<String, Result> aggregate = stream.groupByKey()

            .aggregate(new Initializer<Result>() {

                @Override

                public Result apply() {

                    return new Result("", 0);

                }

            }, new Aggregator<String, String, Result>() {

                @Override

                public Result apply(String k, String v, Result count) {

                    Integer currentCount = count.getCount();

                    return new Result(k, currentCount + 1);

                }

            });

3.3、减少

reduce操作可用于组合值流并实现等。您可以将操作视为的通用版本。summinmaxaggregatereduce

 四、使用 Kafka Streams 进行窗口化

例如,网站分析的常见要求是具有有关每小时唯一页面浏览量、每分钟点击次数等的指标,使您可以限制要在某个时间范围内执行的流处理操作。Windowing

支持的时间窗口包括:滑动、翻滚、跳跃和基于会话的时间窗口。

要计算每小时的唯一页面浏览量,您可以使用 60 分钟的翻转时间窗口。因此,产品从下午 1 点到下午 2 点的页面浏览量将被汇总,之后将开始一个新的时间段。下面是如何实现此目的的示例:

KStream<Product, Long> views = builder.stream("product-views");



views.groupByKey()

    .windowedBy(SessionWindows.with(Duration.ofMinutes(60)))

    .toStream()

    .to("views-per-hour");

 五、结论

本指南介绍了 Kafka Streams 和 API 的类型。接下来是常用的无状态和有状态操作的介绍,以及示例。您可以参考Kafka Streams Javadocs和Kafka 文档进行进一步阅读。

赞(0)
未经允许不得转载:主机百科 » 如何使用 Kafka 流进行有状态和无状态数据处理