一、介绍
MongoDB 是一个 NoSQL数据库,将数据存储在由字段值对组成的文档中。这些是使用 BSON(JSON 的二进制表示形式)存储的。MongoDB 文档类似于关系数据库表中的行,文档字段(键值对)类似于列。MongoDB 文档是集合的一部分,一个或多个集合是数据库的一部分。MongoDB API支持常规 CRUD 操作(创建,读取,更新,删除)以及聚合,地理空间查询和文本搜索功能。MongoDB 还提供高可用性和复制。您可以在多个MongoDB 服务器(称为副本集)之间分发数据,以提供冗余和分片。
本文将通过一个完整的示例引导您了解如何在 Docker 中设置 MongoDB 副本集、部署客户端应用程序以及探索更改流。它将涵盖以下内容:
- MongoDB 和更改流的概述。
- 如何使用 MongoDB Go 驱动程序。
- 应用程序设置和部署
- 测试应用程序并了解如何使用恢复令牌。
MongoDB 为许多客户端驱动程序提供官方支持,包括 Java,Python,Node.js,PHP,Ruby,Swift,C,C++,C# 和 Go。在本文中,您将使用 MongoDB 的 Go 驱动程序来构建更改流处理器和 REST API 应用程序。
1.1、MongoDB 更改流
使用更改流,您可以实时访问MongoDB数据更新。应用程序不需要担心处理 oplog 和更改流的低级操作细节。可以订阅集合、数据库或整个部署上的所有数据更改。由于更改流使用聚合框架,因此应用程序还可以筛选特定更改或转换通知。可以将更改流用于副本集(这是本文将使用的内容)和分片设置。
更改流仅在更新操作期间返回字段的差异(这是默认行为)。但您可以将其配置为返回更新文档的最新提交版本。还可以提供一个或多个管道阶段的数组来控制更改流输出。例如 -、、 等。$addFields
$match
$project
1.2、恢复令牌
恢复令牌允许应用程序保存进度并防止潜在的数据丢失。如果更改流应用程序崩溃,它将无法在此期间检测到数据库更改。恢复令牌可用于从应用程序停止的位置(崩溃之前)继续处理并接收所有过去的事件。更改流可以帮助构建分离且可缩放的体系结构,并使实现提取-转换-加载 (ETL)、通知、同步服务等变得更加容易。
二、应用概述
本文演示了以下应用程序:
- 更改流侦听器:它使用监视 API 订阅 MongoDB 集合的更改事件流。创建或更新文档后,此应用程序将立即收到实时通知。它还利用恢复令牌在重新启动或崩溃后从特定时间点继续处理。
- REST API:它公开了一个HTTP端点,以便在MongoDB中创建文档。
三、准备工作
- 使用本地 Linux 工作站,或将 Vultr 云服务器部署为工作站。
- 确保 Docker 已安装在工作站上。您将需要它来构建和运行 Docker 映像。
- 在工作站上安装 curl,一个流行的命令行 HTTP 客户端。
- 在工作站上安装最新版本的 Go 编程语言(版本 1.18 或更高版本)。
四、准备 MongoDB 更改流应用程序
4.1、初始化项目
创建一个目录并切换到该目录:
mkdir mongo-change-streams
cd mongo-change-streams
创建一个新的 Go 模块:
go mod init mongo-change-streams
这将创建一个新文件
go.mod
创建一个新文件:main.go
touch main.go
4.2、导入库
要导入所需的 Go 模块,请将以下内容添加到文件中:main.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
除了 Go 标准库包之外,您还将从 MongoDB Go 驱动程序导入以下包:
go.mongodb.org/mongo-driver/mongo
– 提供核心的MongoDB功能。go.mongodb.org/mongo-driver/bson
– 包 bson 是一个用于读取、写入和操作 BSON 的库。go.mongodb.org/mongo-driver/mongo/options
– 包选项定义了MongoDB Go驱动程序的可选配置。
4.3、添加函数init
将以下代码添加到文件中:main.go
var mongoConnectString string
var mongoDatabase string
var mongoCollection string
const msgFormat = "export RESUME_TOKEN=%s"
func init() {
mongoConnectString = os.Getenv("MONGODB_URI")
if mongoConnectString == "" {
log.Fatal("missing environment variable", "MONGODB_URI")
}
mongoDatabase = os.Getenv("MONGODB_DATABASE")
if mongoDatabase == "" {
log.Fatal("missing environment variable", "MONGODB_DATABASE")
}
mongoCollection = os.Getenv("MONGODB_COLLECTION")
if mongoCollection == "" {
log.Fatal("missing environment variable", "MONGODB_COLLECTION")
}
}
该函数分别从 、 和环境变量中检索 MongoDB 连接字符串、集合名称和数据库名称。init
MONGODB_URI
MONGODB_COLLECTION
MONGODB_DATABASE
4.4、添加函数main
将函数添加到文件:main
main.go
func main() {
client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))
if err != nil {
log.Fatal("failed to create mongo client", err)
}
fmt.Println("created client object")
ctx, cancel := context.WithCancel(context.Background())
err = client.Connect(ctx)
if err != nil {
log.Fatal("failed to connect to mongo", err)
}
fmt.Println("connected to mongodb")
coll := client.Database(mongoDatabase).Collection(mongoCollection)
defer func() {
err = client.Disconnect(context.Background())
if err != nil {
fmt.Println("failed to close mongo connection")
}
}()
match := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace" }}}}}}}
project := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}
pipeline := mongo.Pipeline{match, project}
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
tokenFromEnv := os.Getenv("RESUME_TOKEN")
if tokenFromEnv != "" {
fmt.Println("resume token in enviroment variable", tokenFromEnv)
t := bson.M{"_data": tokenFromEnv}
opts.SetResumeAfter(t)
fmt.Println("set resume token to watch client")
}
cs, err := coll.Watch(ctx, pipeline, opts)
if err != nil {
log.Fatal("failed to start change stream watch: ", err)
}
fmt.Println("watch established")
defer func() {
fmt.Println("resume token ", cs.ResumeToken().String())
fmt.Println("use resume token in the next run with following command -", fmt.Sprintf(msgFormat, cs.ResumeToken().Lookup("_data").StringValue()))
close, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := cs.Close(close)
if err != nil {
fmt.Println("failed to close change stream")
}
fmt.Println("closed change stream")
}()
go func() {
fmt.Println("started change stream...")
for cs.Next(ctx) {
re := cs.Current.Index(1)
fmt.Println("change stream event" + re.Value().String())
}
}()
exit := make(chan os.Signal, 1)
signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("waiting for program exit signal")
<-exit
fmt.Println("program exit initiated")
cancel()
}
- 用于创建新对象。
mongo.NewClient
*mongo.Client
- 使用Connect启动与MongoDB的连接。
- 调用数据库以获取 ,然后调用集合以获取 的句柄。
mongo.Database
mongo.Collection
- 延迟函数用于在程序结束时断开和关闭对象。
*mongo.Client
- A 是使用匹配和项目阶段创建的。
mongo.Pipeline
- 如果使用环境变量提供恢复令牌,则用于配置更改流监视客户端。
RESUME_TOKEN
- 获取使用手表。作为程序退出过程的一部分,另一个 defer 函数用于提供恢复令牌信息并关闭更改流。
mongo.ChangeStream
- 更改流侦听器作为 goroutine 启动。它使用 Next 侦听更改流事件,用于获取文档,并将事件记录到控制台。
Current.Index
- 最后,设置一个 Go 通道,以便在程序中断时收到通知并干净地退出。
五、准备 MongoDB REST API 应用程序
创建一个新文件:api.go
touch api.go
5.1、导入库
要导入所需的 Go 模块,请将以下内容添加到文件中:api.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
除了 Go 标准库包之外,我们还从 MongoDB Go 驱动程序导入以下包:
go.mongodb.org/mongo-driver/mongo
– 提供核心的MongoDB功能。go.mongodb.org/mongo-driver/mongo/options
– 包选项定义了MongoDB Go驱动程序的可选配置。
5.2、添加函数init
将以下代码添加到文件中:api.go
var coll *mongo.Collection
var mongoConnectString string
var mongoDatabase string
var mongoCollection string
func init() {
mongoConnectString = os.Getenv("MONGODB_URI")
if mongoConnectString == "" {
log.Fatal("missing environment variable", "MONGODB_URI")
}
mongoDatabase = os.Getenv("MONGODB_DATABASE")
if mongoDatabase == "" {
log.Fatal("missing environment variable", "MONGODB_DATABASE")
}
mongoCollection = os.Getenv("MONGODB_COLLECTION")
if mongoCollection == "" {
log.Fatal("missing environment variable", "MONGODB_COLLECTION")
}
client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))
if err != nil {
log.Fatal("failed to create mongo client", err)
}
fmt.Println("created mongo client object")
err = client.Connect(context.Background())
if err != nil {
log.Fatal("failed to connect to mongo", err)
}
fmt.Println("connected to mongo")
coll = client.Database(mongoDatabase).Collection(mongoCollection)
}
该函数分别从 、 和环境变量中检索 MongoDB 连接字符串、集合名称和数据库名称。它调用数据库方法来获取 ,然后调用集合以获取 .init
MONGODB_URI
MONGODB_COLLECTION
MONGODB_DATABASE
mongo.Database
mongo.Collection
5.3、添加函数main
将函数添加到文件:main
api.go
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
res, err := coll.InsertOne(context.Background(), map[string]string{"user": "user-" + strconv.Itoa(int(time.Now().Unix()))})
if err != nil {
log.Fatal("mongo insert failed", err)
}
fmt.Println("created record", res.InsertedID)
})
fmt.Println("started http server...")
log.Fatal(http.ListenAndServe(":8080", nil))
}
main 函数注册一个 HTTP 处理程序,以便在调用时将记录插入到 MongoDB 集合中。HTTP 服务器在端口 8080 上启动。
六、准备 Docker 映像
6.1、添加 Docker 文件
创建一个名为 的文件:Dockerfile.change_stream_app
touch Dockerfile.change_stream_app
在 中输入以下内容:Dockerfile.change_stream_app
FROM golang:1.18-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY main.go ./
RUN go build -o /mongodb-app
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /mongodb-app /mongodb-app
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/mongodb-app"]
- 这是一个多阶段,用作第一阶段的基础映像。
Dockerfile
golang:1.18-buster
- 复制应用程序文件,运行 ,然后生成应用程序二进制文件。
go mod download
- 在第二阶段,用作基础映像。
gcr.io/distroless/base-debian10
- 从第一阶段复制二进制文件,并将其配置为运行应用程序。
ENTRYPOINT
创建一个名为 的文件:Dockerfile.rest_api
touch Dockerfile.rest_api
在 中输入以下内容:Dockerfile.rest_api
FROM golang:1.18-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY api.go ./
RUN go build -o /mongodb-api
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /mongodb-api /mongodb-api
EXPOSE 8080
USER nonroot:nonroot
ENTRYPOINT ["/mongodb-api"]
- 这是一个多阶段,用作第一阶段的基础映像。
Dockerfile
golang:1.18-buster
- 复制应用程序文件,运行 ,然后生成应用程序二进制文件。
go mod download
- 在第二阶段,用作基础映像。
gcr.io/distroless/base-debian10
- 从第一阶段复制二进制文件,并将其配置为运行应用程序。
ENTRYPOINT
6.2、构建 Docker 镜像
拉动模块:
go mod tidy
为更改流应用程序构建 Docker 映像:
docker build -t mongo-change-streams -f Dockerfile.change_stream_app .
为 REST API 应用程序构建 Docker 镜像:
docker build -t mongo-rest-api -f Dockerfile.rest_api .
七、在 Docker 中启动 MongoDB 集群
创建一个 Docker 网络:
docker network create mongodb-cluster
启动第一个节点 mongo1:
docker run -d --rm -p 27017:27017 --name mongo1 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo1
启动第二个节点 mongo2:
docker run -d --rm -p 27018:27017 --name mongo2 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo2
启动第三个节点 mongo3:
docker run -d --rm -p 27019:27017 --name mongo3 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo3
配置副本集:
docker exec -it mongo1 mongosh --eval "rs.initiate({
_id: \"myReplicaSet\",
members: [
{_id: 0, host: \"mongo1\"},
{_id: 1, host: \"mongo2\"},
{_id: 2, host: \"mongo3\"}
]
})"
您应该看到以下输出:
{ ok: 1 }
八、启动两个应用程序
在终端中,启动更改流应用程序:
export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet
docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams
您将看到类似于以下内容的输出:
created client object
connected to mongodb
watch established
started change stream...
waiting for program exit signal
在另一个终端中,启动 REST API 应用程序:
export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet
docker run --network mongodb-cluster -p 8080:8080 -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection mongo-rest-api
您将看到类似于以下内容的输出:
created mongo client object
connected to mongo
started http server...
九、测试应用程序
在MongoDB中创建一些记录。为此,调用 REST API 应用程序(从另一个终端)公开的 HTTP 端点:
curl -i localhost:8080
重复上述过程两到三次。在所有情况下,您都应该得到HTTP / 1.1 200 OK响应。
导航到终端并检查 REST API 应用程序日志。您应该会看到已创建的记录。请注意,在您的情况下,对象 ID 可能有所不同:
created record ObjectID("639c092160078afff212209b")
created record ObjectID("639c097660078afff212209c")
created record ObjectID("639c097760078afff212209d")
导航到终端并检查更改流应用程序日志。您应该看到与上述相同的记录(由 REST API 应用程序创建)。这些是由更改流侦听器进程自动检测到的。请注意,在您的情况下,对象 ID 可能有所不同:
change stream event{"_id": {"$oid":"639c092160078afff212209b"},"user": "user-1671170337"}
change stream event{"_id": {"$oid":"639c097660078afff212209c"},"user": "user-1671170422"}
change stream event{"_id": {"$oid":"639c097760078afff212209d"},"user": "user-1671170423"}
9.1、使用更改流恢复令牌
首先,关闭更改流应用程序 – 在运行该应用程序的相应终端上按 +。CTRLC
您应该会看到与此类似的日志。请注意,令牌可能因您的情况而异。
resume token {"_data": "82639C09A4000000012B0229296E04"}
use this token in the next run with following command - export RESUME_TOKEN=82639C09A4000000012B0229296E04
日志消息突出显示了如果要利用 Resume 令牌时应使用的命令。记下该命令。
通过调用 REST API 应用程序公开的 HTTP 端点向 MongoDB 添加一些记录:
curl -i localhost:8080
重复几次。
导航到终端并检查 REST API 应用程序日志。您应该会看到已创建的记录。请注意,在您的情况下,对象 ID 可能有所不同:
created record ObjectID("639c09ed60078afff212209e")
created record ObjectID("639c09ee60078afff212209f")
重新启动更改流应用程序。这一次,通过将 Resume 令牌作为环境变量传递来使用它。您可以使用上面的日志输出中的命令:
export RESUME_TOKEN=82639C09A4000000012B0229296E04
export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet
docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams
您应该会看到与此类似的日志。请注意,令牌和对象 ID 在您的情况下可能会有所不同。
token passed in as enviroment variable 82639C09A4000000012B0229296E04
set token to watch client option
watch established
started change stream...
change stream event{"_id": {"$oid":"639c09ed60078afff212209e"},"user": "user-1671170541"}
change stream event{"_id": {"$oid":"639c09ee60078afff212209f"},"user": "user-1671170542"}
验证您收到的记录是否与关闭更改流应用程序时添加的记录相同。
您可以通过调用 REST API 应用程序公开的 HTTP 端点来继续向 MongoDB 添加记录:
curl -i localhost:8080
正如预期的那样,更改流应用程序将实时检测和记录这些:
您应该看到类似于以下内容的日志:
change stream event{"_id": {"$oid":"639c0a5c60078afff21220a0"},"user": "user-1671170652"}
change stream event{"_id": {"$oid":"639c0a5d60078afff21220a1"},"user": "user-1671170653"}
十、收尾
最后,要停止这两个应用程序,请在各自的终端中按 +。CTRLC
同时删除 MongoDB 集群实例和 Docker 网络:
docker rm -f mongo1 mongo2 mongo3
docker network rm mongodb-cluster
十一、结论
在本文中,您将了解 MongoDB 和更改流的概述。您在 Docker 中设置了一个 MongoDB 副本集,部署了客户端应用程序,并通过端到端示例探索了更改流和恢复令牌。