go-importer/db_service.go

186 lines
3.4 KiB
Go
Raw Permalink Normal View History

2022-08-12 08:59:03 +00:00
package main
import (
"context"
gm "db_service/gorm_models"
"db_service/models"
helper "db_service/pkg"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
)
func dbService() error {
fmt.Println("dbService started")
tayyarla()
start := time.Now()
var resultingChannel = make(chan gm.Category)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup
// consumer
for c := 0; c < len(mainCategories); c++ {
producer_wg.Add(1)
go func() {
defer consumer_wg.Done()
// WaitGroup for iterate functions
const max = 20
var wg sync.WaitGroup
responses := make(chan bool, max)
for item := range resultingChannel {
wg.Add(1)
db := "ty_db_" + item.Translations[0].Slug
dbExists := helper.CheckDBExists("http://admin:adminadmin@localhost:5984/" + db)
if dbExists {
go iterate(db, responses, func() { wg.Done() })
}
}
// close iterate function responses
go func() {
defer close(responses)
wg.Wait()
}()
for response := range responses {
fmt.Println("response", response)
}
}()
}
// producer
for _, cat := range mainCategories {
consumer_wg.Add(1)
go func(category gm.Category) {
defer producer_wg.Done()
resultingChannel <- category
}(cat)
}
producer_wg.Wait()
close(resultingChannel)
consumer_wg.Wait()
elapsed := time.Since(start)
log.Println("DB Time Elapsed:", elapsed)
return nil
}
// func init() {
// panic("unimplemented")
// }
// iterate categories
func iterate(db string, finished chan<- bool, onExit func()) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
defer onExit()
// start := time.Now()
// get total product count from couch database
totalDocCount := getTotalDocumentCount(db)
if totalDocCount == 0 {
cancel()
return
}
// total document counter
i := 0
for i < totalDocCount {
cdbTime := time.Now()
product, err := getDataFromCouchDb(i, db)
i++
if err != nil {
fmt.Println(err)
cancel()
return
}
//TODO: insert product to mysql
fmt.Println("product: ", product.Name, " db: ", db)
log.Println("Couch DB. Time elapsed:", time.Since(cdbTime), " DB: ", db)
// gorm.AutoMigrate(&product)
// // if there is an error inserting, handle it
// if err != nil {
// gorm.Close()
// panic(err.Error())
// }
// be careful deferring Queries if you are using transactions
// gorm.Close()
}
finished <- true
}
// getTotalDocumentCount gets the total number of documents and returns
func getTotalDocumentCount(db string) int {
var response models.DBDocCountResponse
url := os.Getenv("couch_db_source") + db
body, err := helper.SendRequest("GET", url, nil, "", true)
if err != nil {
log.Println(err.Error())
return 0
}
err = json.Unmarshal(body, &response)
if err != nil {
log.Println(err.Error())
return 0
}
return response.DocCount
}
// getDataFromCouchDb gets data from local couch db
func getDataFromCouchDb(i int, db string) (models.Product, error) {
var model models.Product
var response models.BagistoModelResponse
url := os.Getenv("couch_db_source") + db + fmt.Sprintf("/_all_docs?include_docs=true&limit=1&skip=%v", i)
body, err := helper.SendRequest("GET", url, nil, "", true)
if err != nil {
return model, err
}
err = json.Unmarshal(body, &response)
if err != nil {
log.Println(err.Error())
return model, err
}
return response.Rows[0].Doc, nil
}
//https://gorm.io/docs/query.html