如何将 MongoDB 更改流与 Go 驱动程序一起使用

一、介绍

MongoDB 是一个 NoSQL数据库,将数据存储在由字段值对组成的文档中。这些是使用 BSON(JSON 的二进制表示形式)存储的。MongoDB 文档类似于关系数据库表中的行,文档字段(键值对)类似于列。MongoDB 文档是集合的一部分,一个或多个集合数据库的一部分。MongoDB API支持常规 CRUD 操作(创建,读取,更新,删除)以及聚合,地理空间查询和文本搜索功能。MongoDB 还提供高可用性和复制。您可以在多个MongoDB 服务器(称为副本集)之间分发数据,以提供冗余和分片。

本文将通过一个完整的示例引导您了解如何在 Docker 中设置 MongoDB 副本集、部署客户端应用程序以及探索更改流。它将涵盖以下内容:

  • MongoDB 和更改流的概述。
  • 如何使用 MongoDB Go 驱动程序。
  • 应用程序设置和部署
  • 测试应用程序并了解如何使用恢复令牌。

MongoDB 为许多客户端驱动程序提供官方支持,包括 Java,Python,Node.js,PHP,Ruby,Swift,C,C++,C# 和 Go。在本文中,您将使用 MongoDB 的 Go 驱动程序来构建更改流处理器和 REST API 应用程序。

1.1、MongoDB 更改流

使用更改流,您可以实时访问MongoDB数据更新。应用程序不需要担心处理 oplog 和更改流的低级操作细节。可以订阅集合、数据库或整个部署上的所有数据更改。由于更改流使用聚合框架,因此应用程序还可以筛选特定更改或转换通知。可以将更改流用于副本集(这是本文将使用的内容)和分片设置。

更改流仅在更新操作期间返回字段的差异(这是默认行为)。但您可以将其配置为返回更新文档的最新提交版本。还可以提供一个或多个管道阶段的数组来控制更改流输出。例如 -、、 等。$addFields$match$project

1.2、恢复令牌

恢复令牌允许应用程序保存进度并防止潜在的数据丢失。如果更改流应用程序崩溃,它将无法在此期间检测到数据库更改。恢复令牌可用于从应用程序停止的位置(崩溃之前)继续处理并接收所有过去的事件。更改流可以帮助构建分离且可缩放的体系结构,并使实现提取-转换-加载 (ETL)、通知、同步服务等变得更加容易。

二、应用概述

本文演示了以下应用程序:

  1. 更改流侦听器:它使用监视 API 订阅 MongoDB 集合的更改事件流。创建或更新文档后,此应用程序将立即收到实时通知。它还利用恢复令牌在重新启动或崩溃后从特定时间点继续处理。
  2. REST API:它公开了一个HTTP端点,以便在MongoDB中创建文档。

三、准备工作

  1. 使用本地 Linux 工作站,或将 Vultr 云服务器部署为工作站。
  2. 确保 Docker 已安装在工作站上。您将需要它来构建和运行 Docker 映像。
  3. 在工作站上安装 curl,一个流行的命令行 HTTP 客户端。
  4. 在工作站上安装最新版本的 Go 编程语言(版本 1.18 或更高版本)。

四、准备 MongoDB 更改流应用程序

4.1、初始化项目

创建一个目录并切换到该目录:

mkdir mongo-change-streams

cd mongo-change-streams

创建一个新的 Go 模块:

go mod init mongo-change-streams

这将创建一个新文件go.mod

创建一个新文件:main.go

touch main.go

4.2、导入库

要导入所需的 Go 模块,请将以下内容添加到文件中:main.go

package main



import (

  "context"

  "fmt"

  "log"

  "os"

  "os/signal"

  "syscall"

  "time"



  "go.mongodb.org/mongo-driver/bson"

  "go.mongodb.org/mongo-driver/mongo"

  "go.mongodb.org/mongo-driver/mongo/options"

)

除了 Go 标准库包之外,您还将从 MongoDB Go 驱动程序导入以下包:

  • go.mongodb.org/mongo-driver/mongo– 提供核心的MongoDB功能。
  • go.mongodb.org/mongo-driver/bson– 包 bson 是一个用于读取、写入和操作 BSON 的库。
  • go.mongodb.org/mongo-driver/mongo/options– 包选项定义了MongoDB Go驱动程序的可选配置。

4.3、添加函数init

将以下代码添加到文件中:main.go

var mongoConnectString string

var mongoDatabase string

var mongoCollection string

const msgFormat = "export RESUME_TOKEN=%s"



func init() {

  mongoConnectString = os.Getenv("MONGODB_URI")

  if mongoConnectString == "" {

    log.Fatal("missing environment variable", "MONGODB_URI")

  }



  mongoDatabase = os.Getenv("MONGODB_DATABASE")

  if mongoDatabase == "" {

    log.Fatal("missing environment variable", "MONGODB_DATABASE")

  }



  mongoCollection = os.Getenv("MONGODB_COLLECTION")

  if mongoCollection == "" {

    log.Fatal("missing environment variable", "MONGODB_COLLECTION")

  }

}

该函数分别从 、 和环境变量中检索 MongoDB 连接字符串、集合名称和数据库名称。initMONGODB_URIMONGODB_COLLECTIONMONGODB_DATABASE

4.4、添加函数main

将函数添加到文件:mainmain.go

func main() {

  client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))

  if err != nil {

    log.Fatal("failed to create mongo client", err)

  }



  fmt.Println("created client object")



  ctx, cancel := context.WithCancel(context.Background())



  err = client.Connect(ctx)

  if err != nil {

    log.Fatal("failed to connect to mongo", err)

  }



  fmt.Println("connected to mongodb")



  coll := client.Database(mongoDatabase).Collection(mongoCollection)



  defer func() {

    err = client.Disconnect(context.Background())

    if err != nil {

      fmt.Println("failed to close mongo connection")

    }

  }()



  match := bson.D{{"$match", bson.D{{"operationType", bson.D{{"$in", bson.A{"insert", "update", "replace" }}}}}}}

  project := bson.D{{"$project", bson.M{"_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1}}}

  pipeline := mongo.Pipeline{match, project}



  opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)



  tokenFromEnv := os.Getenv("RESUME_TOKEN")



  if tokenFromEnv != "" {

    fmt.Println("resume token in enviroment variable", tokenFromEnv)



    t := bson.M{"_data": tokenFromEnv}

    opts.SetResumeAfter(t)



    fmt.Println("set resume token to watch client")

  }



  cs, err := coll.Watch(ctx, pipeline, opts)

  if err != nil {

    log.Fatal("failed to start change stream watch: ", err)

  }



  fmt.Println("watch established")



  defer func() {

    fmt.Println("resume token ", cs.ResumeToken().String())

    fmt.Println("use resume token in the next run with following command -", fmt.Sprintf(msgFormat, cs.ResumeToken().Lookup("_data").StringValue()))



    close, cancel := context.WithTimeout(context.Background(), 5*time.Second)

    defer cancel()



    err := cs.Close(close)

    if err != nil {

      fmt.Println("failed to close change stream")

    }



    fmt.Println("closed change stream")

  }()



  go func() {

    fmt.Println("started change stream...")



    for cs.Next(ctx) {

      re := cs.Current.Index(1)

      fmt.Println("change stream event" + re.Value().String())

    }

  }()



  exit := make(chan os.Signal, 1)

  signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)



  fmt.Println("waiting for program exit signal")



  <-exit

  fmt.Println("program exit initiated")

  cancel()

}
  • 用于创建新对象。mongo.NewClient*mongo.Client
  • 使用Connect启动与MongoDB的连接。
  • 调用数据库以获取 ,然后调用集合以获取 的句柄。mongo.Databasemongo.Collection
  • 延迟函数用于在程序结束时断开和关闭对象。*mongo.Client
  • A 是使用匹配项目阶段创建的。mongo.Pipeline
  • 如果使用环境变量提供恢复令牌,则用于配置更改流监视客户端。RESUME_TOKEN
  • 获取使用手表。作为程序退出过程的一部分,另一个 defer 函数用于提供恢复令牌信息并关闭更改流。mongo.ChangeStream
  • 更改流侦听器作为 goroutine 启动。它使用 Next 侦听更改流事件,用于获取文档,并将事件记录到控制台。Current.Index
  • 最后,设置一个 Go 通道,以便在程序中断时收到通知并干净地退出。

五、准备 MongoDB REST API 应用程序

创建一个新文件:api.go

touch api.go

5.1、导入库

要导入所需的 Go 模块,请将以下内容添加到文件中:api.go

package main



import (

  "context"

  "fmt"

  "log"

  "net/http"

  "os"

  "strconv"

  "time"



  "go.mongodb.org/mongo-driver/mongo"

  "go.mongodb.org/mongo-driver/mongo/options"

)

除了 Go 标准库包之外,我们还从 MongoDB Go 驱动程序导入以下包:

  • go.mongodb.org/mongo-driver/mongo– 提供核心的MongoDB功能。
  • go.mongodb.org/mongo-driver/mongo/options– 包选项定义了MongoDB Go驱动程序的可选配置。

5.2、添加函数init

将以下代码添加到文件中:api.go

var coll *mongo.Collection

var mongoConnectString string

var mongoDatabase string

var mongoCollection string



func init() {

  mongoConnectString = os.Getenv("MONGODB_URI")

  if mongoConnectString == "" {

    log.Fatal("missing environment variable", "MONGODB_URI")

  }



  mongoDatabase = os.Getenv("MONGODB_DATABASE")

  if mongoDatabase == "" {

    log.Fatal("missing environment variable", "MONGODB_DATABASE")

  }



  mongoCollection = os.Getenv("MONGODB_COLLECTION")

  if mongoCollection == "" {

    log.Fatal("missing environment variable", "MONGODB_COLLECTION")

  }



  client, err := mongo.NewClient(options.Client().ApplyURI(mongoConnectString))

  if err != nil {

    log.Fatal("failed to create mongo client", err)

  }



  fmt.Println("created mongo client object")



  err = client.Connect(context.Background())

  if err != nil {

    log.Fatal("failed to connect to mongo", err)

  }



  fmt.Println("connected to mongo")



  coll = client.Database(mongoDatabase).Collection(mongoCollection)

}

该函数分别从 、 和环境变量中检索 MongoDB 连接字符串、集合名称和数据库名称。它调用数据库方法来获取 ,然后调用集合以获取 .initMONGODB_URIMONGODB_COLLECTIONMONGODB_DATABASEmongo.Databasemongo.Collection

5.3、添加函数main

将函数添加到文件:mainapi.go

func main() {

  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {



    res, err := coll.InsertOne(context.Background(), map[string]string{"user": "user-" + strconv.Itoa(int(time.Now().Unix()))})

    if err != nil {

      log.Fatal("mongo insert failed", err)

    }



    fmt.Println("created record", res.InsertedID)

  })



  fmt.Println("started http server...")

  log.Fatal(http.ListenAndServe(":8080", nil))

}

main 函数注册一个 HTTP 处理程序,以便在调用时将记录插入到 MongoDB 集合中。HTTP 服务器在端口 8080 上启动。

六、准备 Docker 映像

6.1、添加 Docker 文件

创建一个名为 的文件:Dockerfile.change_stream_app

touch Dockerfile.change_stream_app

在 中输入以下内容:Dockerfile.change_stream_app

FROM golang:1.18-buster AS build



WORKDIR /app

COPY go.mod ./

COPY go.sum ./



RUN go mod download



COPY main.go ./

RUN go build -o /mongodb-app



FROM gcr.io/distroless/base-debian10

WORKDIR /

COPY --from=build /mongodb-app /mongodb-app

EXPOSE 8080

USER nonroot:nonroot

ENTRYPOINT ["/mongodb-app"]
  • 这是一个多阶段,用作第一阶段的基础映像。Dockerfilegolang:1.18-buster
  • 复制应用程序文件,运行 ,然后生成应用程序二进制文件。go mod download
  • 在第二阶段,用作基础映像。gcr.io/distroless/base-debian10
  • 从第一阶段复制二进制文件,并将其配置为运行应用程序。ENTRYPOINT

创建一个名为 的文件:Dockerfile.rest_api

touch Dockerfile.rest_api

在 中输入以下内容:Dockerfile.rest_api

FROM golang:1.18-buster AS build



WORKDIR /app

COPY go.mod ./

COPY go.sum ./



RUN go mod download



COPY api.go ./

RUN go build -o /mongodb-api



FROM gcr.io/distroless/base-debian10

WORKDIR /

COPY --from=build /mongodb-api /mongodb-api

EXPOSE 8080

USER nonroot:nonroot

ENTRYPOINT ["/mongodb-api"]
  • 这是一个多阶段,用作第一阶段的基础映像。Dockerfilegolang:1.18-buster
  • 复制应用程序文件,运行 ,然后生成应用程序二进制文件。go mod download
  • 在第二阶段,用作基础映像。gcr.io/distroless/base-debian10
  • 从第一阶段复制二进制文件,并将其配置为运行应用程序。ENTRYPOINT

6.2、构建 Docker 镜像

拉动模块:

go mod tidy

为更改流应用程序构建 Docker 映像:

docker build -t mongo-change-streams -f Dockerfile.change_stream_app .

为 REST API 应用程序构建 Docker 镜像:

docker build -t mongo-rest-api -f Dockerfile.rest_api .

 七、在 Docker 中启动 MongoDB 集群

创建一个 Docker 网络:

docker network create mongodb-cluster

启动第一个节点 mongo1

docker run -d --rm -p 27017:27017 --name mongo1 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo1

启动第二个节点 mongo2

docker run -d --rm -p 27018:27017 --name mongo2 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo2

启动第三个节点 mongo3

docker run -d --rm -p 27019:27017 --name mongo3 --network mongodb-cluster mongo mongod --replSet myReplicaSet --bind_ip localhost,mongo3

配置副本集:

docker exec -it mongo1 mongosh --eval "rs.initiate({

_id: \"myReplicaSet\",

members: [

  {_id: 0, host: \"mongo1\"},

  {_id: 1, host: \"mongo2\"},

  {_id: 2, host: \"mongo3\"}

]

})"

您应该看到以下输出:

{ ok: 1 }

 八、启动两个应用程序

在终端中,启动更改流应用程序:

export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet



docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams

您将看到类似于以下内容的输出:

created client object

connected to mongodb

watch established

started change stream...

waiting for program exit signal

在另一个终端中,启动 REST API 应用程序:

export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet



docker run --network mongodb-cluster -p 8080:8080 -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection mongo-rest-api

您将看到类似于以下内容的输出:

created mongo client object

connected to mongo

started http server...

 九、测试应用程序

在MongoDB中创建一些记录。为此,调用 REST API 应用程序(从另一个终端)公开的 HTTP 端点:

curl -i localhost:8080

重复上述过程两到三次。在所有情况下,您都应该得到HTTP / 1.1 200 OK响应。

导航到终端并检查 REST API 应用程序日志。您应该会看到已创建的记录。请注意,在您的情况下,对象 ID 可能有所不同:

created record ObjectID("639c092160078afff212209b")

created record ObjectID("639c097660078afff212209c")

created record ObjectID("639c097760078afff212209d")

导航到终端并检查更改流应用程序日志。您应该看到与上述相同的记录(由 REST API 应用程序创建)。这些是由更改流侦听器进程自动检测到的。请注意,在您的情况下,对象 ID 可能有所不同:

change stream event{"_id": {"$oid":"639c092160078afff212209b"},"user": "user-1671170337"}

change stream event{"_id": {"$oid":"639c097660078afff212209c"},"user": "user-1671170422"}

change stream event{"_id": {"$oid":"639c097760078afff212209d"},"user": "user-1671170423"}

9.1、使用更改流恢复令牌

首先,关闭更改流应用程序 – 在运行该应用程序的相应终端上按 +。CTRLC

您应该会看到与此类似的日志。请注意,令牌可能因您的情况而异。

resume token  {"_data": "82639C09A4000000012B0229296E04"}

use this token in the next run with following command - export RESUME_TOKEN=82639C09A4000000012B0229296E04

日志消息突出显示了如果要利用 Resume 令牌时应使用的命令。记下该命令。

通过调用 REST API 应用程序公开的 HTTP 端点向 MongoDB 添加一些记录:

curl -i localhost:8080

重复几次。

导航到终端并检查 REST API 应用程序日志。您应该会看到已创建的记录。请注意,在您的情况下,对象 ID 可能有所不同:

created record ObjectID("639c09ed60078afff212209e")

created record ObjectID("639c09ee60078afff212209f")

重新启动更改流应用程序。这一次,通过将 Resume 令牌作为环境变量传递来使用它。您可以使用上面的日志输出中的命令:

export RESUME_TOKEN=82639C09A4000000012B0229296E04



export MONGODB_URI=mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=myReplicaSet



docker run --network mongodb-cluster -e MONGODB_URI=$MONGODB_URI -e MONGODB_DATABASE=test_db -e MONGODB_COLLECTION=test_collection -e RESUME_TOKEN=$RESUME_TOKEN mongo-change-streams

您应该会看到与此类似的日志。请注意,令牌和对象 ID 在您的情况下可能会有所不同。

token passed in as enviroment variable 82639C09A4000000012B0229296E04

set token to watch client option

watch established

started change stream...

change stream event{"_id": {"$oid":"639c09ed60078afff212209e"},"user": "user-1671170541"}

change stream event{"_id": {"$oid":"639c09ee60078afff212209f"},"user": "user-1671170542"}

验证您收到的记录是否与关闭更改流应用程序时添加的记录相同。

您可以通过调用 REST API 应用程序公开的 HTTP 端点来继续向 MongoDB 添加记录:

curl -i localhost:8080

正如预期的那样,更改流应用程序将实时检测和记录这些:

您应该看到类似于以下内容的日志:

change stream event{"_id": {"$oid":"639c0a5c60078afff21220a0"},"user": "user-1671170652"}

change stream event{"_id": {"$oid":"639c0a5d60078afff21220a1"},"user": "user-1671170653"}

 十、收尾

最后,要停止这两个应用程序,请在各自的终端中按 +。CTRLC

同时删除 MongoDB 集群实例和 Docker 网络:

docker rm -f mongo1 mongo2 mongo3

docker network rm mongodb-cluster

 十一、结论

在本文中,您将了解 MongoDB 和更改流的概述。您在 Docker 中设置了一个 MongoDB 副本集,部署了客户端应用程序,并通过端到端示例探索了更改流和恢复令牌。

赞(0)
未经允许不得转载:主机百科 » 如何将 MongoDB 更改流与 Go 驱动程序一起使用