Skip to content
鼓励作者:欢迎打赏犒劳

go操作mongodb

判断bson.M 是否存在某字段

在 Go 中,bson.M 是 MongoDB 官方驱动(如 go.mongodb.org/mongo-driver)中用于表示 BSON 文档的类型,其底层是一个 map[string]interface{}:

go
type M map[string]interface{}

如果要判断嵌套字段(如 "address.city"),需要逐层判断:

go
doc := bson.M{
    "address": bson.M{
        "city": "Beijing",
    },
}

if addr, ok := doc["address"]; ok {
    if addrMap, ok := addr.(bson.M); ok {
        if city, ok := addrMap["city"]; ok {
            fmt.Println("city 存在,值为:", city)
        }
    }
}

封装一个安全的嵌套字段判断函数(可选)

go
func hasNestedField(doc bson.M, path string) bool {
    keys := strings.Split(path, ".")
    current := doc

    for _, key := range keys {
        val, exists := current[key]
        if !exists {
            return false
        }
        if m, ok := val.(bson.M); ok {
            current = m
        } else if len(keys) > 1 { // 还有下一级,但当前不是 map
            return false
        }
    }
    return true
}

使用:

go
if hasNestedField(doc, "address.city") {
    fmt.Println("address.city 存在")
}

用map接收

go
coll := bytedoc.GetCleanDataCollectionByName(namespace)

// ✅ 正确声明并初始化一个 map 来接收数据
var data map[string]interface{}

// 执行查询并解码到 map
err := coll.FindOne(ctx, bson.M{"id": id}).Decode(&data)
if err != nil {
    if err == mongo.ErrNoDocuments {
        fmt.Println("未找到文档")
    } else {
        log.Fatal("查询出错:", err)
    }
    return
}

// 使用数据
fmt.Printf("查询结果: %+v\n", data)

// 示例:访问字段
if name, ok := data["name"]; ok {
    fmt.Println("姓名:", name)
}
if age, ok := data["age"].(float64); ok {
    fmt.Println("年龄:", int(age))
}
  • map[string]interface{} 是 Go 中接收任意结构 JSON/BSON 数据的通用方式。
  • MongoDB 驱动会自动将 BSON 文档解码为 map[string]interface{}
  • 所有字段名默认是 小写(除非你在文档中明确使用了其他大小写)。
go
package main

import (
    "fmt"
    "go.mongodb.org/mongo-driver/bson"
)

func main() {
    doc := bson.M{
        "name": "Alice",
        "age":  30,
        // "email" 字段不存在
    }

    // 判断字段是否存在
    if value, exists := doc["email"]; exists {
        fmt.Println("email 存在,值为:", value)
    } else {
        fmt.Println("email 不存在")
    }

    // 判断 name 是否存在
    if _, exists := doc["name"]; exists {
        fmt.Println("name 存在")
    } else {
        fmt.Println("name 不存在")
    }
}

插入对象

直接插入结构体

go
user := User{
    Name:     "Alice",
    Email:    "alice@example.com",
    Age:      28,
    Active:   true,
    LastSeen: time.Now(),
}

result, err := collection.InsertOne(ctx, user)

任意对象转成bson.M对象

它的目的是:将任意 Go 数据(如 struct、map 等)转换为 bson.M 类型,以便进行灵活的操作(比如修改字段、增删 key)。

go
type User struct {
    Name string `json:"name" bson:"name"`
    Age  int    `json:"age" bson:"age"`
}

data := User{Name: "Alice", Age: 25}

// ❌ 不能直接转
// var docMap bson.M = data  // 编译错误!

// ✅ 正确方式:先序列化成 BSON 二进制,再反序列化成 bson.M
bsonData, _ := bson.Marshal(data)   // struct → BSON bytes
var docMap bson.M
bson.Unmarshal(bsonData, &docMap)   // BSON bytes → bson.M

// 结果:docMap = {"name": "Alice", "age": 25}

这种写法常见于:

  • 从外部接收 JSON/struct 数据,想存入 MongoDB
  • 需要动态修改字段(比如加个时间戳、改个字段名)
  • 做数据清洗、中转、日志记录等
go
// 比如你想加个字段
docMap["created_at"] = time.Now()
docMap["source"] = "api_v2"

// 然后插入数据库
collection.InsertOne(ctx, docMap)

json转bson.M

如果你控制输入源,可以直接解析到 bson.M:

go
var docMap bson.M
json.Unmarshal(jsonBytes, &docMap)  // JSON → bson.M

插入bson

go
doc := bson.M{
    "id":      "123",
    "content": "hello",
    "tags":    []string{"a", "b"},
}
collection.InsertOne(ctx, doc)

获取链接

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)
// User 结构体,对应 MongoDB 中的文档
type User struct {
	ID       primitive.ObjectID `bson:"_id,omitempty"`
	Name     string             `bson:"name"`
	Email    string             `bson:"email"`
	Age      int                `bson:"age,omitempty"`
	Active   bool               `bson:"active,omitempty"`
	LastSeen time.Time          `bson:"last_seen,omitempty"`
}
func connect() (*mongo.Client, context.Context, context.CancelFunc) {
	// 设置上下文,超时 10 秒
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

	// MongoDB 连接 URI(根据你的配置修改)
	uri := "mongodb://localhost:27017" // 本地
	// uri := "mongodb://user:pass@host:port/db" // 远程或带认证

	// 创建 MongoDB 客户端
	client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
	if err != nil {
		log.Fatal("Failed to connect to MongoDB:", err)
	}

	// 测试连接
	err = client.Ping(ctx, nil)
	if err != nil {
		log.Fatal("Failed to ping MongoDB:", err)
	}

	fmt.Println("Connected to MongoDB!")
	return client, ctx, cancel
}

func main() {
	client, ctx, cancel := connect()
	defer cancel()
	defer func() {
		if err := client.Disconnect(ctx); err != nil {
			log.Fatal("Failed to disconnect:", err)
		}
	}()
}

查询

根据id查询

go
func findUserByID(client *mongo.Client, ctx context.Context, idStr string) {
	collection := client.Database("testdb").Collection("users")

	// 将字符串 ID 转换为 ObjectID
	objectID, err := primitive.ObjectIDFromHex(idStr)
	if err != nil {
		log.Fatal("Invalid ID format:", err)
	}

	var user User
	// 查询
	err = collection.FindOne(ctx, bson.M{"_id": objectID}).Decode(&user)
	if err != nil {
		if err == mongo.ErrNoDocuments {
			fmt.Printf("❌ 未找到 ID 为 %s 的用户\n", idStr)
		} else {
			log.Fatal("查询出错:", err)
		}
		return
	}

	fmt.Printf("✅ 查询到用户: %+v\n", user)
}

查询所有数据

通过游标查询

优点:内存友好,适合处理大量数据(不会一次性加载到内存)。

go
func findUsersAsListManual(client *mongo.Client, ctx context.Context) []User {
	collection := client.Database("testdb").Collection("users")

	cursor, err := collection.Find(ctx, bson.M{})
	if err != nil {
		log.Fatal("Find failed:", err)
	}
	defer cursor.Close(ctx)

	var users []User
	for cursor.Next(ctx) {
		var user User
		if err := cursor.Decode(&user); err != nil {
			log.Fatal("Decode error:", err)
		}
		users = append(users, user)
	}

	if err := cursor.Err(); err != nil {
		log.Fatal("Cursor error:", err)
	}

	return users
}

cursor.All(ctx, &users) 会自动遍历游标,把所有文档解码到 users 切片中。

go
func findUsersAsList(client *mongo.Client, ctx context.Context) []User {
	collection := client.Database("testdb").Collection("users")

	cursor, err := collection.Find(ctx, bson.M{})
	if err != nil {
		log.Fatal("Find failed:", err)
	}
	defer cursor.Close(ctx)

	var users []User
	if err = cursor.All(ctx, &users); err != nil {
		log.Fatal("Decode all error:", err)
	}

	return users
}

简单查询

go
func findUserByName(client *mongo.Client, ctx context.Context, name string) {
	collection := client.Database("testdb").Collection("users")

	var user User
	err := collection.FindOne(ctx, bson.M{"name": name}).Decode(&user)
	if err != nil {
		if err == mongo.ErrNoDocuments {
			fmt.Printf("User '%s' not found\n", name)
		} else {
			log.Fatal("Query error:", err)
		}
		return
	}

	fmt.Printf("Found user: %+v\n", user)
}

更新

指定字段更新

go
func updateUser(client *mongo.Client, ctx context.Context, name string) {
	collection := client.Database("testdb").Collection("users")

	update := bson.M{
		"$set": bson.M{
			"age":       29,
			"active":    false,
			"last_seen": time.Now(),
		},
	}

	result, err := collection.UpdateOne(ctx, bson.M{"name": name}, update)
	if err != nil {
		log.Fatal("Update failed:", err)
	}

	fmt.Printf("Modified %v document(s)\n", result.ModifiedCount)
}

全量更新

使用 ReplaceOne + Upsert: true

go
// UpsertUser 根据 User 的 ID 判断是否存在:存在则替换(更新),不存在则插入
func UpsertUser(client *mongo.Client, ctx context.Context, user User) error {
	collection := client.Database("testdb").Collection("users")

	var filter bson.M

	// 方式1:按 _id 判断是否存在(推荐:主键唯一)
	if user.ID.IsZero() {
		// 如果没有 ID,则按 Name 和 Email 联合判断(或其他业务唯一键)
		filter = bson.M{"name": user.Name, "email": user.Email}
	} else {
		filter = bson.M{"_id": user.ID}
	}

	upsert := true
	// 执行 upsert 操作
	result, err := collection.ReplaceOne(
		ctx,
		filter,
		user,
		&options.ReplaceOptions{Upsert: &upsert},
	)
	if err != nil {
		return fmt.Errorf("failed to upsert user: %w", err)
	}

	if result.UpsertedID != nil {
		fmt.Printf("Inserted new user with ID: %v\n", result.UpsertedID)
	} else {
		fmt.Printf("Updated existing user, ModifiedCount: %v\n", result.ModifiedCount)
	}

	return nil
}

删除

删除1个

go
func deleteUser(client *mongo.Client, ctx context.Context, name string) {
	collection := client.Database("testdb").Collection("users")

	result, err := collection.DeleteOne(ctx, bson.M{"name": name})
	if err != nil {
		log.Fatal("Delete failed:", err)
	}

	fmt.Printf("Deleted %v document(s)\n", result.DeletedCount)
}

删除多个

go
func deleteUsersByName(client *mongo.Client, ctx context.Context, name string) {
	collection := client.Database("testdb").Collection("users")

	// 删除所有 name 匹配的文档
	result, err := collection.DeleteMany(ctx, bson.M{"name": name})
	if err != nil {
		log.Fatal("Delete failed:", err)
	}

	fmt.Printf("Deleted %v document(s)\n", result.DeletedCount)
}

删除重复数据

需要group之后删除多余重复的数据,有点复杂

go
func ChannelMonitorByByteDocRemoveDuplicates(ctx context.Context, req *open_clean_message.ByteDocWashRequest) error {
	query := bson.M{}

	// ids 条件
	if req.Ids != nil && len(req.Ids) > 0 {
		query["id"] = bson.M{"$in": req.Ids}
	}

	// date 条件(合并)
	if req.StartDate != nil || req.EndDate != nil {
		dateFilter := bson.M{}
		if req.StartDate != nil {
			dateFilter["$gte"] = req.StartDate
		}
		if req.EndDate != nil {
			dateFilter["$lte"] = req.EndDate
		}
		query["date"] = dateFilter
	}
	// 定义聚合结果的结构体
	type AggregateResult struct {
		ID    string   `bson:"id"`    // 对应 $project 中的 id: "$_id"
		IDs   []string `bson:"_ids"`  // 收集的 MongoDB _id 字符串列表
		Count int      `bson:"count"` // 重复数量
	}
	// 3. 构建聚合管道:先 $match 过滤,再 $group 去重
	pipeline := []bson.M{}
	// 第一步:加入 query 作为 $match 阶段(如果 query 不为空)
	if len(query) > 0 {
		pipeline = append(pipeline, bson.M{
			"$match": query,
		})
	}
	// 第二步:分组统计
	pipeline = append(pipeline, bson.M{
		"$group": bson.M{
			"_id":   "$id",
			"_ids":  bson.M{"$push": "$_id"},
			"count": bson.M{"$sum": 1},
		},
	})

	// 第三步:只保留重复的(count > 1)
	pipeline = append(pipeline, bson.M{
		"$match": bson.M{
			"count": bson.M{"$gt": 1},
		},
	})
	// 第四步:投影输出
	pipeline = append(pipeline, bson.M{
		"$project": bson.M{
			"_id":   0,
			"id":    "$_id",
			"_ids":  1,
			"count": 1,
		},
	})
	opts := options.Aggregate().
		SetAllowDiskUse(true)
	// 执行聚合
	coll := bytedoc.GetCleanDataCollectionByName("表名")
	cursor, err := coll.Aggregate(ctx, pipeline, opts)
	if err != nil {
		logs.CtxError(ctx, "ChannelMonitorByByteDocRemoveDuplicates Aggregate err %v ; query : %v", err, query)
		return err
	}
	defer cursor.Close(ctx)
	var results []AggregateResult
	if err = cursor.All(ctx, &results); err != nil {
		logs.CtxError(ctx, "ChannelMonitorByByteDocRemoveDuplicates err %v ; query : %v", err, query)
		return err
	}
	logs.CtxInfo(ctx, "ChannelMonitorByByteDocRemoveDuplicates query : %v ,  results %v", json_util.JsonForLog(pipeline), json_util.JsonForLog(results))
	for _, doc := range results {
		if doc.Count <= 1 {
			continue
		}
		// 保留第一个,删除其余
		idsToDelete := doc.IDs[1:] // 跳过第一个

		if len(idsToDelete) == 0 {
			continue
		}
		// 🔹 打印调试信息(类似 MongoDB Shell 的 print)
		logs.CtxInfo(ctx, "正在处理重复 id: %s,总共找到重复文档: %d 条,将删除以下 _id: %v", doc.ID, doc.Count, idsToDelete)
		var idsToDelete2 []primitive.ObjectID

		for _, idStr := range idsToDelete {
			objectID, err := primitive.ObjectIDFromHex(idStr)
			if err != nil {
				logs.CtxInfo(ctx, "无效的 ObjectID: %s", idStr)
				continue
			}
			idsToDelete2 = append(idsToDelete2, objectID)
		}
		if len(idsToDelete2) == 0 {
			continue
		}
		// 执行删除
		result, err := coll.DeleteMany(
			ctx,
			bson.M{"_id": bson.M{"$in": idsToDelete2}},
		)
		if err != nil {
			return fmt.Errorf("删除重复数据失败 (id=%s): %v", doc.ID, err)
		}
		logs.CtxInfo(ctx, "成功删除: %d 条文档", result.DeletedCount)
	}
	return nil
}

从gson中获取值

go
package mongodb_data_util

import (
	"fmt"
	"strconv"
	"strings"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
)

// GetStringFromBson 从 bson map 中获取指定 key 的值并返回字符串。
// 如果值为 nil 或不存在,返回空字符串。
// 若值不是 string 类型,则使用 fmt.Sprintf 转为字符串。
func GetStringFromBson(bson map[string]interface{}, key string) string {
	if val, exists := bson[key]; exists && val != nil {
		if s, ok := val.(string); ok {
			return s
		}
		return fmt.Sprintf("%v", val)
	}
	return ""
}

// GetStringSliceFromBson 从 bson 中提取字符串切片。
// 支持 []string、primitive.A 和 []interface{} 类型。
// 非字符串元素将被忽略。
func GetStringSliceFromBson(bson map[string]interface{}, key string) []string {
	var result []string
	temp := bson[key]
	if temp == nil {
		return result
	}

	switch v := temp.(type) {
	case []string:
		result = v
	case primitive.A:
		for _, item := range v {
			if s, ok := item.(string); ok {
				result = append(result, s)
			}
		}
	case []interface{}:
		for _, item := range v {
			if s, ok := item.(string); ok {
				result = append(result, s)
			}
		}
	}
	return result
}

// GetFloat64FromBson 从 bson 中安全获取 float64 值。
// 支持数字类型、字符串(尝试解析),不支持则返回 0.0。
func GetFloat64FromBson(bson map[string]interface{}, key string) float64 {
	temp := bson[key]
	if temp == nil {
		return 0.0
	}

	switch v := temp.(type) {
	case float64:
		return v
	case float32:
		return float64(v)
	case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
		return float64(reflectValue(v))
	case string:
		if f, err := strconv.ParseFloat(v, 64); err == nil {
			return f
		}
	}
	return 0.0
}

// GetBoolFromBson 从 bson 中安全获取布尔值。
// 支持 bool、数字(非零为 true)、字符串("true", "1", "on", "yes" 等)。
// 默认返回 false。
func GetBoolFromBson(bson map[string]interface{}, key string) bool {
	temp := bson[key]
	if temp == nil {
		return false
	}

	switch v := temp.(type) {
	case bool:
		return v
	case string:
		switch strings.ToLower(v) {
		case "true", "1", "on", "yes":
			return true
		case "false", "0", "off", "no":
			return false
		}
	case int, int8, int16, int32, int64:
		return reflectValue(v) != 0
	case uint, uint8, uint16, uint32, uint64:
		return reflectValue(v) != 0
	case float32:
		return v != 0.0
	case float64:
		return v != 0.0
	}
	return false
}

// GetInt64FromBson 从 bson 中安全获取 int64 值。
// 支持整型、浮点型(截断)、字符串(解析)等类型。
// 若无法转换或不存在,返回 0。
func GetInt64FromBson(bson map[string]interface{}, key string) int64 {
	temp := bson[key]
	if temp == nil {
		return 0
	}

	switch v := temp.(type) {
	case int64:
		return v
	case int32:
		return int64(v)
	case int16:
		return int64(v)
	case int8:
		return int64(v)
	case int:
		return int64(v)
	case uint64:
		return int64(v) // 注意:可能溢出
	case uint32:
		return int64(v)
	case uint16:
		return int64(v)
	case uint8:
		return int64(v)
	case float64:
		return int64(v) // 截断小数
	case float32:
		return int64(v)
	case string:
		if i, err := strconv.ParseInt(v, 10, 64); err == nil {
			return i
		}
	}
	return 0
}

// GetMapFromBson 从 bson 中提取 map[string]interface{}。
// 支持原生 map 和 bson.M。
// 若不存在或类型不符,返回空 map。
func GetMapFromBson(bsonM map[string]interface{}, key string) map[string]interface{} {
	val, exists := bsonM[key]
	if !exists || val == nil {
		return make(map[string]interface{})
	}

	if m, ok := val.(map[string]interface{}); ok {
		return m
	}
	if m, ok := val.(bson.M); ok {
		return map[string]interface{}(m)
	}
	return make(map[string]interface{})
}

// 辅助函数:统一获取数值的 int64 表示(用于类型判断)
func reflectValue(v interface{}) int64 {
	switch val := v.(type) {
	case int:
		return int64(val)
	case int8:
		return int64(val)
	case int16:
		return int64(val)
	case int32:
		return int64(val)
	case int64:
		return val
	case uint:
		return int64(val)
	case uint8:
		return int64(val)
	case uint16:
		return int64(val)
	case uint32:
		return int64(val)
	case uint64:
		return int64(val)
	default:
		return 0
	}
}

如有转载或 CV 的请标注本站原文地址