update lab1a go solution
This commit is contained in:
parent
062dfd9685
commit
cd9c0f52e9
@ -1,5 +1,5 @@
|
||||
# Lab1
|
||||
|
||||
```shell
|
||||
go run limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt
|
||||
go run sorted_list.go limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt
|
||||
```
|
||||
|
@ -10,21 +10,12 @@ type LimitedBuffer struct {
|
||||
mutex *sync.Mutex
|
||||
updateCond *sync.Cond
|
||||
|
||||
estimatedSize int
|
||||
estimatedSizeMutex *sync.Mutex
|
||||
|
||||
consumedCount int
|
||||
consumedMutex *sync.Mutex
|
||||
|
||||
processedCount int
|
||||
processedMutex *sync.Mutex
|
||||
estimatedCount int
|
||||
reservedCount int
|
||||
}
|
||||
|
||||
func makeLimitedBuffer(bufferSize int) LimitedBuffer {
|
||||
var container = make([]DataEntry, bufferSize)
|
||||
var estimatedSizeMutex = sync.Mutex{}
|
||||
var processedMutex = sync.Mutex{}
|
||||
var consumedMutex = sync.Mutex{}
|
||||
|
||||
var itemsMutex = sync.Mutex{}
|
||||
var cond = sync.NewCond(&itemsMutex)
|
||||
@ -32,9 +23,6 @@ func makeLimitedBuffer(bufferSize int) LimitedBuffer {
|
||||
items: container,
|
||||
mutex: &itemsMutex,
|
||||
updateCond: cond,
|
||||
estimatedSizeMutex: &estimatedSizeMutex,
|
||||
processedMutex: &processedMutex,
|
||||
consumedMutex: &consumedMutex,
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,43 +51,14 @@ func (buffer *LimitedBuffer) Remove() DataEntry {
|
||||
return item
|
||||
}
|
||||
|
||||
func (buffer *LimitedBuffer) IsDone() bool {
|
||||
return buffer.estimatedSize == buffer.processedCount
|
||||
}
|
||||
func (buffer *LimitedBuffer) ReserveEntry() bool {
|
||||
buffer.mutex.Lock()
|
||||
defer buffer.mutex.Unlock()
|
||||
|
||||
func (buffer *LimitedBuffer) WaitUntilDone() {
|
||||
for !buffer.IsDone() {}
|
||||
}
|
||||
|
||||
func (buffer *LimitedBuffer) ConsumeEstimatedEntry() bool {
|
||||
buffer.estimatedSizeMutex.Lock()
|
||||
defer buffer.estimatedSizeMutex.Unlock()
|
||||
|
||||
if (buffer.estimatedSize == buffer.consumedCount) {
|
||||
if buffer.estimatedCount == buffer.reservedCount {
|
||||
return false
|
||||
} else {
|
||||
buffer.consumedCount += 1
|
||||
buffer.reservedCount++
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (buffer *LimitedBuffer) IsEstimatedEmpty() bool {
|
||||
buffer.estimatedSizeMutex.Lock()
|
||||
defer buffer.estimatedSizeMutex.Unlock()
|
||||
|
||||
return buffer.estimatedSize == 0
|
||||
}
|
||||
|
||||
func (buffer *LimitedBuffer) UpdateEstimated(change int) {
|
||||
buffer.estimatedSizeMutex.Lock()
|
||||
defer buffer.estimatedSizeMutex.Unlock()
|
||||
|
||||
buffer.estimatedSize += change
|
||||
}
|
||||
|
||||
func (buffer *LimitedBuffer) MarkAsProcessed() {
|
||||
buffer.processedMutex.Lock()
|
||||
defer buffer.processedMutex.Unlock()
|
||||
|
||||
buffer.processedCount += 1
|
||||
}
|
||||
|
@ -5,16 +5,12 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const inputBufferSize = 10
|
||||
const inputMonitorCount = 3
|
||||
|
||||
const outputBufferSize = 10
|
||||
const outputMonitorCount = 1
|
||||
const bufferSize = 10
|
||||
const threadCount = 3
|
||||
|
||||
type DataEntry struct {
|
||||
Name string `json:"name"`
|
||||
@ -22,48 +18,24 @@ type DataEntry struct {
|
||||
Criteria int `json:"criteria"`
|
||||
}
|
||||
|
||||
func filterEntry(entry DataEntry) bool {
|
||||
func processEntry(entry DataEntry, outputEntries *SortedList, outputMutex *sync.Mutex) {
|
||||
time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria))
|
||||
return entry.Sugar > float32(entry.Criteria)
|
||||
if entry.Sugar > float32(entry.Criteria) {
|
||||
outputMutex.Lock()
|
||||
fmt.Println("Output:", entry)
|
||||
outputEntries.Append(entry)
|
||||
outputMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func outputEntry(outputEntries *[]DataEntry, entry DataEntry) {
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
fmt.Println("Output:", entry)
|
||||
*outputEntries = append(*outputEntries, entry)
|
||||
sort.Slice(*outputEntries, func(i, j int) bool {
|
||||
return (*outputEntries)[i].Sugar < (*outputEntries)[j].Sugar
|
||||
})
|
||||
}
|
||||
|
||||
func filterEntries(inputBuffer *LimitedBuffer, outputBuffer *LimitedBuffer) {
|
||||
for inputBuffer.ConsumeEstimatedEntry() {
|
||||
entry := inputBuffer.Remove()
|
||||
func processEntries(monitor *LimitedBuffer, outputEntries *SortedList, outputMutex *sync.Mutex, waitGroup *sync.WaitGroup) {
|
||||
for monitor.ReserveEntry() {
|
||||
entry := monitor.Remove()
|
||||
fmt.Println("Started to filter:", entry)
|
||||
isFiltered := filterEntry(entry)
|
||||
processEntry(entry, outputEntries, outputMutex)
|
||||
fmt.Println("Finished to filter:", entry)
|
||||
if (isFiltered) {
|
||||
outputBuffer.UpdateEstimated(+1)
|
||||
outputBuffer.Insert(entry)
|
||||
}
|
||||
|
||||
inputBuffer.MarkAsProcessed()
|
||||
}
|
||||
}
|
||||
|
||||
func outputEntries(outputBuffer *LimitedBuffer, inputBuffer *LimitedBuffer, outputList *[]DataEntry, outputMutex *sync.Mutex) {
|
||||
for !inputBuffer.IsDone() {
|
||||
for outputBuffer.ConsumeEstimatedEntry() {
|
||||
entry := outputBuffer.Remove()
|
||||
outputMutex.Lock()
|
||||
fmt.Println("Started to output:", entry)
|
||||
outputEntry(outputList, entry)
|
||||
fmt.Println("Finished to output:", entry)
|
||||
outputMutex.Unlock()
|
||||
|
||||
outputBuffer.MarkAsProcessed()
|
||||
}
|
||||
}
|
||||
waitGroup.Done()
|
||||
}
|
||||
|
||||
func main() {
|
||||
@ -85,25 +57,22 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
outputList := []DataEntry{}
|
||||
|
||||
inputBuffer := makeLimitedBuffer(inputBufferSize)
|
||||
outputBuffer := makeLimitedBuffer(outputBufferSize)
|
||||
outputList := makeSortedList(len(entries))
|
||||
outputMutex := sync.Mutex{}
|
||||
inputBuffer.UpdateEstimated(len(entries))
|
||||
waitGroup := sync.WaitGroup{}
|
||||
|
||||
for i := 0; i < inputMonitorCount; i++ {
|
||||
go filterEntries(&inputBuffer, &outputBuffer)
|
||||
}
|
||||
for i := 0; i < outputMonitorCount; i++ {
|
||||
go outputEntries(&outputBuffer, &inputBuffer, &outputList, &outputMutex)
|
||||
monitor := makeLimitedBuffer(bufferSize)
|
||||
monitor.estimatedCount = len(entries)
|
||||
|
||||
for i := 0; i < threadCount; i++ {
|
||||
go processEntries(&monitor, &outputList, &outputMutex, &waitGroup)
|
||||
waitGroup.Add(1)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
inputBuffer.Insert(entry);
|
||||
monitor.Insert(entry);
|
||||
}
|
||||
inputBuffer.WaitUntilDone()
|
||||
outputBuffer.WaitUntilDone()
|
||||
waitGroup.Wait()
|
||||
|
||||
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
@ -111,7 +80,7 @@ func main() {
|
||||
}
|
||||
defer outputFile.Close()
|
||||
|
||||
outputListBytes, err := json.MarshalIndent(outputList, "", "\t")
|
||||
outputListBytes, err := json.MarshalIndent(outputList.GetSlice(), "", "\t")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
31
lab1a/go/sorted_list.go
Normal file
31
lab1a/go/sorted_list.go
Normal file
@ -0,0 +1,31 @@
|
||||
package main
|
||||
|
||||
type SortedList struct {
|
||||
items []DataEntry
|
||||
count int
|
||||
}
|
||||
|
||||
func makeSortedList(capacity int) SortedList {
|
||||
var items = make([]DataEntry, capacity)
|
||||
return SortedList{
|
||||
items: items,
|
||||
}
|
||||
}
|
||||
|
||||
func (self *SortedList) Append(item DataEntry) {
|
||||
if len(self.items) == self.count {
|
||||
panic("SortedList is full")
|
||||
}
|
||||
|
||||
self.items[self.count] = item
|
||||
self.count++
|
||||
for i := self.count-1; i >= 1; i-- {
|
||||
if self.items[i].Sugar > self.items[i-1].Sugar { break }
|
||||
|
||||
self.items[i], self.items[i-1] = self.items[i-1], self.items[i] // Swap elements [i] and [i-1]
|
||||
}
|
||||
}
|
||||
|
||||
func (self *SortedList) GetSlice() []DataEntry {
|
||||
return self.items[:self.count]
|
||||
}
|
Loading…
Reference in New Issue
Block a user