update lab2 solution
This commit is contained in:
parent
91dd59c0f0
commit
d93dac626e
129
lab2/main.go
129
lab2/main.go
@ -5,11 +5,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const processThreadCount = 3
|
var Terminator = DataEntry{ Name: "Terminator", Sugar: -1, Criteria: -1 };
|
||||||
|
|
||||||
type DataEntry struct {
|
type DataEntry struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@ -55,20 +54,90 @@ func processEntry(entry DataEntry, output chan DataEntry) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func processEntries(input chan DataEntry, output chan DataEntry, waitGroup *sync.WaitGroup) {
|
func dataThread(fromMain, toWorker chan DataEntry, workerRequest chan bool, arraySize int, workerCount int) {
|
||||||
for entry := range input {
|
var gotTerminator = false
|
||||||
fmt.Println("Started to filter:", entry)
|
var storage []DataEntry = make([]DataEntry, arraySize)
|
||||||
processEntry(entry, output)
|
var used = 0
|
||||||
fmt.Println("Finished to filter:", entry)
|
|
||||||
}
|
var appendtoStorage = func(entry DataEntry) {
|
||||||
waitGroup.Done();
|
if (entry == Terminator) {
|
||||||
|
gotTerminator = true
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectResults(channel chan DataEntry, output *SortedList, waitGroup *sync.WaitGroup) {
|
storage[used] = entry
|
||||||
for entry := range channel {
|
used += 1
|
||||||
output.Append(entry)
|
|
||||||
}
|
}
|
||||||
waitGroup.Done()
|
|
||||||
|
var getFromStorage = func() DataEntry {
|
||||||
|
var entry = storage[used-1]
|
||||||
|
used -= 1
|
||||||
|
return entry
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if (used == arraySize) {
|
||||||
|
<-workerRequest
|
||||||
|
toWorker <- getFromStorage()
|
||||||
|
} else if (used == 0) {
|
||||||
|
appendtoStorage(<-fromMain)
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case entry := <-fromMain:
|
||||||
|
appendtoStorage(entry)
|
||||||
|
case <-workerRequest:
|
||||||
|
toWorker <- getFromStorage()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (gotTerminator) {
|
||||||
|
for i := 0; i < used; i++ {
|
||||||
|
<-workerRequest
|
||||||
|
toWorker <- storage[i]
|
||||||
|
}
|
||||||
|
for i := 0; i < workerCount; i++ {
|
||||||
|
<-workerRequest
|
||||||
|
toWorker <- Terminator
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func workerThread(fromData, toResult chan DataEntry, toDataRequest chan bool) {
|
||||||
|
for {
|
||||||
|
toDataRequest <-true
|
||||||
|
var entry = <-fromData
|
||||||
|
if entry == Terminator {
|
||||||
|
toResult <-Terminator
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fmt.Println("Started to filter:", entry)
|
||||||
|
processEntry(entry, toResult)
|
||||||
|
fmt.Println("Finished to filter:", entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func resultThread(fromWorker chan DataEntry, toMain chan SortedList, workerCount int, maxResults int) {
|
||||||
|
result := makeSortedList(maxResults)
|
||||||
|
finishedWorkers := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
if finishedWorkers == workerCount {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var entry = <-fromWorker
|
||||||
|
if entry == Terminator {
|
||||||
|
finishedWorkers += 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Append(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
toMain <-result
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -90,26 +159,25 @@ func main() {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
outputList := makeSortedList(len(entries))
|
mainToData := make(chan DataEntry)
|
||||||
waitGroup := sync.WaitGroup{}
|
dataToWorker := make(chan DataEntry)
|
||||||
outputWaitGroup := sync.WaitGroup{}
|
workerToDataRequest := make(chan bool)
|
||||||
inputChannel := make(chan DataEntry);
|
workerToResult := make(chan DataEntry)
|
||||||
outputChannel := make(chan DataEntry);
|
resultToMain := make(chan SortedList)
|
||||||
|
|
||||||
for i := 0; i < processThreadCount; i++ {
|
var workerCount = 5
|
||||||
go processEntries(inputChannel, outputChannel, &waitGroup)
|
|
||||||
waitGroup.Add(1)
|
for i := 0; i < workerCount; i++ {
|
||||||
|
go workerThread(dataToWorker, workerToResult, workerToDataRequest)
|
||||||
}
|
}
|
||||||
outputWaitGroup.Add(1)
|
go dataThread(mainToData, dataToWorker, workerToDataRequest, 3, workerCount)
|
||||||
go collectResults(outputChannel, &outputList, &outputWaitGroup)
|
go resultThread(workerToResult, resultToMain, workerCount, len(entries))
|
||||||
|
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
inputChannel <- entry
|
mainToData <-entry
|
||||||
}
|
}
|
||||||
close(inputChannel)
|
mainToData <-Terminator
|
||||||
waitGroup.Wait()
|
var result = <-resultToMain
|
||||||
close(outputChannel)
|
|
||||||
outputWaitGroup.Wait()
|
|
||||||
|
|
||||||
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
|
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -117,11 +185,12 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer outputFile.Close()
|
defer outputFile.Close()
|
||||||
|
|
||||||
outputListBytes, err := json.MarshalIndent(outputList.GetSlice(), "", "\t")
|
outputListBytes, err := json.MarshalIndent(result.GetSlice(), "", "\t")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
outputFile.Write(outputListBytes)
|
outputFile.Write(outputListBytes)
|
||||||
|
|
||||||
fmt.Printf("Finished, %d entries\n", outputList.count)
|
fmt.Printf("Initial amount %d\n", len(entries))
|
||||||
|
fmt.Printf("Finished, %d entries\n", result.count)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user