Browse Source

Attempting to change all code to work with the new version of Badger (changed everything!)

Gwyneth Llewelyn 7 năm trước cách đây
mục cha
commit
37808f31a7
1 tập tin đã thay đổi với 81 bổ sung83 xóa
  1. 81 83
      gosl.go

+ 81 - 83
gosl.go

@@ -13,7 +13,6 @@ import (
 	"github.com/op/go-logging"
 	"gopkg.in/natefinch/lumberjack.v2"
 	"io"
-//	"io/ioutil"
 	"net/http"
 	"net/http/fcgi"
 	"os"
@@ -21,7 +20,6 @@ import (
 	"regexp"
 	"runtime"
 	"strings"
-	"sync"
 	"time"
 )
 
@@ -40,7 +38,7 @@ type avatarUUID struct {
 	Grid string
 } 
 
-var BATCH_BLOCK = 100000	// how many entries to write to the database as a block; the bigger, the faster, but the more memory it consumes
+var BATCH_BLOCK = 1000000	// how many entries to write to the database as a block; the bigger, the faster, but the more memory it consumes
 							// the authors say that 100000 is way too much
 
 /*
@@ -136,12 +134,16 @@ func main() {
 		var err error
 
 		log.Info("gosl started and logging is set up. Proceeding to test KV database.")
-		kv, err := badger.NewKV(&Opt)
+		kv, err := badger.Open(Opt)
 		checkErrPanic(err) // should probably panic, cannot prep new database
 		var testValue = avatarUUID{ NullUUID, "all grids" }
 		jsonTestValue, err := json.Marshal(testValue)
 		checkErrPanic(err) // something went VERY wrong
-		kv.Set([]byte(testAvatarName), jsonTestValue, 0x00)
+		txn := kv.NewTransaction(true)
+		err = txn.Set([]byte(testAvatarName), jsonTestValue, 0x00)
+		checkErrPanic(err)
+		err = txn.Commit(nil)
+		checkErrPanic(err)
 		log.Debugf("SET %+v (json: %v)\n", testValue, string(jsonTestValue))
 		kv.Close()
 		key, grid := searchKVname(testAvatarName)
@@ -233,13 +235,18 @@ func handler(w http.ResponseWriter, r *http.Request) {
 	if name != "" {
 		if key != "" {
 			// we received both: add a new entry
-			kv, err := badger.NewKV(&Opt)
+			kv, err := badger.Open(Opt)
 			checkErrPanic(err) // should probably panic
 			valueToInsert.UUID = key
 			valueToInsert.Grid = r.Header.Get("X-Secondlife-Shard")
 			jsonValueToInsert, err := json.Marshal(valueToInsert)
 			checkErr(err)
-			kv.Set([]byte(name), jsonValueToInsert, 0x00)
+			txn := kv.NewTransaction(true)
+			defer txn.Discard()
+			err = txn.Set([]byte(name), jsonValueToInsert, 0x00)
+			checkErrPanic(err)
+			err = txn.Commit(nil)
+			checkErrPanic(err)
 			kv.Close()
 			messageToSL += "Added new entry for '" + name + "' which is: " + valueToInsert.UUID + " from grid: '" + valueToInsert.Grid + "'"
 		} else {
@@ -271,18 +278,30 @@ func handler(w http.ResponseWriter, r *http.Request) {
 // searchKVname searches the KV database for an avatar name.
 func searchKVname(avatarName string) (UUID string, grid string) {
 	time_start := time.Now()
-	kv, err := badger.NewKV(&Opt)
+	kv, err := badger.Open(Opt)
 	defer kv.Close()
-	var item badger.KVItem
-	if err := kv.Get([]byte(avatarName), &item); err != nil {
-		log.Errorf("Error while getting name: '%s' - (%v)\n", avatarName, err)
-		return NullUUID, ""
-	}
+//	var item badger.Item
 	var val = avatarUUID{ NullUUID, "" }
-	if err = item.Value(func(v []byte) error {
-			return json.Unmarshal(v, &val)
-		}); err != nil {
-		log.Errorf("Error while unparsing UUID for name: '%s' (%v)\n", avatarName, err)
+//	txn := kv.NewTransaction(true)
+//	defer txn.Discard()
+	err = kv.View(func(txn *badger.Txn) error {
+	    item, err := txn.Get([]byte(avatarName))
+		if err != nil {
+			return err
+    	}    	
+    	data, err := item.Value()
+		if err != nil {
+			log.Errorf("Error '%s' while getting data from %v\n", err, item)
+			return err
+    	}    	
+    	err = json.Unmarshal(data, &val)
+		if err != nil {
+			log.Errorf("Error while unparsing UUID for name: '%s' (%v)\n", avatarName, err)
+			return err
+    	}
+    	return nil
+	})	
+	if err != nil {
 		return NullUUID, ""
 	}
 	time_end := time.Now()
@@ -293,7 +312,7 @@ func searchKVname(avatarName string) (UUID string, grid string) {
 // searchKVUUID searches the KV database for an avatar key.
 func searchKVUUID(avatarKey string) (name string, grid string) {
 	time_start := time.Now()
-	kv, err := badger.NewKV(&Opt)
+	kv, err := badger.Open(Opt)
 	checkErr(err) // should probably panic
 	itOpt := badger.DefaultIteratorOptions
 	if !*noMemory {
@@ -302,26 +321,38 @@ func searchKVUUID(avatarKey string) (name string, grid string) {
 	} else {
 		itOpt.PrefetchValues = false
 	}
-	itr := kv.NewIterator(itOpt)
+	checks := 0
 	var val = avatarUUID{ NullUUID, "" }
 	var found string
-	checks := 0
-	for itr.Rewind(); itr.Valid(); itr.Next() {
-		item := itr.Item()
-		if err = item.Value(func(v []byte) error {
-				return json.Unmarshal(v, &val)
-			}); err == nil {
+	txn := kv.NewTransaction(true)
+	defer txn.Discard()
+
+	err = kv.View(func(txn *badger.Txn) error {				
+		itr := txn.NewIterator(itOpt)
+		defer itr.Close()		
+		for itr.Rewind(); itr.Valid(); itr.Next() {
+			item := itr.Item()
+			data, err := item.Value()
+			if err != nil {
+				log.Errorf("Error '%s' while getting data from %v\n", err, item)
+				return err
+	    	}    	
+	    	err = json.Unmarshal(data, &val)
+			if err != nil {
+				log.Errorf("Error '%s' while unparsing UUID for data: %v\n", err, data)
+				return err
+	    	}
 			checks++	//Just to see how many checks we made, for statistical purposes
 			if avatarKey == val.UUID {
 				found = string(item.Key())
 				break
 			}
-		}	
-	}
+		}
+		return nil
+	})
 	time_end := time.Now()
 	diffTime := time_end.Sub(time_start)
 	log.Debugf("Made %d checks for '%s' in %v\n", checks, avatarKey, diffTime)
-	itr.Close()
 	kv.Close()
 	return found, val.Grid
 }
@@ -339,85 +370,52 @@ func importDatabase(filename string) {
 	cr := csv.NewReader(gr)  // open csv reader and feed the bzip2 reader into it
 	
 	// prepare connection to KV database
-	kv, err := badger.NewKV(&Opt)
+	kv, err := badger.Open(Opt)
 	checkErrPanic(err) // should probably panic		
 	defer kv.Close()
 	time_start := time.Now() // we want to get an idea on how long this takes
 	
-	// prepare stuff for asynchronous key reading; see https://godoc.org/github.com/dgraph-io/badger#KV.BatchSetAsync
-	wg := new(sync.WaitGroup)
-
-//	var oneEntry badger.Entry	// make sure this one has at least some memory assigned to it
 	limit := 0	// outside of for loop so that we can count how many entries we had in total
-	batch := make([]*badger.Entry, 0, BATCH_BLOCK) // this is the actual data
-
-	// define a error function for async reads
-	f := func(err error) {
-	    defer wg.Done()
-	    if err != nil {
-	        // At this point you can retry writing keys or send error over a channel to handle
-	        // in some other goroutine.
-	        log.Errorf("Got error: %+v\n", err)
-	    }
-	
-	    // Check for error in entries which could be non-nil if the user supplies a CasCounter.
-	    for _, e := range batch {
-	        if e.Error != nil {
-	            log.Errorf("Got error: %+v\n", e.Error)
-	        }
-	    }
-	
-	    // You can do cleanup now. Like deleting keys from cache.
-	    log.Info("All async sets complete.")
-	}
 
+	txn := kv.NewTransaction(true) // start new transaction; we will commit only every BATCH_BLOCK entries
+	defer txn.Discard()
 	for ;;limit++ {
 		record, err := cr.Read()
 		if err == io.EOF {
 			break
-		}
-		if err != nil {
+		} else if err != nil {
 			log.Fatal(err)
 		}
 		jsonNewEntry, err := json.Marshal(avatarUUID{ record[0], "Production" }) // W-Hat keys come all from the main LL grid, known as 'Production'
 		if err != nil {
 			log.Warning(err)
-		} else {
-//			oneEntry = badger.Entry{ Key: []byte(record[1]), Value: []byte(jsonNewEntry)}
-			batch = append(batch, &badger.Entry{
-				Key:   []byte(record[1]),
-				Value: []byte(jsonNewEntry),
-    		})
+		} else {			 
+			err = txn.Set([]byte(record[1]), jsonNewEntry, 0x00)
+			if err != nil {
+			    log.Fatal(err)
+			}
 		}
 		if limit % BATCH_BLOCK == 0 && limit != 0 { // we do not run on the first time, and then only every BATCH_BLOCK times
-			log.Debug("Processing:", limit)
-//			go writeOneBatch(kv, batch)
-			wg.Add(1)
-			kv.BatchSetAsync(batch, f)
-			log.Debug("Finished")
-			wg.Wait()
-			batch = nil // clear all entries, start a new batch
-			runtime.GC()
-			batch = make([]*badger.Entry, 0, BATCH_BLOCK)
+			log.Info("Processing:", limit)
+			err = txn.Commit(nil)
+			if err != nil {
+			    log.Fatal(err)
+			}
+			txn = kv.NewTransaction(true) // start a new transaction
+			defer txn.Discard()
 		}
 	}
-	// NOTE(gwyneth): these are the final ones (i.e. from the last round number of BATCH_BLOCK up to the end) 
-	// and we do not run them as goroutine because the function might terminate and close our channel before
-	// this finishes
-	// BUG(gwyneth): if BATCH_BLOCK is too small, or we finish exactly on the modulo, we may still have a
-	// racing condition! 
-	wg.Add(1)
-	kv.BatchSetAsync(batch, f)
-	log.Debug("Finished last batch")
-	wg.Wait()
-	batch = nil // flag the garbage collector that we are finished with this array
-	runtime.GC()
+	// commit last batch
+	err = txn.Commit(nil)
+	if err != nil {
+	    log.Fatal(err)
+	}
+	kv.PurgeOlderVersions()
 	time_end := time.Now()
 	diffTime := time_end.Sub(time_start)
 	log.Info("Total read", limit, "records (or thereabouts) in", diffTime)
 }
 
-
 // NOTE(gwyneth):Auxiliary functions which I'm always using...
 
 // checkErrPanic logs a fatal error and panics.