1
0
Эх сурвалжийг харах

Using goroutines with BatchSet to speed up the process of importing.

Gwyneth Llewelyn 7 жил өмнө
parent
commit
446bed38fc
1 өөрчлөгдсөн 60 нэмэгдсэн , 13 устгасан
  1. 60 13
      gosl.go

+ 60 - 13
gosl.go

@@ -39,6 +39,8 @@ type avatarUUID struct {
 	Grid string
 } 
 
+var BATCH_BLOCK = 100000	// how many entries to write to the database as a block; the bigger, the faster, but the more memory it consumes
+
 /*
 				  .__			 
   _____ _____  |__| ____  
@@ -117,7 +119,8 @@ func main() {
 	Opt.Dir = *myDir
 	Opt.ValueDir = Opt.Dir
 	if *noMemory {
-		Opt.TableLoadingMode = options.FileIO // use standard file I/O operations for tables instead of LoadRAM 
+		Opt.TableLoadingMode = options.FileIO // use standard file I/O operations for tables instead of LoadRAM
+		BATCH_BLOCK = 10000	// try to import less at each time, it will take longer but hopefully work
 		log.Info("Trying to avoid too much memory consumption")	
 	}
 	kv, err := badger.NewKV(&Opt)
@@ -252,6 +255,7 @@ func handler(w http.ResponseWriter, r *http.Request) {
 }
 // searchKVname searches the KV database for an avatar name.
 func searchKVname(avatarName string) (UUID string, grid string) {
+	time_start := time.Now()
 	kv, err := badger.NewKV(&Opt)
 	defer kv.Close()
 	var item badger.KVItem
@@ -266,19 +270,23 @@ func searchKVname(avatarName string) (UUID string, grid string) {
 		log.Errorf("Error while unparsing UUID for name: %s - %v\n", avatarName, err)
 		return NullUUID, ""
 	}
+	time_end := time.Now()
+	diffTime := time_end.Sub(time_start)
+	log.Debugf("Time to lookup '%s': %v\n", avatarName, diffTime)
 	return val.UUID, val.Grid
 }
 // searchKVUUID searches the KV database for an avatar key.
 func searchKVUUID(avatarKey string) (name string, grid string) {
+	time_start := time.Now()
 	kv, err := badger.NewKV(&Opt)
 	checkErr(err) // should probably panic
-
 	itOpt := badger.DefaultIteratorOptions
+	itOpt.PrefetchValues = true
+	itOpt.PrefetchSize = 1000	// attempt to get this a little bit more efficient; we have many small entries, so this is not too much
 	itr := kv.NewIterator(itOpt)
 	var val = avatarUUID{ NullUUID, "" }
 	var found string
 	checks := 0
-	time_start := time.Now()
 	for itr.Rewind(); itr.Valid(); itr.Next() {
 		item := itr.Item()
 		if err = item.Value(func(v []byte) {
@@ -310,13 +318,14 @@ func importDatabase(filename string) {
 	defer f.Close()
 	gr := bzip2.NewReader(f) // open bzip2 reader
 	cr := csv.NewReader(gr)  // open csv reader and feed the bzip2 reader into it
-	limit := 0
 	kv, err := badger.NewKV(&Opt)
 	checkErrPanic(err) // should probably panic		
 	defer kv.Close()
 	time_start := time.Now()
-	// probably better to get several chunks and use BatchSet concurrently, as recommended per the Badger instructions
-	for {
+	var oneEntry badger.Entry	// make sure this one has at least some memory assigned to it
+	limit := 0	// outside of for loop so that we can count how many entries we had in total
+	var batch = make([]badger.Entry, BATCH_BLOCK) // this is the actual data, or else we get pointers to nil
+	for ;;limit++ {
 		record, err := cr.Read()
 		if err == io.EOF {
 			break
@@ -324,25 +333,63 @@ func importDatabase(filename string) {
 		if err != nil {
 			log.Fatal(err)
 		}
-		//fmt.Println("Key:", record[0], "Name:", record[1])			
 		jsonNewEntry, err := json.Marshal(avatarUUID{ record[0], "Production" }) // W-Hat keys come all from the main LL grid, known as 'Production'
 		if err != nil {
 			log.Warning(err)
 		} else {
-			kv.Set([]byte(record[1]), []byte(jsonNewEntry), 0x00)
+			oneEntry = badger.Entry{ Key: []byte(record[1]), Value: []byte(jsonNewEntry)}
+			batch[limit % BATCH_BLOCK] = oneEntry
 		}
-		limit++
-		if limit % 1000000 == 0 {
-			time_end := time.Now()
-			diffTime := time_end.Sub(time_start)
-			log.Info("Read", limit, "records (or thereabouts) in", diffTime)
+		if limit % BATCH_BLOCK == 0 && limit != 0 { // we do not run on the first time, and then only every BATCH_BLOCK times
+			log.Debug("Processing:", limit)
+			go writeOneBatch(kv, batch)
+			batch = nil // clear all entries, start a new batch
+			runtime.GC()
+			batch = make([]badger.Entry, BATCH_BLOCK)
 		}
 	}
+	writeOneBatch(kv, batch)	// NOTE(gwyneth): these are the final ones (i.e. from the last round number of BATCH_BLOCK up to the end) 
+								// and we do not run them as goroutine because the function might terminate and close our channel before
+								// this finishes
+								// BUG(gwyneth): if BATCH_BLOCK is too small, or we finish exactly on the modulo, we may still have a
+								// racing condition! 
+	batch = nil // flag the garbage collector that we are finished with this array
 	time_end := time.Now()
 	diffTime := time_end.Sub(time_start)
 	log.Info("Total read", limit, "records (or thereabouts) in", diffTime)
 }
 
+// writeOneBatch copies all entries in this batch into the correct kind of array to push them out as a batch.
+// Since this is called as a goroutine, there will be thousands of those batches around!
+func writeOneBatch(kv *badger.KV, batch []badger.Entry) {
+	if kv == nil {
+		log.Panic("kv should NEVER be nil")
+		return
+	}
+	if batch == nil || len(batch) == 0 {
+		log.Panic("batch should NEVER be nil or have zero elements")
+		return
+	}
+	time_start := time.Now()
+	var entries = make([]*badger.Entry, BATCH_BLOCK)	// prepare entries for BatchSet
+	for i := 0; i < BATCH_BLOCK; i++ {
+		entries[i] = &batch[i]
+	}
+	if entries == nil || len(entries) == 0{
+		log.Panic("entries should NEVER be nil or have zero elements")
+		return
+	}
+	checkErr(kv.BatchSet(entries))
+		for _, e := range entries {
+		checkErr(e.Error)
+	}
+	entries = nil // flag the garbage collector that we're finished with this array
+	runtime.GC()
+	time_end := time.Now()
+	diffTime := time_end.Sub(time_start)
+	log.Debug("goroutine sent a batch of", BATCH_BLOCK, "records in", diffTime)
+}
+
 // NOTE(gwyneth):Auxiliary functions which I'm always using...
 
 // checkErrPanic logs a fatal error and panics.