go-importer/inserter.go

211 lines
3.8 KiB
Go

package main
import (
"context"
gm "db_service/gorm_models"
cb "db_service/models"
helper "db_service/pkg"
"fmt"
"log"
"os"
"sync"
"time"
"gorm.io/gorm"
)
var mainCategories []gm.Category
var baza *gorm.DB
var resultingChannel = make(chan gm.Category)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup
func insertToBaza(product cb.Product) bool {
return true
}
func tayyarla() {
// baza, err := gorm.Open("mysql", os.Getenv("database_url"))
// if err != nil {
// panic(err)
// }
// mainCategories = gm.GetMainCategories(baza)
}
func bashlat() error {
fmt.Println("Insert service starts")
start := time.Now()
tayyarla()
fmt.Println("consuming")
for c := 0; c < len(mainCategories); c++ {
producer_wg.Add(1)
go func() {
defer consumer_wg.Done()
// WaitGroup for iterate functions
const max = 20
var wg sync.WaitGroup
responses := make(chan bool, max)
for item := range resultingChannel {
wg.Add(1)
db := "ty_db_" + item.Translations[0].Slug
dbExists := helper.CheckDBExists(os.Getenv("couchdb_url") + db)
if dbExists {
go iterateCategories(db, responses, func() { wg.Done() })
}
}
// close iterate function responses
go func() {
defer close(responses)
wg.Wait()
}()
for response := range responses {
fmt.Println("response", response)
}
}()
}
fmt.Println("producing")
for _, cat := range mainCategories {
consumer_wg.Add(1)
go func(category gm.Category) {
defer producer_wg.Done()
resultingChannel <- category
}(cat)
}
producer_wg.Wait()
close(resultingChannel)
consumer_wg.Wait()
elapsed := time.Since(start)
log.Println("DB Time Elapsed:", elapsed)
return nil
}
// func consumer() {
// fmt.Println("consuming")
// for c := 0; c < len(mainCategories); c++ {
// producer_wg.Add(1)
// go func() {
// defer consumer_wg.Done()
// // WaitGroup for iterate functions
// const max = 20
// var wg sync.WaitGroup
// responses := make(chan bool, max)
// for item := range resultingChannel {
// wg.Add(1)
// db := "ty_db_" + item.Translations[0].Slug
// dbExists := helper.CheckDBExists(os.Getenv("couchdb_url") + db)
// if dbExists {
// go iterateCategories(db, responses, func() { wg.Done() })
// }
// }
// // close iterate function responses
// go func() {
// defer close(responses)
// wg.Wait()
// }()
// for response := range responses {
// fmt.Println("response", response)
// }
// }()
// }
// }
// func producer() {
// fmt.Println("producing")
// for _, cat := range mainCategories {
// consumer_wg.Add(1)
// go func(category gm.Category) {
// defer producer_wg.Done()
// resultingChannel <- category
// }(cat)
// }
// producer_wg.Wait()
// close(resultingChannel)
// consumer_wg.Wait()
// }
// iterate categories
func iterateCategories(db string, finished chan<- bool, onExit func()) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
defer onExit()
// start := time.Now()
// get total product count from couch database
totalDocCount := getTotalDocumentCount(db)
if totalDocCount == 0 {
cancel()
return
}
// total document counter
i := 0
for i < totalDocCount {
cdbTime := time.Now()
product, err := getDataFromCouchDb(i, db)
i++
if err != nil {
fmt.Println(err)
cancel()
return
}
//TODO: insert product to mysql
fmt.Println("product: ", product.Name, " db: ", db)
log.Println("Couch DB. Time elapsed:", time.Since(cdbTime), " DB: ", db)
// gorm.AutoMigrate(&product)
// // if there is an error inserting, handle it
// if err != nil {
// gorm.Close()
// panic(err.Error())
// }
// be careful deferring Queries if you are using transactions
// gorm.Close()
}
finished <- true
}