package main import ( "context" gm "db_service/gorm_models" cb "db_service/models" helper "db_service/pkg" "fmt" "log" "os" "sync" "time" "gorm.io/gorm" ) var mainCategories []gm.Category var baza *gorm.DB var resultingChannel = make(chan gm.Category) var producer_wg sync.WaitGroup var consumer_wg sync.WaitGroup func insertToBaza(product cb.Product) bool { return true } func tayyarla() { // baza, err := gorm.Open("mysql", os.Getenv("database_url")) // if err != nil { // panic(err) // } // mainCategories = gm.GetMainCategories(baza) } func bashlat() error { fmt.Println("Insert service starts") start := time.Now() tayyarla() fmt.Println("consuming") for c := 0; c < len(mainCategories); c++ { producer_wg.Add(1) go func() { defer consumer_wg.Done() // WaitGroup for iterate functions const max = 20 var wg sync.WaitGroup responses := make(chan bool, max) for item := range resultingChannel { wg.Add(1) db := "ty_db_" + item.Translations[0].Slug dbExists := helper.CheckDBExists(os.Getenv("couchdb_url") + db) if dbExists { go iterateCategories(db, responses, func() { wg.Done() }) } } // close iterate function responses go func() { defer close(responses) wg.Wait() }() for response := range responses { fmt.Println("response", response) } }() } fmt.Println("producing") for _, cat := range mainCategories { consumer_wg.Add(1) go func(category gm.Category) { defer producer_wg.Done() resultingChannel <- category }(cat) } producer_wg.Wait() close(resultingChannel) consumer_wg.Wait() elapsed := time.Since(start) log.Println("DB Time Elapsed:", elapsed) return nil } // func consumer() { // fmt.Println("consuming") // for c := 0; c < len(mainCategories); c++ { // producer_wg.Add(1) // go func() { // defer consumer_wg.Done() // // WaitGroup for iterate functions // const max = 20 // var wg sync.WaitGroup // responses := make(chan bool, max) // for item := range resultingChannel { // wg.Add(1) // db := "ty_db_" + item.Translations[0].Slug // dbExists := helper.CheckDBExists(os.Getenv("couchdb_url") + db) // if dbExists { // go iterateCategories(db, responses, func() { wg.Done() }) // } // } // // close iterate function responses // go func() { // defer close(responses) // wg.Wait() // }() // for response := range responses { // fmt.Println("response", response) // } // }() // } // } // func producer() { // fmt.Println("producing") // for _, cat := range mainCategories { // consumer_wg.Add(1) // go func(category gm.Category) { // defer producer_wg.Done() // resultingChannel <- category // }(cat) // } // producer_wg.Wait() // close(resultingChannel) // consumer_wg.Wait() // } // iterate categories func iterateCategories(db string, finished chan<- bool, onExit func()) { _, cancel := context.WithCancel(context.Background()) defer cancel() defer onExit() // start := time.Now() // get total product count from couch database totalDocCount := getTotalDocumentCount(db) if totalDocCount == 0 { cancel() return } // total document counter i := 0 for i < totalDocCount { cdbTime := time.Now() product, err := getDataFromCouchDb(i, db) i++ if err != nil { fmt.Println(err) cancel() return } //TODO: insert product to mysql fmt.Println("product: ", product.Name, " db: ", db) log.Println("Couch DB. Time elapsed:", time.Since(cdbTime), " DB: ", db) // gorm.AutoMigrate(&product) // // if there is an error inserting, handle it // if err != nil { // gorm.Close() // panic(err.Error()) // } // be careful deferring Queries if you are using transactions // gorm.Close() } finished <- true }