From d93dac626ee78fbf07516289249942dccb5a03bc Mon Sep 17 00:00:00 2001 From: Rokas Puzonas Date: Mon, 13 Nov 2023 22:35:21 +0200 Subject: [PATCH] update lab2 solution --- lab2/main.go | 127 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 29 deletions(-) diff --git a/lab2/main.go b/lab2/main.go index 24b882b..243a10e 100644 --- a/lab2/main.go +++ b/lab2/main.go @@ -5,11 +5,10 @@ import ( "fmt" "log" "os" - "sync" "time" ) -const processThreadCount = 3 +var Terminator = DataEntry{ Name: "Terminator", Sugar: -1, Criteria: -1 }; type DataEntry struct { Name string `json:"name"` @@ -55,20 +54,90 @@ func processEntry(entry DataEntry, output chan DataEntry) { } } -func processEntries(input chan DataEntry, output chan DataEntry, waitGroup *sync.WaitGroup) { - for entry := range input { - fmt.Println("Started to filter:", entry) - processEntry(entry, output) - fmt.Println("Finished to filter:", entry) +func dataThread(fromMain, toWorker chan DataEntry, workerRequest chan bool, arraySize int, workerCount int) { + var gotTerminator = false + var storage []DataEntry = make([]DataEntry, arraySize) + var used = 0 + + var appendtoStorage = func(entry DataEntry) { + if (entry == Terminator) { + gotTerminator = true + return + } + + storage[used] = entry + used += 1 + } + + var getFromStorage = func() DataEntry { + var entry = storage[used-1] + used -= 1 + return entry + } + + for { + if (used == arraySize) { + <-workerRequest + toWorker <- getFromStorage() + } else if (used == 0) { + appendtoStorage(<-fromMain) + } else { + select { + case entry := <-fromMain: + appendtoStorage(entry) + case <-workerRequest: + toWorker <- getFromStorage() + } + } + + if (gotTerminator) { + for i := 0; i < used; i++ { + <-workerRequest + toWorker <- storage[i] + } + for i := 0; i < workerCount; i++ { + <-workerRequest + toWorker <- Terminator + } + break + } + } - waitGroup.Done(); } -func collectResults(channel chan DataEntry, output *SortedList, waitGroup *sync.WaitGroup) { - for entry := range channel { - output.Append(entry) +func workerThread(fromData, toResult chan DataEntry, toDataRequest chan bool) { + for { + toDataRequest <-true + var entry = <-fromData + if entry == Terminator { + toResult <-Terminator + break + } + fmt.Println("Started to filter:", entry) + processEntry(entry, toResult) + fmt.Println("Finished to filter:", entry) } - waitGroup.Done() +} + +func resultThread(fromWorker chan DataEntry, toMain chan SortedList, workerCount int, maxResults int) { + result := makeSortedList(maxResults) + finishedWorkers := 0 + + for { + if finishedWorkers == workerCount { + break + } + + var entry = <-fromWorker + if entry == Terminator { + finishedWorkers += 1 + continue + } + + result.Append(entry) + } + + toMain <-result } func main() { @@ -90,26 +159,25 @@ func main() { log.Fatal(err) } - outputList := makeSortedList(len(entries)) - waitGroup := sync.WaitGroup{} - outputWaitGroup := sync.WaitGroup{} - inputChannel := make(chan DataEntry); - outputChannel := make(chan DataEntry); + mainToData := make(chan DataEntry) + dataToWorker := make(chan DataEntry) + workerToDataRequest := make(chan bool) + workerToResult := make(chan DataEntry) + resultToMain := make(chan SortedList) - for i := 0; i < processThreadCount; i++ { - go processEntries(inputChannel, outputChannel, &waitGroup) - waitGroup.Add(1) + var workerCount = 5 + + for i := 0; i < workerCount; i++ { + go workerThread(dataToWorker, workerToResult, workerToDataRequest) } - outputWaitGroup.Add(1) - go collectResults(outputChannel, &outputList, &outputWaitGroup) + go dataThread(mainToData, dataToWorker, workerToDataRequest, 3, workerCount) + go resultThread(workerToResult, resultToMain, workerCount, len(entries)) for _, entry := range entries { - inputChannel <- entry + mainToData <-entry } - close(inputChannel) - waitGroup.Wait() - close(outputChannel) - outputWaitGroup.Wait() + mainToData <-Terminator + var result = <-resultToMain outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644) if err != nil { @@ -117,11 +185,12 @@ func main() { } defer outputFile.Close() - outputListBytes, err := json.MarshalIndent(outputList.GetSlice(), "", "\t") + outputListBytes, err := json.MarshalIndent(result.GetSlice(), "", "\t") if err != nil { log.Fatal(err) } outputFile.Write(outputListBytes) - fmt.Printf("Finished, %d entries\n", outputList.count) + fmt.Printf("Initial amount %d\n", len(entries)) + fmt.Printf("Finished, %d entries\n", result.count) }