|
@@ -21,6 +21,7 @@ import (
|
|
|
"regexp"
|
|
|
"runtime"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -40,6 +41,7 @@ type avatarUUID struct {
|
|
|
}
|
|
|
|
|
|
var BATCH_BLOCK = 100000 // how many entries to write to the database as a block; the bigger, the faster, but the more memory it consumes
|
|
|
+ // the authors say that 100000 is way too much
|
|
|
|
|
|
/*
|
|
|
.__
|
|
@@ -49,7 +51,9 @@ var BATCH_BLOCK = 100000 // how many entries to write to the database as a block
|
|
|
|__|_| (____ /__|___| /
|
|
|
\/ \/ \/
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
+var noMemory *bool
|
|
|
+
|
|
|
// main() starts here.
|
|
|
func main() {
|
|
|
// Flag setup
|
|
@@ -58,7 +62,7 @@ func main() {
|
|
|
var isServer = flag.Bool("server", false, "Run as server on port " + *myPort)
|
|
|
var isShell = flag.Bool("shell", false, "Run as an interactive shell")
|
|
|
var importFilename = flag.String("import", "", "Import database from W-Hat (use the csv.bz2 version)")
|
|
|
- var noMemory = flag.Bool("nomemory", false, "Attempt to use only disk to save memory (important for shared webservers)")
|
|
|
+ noMemory = flag.Bool("nomemory", false, "Attempt to use only disk to save memory (important for shared webservers)")
|
|
|
|
|
|
// default is FastCGI
|
|
|
|
|
@@ -118,10 +122,14 @@ func main() {
|
|
|
}
|
|
|
Opt.Dir = *myDir
|
|
|
Opt.ValueDir = Opt.Dir
|
|
|
+ //Opt.TableLoadingMode = options.MemoryMap
|
|
|
+ Opt.TableLoadingMode = options.FileIO
|
|
|
+
|
|
|
if *noMemory {
|
|
|
// Opt.TableLoadingMode = options.FileIO // use standard file I/O operations for tables instead of LoadRAM
|
|
|
- Opt.TableLoadingMode = options.MemoryMap // MemoryMap indicates that that the file must be memory-mapped - https://github.com/dgraph-io/badger/issues/224#issuecomment-329643771
|
|
|
- BATCH_BLOCK = 10000 // try to import less at each time, it will take longer but hopefully work
|
|
|
+// Opt.TableLoadingMode = options.MemoryMap // MemoryMap indicates that that the file must be memory-mapped - https://github.com/dgraph-io/badger/issues/224#issuecomment-329643771
|
|
|
+ Opt.TableLoadingMode = options.FileIO
|
|
|
+ BATCH_BLOCK = 1000 // try to import less at each time, it will take longer but hopefully work
|
|
|
log.Info("Trying to avoid too much memory consumption")
|
|
|
}
|
|
|
kv, err := badger.NewKV(&Opt)
|
|
@@ -282,8 +290,12 @@ func searchKVUUID(avatarKey string) (name string, grid string) {
|
|
|
kv, err := badger.NewKV(&Opt)
|
|
|
checkErr(err) // should probably panic
|
|
|
itOpt := badger.DefaultIteratorOptions
|
|
|
- itOpt.PrefetchValues = true
|
|
|
- itOpt.PrefetchSize = 1000 // attempt to get this a little bit more efficient; we have many small entries, so this is not too much
|
|
|
+ if !*noMemory {
|
|
|
+ itOpt.PrefetchValues = true
|
|
|
+ itOpt.PrefetchSize = 1000 // attempt to get this a little bit more efficient; we have many small entries, so this is not too much
|
|
|
+ } else {
|
|
|
+ itOpt.PrefetchValues = false
|
|
|
+ }
|
|
|
itr := kv.NewIterator(itOpt)
|
|
|
var val = avatarUUID{ NullUUID, "" }
|
|
|
var found string
|
|
@@ -312,20 +324,47 @@ func searchKVUUID(avatarKey string) (name string, grid string) {
|
|
|
// One could theoretically set a cron job to get this file, save it on disk periodically, and keep the database up-to-date
|
|
|
// see https://stackoverflow.com/questions/24673335/how-do-i-read-a-gzipped-csv-file for the actual usage of these complicated things!
|
|
|
func importDatabase(filename string) {
|
|
|
- f, err := os.Open(filename)
|
|
|
+ filehandler, err := os.Open(filename)
|
|
|
if err != nil {
|
|
|
log.Fatal(err)
|
|
|
}
|
|
|
- defer f.Close()
|
|
|
- gr := bzip2.NewReader(f) // open bzip2 reader
|
|
|
+ defer filehandler.Close()
|
|
|
+ gr := bzip2.NewReader(filehandler) // open bzip2 reader
|
|
|
cr := csv.NewReader(gr) // open csv reader and feed the bzip2 reader into it
|
|
|
+
|
|
|
+ // prepare connection to KV database
|
|
|
kv, err := badger.NewKV(&Opt)
|
|
|
checkErrPanic(err) // should probably panic
|
|
|
defer kv.Close()
|
|
|
- time_start := time.Now()
|
|
|
- var oneEntry badger.Entry // make sure this one has at least some memory assigned to it
|
|
|
+ time_start := time.Now() // we want to get an idea on how long this takes
|
|
|
+
|
|
|
+ // prepare stuff for asynchronous key reading; see https://godoc.org/github.com/dgraph-io/badger#KV.BatchSetAsync
|
|
|
+ wg := new(sync.WaitGroup)
|
|
|
+
|
|
|
+// var oneEntry badger.Entry // make sure this one has at least some memory assigned to it
|
|
|
limit := 0 // outside of for loop so that we can count how many entries we had in total
|
|
|
- var batch = make([]badger.Entry, BATCH_BLOCK) // this is the actual data, or else we get pointers to nil
|
|
|
+ batch := make([]*badger.Entry, 0, BATCH_BLOCK) // this is the actual data
|
|
|
+
|
|
|
+ // define a error function for async reads
|
|
|
+ f := func(err error) {
|
|
|
+ defer wg.Done()
|
|
|
+ if err != nil {
|
|
|
+ // At this point you can retry writing keys or send error over a channel to handle
|
|
|
+ // in some other goroutine.
|
|
|
+ log.Errorf("Got error: %+v\n", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check for error in entries which could be non-nil if the user supplies a CasCounter.
|
|
|
+ for _, e := range batch {
|
|
|
+ if e.Error != nil {
|
|
|
+ log.Errorf("Got error: %+v\n", e.Error)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // You can do cleanup now. Like deleting keys from cache.
|
|
|
+ log.Info("All async sets complete.")
|
|
|
+ }
|
|
|
+
|
|
|
for ;;limit++ {
|
|
|
record, err := cr.Read()
|
|
|
if err == io.EOF {
|
|
@@ -338,58 +377,40 @@ func importDatabase(filename string) {
|
|
|
if err != nil {
|
|
|
log.Warning(err)
|
|
|
} else {
|
|
|
- oneEntry = badger.Entry{ Key: []byte(record[1]), Value: []byte(jsonNewEntry)}
|
|
|
- batch[limit % BATCH_BLOCK] = oneEntry
|
|
|
+// oneEntry = badger.Entry{ Key: []byte(record[1]), Value: []byte(jsonNewEntry)}
|
|
|
+ batch = append(batch, &badger.Entry{
|
|
|
+ Key: []byte(record[1]),
|
|
|
+ Value: []byte(jsonNewEntry),
|
|
|
+ })
|
|
|
}
|
|
|
if limit % BATCH_BLOCK == 0 && limit != 0 { // we do not run on the first time, and then only every BATCH_BLOCK times
|
|
|
log.Debug("Processing:", limit)
|
|
|
- go writeOneBatch(kv, batch)
|
|
|
+// go writeOneBatch(kv, batch)
|
|
|
+ wg.Add(1)
|
|
|
+ kv.BatchSetAsync(batch, f)
|
|
|
+ log.Debug("Finished")
|
|
|
+ wg.Wait()
|
|
|
batch = nil // clear all entries, start a new batch
|
|
|
runtime.GC()
|
|
|
- batch = make([]badger.Entry, BATCH_BLOCK)
|
|
|
+ batch = make([]*badger.Entry, 0, BATCH_BLOCK)
|
|
|
}
|
|
|
}
|
|
|
- writeOneBatch(kv, batch) // NOTE(gwyneth): these are the final ones (i.e. from the last round number of BATCH_BLOCK up to the end)
|
|
|
- // and we do not run them as goroutine because the function might terminate and close our channel before
|
|
|
- // this finishes
|
|
|
- // BUG(gwyneth): if BATCH_BLOCK is too small, or we finish exactly on the modulo, we may still have a
|
|
|
- // racing condition!
|
|
|
+ // NOTE(gwyneth): these are the final ones (i.e. from the last round number of BATCH_BLOCK up to the end)
|
|
|
+ // and we do not run them as goroutine because the function might terminate and close our channel before
|
|
|
+ // this finishes
|
|
|
+ // BUG(gwyneth): if BATCH_BLOCK is too small, or we finish exactly on the modulo, we may still have a
|
|
|
+ // racing condition!
|
|
|
+ wg.Add(1)
|
|
|
+ kv.BatchSetAsync(batch, f)
|
|
|
+ log.Debug("Finished last batch")
|
|
|
+ wg.Wait()
|
|
|
batch = nil // flag the garbage collector that we are finished with this array
|
|
|
+ runtime.GC()
|
|
|
time_end := time.Now()
|
|
|
diffTime := time_end.Sub(time_start)
|
|
|
log.Info("Total read", limit, "records (or thereabouts) in", diffTime)
|
|
|
}
|
|
|
|
|
|
-// writeOneBatch copies all entries in this batch into the correct kind of array to push them out as a batch.
|
|
|
-// Since this is called as a goroutine, there will be thousands of those batches around!
|
|
|
-func writeOneBatch(kv *badger.KV, batch []badger.Entry) {
|
|
|
- if kv == nil {
|
|
|
- log.Panic("kv should NEVER be nil")
|
|
|
- return
|
|
|
- }
|
|
|
- if batch == nil || len(batch) == 0 {
|
|
|
- log.Panic("batch should NEVER be nil or have zero elements")
|
|
|
- return
|
|
|
- }
|
|
|
- time_start := time.Now()
|
|
|
- var entries = make([]*badger.Entry, BATCH_BLOCK) // prepare entries for BatchSet
|
|
|
- for i := 0; i < BATCH_BLOCK; i++ {
|
|
|
- entries[i] = &batch[i]
|
|
|
- }
|
|
|
- if entries == nil || len(entries) == 0{
|
|
|
- log.Panic("entries should NEVER be nil or have zero elements")
|
|
|
- return
|
|
|
- }
|
|
|
- checkErr(kv.BatchSet(entries))
|
|
|
- for _, e := range entries {
|
|
|
- checkErr(e.Error)
|
|
|
- }
|
|
|
- entries = nil // flag the garbage collector that we're finished with this array
|
|
|
- runtime.GC()
|
|
|
- time_end := time.Now()
|
|
|
- diffTime := time_end.Sub(time_start)
|
|
|
- log.Debug("goroutine sent a batch of", BATCH_BLOCK, "records in", diffTime)
|
|
|
-}
|
|
|
|
|
|
// NOTE(gwyneth):Auxiliary functions which I'm always using...
|
|
|
|