golang *** 作mongo上亿级别数据的插入优化方案

golang  *** 作mongo上亿级别数据的插入优化方案,第1张

golang *** 作mongo上亿级别数据的插入优化方案 背景

mongo集群中已经存在十五亿数据,需要将一批将近8亿条数据(有重复)继续导入进数据库中。此数据存储在了sqllite 中。
要求:
1:如果数据已经存在,需要更新 *** 作;如果没有数据,则执行新增 *** 作。可以利用mongodb的 upsert *** 作来处理。
2:时间尽量压缩

方案一

读取数据,将每条数据进行upsert *** 作。
此方案可以满足要求1,但是不满足要求2。 处理时间预计在30-40天左右

方案二

批量处理,使用collection.BulkWrite 函数处理。
此方案可以满足要求一,处理时间约需要20-30天左右。

核心代码:

list   []mongo.WriteModel
sha256Data := bson.M{
					"$set": bson.D{
						{"_id", strings.ToLower(v.Sha256)},
						{"md5", v.Md5},
					},
				}
model := mongo.NewUpdateOneModel()
model.Upsert = &upsert
wm := model.SetFilter(bson.M{"_id": strings.ToLower(v.Sha256)}).SetUpdate(sha256Data)
list = append(list, wm)

collection.BulkWrite(cxt, list)
方案三

优化方案二,还是批量处理,使用collection.BulkWrite 函数处理,设置options
优化完,满足要求1,预计处理时间5-10天左右

//Unordered Writes to mongos
//To improve write performance to sharded clusters, use bulkWrite() with the optional parameter ordered set to false. mongos can attempt to send the writes to multiple shards simultaneously. For empty collections, first pre-split the collection as described in Split Chunks in a Sharded Cluster.
op := options.BulkWriteOptions{}
ordered := false
op.Ordered = &ordered	//设置无序,会加快处理速度  https://docs.mongodb.com/manual/core/bulk-write-operations/
list   []mongo.WriteModel
sha256Data := bson.M{
					"$set": bson.D{
						{"_id", strings.ToLower(v.Sha256)},
						{"md5", v.Md5},
					},
				}
model := mongo.NewUpdateOneModel()
model.Upsert = &upsert
wm := model.SetFilter(bson.M{"_id": strings.ToLower(v.Sha256)}).SetUpdate(sha256Data)
list = append(list, wm)
collection.BulkWrite(cxt, list, &op)
方案四

继续优化,代码中使用生产者消费者模型,使用channel 有20个缓冲,加大mongo的并发处理速度。
此方案的弊端就是多协程执行 upsert *** 作时,会报主键重复异常(E11000 duplicate key error) 。
原因是: mongodb执行upsert时,先执行query *** 作,如果没有执行insert *** 作,这个两个 *** 作不是原子性,所以当同一条数据在两个协程中执行时,会出现这两个协程通过相同的id都去query,在其中一个线程还没insert时,另一个线程也查询到collection中没有数据,然后都去执行了insert *** 作,导致报错 。
解决方法:当出现err错误时,重试执行。

方案五

先将sqlite里面的数据去重,生产一个不重复的数据表。可以有效的避免掉主键重复导致for循环带来的时间消耗。 预计耗时:15小时之间。

create table test as select * from metadata  group by sha256;

此处注意,如果数据表非常大,则会出现error: database or disk is full 。
解决方法:https://stackoverflow.com/questions/5274202/sqlite3-database-or-disk-is-full-the-database-disk-image-is-malformed

To avoid getting "database or disk is full" in the first place, try this if you have lots of RAM:
sqlite> pragma temp_store = 2; 
That tells SQLite to put temp files in memory. (The "database or disk is full" message does not mean either that the database is full or that the disk is full! It means the temp directory is full.) I have 256G of RAM but only 2G of /tmp, so this works great for me. The more RAM you have, the bigger db files you can work with.
If you haven't got a lot of ram, try this:
sqlite> pragma temp_store = 1; 
sqlite> pragma temp_store_directory = '/directory/with/lots/of/space'; 
temp_store_directory is deprecated (which is silly, since temp_store is not deprecated and requires temp_store_directory), so be wary of using this in code.

还需要特别注意一点,创建出来的表是没有索引的。而且随着数据量的越来越大,分页执行的效率会越来越低。

方案六

在方案五的基础上继续优化,创建出来的表由于可能是无序的,所以新增创建的条件 order by id
创建表

create table test as select * from metadata  group by sha256 order by metadata_id;
create unique idx_metadata_id on test(metadata_id);

分页代码调整为 metadata_id > begin limit 10000; 充分理由索引加快处理效率。
需要注意的是: begin:开始的id值。 end 为终止的id值。 由于id有可能是不连续的,所以可以先从数据库中查询出来。
最终优化后的代码为

package main
import (
	"context"
	"database/sql"
	"flag"
	"fmt"
	"log"
	"os"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/chennqqi/tqdm/render"
	_ "github.com/mattn/go-sqlite3"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

var (
	dealCount int64
	filename  string
	mongoDSN  string
	limit     int
	begin     int64
	end       int64
)

func main() {
	flag.StringVar(&filename, "f", "", "导入的数据文件名称")
	flag.IntVar(&limit, "limit", 100, "分页数量")
	flag.Int64Var(&begin, "begin", 5297946, "开始的metadata_id")
	flag.Int64Var(&end, "end", 1090671226, "结束的metadata_id")
	flag.StringVar(&mongoDSN, "mongoDSN", "", "mongo链接")
	flag.Parse()

	db, err := sql.Open("sqlite3", filename)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()
	database, err := initDatabase(mongoDSN)
	if err != nil {
		fmt.Println(err)
		return
	}
	resultColl := database.Database("cs").Collection("result")
	process(db, resultColl, sha1Coll, md5Coll)
}

var taskChannel = make(chan *task, 100)

func process(db *sql.DB, resultColl, sha1Coll, md5Coll *mongo.Collection) error {
	var w sync.WaitGroup
	w.Add(100)
	for i := 0; i < 100; i++ {
		go func() {
			defer w.Done()
			for t := range taskChannel {
				processChannel(t, resultColl, sha1Coll, md5Coll)
			}
		}()
	}

	r := render.MakeRendererFunc(os.Stdout)
	now := time.Now()
	for begin <= end {
		var (
			upsert       = true
			dataSha256   []mongo.WriteModel
			rdsdate      = 2203
			timeTemplate = "2006-01-02 15:04:05 +0000 UTC"
			ordered      = false
			op           = options.BulkWriteOptions{}
		)
		op.Ordered = &ordered
		rows, err := db.Query("select * from sample1 where metadata_id>=? limit ?", begin, limit)
		if err != nil {
			fmt.Println("db.Query 数据处理到:", begin)
			panic(err)
		}
		now2 := time.Now()
		for rows.Next() {
			v := METADATA{}
			rows.Scan(&v.MetadataId, &v.ObjectId, &v.KeyHash, &v.FileName, &v.Extension, &v.Bytes, &v.Mtime, &v.Md5, &v.Sha1, &v.Sha256)
			begin = v.MetadataId
			atomic.AddInt64(&dealCount, 1)
			if v.Sha256 == "" {
				fmt.Println(v)
				panic(err)
			}
			var timestamp int64 = 0
			if v.Mtime == nil {
				timestamp = time.Now().Unix()
			} else {
				t := fmt.Sprintf("%v", v.Mtime)
				stamp, _ := time.ParseInLocation(timeTemplate, t, time.Local)
				timestamp = stamp.Unix()
			}
			fname := ""
			if v.Extension == "" {
				fname = v.FileName
			} else {
				fname = v.FileName + "." + v.Extension
			}
			{
				sha256Data := bson.M{
					"$set": bson.D{
						{"_id", strings.ToLower(v.Sha256)},
						{"md5", v.Md5},{"sha1", v.Sha1},
						{"status", 1},{"ct", time.Now().Unix()},
					},
					"$addToSet": bson.D{
						{"names", fname},
					},
				}
				model := mongo.NewUpdateOneModel()
				model.Upsert = &upsert
				wm := model.SetFilter(bson.M{"_id": strings.ToLower(v.Sha256)}).SetUpdate(sha256Data)
				dataSha256 = append(dataSha256, wm)
			}
		}
		rows.Close()
		fmt.Println("rows.Next()耗时:", time.Since(now2))
		t := task{
			dataSha256: dataSha256,
		}
		taskChannel <- &t
		fmt.Printf("\t begin : %v  end : %v  dealCount : %v \n ", begin, end, dealCount)
		r(render.FormatProgressBar(uint(end), uint(begin), time.Since(now)))
	}
	fmt.Printf("\n处理%d行数据,其中失败个数为%d个,共计用时%v\n", dealCount, 0, time.Since(now))
	close(taskChannel)
	w.Wait()
	return nil
}
type task struct {
	dataSha256 []mongo.WriteModel
}
func processChannel(t *task, resultColl *mongo.Collection) {
	ordered := false
	op := options.BulkWriteOptions{}
	op.Ordered = &ordered
	cxt := context.Background()
	now3 := time.Now()
    if len(t.dataSha256) > 0 {
        for {
            if _, err := resultColl.BulkWrite(cxt, t.dataSha256, &op); err != nil {
                time.Sleep(1 * time.Second)
                fmt.Println(err)
            } else {
                break
            }
        }
    }
	fmt.Println("mongo插入耗时:", time.Since(now3))
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/990532.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存