package main import ( "context" gm "db_service/gorm_models" "db_service/models" helper "db_service/pkg" "encoding/json" "fmt" "log" "os" "sync" "time" ) func dbService() error { fmt.Println("dbService started") tayyarla() start := time.Now() var resultingChannel = make(chan gm.Category) var producer_wg sync.WaitGroup var consumer_wg sync.WaitGroup // consumer for c := 0; c < len(mainCategories); c++ { producer_wg.Add(1) go func() { defer consumer_wg.Done() // WaitGroup for iterate functions const max = 20 var wg sync.WaitGroup responses := make(chan bool, max) for item := range resultingChannel { wg.Add(1) db := "ty_db_" + item.Translations[0].Slug dbExists := helper.CheckDBExists("http://admin:adminadmin@localhost:5984/" + db) if dbExists { go iterate(db, responses, func() { wg.Done() }) } } // close iterate function responses go func() { defer close(responses) wg.Wait() }() for response := range responses { fmt.Println("response", response) } }() } // producer for _, cat := range mainCategories { consumer_wg.Add(1) go func(category gm.Category) { defer producer_wg.Done() resultingChannel <- category }(cat) } producer_wg.Wait() close(resultingChannel) consumer_wg.Wait() elapsed := time.Since(start) log.Println("DB Time Elapsed:", elapsed) return nil } // func init() { // panic("unimplemented") // } // iterate categories func iterate(db string, finished chan<- bool, onExit func()) { _, cancel := context.WithCancel(context.Background()) defer cancel() defer onExit() // start := time.Now() // get total product count from couch database totalDocCount := getTotalDocumentCount(db) if totalDocCount == 0 { cancel() return } // total document counter i := 0 for i < totalDocCount { cdbTime := time.Now() product, err := getDataFromCouchDb(i, db) i++ if err != nil { fmt.Println(err) cancel() return } //TODO: insert product to mysql fmt.Println("product: ", product.Name, " db: ", db) log.Println("Couch DB. Time elapsed:", time.Since(cdbTime), " DB: ", db) // gorm.AutoMigrate(&product) // // if there is an error inserting, handle it // if err != nil { // gorm.Close() // panic(err.Error()) // } // be careful deferring Queries if you are using transactions // gorm.Close() } finished <- true } // getTotalDocumentCount gets the total number of documents and returns func getTotalDocumentCount(db string) int { var response models.DBDocCountResponse url := os.Getenv("couch_db_source") + db body, err := helper.SendRequest("GET", url, nil, "", true) if err != nil { log.Println(err.Error()) return 0 } err = json.Unmarshal(body, &response) if err != nil { log.Println(err.Error()) return 0 } return response.DocCount } // getDataFromCouchDb gets data from local couch db func getDataFromCouchDb(i int, db string) (models.Product, error) { var model models.Product var response models.BagistoModelResponse url := os.Getenv("couch_db_source") + db + fmt.Sprintf("/_all_docs?include_docs=true&limit=1&skip=%v", i) body, err := helper.SendRequest("GET", url, nil, "", true) if err != nil { return model, err } err = json.Unmarshal(body, &response) if err != nil { log.Println(err.Error()) return model, err } return response.Rows[0].Doc, nil } //https://gorm.io/docs/query.html