diff --git a/lab1/go/README.md b/lab1/go/README.md deleted file mode 100644 index 7d32eca..0000000 --- a/lab1/go/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# Lab1 - -```shell -go run main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt -``` diff --git a/lab1/go/main.go b/lab1/go/main.go deleted file mode 100644 index 951daa9..0000000 --- a/lab1/go/main.go +++ /dev/null @@ -1,104 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "os" - "sync" - "time" -) - -const inputQueueSize = 10 -const inputMonitorCount = 3 - -const outputQueueSize = 10 -const outputMonitorCount = 3 - -type DataEntry struct { - Name string `json:"name"` - Sugar float32 `json:"sugar"` - Criteria float32 `json:"criteria"` -} - -func filterEntry(entry DataEntry) bool { - time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria)) - return entry.Sugar > entry.Criteria -} - -func outputEntry(file *os.File, entry DataEntry) { - time.Sleep(time.Millisecond * 200) - fmt.Println("Output:", entry) - file.WriteString(entry.Name) - file.WriteString("\n") -} - -func filterEntries(inputGroup, outputGroup *sync.WaitGroup, input <-chan DataEntry, output chan<- DataEntry) { - for entry := range input { - fmt.Println("Started to filter:", entry) - isFiltered := filterEntry(entry) - fmt.Println("Finished to filter:", entry) - if (isFiltered) { - outputGroup.Add(1) - output <- entry - } - inputGroup.Done() - } -} - -func ouputEntries(file *os.File, group *sync.WaitGroup, channel <-chan DataEntry) { - for entry := range channel { - outputEntry(file, entry) - group.Done() - } -} - -func main() { - if len(os.Args) != 3 { - fmt.Println("Usage:", os.Args[0], " ") - os.Exit(-1) - } - - dataFilename := os.Args[1] - outputFilename := os.Args[2] - fileData, err := os.ReadFile(dataFilename) - if err != nil { - log.Fatal(err) - } - - entries := []DataEntry{} - err = json.Unmarshal(fileData, &entries) - if err != nil { - log.Fatal(err) - } - - outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644) - if err != nil { - log.Fatal(err) - } - defer outputFile.Close() - - inputChannel := make(chan DataEntry, inputQueueSize) - outputChannel := make(chan DataEntry, outputQueueSize) - - var inputsGroups = sync.WaitGroup{} - var ouputsGroups = sync.WaitGroup{} - inputsGroups.Add(len(entries)) - - for i := 0; i < inputMonitorCount; i++ { - go filterEntries(&inputsGroups, &ouputsGroups, inputChannel, outputChannel) - } - for i := 0; i < outputMonitorCount; i++ { - go ouputEntries(outputFile, &ouputsGroups, outputChannel) - } - - for _, entry := range entries { - inputChannel <- entry - } - close(inputChannel) - inputsGroups.Wait() - ouputsGroups.Wait() - close(outputChannel) - - fmt.Println("Finished") -} diff --git a/lab1/IF-1-1_PuzonasR_L1_dat_1.json b/lab1a/IF-1-1_PuzonasR_L1_dat_1.json similarity index 100% rename from lab1/IF-1-1_PuzonasR_L1_dat_1.json rename to lab1a/IF-1-1_PuzonasR_L1_dat_1.json diff --git a/lab1/IF-1-1_PuzonasR_L1_dat_2.json b/lab1a/IF-1-1_PuzonasR_L1_dat_2.json similarity index 100% rename from lab1/IF-1-1_PuzonasR_L1_dat_2.json rename to lab1a/IF-1-1_PuzonasR_L1_dat_2.json diff --git a/lab1/IF-1-1_PuzonasR_L1_dat_3.json b/lab1a/IF-1-1_PuzonasR_L1_dat_3.json similarity index 100% rename from lab1/IF-1-1_PuzonasR_L1_dat_3.json rename to lab1a/IF-1-1_PuzonasR_L1_dat_3.json diff --git a/lab1/gen-data.go b/lab1a/gen-data.go similarity index 100% rename from lab1/gen-data.go rename to lab1a/gen-data.go diff --git a/lab1/gen-data.sh b/lab1a/gen-data.sh similarity index 100% rename from lab1/gen-data.sh rename to lab1a/gen-data.sh diff --git a/lab1a/go/README.md b/lab1a/go/README.md new file mode 100644 index 0000000..df99b32 --- /dev/null +++ b/lab1a/go/README.md @@ -0,0 +1,5 @@ +# Lab1 + +```shell +go run limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt +``` diff --git a/lab1a/go/limited_buffer.go b/lab1a/go/limited_buffer.go new file mode 100644 index 0000000..17d6046 --- /dev/null +++ b/lab1a/go/limited_buffer.go @@ -0,0 +1,105 @@ +package main + +import "sync" + +type LimitedBuffer struct { + items []DataEntry + readIndex int + writeIndex int + currentSize int + mutex *sync.Mutex + updateCond *sync.Cond + + estimatedSize int + estimatedSizeMutex *sync.Mutex + + consumedCount int + consumedMutex *sync.Mutex + + processedCount int + processedMutex *sync.Mutex +} + +func makeLimitedBuffer(bufferSize int) LimitedBuffer { + var container = make([]DataEntry, bufferSize) + var estimatedSizeMutex = sync.Mutex{} + var processedMutex = sync.Mutex{} + var consumedMutex = sync.Mutex{} + + var itemsMutex = sync.Mutex{} + var cond = sync.NewCond(&itemsMutex) + return LimitedBuffer{ + items: container, + mutex: &itemsMutex, + updateCond: cond, + estimatedSizeMutex: &estimatedSizeMutex, + processedMutex: &processedMutex, + consumedMutex: &consumedMutex, + } +} + +func (buffer *LimitedBuffer) Insert(item DataEntry) { + buffer.mutex.Lock() + for buffer.currentSize == len(buffer.items) { + buffer.updateCond.Wait() + } + buffer.items[buffer.writeIndex] = item + buffer.writeIndex = (buffer.writeIndex + 1) % len(buffer.items) + buffer.currentSize++ + buffer.updateCond.Broadcast() + buffer.mutex.Unlock() +} + +func (buffer *LimitedBuffer) Remove() DataEntry { + buffer.mutex.Lock() + for buffer.currentSize == 0 { + buffer.updateCond.Wait() + } + var item = buffer.items[buffer.readIndex] + buffer.readIndex = (buffer.readIndex + 1) % len(buffer.items) + buffer.currentSize-- + buffer.updateCond.Broadcast() + buffer.mutex.Unlock() + return item +} + +func (buffer *LimitedBuffer) IsDone() bool { + return buffer.estimatedSize == buffer.processedCount +} + +func (buffer *LimitedBuffer) WaitUntilDone() { + for !buffer.IsDone() {} +} + +func (buffer *LimitedBuffer) ConsumeEstimatedEntry() bool { + buffer.estimatedSizeMutex.Lock() + defer buffer.estimatedSizeMutex.Unlock() + + if (buffer.estimatedSize == buffer.consumedCount) { + return false + } else { + buffer.consumedCount += 1 + return true + } +} + +func (buffer *LimitedBuffer) IsEstimatedEmpty() bool { + buffer.estimatedSizeMutex.Lock() + defer buffer.estimatedSizeMutex.Unlock() + + return buffer.estimatedSize == 0 +} + +func (buffer *LimitedBuffer) UpdateEstimated(change int) { + buffer.estimatedSizeMutex.Lock() + defer buffer.estimatedSizeMutex.Unlock() + + buffer.estimatedSize += change +} + +func (buffer *LimitedBuffer) MarkAsProcessed() { + buffer.processedMutex.Lock() + defer buffer.processedMutex.Unlock() + + buffer.processedCount += 1 +} diff --git a/lab1a/go/main.go b/lab1a/go/main.go new file mode 100644 index 0000000..a6389d4 --- /dev/null +++ b/lab1a/go/main.go @@ -0,0 +1,121 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "os" + "sort" + "sync" + "time" +) + +const inputBufferSize = 10 +const inputMonitorCount = 3 + +const outputBufferSize = 10 +const outputMonitorCount = 1 + +type DataEntry struct { + Name string `json:"name"` + Sugar float32 `json:"sugar"` + Criteria int `json:"criteria"` +} + +func filterEntry(entry DataEntry) bool { + time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria)) + return entry.Sugar > float32(entry.Criteria) +} + +func outputEntry(outputEntries *[]DataEntry, entry DataEntry) { + time.Sleep(time.Millisecond * 200) + fmt.Println("Output:", entry) + *outputEntries = append(*outputEntries, entry) + sort.Slice(*outputEntries, func(i, j int) bool { + return (*outputEntries)[i].Sugar < (*outputEntries)[j].Sugar + }) +} + +func filterEntries(inputBuffer *LimitedBuffer, outputBuffer *LimitedBuffer) { + for inputBuffer.ConsumeEstimatedEntry() { + entry := inputBuffer.Remove() + fmt.Println("Started to filter:", entry) + isFiltered := filterEntry(entry) + fmt.Println("Finished to filter:", entry) + if (isFiltered) { + outputBuffer.UpdateEstimated(+1) + outputBuffer.Insert(entry) + } + + inputBuffer.MarkAsProcessed() + } +} + +func outputEntries(outputBuffer *LimitedBuffer, inputBuffer *LimitedBuffer, outputList *[]DataEntry, outputMutex *sync.Mutex) { + for !inputBuffer.IsDone() { + for outputBuffer.ConsumeEstimatedEntry() { + entry := outputBuffer.Remove() + outputMutex.Lock() + fmt.Println("Started to output:", entry) + outputEntry(outputList, entry) + fmt.Println("Finished to output:", entry) + outputMutex.Unlock() + + outputBuffer.MarkAsProcessed() + } + } +} + +func main() { + if len(os.Args) != 3 { + fmt.Println("Usage:", os.Args[0], " ") + os.Exit(-1) + } + + dataFilename := os.Args[1] + outputFilename := os.Args[2] + fileData, err := os.ReadFile(dataFilename) + if err != nil { + log.Fatal(err) + } + + entries := []DataEntry{} + err = json.Unmarshal(fileData, &entries) + if err != nil { + log.Fatal(err) + } + + outputList := []DataEntry{} + + inputBuffer := makeLimitedBuffer(inputBufferSize) + outputBuffer := makeLimitedBuffer(outputBufferSize) + outputMutex := sync.Mutex{} + inputBuffer.UpdateEstimated(len(entries)) + + for i := 0; i < inputMonitorCount; i++ { + go filterEntries(&inputBuffer, &outputBuffer) + } + for i := 0; i < outputMonitorCount; i++ { + go outputEntries(&outputBuffer, &inputBuffer, &outputList, &outputMutex) + } + + for _, entry := range entries { + inputBuffer.Insert(entry); + } + inputBuffer.WaitUntilDone() + outputBuffer.WaitUntilDone() + + outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644) + if err != nil { + log.Fatal(err) + } + defer outputFile.Close() + + outputListBytes, err := json.MarshalIndent(outputList, "", "\t") + if err != nil { + log.Fatal(err) + } + outputFile.Write(outputListBytes) + + fmt.Println("Finished") +}