diff --git a/lab1a/go/README.md b/lab1a/go/README.md index df99b32..1f65c05 100644 --- a/lab1a/go/README.md +++ b/lab1a/go/README.md @@ -1,5 +1,5 @@ # Lab1 ```shell -go run limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt +go run sorted_list.go limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt ``` diff --git a/lab1a/go/limited_buffer.go b/lab1a/go/limited_buffer.go index 17d6046..ca67bf3 100644 --- a/lab1a/go/limited_buffer.go +++ b/lab1a/go/limited_buffer.go @@ -10,21 +10,12 @@ type LimitedBuffer struct { mutex *sync.Mutex updateCond *sync.Cond - estimatedSize int - estimatedSizeMutex *sync.Mutex - - consumedCount int - consumedMutex *sync.Mutex - - processedCount int - processedMutex *sync.Mutex + estimatedCount int + reservedCount int } func makeLimitedBuffer(bufferSize int) LimitedBuffer { var container = make([]DataEntry, bufferSize) - var estimatedSizeMutex = sync.Mutex{} - var processedMutex = sync.Mutex{} - var consumedMutex = sync.Mutex{} var itemsMutex = sync.Mutex{} var cond = sync.NewCond(&itemsMutex) @@ -32,9 +23,6 @@ func makeLimitedBuffer(bufferSize int) LimitedBuffer { items: container, mutex: &itemsMutex, updateCond: cond, - estimatedSizeMutex: &estimatedSizeMutex, - processedMutex: &processedMutex, - consumedMutex: &consumedMutex, } } @@ -63,43 +51,14 @@ func (buffer *LimitedBuffer) Remove() DataEntry { return item } -func (buffer *LimitedBuffer) IsDone() bool { - return buffer.estimatedSize == buffer.processedCount -} +func (buffer *LimitedBuffer) ReserveEntry() bool { + buffer.mutex.Lock() + defer buffer.mutex.Unlock() -func (buffer *LimitedBuffer) WaitUntilDone() { - for !buffer.IsDone() {} -} - -func (buffer *LimitedBuffer) ConsumeEstimatedEntry() bool { - buffer.estimatedSizeMutex.Lock() - defer buffer.estimatedSizeMutex.Unlock() - - if (buffer.estimatedSize == buffer.consumedCount) { + if buffer.estimatedCount == buffer.reservedCount { return false } else { - buffer.consumedCount += 1 + buffer.reservedCount++ return true } } - -func (buffer *LimitedBuffer) IsEstimatedEmpty() bool { - buffer.estimatedSizeMutex.Lock() - defer buffer.estimatedSizeMutex.Unlock() - - return buffer.estimatedSize == 0 -} - -func (buffer *LimitedBuffer) UpdateEstimated(change int) { - buffer.estimatedSizeMutex.Lock() - defer buffer.estimatedSizeMutex.Unlock() - - buffer.estimatedSize += change -} - -func (buffer *LimitedBuffer) MarkAsProcessed() { - buffer.processedMutex.Lock() - defer buffer.processedMutex.Unlock() - - buffer.processedCount += 1 -} diff --git a/lab1a/go/main.go b/lab1a/go/main.go index a6389d4..5fa1441 100644 --- a/lab1a/go/main.go +++ b/lab1a/go/main.go @@ -5,16 +5,12 @@ import ( "fmt" "log" "os" - "sort" "sync" "time" ) -const inputBufferSize = 10 -const inputMonitorCount = 3 - -const outputBufferSize = 10 -const outputMonitorCount = 1 +const bufferSize = 10 +const threadCount = 3 type DataEntry struct { Name string `json:"name"` @@ -22,48 +18,24 @@ type DataEntry struct { Criteria int `json:"criteria"` } -func filterEntry(entry DataEntry) bool { +func processEntry(entry DataEntry, outputEntries *SortedList, outputMutex *sync.Mutex) { time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria)) - return entry.Sugar > float32(entry.Criteria) + if entry.Sugar > float32(entry.Criteria) { + outputMutex.Lock() + fmt.Println("Output:", entry) + outputEntries.Append(entry) + outputMutex.Unlock() + } } -func outputEntry(outputEntries *[]DataEntry, entry DataEntry) { - time.Sleep(time.Millisecond * 200) - fmt.Println("Output:", entry) - *outputEntries = append(*outputEntries, entry) - sort.Slice(*outputEntries, func(i, j int) bool { - return (*outputEntries)[i].Sugar < (*outputEntries)[j].Sugar - }) -} - -func filterEntries(inputBuffer *LimitedBuffer, outputBuffer *LimitedBuffer) { - for inputBuffer.ConsumeEstimatedEntry() { - entry := inputBuffer.Remove() +func processEntries(monitor *LimitedBuffer, outputEntries *SortedList, outputMutex *sync.Mutex, waitGroup *sync.WaitGroup) { + for monitor.ReserveEntry() { + entry := monitor.Remove() fmt.Println("Started to filter:", entry) - isFiltered := filterEntry(entry) + processEntry(entry, outputEntries, outputMutex) fmt.Println("Finished to filter:", entry) - if (isFiltered) { - outputBuffer.UpdateEstimated(+1) - outputBuffer.Insert(entry) - } - - inputBuffer.MarkAsProcessed() - } -} - -func outputEntries(outputBuffer *LimitedBuffer, inputBuffer *LimitedBuffer, outputList *[]DataEntry, outputMutex *sync.Mutex) { - for !inputBuffer.IsDone() { - for outputBuffer.ConsumeEstimatedEntry() { - entry := outputBuffer.Remove() - outputMutex.Lock() - fmt.Println("Started to output:", entry) - outputEntry(outputList, entry) - fmt.Println("Finished to output:", entry) - outputMutex.Unlock() - - outputBuffer.MarkAsProcessed() - } } + waitGroup.Done() } func main() { @@ -85,25 +57,22 @@ func main() { log.Fatal(err) } - outputList := []DataEntry{} - - inputBuffer := makeLimitedBuffer(inputBufferSize) - outputBuffer := makeLimitedBuffer(outputBufferSize) + outputList := makeSortedList(len(entries)) outputMutex := sync.Mutex{} - inputBuffer.UpdateEstimated(len(entries)) + waitGroup := sync.WaitGroup{} - for i := 0; i < inputMonitorCount; i++ { - go filterEntries(&inputBuffer, &outputBuffer) - } - for i := 0; i < outputMonitorCount; i++ { - go outputEntries(&outputBuffer, &inputBuffer, &outputList, &outputMutex) + monitor := makeLimitedBuffer(bufferSize) + monitor.estimatedCount = len(entries) + + for i := 0; i < threadCount; i++ { + go processEntries(&monitor, &outputList, &outputMutex, &waitGroup) + waitGroup.Add(1) } for _, entry := range entries { - inputBuffer.Insert(entry); + monitor.Insert(entry); } - inputBuffer.WaitUntilDone() - outputBuffer.WaitUntilDone() + waitGroup.Wait() outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644) if err != nil { @@ -111,7 +80,7 @@ func main() { } defer outputFile.Close() - outputListBytes, err := json.MarshalIndent(outputList, "", "\t") + outputListBytes, err := json.MarshalIndent(outputList.GetSlice(), "", "\t") if err != nil { log.Fatal(err) } diff --git a/lab1a/go/sorted_list.go b/lab1a/go/sorted_list.go new file mode 100644 index 0000000..c64eb83 --- /dev/null +++ b/lab1a/go/sorted_list.go @@ -0,0 +1,31 @@ +package main + +type SortedList struct { + items []DataEntry + count int +} + +func makeSortedList(capacity int) SortedList { + var items = make([]DataEntry, capacity) + return SortedList{ + items: items, + } +} + +func (self *SortedList) Append(item DataEntry) { + if len(self.items) == self.count { + panic("SortedList is full") + } + + self.items[self.count] = item + self.count++ + for i := self.count-1; i >= 1; i-- { + if self.items[i].Sugar > self.items[i-1].Sugar { break } + + self.items[i], self.items[i-1] = self.items[i-1], self.items[i] // Swap elements [i] and [i-1] + } +} + +func (self *SortedList) GetSlice() []DataEntry { + return self.items[:self.count] +}