
go操作mongodb
判断bson.M 是否存在某字段
在 Go 中,bson.M 是 MongoDB 官方驱动(如 go.mongodb.org/mongo-driver)中用于表示 BSON 文档的类型,其底层是一个 map[string]interface{}:
go
type M map[string]interface{}如果要判断嵌套字段(如 "address.city"),需要逐层判断:
go
doc := bson.M{
"address": bson.M{
"city": "Beijing",
},
}
if addr, ok := doc["address"]; ok {
if addrMap, ok := addr.(bson.M); ok {
if city, ok := addrMap["city"]; ok {
fmt.Println("city 存在,值为:", city)
}
}
}封装一个安全的嵌套字段判断函数(可选)
go
func hasNestedField(doc bson.M, path string) bool {
keys := strings.Split(path, ".")
current := doc
for _, key := range keys {
val, exists := current[key]
if !exists {
return false
}
if m, ok := val.(bson.M); ok {
current = m
} else if len(keys) > 1 { // 还有下一级,但当前不是 map
return false
}
}
return true
}使用:
go
if hasNestedField(doc, "address.city") {
fmt.Println("address.city 存在")
}用map接收
go
coll := bytedoc.GetCleanDataCollectionByName(namespace)
// ✅ 正确声明并初始化一个 map 来接收数据
var data map[string]interface{}
// 执行查询并解码到 map
err := coll.FindOne(ctx, bson.M{"id": id}).Decode(&data)
if err != nil {
if err == mongo.ErrNoDocuments {
fmt.Println("未找到文档")
} else {
log.Fatal("查询出错:", err)
}
return
}
// 使用数据
fmt.Printf("查询结果: %+v\n", data)
// 示例:访问字段
if name, ok := data["name"]; ok {
fmt.Println("姓名:", name)
}
if age, ok := data["age"].(float64); ok {
fmt.Println("年龄:", int(age))
}map[string]interface{}是 Go 中接收任意结构 JSON/BSON 数据的通用方式。- MongoDB 驱动会自动将 BSON 文档解码为
map[string]interface{}。 - 所有字段名默认是 小写(除非你在文档中明确使用了其他大小写)。
go
package main
import (
"fmt"
"go.mongodb.org/mongo-driver/bson"
)
func main() {
doc := bson.M{
"name": "Alice",
"age": 30,
// "email" 字段不存在
}
// 判断字段是否存在
if value, exists := doc["email"]; exists {
fmt.Println("email 存在,值为:", value)
} else {
fmt.Println("email 不存在")
}
// 判断 name 是否存在
if _, exists := doc["name"]; exists {
fmt.Println("name 存在")
} else {
fmt.Println("name 不存在")
}
}插入对象
直接插入结构体
go
user := User{
Name: "Alice",
Email: "alice@example.com",
Age: 28,
Active: true,
LastSeen: time.Now(),
}
result, err := collection.InsertOne(ctx, user)任意对象转成bson.M对象
它的目的是:将任意 Go 数据(如 struct、map 等)转换为 bson.M 类型,以便进行灵活的操作(比如修改字段、增删 key)。
go
type User struct {
Name string `json:"name" bson:"name"`
Age int `json:"age" bson:"age"`
}
data := User{Name: "Alice", Age: 25}
// ❌ 不能直接转
// var docMap bson.M = data // 编译错误!
// ✅ 正确方式:先序列化成 BSON 二进制,再反序列化成 bson.M
bsonData, _ := bson.Marshal(data) // struct → BSON bytes
var docMap bson.M
bson.Unmarshal(bsonData, &docMap) // BSON bytes → bson.M
// 结果:docMap = {"name": "Alice", "age": 25}这种写法常见于:
- 从外部接收 JSON/struct 数据,想存入 MongoDB
- 需要动态修改字段(比如加个时间戳、改个字段名)
- 做数据清洗、中转、日志记录等
go
// 比如你想加个字段
docMap["created_at"] = time.Now()
docMap["source"] = "api_v2"
// 然后插入数据库
collection.InsertOne(ctx, docMap)json转bson.M
如果你控制输入源,可以直接解析到 bson.M:
go
var docMap bson.M
json.Unmarshal(jsonBytes, &docMap) // JSON → bson.M插入bson
go
doc := bson.M{
"id": "123",
"content": "hello",
"tags": []string{"a", "b"},
}
collection.InsertOne(ctx, doc)获取链接
go
package main
import (
"context"
"fmt"
"log"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// User 结构体,对应 MongoDB 中的文档
type User struct {
ID primitive.ObjectID `bson:"_id,omitempty"`
Name string `bson:"name"`
Email string `bson:"email"`
Age int `bson:"age,omitempty"`
Active bool `bson:"active,omitempty"`
LastSeen time.Time `bson:"last_seen,omitempty"`
}
func connect() (*mongo.Client, context.Context, context.CancelFunc) {
// 设置上下文,超时 10 秒
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// MongoDB 连接 URI(根据你的配置修改)
uri := "mongodb://localhost:27017" // 本地
// uri := "mongodb://user:pass@host:port/db" // 远程或带认证
// 创建 MongoDB 客户端
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
log.Fatal("Failed to connect to MongoDB:", err)
}
// 测试连接
err = client.Ping(ctx, nil)
if err != nil {
log.Fatal("Failed to ping MongoDB:", err)
}
fmt.Println("Connected to MongoDB!")
return client, ctx, cancel
}
func main() {
client, ctx, cancel := connect()
defer cancel()
defer func() {
if err := client.Disconnect(ctx); err != nil {
log.Fatal("Failed to disconnect:", err)
}
}()
}查询
根据id查询
go
func findUserByID(client *mongo.Client, ctx context.Context, idStr string) {
collection := client.Database("testdb").Collection("users")
// 将字符串 ID 转换为 ObjectID
objectID, err := primitive.ObjectIDFromHex(idStr)
if err != nil {
log.Fatal("Invalid ID format:", err)
}
var user User
// 查询
err = collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&user)
if err != nil {
if err == mongo.ErrNoDocuments {
fmt.Printf("❌ 未找到 ID 为 %s 的用户\n", idStr)
} else {
log.Fatal("查询出错:", err)
}
return
}
fmt.Printf("✅ 查询到用户: %+v\n", user)
}查询所有数据
通过游标查询
优点:内存友好,适合处理大量数据(不会一次性加载到内存)。
go
func findUsersAsListManual(client *mongo.Client, ctx context.Context) []User {
collection := client.Database("testdb").Collection("users")
cursor, err := collection.Find(ctx, bson.M{})
if err != nil {
log.Fatal("Find failed:", err)
}
defer cursor.Close(ctx)
var users []User
for cursor.Next(ctx) {
var user User
if err := cursor.Decode(&user); err != nil {
log.Fatal("Decode error:", err)
}
users = append(users, user)
}
if err := cursor.Err(); err != nil {
log.Fatal("Cursor error:", err)
}
return users
}cursor.All(ctx, &users) 会自动遍历游标,把所有文档解码到 users 切片中。
go
func findUsersAsList(client *mongo.Client, ctx context.Context) []User {
collection := client.Database("testdb").Collection("users")
cursor, err := collection.Find(ctx, bson.M{})
if err != nil {
log.Fatal("Find failed:", err)
}
defer cursor.Close(ctx)
var users []User
if err = cursor.All(ctx, &users); err != nil {
log.Fatal("Decode all error:", err)
}
return users
}简单查询
go
func findUserByName(client *mongo.Client, ctx context.Context, name string) {
collection := client.Database("testdb").Collection("users")
var user User
err := collection.FindOne(ctx, bson.M{"name": name}).Decode(&user)
if err != nil {
if err == mongo.ErrNoDocuments {
fmt.Printf("User '%s' not found\n", name)
} else {
log.Fatal("Query error:", err)
}
return
}
fmt.Printf("Found user: %+v\n", user)
}更新
指定字段更新
go
func updateUser(client *mongo.Client, ctx context.Context, name string) {
collection := client.Database("testdb").Collection("users")
update := bson.M{
"$set": bson.M{
"age": 29,
"active": false,
"last_seen": time.Now(),
},
}
result, err := collection.UpdateOne(ctx, bson.M{"name": name}, update)
if err != nil {
log.Fatal("Update failed:", err)
}
fmt.Printf("Modified %v document(s)\n", result.ModifiedCount)
}全量更新
使用 ReplaceOne + Upsert: true
go
// UpsertUser 根据 User 的 ID 判断是否存在:存在则替换(更新),不存在则插入
func UpsertUser(client *mongo.Client, ctx context.Context, user User) error {
collection := client.Database("testdb").Collection("users")
var filter bson.M
// 方式1:按 _id 判断是否存在(推荐:主键唯一)
if user.ID.IsZero() {
// 如果没有 ID,则按 Name 和 Email 联合判断(或其他业务唯一键)
filter = bson.M{"name": user.Name, "email": user.Email}
} else {
filter = bson.M{"_id": user.ID}
}
upsert := true
// 执行 upsert 操作
result, err := collection.ReplaceOne(
ctx,
filter,
user,
&options.ReplaceOptions{Upsert: &upsert},
)
if err != nil {
return fmt.Errorf("failed to upsert user: %w", err)
}
if result.UpsertedID != nil {
fmt.Printf("Inserted new user with ID: %v\n", result.UpsertedID)
} else {
fmt.Printf("Updated existing user, ModifiedCount: %v\n", result.ModifiedCount)
}
return nil
}删除
删除1个
go
func deleteUser(client *mongo.Client, ctx context.Context, name string) {
collection := client.Database("testdb").Collection("users")
result, err := collection.DeleteOne(ctx, bson.M{"name": name})
if err != nil {
log.Fatal("Delete failed:", err)
}
fmt.Printf("Deleted %v document(s)\n", result.DeletedCount)
}删除多个
go
func deleteUsersByName(client *mongo.Client, ctx context.Context, name string) {
collection := client.Database("testdb").Collection("users")
// 删除所有 name 匹配的文档
result, err := collection.DeleteMany(ctx, bson.M{"name": name})
if err != nil {
log.Fatal("Delete failed:", err)
}
fmt.Printf("Deleted %v document(s)\n", result.DeletedCount)
}删除重复数据
需要group之后删除多余重复的数据,有点复杂
go
func ChannelMonitorByByteDocRemoveDuplicates(ctx context.Context, req *open_clean_message.ByteDocWashRequest) error {
query := bson.M{}
// ids 条件
if req.Ids != nil && len(req.Ids) > 0 {
query["id"] = bson.M{"$in": req.Ids}
}
// date 条件(合并)
if req.StartDate != nil || req.EndDate != nil {
dateFilter := bson.M{}
if req.StartDate != nil {
dateFilter["$gte"] = req.StartDate
}
if req.EndDate != nil {
dateFilter["$lte"] = req.EndDate
}
query["date"] = dateFilter
}
// 定义聚合结果的结构体
type AggregateResult struct {
ID string `bson:"id"` // 对应 $project 中的 id: "$_id"
IDs []string `bson:"_ids"` // 收集的 MongoDB _id 字符串列表
Count int `bson:"count"` // 重复数量
}
// 3. 构建聚合管道:先 $match 过滤,再 $group 去重
pipeline := []bson.M{}
// 第一步:加入 query 作为 $match 阶段(如果 query 不为空)
if len(query) > 0 {
pipeline = append(pipeline, bson.M{
"$match": query,
})
}
// 第二步:分组统计
pipeline = append(pipeline, bson.M{
"$group": bson.M{
"_id": "$id",
"_ids": bson.M{"$push": "$_id"},
"count": bson.M{"$sum": 1},
},
})
// 第三步:只保留重复的(count > 1)
pipeline = append(pipeline, bson.M{
"$match": bson.M{
"count": bson.M{"$gt": 1},
},
})
// 第四步:投影输出
pipeline = append(pipeline, bson.M{
"$project": bson.M{
"_id": 0,
"id": "$_id",
"_ids": 1,
"count": 1,
},
})
opts := options.Aggregate().
SetAllowDiskUse(true)
// 执行聚合
coll := bytedoc.GetCleanDataCollectionByName("表名")
cursor, err := coll.Aggregate(ctx, pipeline, opts)
if err != nil {
logs.CtxError(ctx, "ChannelMonitorByByteDocRemoveDuplicates Aggregate err %v ; query : %v", err, query)
return err
}
defer cursor.Close(ctx)
var results []AggregateResult
if err = cursor.All(ctx, &results); err != nil {
logs.CtxError(ctx, "ChannelMonitorByByteDocRemoveDuplicates err %v ; query : %v", err, query)
return err
}
logs.CtxInfo(ctx, "ChannelMonitorByByteDocRemoveDuplicates query : %v , results %v", json_util.JsonForLog(pipeline), json_util.JsonForLog(results))
for _, doc := range results {
if doc.Count <= 1 {
continue
}
// 保留第一个,删除其余
idsToDelete := doc.IDs[1:] // 跳过第一个
if len(idsToDelete) == 0 {
continue
}
// 🔹 打印调试信息(类似 MongoDB Shell 的 print)
logs.CtxInfo(ctx, "正在处理重复 id: %s,总共找到重复文档: %d 条,将删除以下 _id: %v", doc.ID, doc.Count, idsToDelete)
var idsToDelete2 []primitive.ObjectID
for _, idStr := range idsToDelete {
objectID, err := primitive.ObjectIDFromHex(idStr)
if err != nil {
logs.CtxInfo(ctx, "无效的 ObjectID: %s", idStr)
continue
}
idsToDelete2 = append(idsToDelete2, objectID)
}
if len(idsToDelete2) == 0 {
continue
}
// 执行删除
result, err := coll.DeleteMany(
ctx,
bson.M{"_id": bson.M{"$in": idsToDelete2}},
)
if err != nil {
return fmt.Errorf("删除重复数据失败 (id=%s): %v", doc.ID, err)
}
logs.CtxInfo(ctx, "成功删除: %d 条文档", result.DeletedCount)
}
return nil
}从gson中获取值
go
package mongodb_data_util
import (
"fmt"
"strconv"
"strings"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// GetStringFromBson 从 bson map 中获取指定 key 的值并返回字符串。
// 如果值为 nil 或不存在,返回空字符串。
// 若值不是 string 类型,则使用 fmt.Sprintf 转为字符串。
func GetStringFromBson(bson map[string]interface{}, key string) string {
if val, exists := bson[key]; exists && val != nil {
if s, ok := val.(string); ok {
return s
}
return fmt.Sprintf("%v", val)
}
return ""
}
// GetStringSliceFromBson 从 bson 中提取字符串切片。
// 支持 []string、primitive.A 和 []interface{} 类型。
// 非字符串元素将被忽略。
func GetStringSliceFromBson(bson map[string]interface{}, key string) []string {
var result []string
temp := bson[key]
if temp == nil {
return result
}
switch v := temp.(type) {
case []string:
result = v
case primitive.A:
for _, item := range v {
if s, ok := item.(string); ok {
result = append(result, s)
}
}
case []interface{}:
for _, item := range v {
if s, ok := item.(string); ok {
result = append(result, s)
}
}
}
return result
}
// GetFloat64FromBson 从 bson 中安全获取 float64 值。
// 支持数字类型、字符串(尝试解析),不支持则返回 0.0。
func GetFloat64FromBson(bson map[string]interface{}, key string) float64 {
temp := bson[key]
if temp == nil {
return 0.0
}
switch v := temp.(type) {
case float64:
return v
case float32:
return float64(v)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return float64(reflectValue(v))
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
return f
}
}
return 0.0
}
// GetBoolFromBson 从 bson 中安全获取布尔值。
// 支持 bool、数字(非零为 true)、字符串("true", "1", "on", "yes" 等)。
// 默认返回 false。
func GetBoolFromBson(bson map[string]interface{}, key string) bool {
temp := bson[key]
if temp == nil {
return false
}
switch v := temp.(type) {
case bool:
return v
case string:
switch strings.ToLower(v) {
case "true", "1", "on", "yes":
return true
case "false", "0", "off", "no":
return false
}
case int, int8, int16, int32, int64:
return reflectValue(v) != 0
case uint, uint8, uint16, uint32, uint64:
return reflectValue(v) != 0
case float32:
return v != 0.0
case float64:
return v != 0.0
}
return false
}
// GetInt64FromBson 从 bson 中安全获取 int64 值。
// 支持整型、浮点型(截断)、字符串(解析)等类型。
// 若无法转换或不存在,返回 0。
func GetInt64FromBson(bson map[string]interface{}, key string) int64 {
temp := bson[key]
if temp == nil {
return 0
}
switch v := temp.(type) {
case int64:
return v
case int32:
return int64(v)
case int16:
return int64(v)
case int8:
return int64(v)
case int:
return int64(v)
case uint64:
return int64(v) // 注意:可能溢出
case uint32:
return int64(v)
case uint16:
return int64(v)
case uint8:
return int64(v)
case float64:
return int64(v) // 截断小数
case float32:
return int64(v)
case string:
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
return i
}
}
return 0
}
// GetMapFromBson 从 bson 中提取 map[string]interface{}。
// 支持原生 map 和 bson.M。
// 若不存在或类型不符,返回空 map。
func GetMapFromBson(bsonM map[string]interface{}, key string) map[string]interface{} {
val, exists := bsonM[key]
if !exists || val == nil {
return make(map[string]interface{})
}
if m, ok := val.(map[string]interface{}); ok {
return m
}
if m, ok := val.(bson.M); ok {
return map[string]interface{}(m)
}
return make(map[string]interface{})
}
// 辅助函数:统一获取数值的 int64 表示(用于类型判断)
func reflectValue(v interface{}) int64 {
switch val := v.(type) {
case int:
return int64(val)
case int8:
return int64(val)
case int16:
return int64(val)
case int32:
return int64(val)
case int64:
return val
case uint:
return int64(val)
case uint8:
return int64(val)
case uint16:
return int64(val)
case uint32:
return int64(val)
case uint64:
return int64(val)
default:
return 0
}
}
