一、介绍
Kafka Streams是一个Java库,用于使用Apache Kafka构建流处理应用程序。它可以在 Java 应用程序中使用或嵌入,以处理 Kafka 主题中的流数据。它是一个独立的库,仅依赖于 Kafka,并将其用作高可用性和可靠性的基础。
Kafka Streams 有两种类型的 API:
- Streams DSL – 一个高级API
- 处理器 – 低级 API
本指南介绍了高级 Streams DSL API,它提供了一个函数式编程模型,只需几行代码即可简洁地编写流处理拓扑。Streams DSL API提供了许多抽象,例如等。KStreams
KTable
分解它的一种方法是将这些 API 提供的功能分类如下:
- 无状态操作
- 有状态操作
本指南将包括代码示例,以演示无状态操作(如 and)和有状态计算(如 and)。map
filter
aggregate
count
二、Kafka Streams 无状态函数
本节介绍了以下方法:KStream
- 地图
- 滤波器
- groupBy 和 groupByKey
- 通过和到
- 打印和速览
- 合并
2.1、地图
map
可以通过应用函数将单个记录转换为 A。它可用于转换键和值。如果只想转换值,请使用方法。该方法可以返回多条记录 ()。KStream
mapValues
flatMap
KeyValue
让我们看几个例子。
map
可用于将每个记录的键和值转换为小写:KStream
String
KStream<String, String> words = builder.stream("words");
words.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String val) {
return new KeyValue<>(key.toLowerCase(), val.toLowerCase());
}
});
或者,仅用于处理值:mapValues
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
});
使用,您可以进一步将值分解为值集合。flatMapValues
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String key, String val) {
String[] values = val.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(key, value))
.collect(Collectors.toList());
}
})
2.2、滤波器
通常,该方法仅包括满足特定条件的记录。它由用于排除记录的方法补充。在这两种情况下,过滤标准都是使用 aobject 定义的。filter
KStream
filterNot
Predicate
例如,要仅处理特定信用卡类型的交易:
KStream<String, String> transactions = builder.stream("user-transactions");
transactions.filter(new Predicate<String, Transaction>() {
@Override
public boolean test(String userID, Transaction tx) {
return tx.cardType().equals("VISA");
}
})
要忽略/排除所有尚未设置密码的用户,请执行以下操作:
KStream<String, String> users = builder.stream("users");
users.filterNot(new Predicate<String, User>() {
@Override
public boolean test(String userID, User user) {
return user.isPwdSet();
}
})
2.3、群
分组操作通常用于转换 ato ato 的内容以执行有状态计算(本指南稍后将介绍)。这可以使用更通用的方法来实现。KStream
KGroupedStream
groupByKey
group
虽然使用很简单,但请注意,acan 可以与使用不同的键一起使用。例如,您可以使用它根据卡类型对用户交易进行分组:groupByKey
KeyValueMapper
groupBy
KStream<String, User> transactions = builder.stream("transactions");
KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {
@Override
public String apply(String txID, User user) {
return user.getCardType();
}
});
2.4、往返和通过
该方法与您到目前为止遇到的其他一些操作不同。它非常简单,但非常强大,因为它允许我们将记录具体化(存储)到 Kafka 中的另一个主题 – 只需一个简单的方法调用!to
KStream
它返回 aresult 而不是 a(or) – 此类操作也称为终端方法。void
KStream
KTable
在此示例中,所有小写单词都发送到一个主题,该主题在转换后使用操作(前面已介绍过)。lowercase-words
mapValues
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.to("lowercase-words");
through
是另一个简单但功能强大的操作,通常用于在构建流管道时补充该方法。要继续上面的例子 – 比如说,在一个主题中存储所有小写单词后,您需要删除所有具有特定字符(例如连字符)的单词,并将最终结果存储在另一个 Kafka 主题中。无需使用方法并创建一个 newfrom 主题,可以像这样简化代码:to
-
to
KStream
lowercase-words
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.through("lowercase-words")
.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.contains("-");
}
})
.to("processed-words");
2.5、打印和速览
如果你想记录记录(用于调试目的),是一个方便的方法(它也是一个终端操作,就像)。也可以使用 aobject(接受)配置此方法的行为。KStream
print
to
Printed
print
例如,要记录标准输出终端 ato 的值:KStream
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.print(Printed.withLabel("demo").toSysOut());
该方法在功能方面类似,但它不是终端操作。相反,它允许调用方使用 ato 定义特定操作并返回相同的实例。peek
print
ForeachAction
KStream
在此示例中,我们只需将键和值记录到标准输出:
KStream<String, String> words = builder.stream("words");
words.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(String val) {
return val.toLowerCase();
}
})
.peek(new ForeachAction<String, String>() {
@Override
public void apply(String k, String v) {
System.out.println("key is "+k+", value is "+v);
}
})
.to("lowercase-words");
2.6、合并
如果有两个流并且需要合并它们,请使用。merge
KStream<String, String> fte = builder.stream("fte");
KStream<String, String> contractor = builder.stream("contractors");
fte.merge(contractor).to("all-employees");
三、Kafka Streams 有状态函数
本节将介绍聚合操作(,和)以及Kafka Streams中的窗口化概述。所有这些操作的副作用是“状态”(因此称为有状态操作),了解它的存储位置以及如何管理它非常重要。aggregate
count
reduce
与这些操作关联的状态存储在本地“状态存储”中 – 内存中或磁盘上。“数据局部性”使处理效率大大提高。您还可以配置应用程序,以便此状态存储数据也发送 Kafka 主题。这对于高可用性和容错非常重要,因为如果发生应用程序问题或崩溃,可以从 Kafka 恢复数据。
让我们回顾一下其中的一些有状态操作。
3.1、计数
KGroupedStream
支持此操作。通过此操作,通过单一方法,可以方便地计算特定键的记录数。
继续前面介绍的示例。一旦我们按卡类型对交易进行分组,我们就可以简单地用于获取每种卡类型的交易数量。groupBy
count
KStream<String, User> transactions = builder.stream("transactions");
KGroupedStream<String, String> grouped = transactions.groupBy(new KeyValueMapper<String, User, String>() {
@Override
public String apply(String txID, User user) {
return user.getCardType();
}
});
KTable<String, Long> txPerCardType = grouped.count();
为了在本地存储此状态(计数),count 接受 的实例,该实例可以按如下方式使用:Materialized
KTable<String, Long> txPerCardType = grouped.count(Materialized.as("tx-per-card-type"));
3.2、递增
aggregate
在流式数据集上执行移动平均线等计算时派上用场。这需要处理状态,并且必须考虑当前值和计算聚合的当前值。
一个很好的理解方法是实际使用它来实现操作。收到第一条记录时,该 用于初始化状态(在此示例中,计数设置为零)并使用第一条记录调用。之后,接管 – 在此示例中,每当收到记录时,当前计数都会递增 1。aggregate
count
Initializer
Aggregator
KStream<String, String> stream = builder.stream("transactions");
KTable<String, Result> aggregate = stream.groupByKey()
.aggregate(new Initializer<Result>() {
@Override
public Result apply() {
return new Result("", 0);
}
}, new Aggregator<String, String, Result>() {
@Override
public Result apply(String k, String v, Result count) {
Integer currentCount = count.getCount();
return new Result(k, currentCount + 1);
}
});
3.3、减少
reduce
操作可用于组合值流并实现等。您可以将操作视为的通用版本。sum
min
max
aggregate
reduce
四、使用 Kafka Streams 进行窗口化
例如,网站分析的常见要求是具有有关每小时唯一页面浏览量、每分钟点击次数等的指标,使您可以限制要在某个时间范围内执行的流处理操作。Windowing
支持的时间窗口包括:滑动、翻滚、跳跃和基于会话的时间窗口。
要计算每小时的唯一页面浏览量,您可以使用 60 分钟的翻转时间窗口。因此,产品从下午 1 点到下午 2 点的页面浏览量将被汇总,之后将开始一个新的时间段。下面是如何实现此目的的示例:
KStream<Product, Long> views = builder.stream("product-views");
views.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(60)))
.toStream()
.to("views-per-hour");
五、结论
本指南介绍了 Kafka Streams 和 API 的类型。接下来是常用的无状态和有状态操作的介绍,以及示例。您可以参考Kafka Streams Javadocs和Kafka 文档进行进一步阅读。