Compare commits
No commits in common. "15772e30d9551abd185eb7eee24f65ba99a35a2d" and "062dfd96850c6bb2884b97a0f5ceb28dee56b1e9" have entirely different histories.
15772e30d9
...
062dfd9685
@ -10,20 +10,26 @@ class LimitedBuffer {
|
|||||||
std::mutex items_mutex;
|
std::mutex items_mutex;
|
||||||
std::condition_variable items_cv;
|
std::condition_variable items_cv;
|
||||||
|
|
||||||
int reserved_count;
|
|
||||||
public:
|
public:
|
||||||
int estimated_count;
|
int estimated_size;
|
||||||
|
int consumed_count;
|
||||||
|
int processed_count;
|
||||||
|
std::mutex processed_mutex;
|
||||||
|
|
||||||
LimitedBuffer(int capacity) {
|
LimitedBuffer(int capacity) {
|
||||||
this->capacity = capacity;
|
this->capacity = capacity;
|
||||||
estimated_count = 0;
|
estimated_size = 0;
|
||||||
reserved_count = 0;
|
consumed_count = 0;
|
||||||
|
processed_count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void insert(DataEntry entry);
|
void insert(DataEntry entry);
|
||||||
DataEntry remove();
|
DataEntry remove();
|
||||||
|
|
||||||
bool reserve_entry();
|
void update_estimated(int change);
|
||||||
|
bool consume_estimated();
|
||||||
|
void mark_processed();
|
||||||
|
bool is_done();
|
||||||
};
|
};
|
||||||
|
|
||||||
void LimitedBuffer::insert(DataEntry entry) {
|
void LimitedBuffer::insert(DataEntry entry) {
|
||||||
@ -44,13 +50,27 @@ DataEntry LimitedBuffer::remove() {
|
|||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool LimitedBuffer::reserve_entry() {
|
void LimitedBuffer::update_estimated(int change) {
|
||||||
std::unique_lock<std::mutex> guard(items_mutex);
|
std::unique_lock<std::mutex> guard(processed_mutex);
|
||||||
|
estimated_size += change;
|
||||||
|
}
|
||||||
|
|
||||||
if (reserved_count == estimated_count) {
|
bool LimitedBuffer::consume_estimated() {
|
||||||
|
std::unique_lock<std::mutex> guard(processed_mutex);
|
||||||
|
|
||||||
|
if (consumed_count == estimated_size) {
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
reserved_count++;
|
consumed_count++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void LimitedBuffer::mark_processed() {
|
||||||
|
std::unique_lock<std::mutex> guard(processed_mutex);
|
||||||
|
processed_count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LimitedBuffer::is_done() {
|
||||||
|
return estimated_size == processed_count;
|
||||||
|
}
|
||||||
|
@ -11,32 +11,57 @@
|
|||||||
using namespace std;
|
using namespace std;
|
||||||
using json = nlohmann::json;
|
using json = nlohmann::json;
|
||||||
|
|
||||||
const int bufferSize = 10;
|
const int inputBufferSize = 10;
|
||||||
const int threadCount = 1;
|
const int inputMonitorCount = 3;
|
||||||
|
|
||||||
|
const int outputBufferSize = 10;
|
||||||
|
const int outputMonitorCount = 3;
|
||||||
|
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "limited_buffer.cpp"
|
#include "limited_buffer.cpp"
|
||||||
|
|
||||||
void processEntry(DataEntry *entry, vector<DataEntry> *entries, mutex *outputMutex) {
|
bool filterEntry(DataEntry *entry) {
|
||||||
this_thread::sleep_for(chrono::milliseconds(100 + entry->criteria));
|
this_thread::sleep_for(chrono::milliseconds(100 + entry->criteria));
|
||||||
if (entry->sugar > entry->criteria) {
|
return entry->sugar > entry->criteria;
|
||||||
outputMutex->lock();
|
}
|
||||||
cout << "Output: " << entry->name << endl;
|
|
||||||
// TODO: make this a sorted add
|
void outputEntry(vector<DataEntry> *entries, DataEntry *entry) {
|
||||||
entries->push_back(*entry);
|
this_thread::sleep_for(chrono::milliseconds(200));
|
||||||
sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) {
|
cout << "Output: " << entry->name << endl;
|
||||||
return a.sugar < b.sugar;
|
entries->push_back(*entry);
|
||||||
});
|
sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) {
|
||||||
outputMutex->unlock();
|
return a.sugar < b.sugar;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void filterEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer) {
|
||||||
|
while (!inputBuffer->is_done()) {
|
||||||
|
if (inputBuffer->consume_estimated()) {
|
||||||
|
DataEntry entry = inputBuffer->remove();
|
||||||
|
cout << "Started to filter: " << entry.name << endl;
|
||||||
|
bool is_filtered = filterEntry(&entry);
|
||||||
|
cout << "Finished to filter: " << entry.name << endl;
|
||||||
|
|
||||||
|
if (is_filtered) {
|
||||||
|
outputBuffer->update_estimated(+1);
|
||||||
|
outputBuffer->insert(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
inputBuffer->mark_processed();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void processEntries(LimitedBuffer *monitor, vector<DataEntry> *outputList, mutex *outputMutex) {
|
void outputEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer, vector<DataEntry> *outputList, mutex *outputMutex) {
|
||||||
while (monitor->reserve_entry()) {
|
while (!inputBuffer->is_done() || !outputBuffer->is_done()) {
|
||||||
DataEntry entry = monitor->remove();
|
if (outputBuffer->consume_estimated()) {
|
||||||
cout << "Started to filter: " << entry.name << endl;
|
DataEntry entry = outputBuffer->remove();
|
||||||
processEntry(&entry, outputList, outputMutex);
|
cout << "Started to output: " << entry.name << endl;
|
||||||
cout << "Finished to filter: " << entry.name << endl;
|
outputEntry(outputList, &entry);
|
||||||
|
cout << "Finished to output: " << entry.name << endl;
|
||||||
|
|
||||||
|
outputBuffer->mark_processed();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,17 +89,22 @@ int main(int argc, char **argv) {
|
|||||||
vector<DataEntry> outputList;
|
vector<DataEntry> outputList;
|
||||||
mutex outputListMutex;
|
mutex outputListMutex;
|
||||||
|
|
||||||
LimitedBuffer monitor(bufferSize);
|
LimitedBuffer inputBuffer(inputBufferSize);
|
||||||
monitor.estimated_count = entries.size();
|
LimitedBuffer outputBuffer(outputBufferSize);
|
||||||
|
inputBuffer.update_estimated(entries.size());
|
||||||
|
|
||||||
vector<thread> threads;
|
vector<thread> threads;
|
||||||
for (int i = 0; i < threadCount; i++) {
|
for (int i = 0; i < inputMonitorCount; i++) {
|
||||||
threads.push_back(thread(processEntries, &monitor, &outputList, &outputListMutex));
|
threads.push_back(thread(filterEntries, &inputBuffer, &outputBuffer));
|
||||||
|
}
|
||||||
|
for (int i = 0; i < outputMonitorCount; i++) {
|
||||||
|
threads.push_back(thread(outputEntries, &inputBuffer, &outputBuffer, &outputList, &outputListMutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < entries.size(); i++) {
|
for (int i = 0; i < entries.size(); i++) {
|
||||||
monitor.insert(entries[i]);
|
inputBuffer.insert(entries[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < threads.size(); i++) {
|
for (int i = 0; i < threads.size(); i++) {
|
||||||
threads[i].join();
|
threads[i].join();
|
||||||
}
|
}
|
||||||
@ -100,3 +130,9 @@ int main(int argc, char **argv) {
|
|||||||
cout << "Finished" << endl;
|
cout << "Finished" << endl;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void execute(const string &name) {
|
||||||
|
cout << name << ": one" << endl;
|
||||||
|
cout << name << ": two" << endl;
|
||||||
|
cout << name << ": three" << endl;
|
||||||
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
# Lab1
|
# Lab1
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
go run sorted_list.go limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt
|
go run limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt
|
||||||
```
|
```
|
||||||
|
@ -10,12 +10,21 @@ type LimitedBuffer struct {
|
|||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
updateCond *sync.Cond
|
updateCond *sync.Cond
|
||||||
|
|
||||||
estimatedCount int
|
estimatedSize int
|
||||||
reservedCount int
|
estimatedSizeMutex *sync.Mutex
|
||||||
|
|
||||||
|
consumedCount int
|
||||||
|
consumedMutex *sync.Mutex
|
||||||
|
|
||||||
|
processedCount int
|
||||||
|
processedMutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeLimitedBuffer(bufferSize int) LimitedBuffer {
|
func makeLimitedBuffer(bufferSize int) LimitedBuffer {
|
||||||
var container = make([]DataEntry, bufferSize)
|
var container = make([]DataEntry, bufferSize)
|
||||||
|
var estimatedSizeMutex = sync.Mutex{}
|
||||||
|
var processedMutex = sync.Mutex{}
|
||||||
|
var consumedMutex = sync.Mutex{}
|
||||||
|
|
||||||
var itemsMutex = sync.Mutex{}
|
var itemsMutex = sync.Mutex{}
|
||||||
var cond = sync.NewCond(&itemsMutex)
|
var cond = sync.NewCond(&itemsMutex)
|
||||||
@ -23,6 +32,9 @@ func makeLimitedBuffer(bufferSize int) LimitedBuffer {
|
|||||||
items: container,
|
items: container,
|
||||||
mutex: &itemsMutex,
|
mutex: &itemsMutex,
|
||||||
updateCond: cond,
|
updateCond: cond,
|
||||||
|
estimatedSizeMutex: &estimatedSizeMutex,
|
||||||
|
processedMutex: &processedMutex,
|
||||||
|
consumedMutex: &consumedMutex,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,14 +63,43 @@ func (buffer *LimitedBuffer) Remove() DataEntry {
|
|||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
func (buffer *LimitedBuffer) ReserveEntry() bool {
|
func (buffer *LimitedBuffer) IsDone() bool {
|
||||||
buffer.mutex.Lock()
|
return buffer.estimatedSize == buffer.processedCount
|
||||||
defer buffer.mutex.Unlock()
|
}
|
||||||
|
|
||||||
if buffer.estimatedCount == buffer.reservedCount {
|
func (buffer *LimitedBuffer) WaitUntilDone() {
|
||||||
|
for !buffer.IsDone() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (buffer *LimitedBuffer) ConsumeEstimatedEntry() bool {
|
||||||
|
buffer.estimatedSizeMutex.Lock()
|
||||||
|
defer buffer.estimatedSizeMutex.Unlock()
|
||||||
|
|
||||||
|
if (buffer.estimatedSize == buffer.consumedCount) {
|
||||||
return false
|
return false
|
||||||
} else {
|
} else {
|
||||||
buffer.reservedCount++
|
buffer.consumedCount += 1
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (buffer *LimitedBuffer) IsEstimatedEmpty() bool {
|
||||||
|
buffer.estimatedSizeMutex.Lock()
|
||||||
|
defer buffer.estimatedSizeMutex.Unlock()
|
||||||
|
|
||||||
|
return buffer.estimatedSize == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (buffer *LimitedBuffer) UpdateEstimated(change int) {
|
||||||
|
buffer.estimatedSizeMutex.Lock()
|
||||||
|
defer buffer.estimatedSizeMutex.Unlock()
|
||||||
|
|
||||||
|
buffer.estimatedSize += change
|
||||||
|
}
|
||||||
|
|
||||||
|
func (buffer *LimitedBuffer) MarkAsProcessed() {
|
||||||
|
buffer.processedMutex.Lock()
|
||||||
|
defer buffer.processedMutex.Unlock()
|
||||||
|
|
||||||
|
buffer.processedCount += 1
|
||||||
|
}
|
||||||
|
@ -5,12 +5,16 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const bufferSize = 10
|
const inputBufferSize = 10
|
||||||
const threadCount = 3
|
const inputMonitorCount = 3
|
||||||
|
|
||||||
|
const outputBufferSize = 10
|
||||||
|
const outputMonitorCount = 1
|
||||||
|
|
||||||
type DataEntry struct {
|
type DataEntry struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@ -18,24 +22,48 @@ type DataEntry struct {
|
|||||||
Criteria int `json:"criteria"`
|
Criteria int `json:"criteria"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func processEntry(entry DataEntry, outputEntries *SortedList, outputMutex *sync.Mutex) {
|
func filterEntry(entry DataEntry) bool {
|
||||||
time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria))
|
time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria))
|
||||||
if entry.Sugar > float32(entry.Criteria) {
|
return entry.Sugar > float32(entry.Criteria)
|
||||||
outputMutex.Lock()
|
}
|
||||||
fmt.Println("Output:", entry)
|
|
||||||
outputEntries.Append(entry)
|
func outputEntry(outputEntries *[]DataEntry, entry DataEntry) {
|
||||||
outputMutex.Unlock()
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
fmt.Println("Output:", entry)
|
||||||
|
*outputEntries = append(*outputEntries, entry)
|
||||||
|
sort.Slice(*outputEntries, func(i, j int) bool {
|
||||||
|
return (*outputEntries)[i].Sugar < (*outputEntries)[j].Sugar
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterEntries(inputBuffer *LimitedBuffer, outputBuffer *LimitedBuffer) {
|
||||||
|
for inputBuffer.ConsumeEstimatedEntry() {
|
||||||
|
entry := inputBuffer.Remove()
|
||||||
|
fmt.Println("Started to filter:", entry)
|
||||||
|
isFiltered := filterEntry(entry)
|
||||||
|
fmt.Println("Finished to filter:", entry)
|
||||||
|
if (isFiltered) {
|
||||||
|
outputBuffer.UpdateEstimated(+1)
|
||||||
|
outputBuffer.Insert(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
inputBuffer.MarkAsProcessed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func processEntries(monitor *LimitedBuffer, outputEntries *SortedList, outputMutex *sync.Mutex, waitGroup *sync.WaitGroup) {
|
func outputEntries(outputBuffer *LimitedBuffer, inputBuffer *LimitedBuffer, outputList *[]DataEntry, outputMutex *sync.Mutex) {
|
||||||
for monitor.ReserveEntry() {
|
for !inputBuffer.IsDone() {
|
||||||
entry := monitor.Remove()
|
for outputBuffer.ConsumeEstimatedEntry() {
|
||||||
fmt.Println("Started to filter:", entry)
|
entry := outputBuffer.Remove()
|
||||||
processEntry(entry, outputEntries, outputMutex)
|
outputMutex.Lock()
|
||||||
fmt.Println("Finished to filter:", entry)
|
fmt.Println("Started to output:", entry)
|
||||||
|
outputEntry(outputList, entry)
|
||||||
|
fmt.Println("Finished to output:", entry)
|
||||||
|
outputMutex.Unlock()
|
||||||
|
|
||||||
|
outputBuffer.MarkAsProcessed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
waitGroup.Done()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -57,22 +85,25 @@ func main() {
|
|||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
outputList := makeSortedList(len(entries))
|
outputList := []DataEntry{}
|
||||||
|
|
||||||
|
inputBuffer := makeLimitedBuffer(inputBufferSize)
|
||||||
|
outputBuffer := makeLimitedBuffer(outputBufferSize)
|
||||||
outputMutex := sync.Mutex{}
|
outputMutex := sync.Mutex{}
|
||||||
waitGroup := sync.WaitGroup{}
|
inputBuffer.UpdateEstimated(len(entries))
|
||||||
|
|
||||||
monitor := makeLimitedBuffer(bufferSize)
|
for i := 0; i < inputMonitorCount; i++ {
|
||||||
monitor.estimatedCount = len(entries)
|
go filterEntries(&inputBuffer, &outputBuffer)
|
||||||
|
}
|
||||||
for i := 0; i < threadCount; i++ {
|
for i := 0; i < outputMonitorCount; i++ {
|
||||||
go processEntries(&monitor, &outputList, &outputMutex, &waitGroup)
|
go outputEntries(&outputBuffer, &inputBuffer, &outputList, &outputMutex)
|
||||||
waitGroup.Add(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
monitor.Insert(entry);
|
inputBuffer.Insert(entry);
|
||||||
}
|
}
|
||||||
waitGroup.Wait()
|
inputBuffer.WaitUntilDone()
|
||||||
|
outputBuffer.WaitUntilDone()
|
||||||
|
|
||||||
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
|
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -80,7 +111,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer outputFile.Close()
|
defer outputFile.Close()
|
||||||
|
|
||||||
outputListBytes, err := json.MarshalIndent(outputList.GetSlice(), "", "\t")
|
outputListBytes, err := json.MarshalIndent(outputList, "", "\t")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -1,31 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
type SortedList struct {
|
|
||||||
items []DataEntry
|
|
||||||
count int
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeSortedList(capacity int) SortedList {
|
|
||||||
var items = make([]DataEntry, capacity)
|
|
||||||
return SortedList{
|
|
||||||
items: items,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *SortedList) Append(item DataEntry) {
|
|
||||||
if len(self.items) == self.count {
|
|
||||||
panic("SortedList is full")
|
|
||||||
}
|
|
||||||
|
|
||||||
self.items[self.count] = item
|
|
||||||
self.count++
|
|
||||||
for i := self.count-1; i >= 1; i-- {
|
|
||||||
if self.items[i].Sugar > self.items[i-1].Sugar { break }
|
|
||||||
|
|
||||||
self.items[i], self.items[i-1] = self.items[i-1], self.items[i] // Swap elements [i] and [i-1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *SortedList) GetSlice() []DataEntry {
|
|
||||||
return self.items[:self.count]
|
|
||||||
}
|
|
1
lab1a/rust/.gitignore
vendored
Normal file
1
lab1a/rust/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
target
|
122
lab1a/rust/Cargo.lock
generated
Normal file
122
lab1a/rust/Cargo.lock
generated
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "a-rust"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"crossbeam-channel",
|
||||||
|
"crossbeam-utils",
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "anyhow"
|
||||||
|
version = "1.0.75"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cfg-if"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam-channel"
|
||||||
|
version = "0.5.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"crossbeam-utils",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam-utils"
|
||||||
|
version = "0.8.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itoa"
|
||||||
|
version = "1.0.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro2"
|
||||||
|
version = "1.0.67"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-ident",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quote"
|
||||||
|
version = "1.0.33"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ryu"
|
||||||
|
version = "1.0.15"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde"
|
||||||
|
version = "1.0.188"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
|
||||||
|
dependencies = [
|
||||||
|
"serde_derive",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_derive"
|
||||||
|
version = "1.0.188"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_json"
|
||||||
|
version = "1.0.107"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
|
||||||
|
dependencies = [
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "syn"
|
||||||
|
version = "2.0.36"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "91e02e55d62894af2a08aca894c6577281f76769ba47c94d5756bec8ac6e7373"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"unicode-ident",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-ident"
|
||||||
|
version = "1.0.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
|
12
lab1a/rust/Cargo.toml
Normal file
12
lab1a/rust/Cargo.toml
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
[package]
|
||||||
|
name = "a-rust"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.75"
|
||||||
|
crossbeam-channel = "0.5.8"
|
||||||
|
crossbeam-utils = "0.8.16"
|
||||||
|
serde_json = "1.0.107"
|
3
lab1a/rust/README.md
Normal file
3
lab1a/rust/README.md
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# Lab1a
|
||||||
|
|
||||||
|
THIS IS NOT FINISHED
|
7
lab1a/rust/src/common.rs
Normal file
7
lab1a/rust/src/common.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DataEntry {
|
||||||
|
pub name: String,
|
||||||
|
pub sugar: f32,
|
||||||
|
pub criteria: i32
|
||||||
|
}
|
59
lab1a/rust/src/limited_buffer.rs
Normal file
59
lab1a/rust/src/limited_buffer.rs
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
use crate::common::DataEntry;
|
||||||
|
|
||||||
|
pub struct LimitedBuffer {
|
||||||
|
items: Vec<DataEntry>,
|
||||||
|
capacity: usize,
|
||||||
|
|
||||||
|
estimated_size: i32,
|
||||||
|
consumed_count: u32,
|
||||||
|
processed_count: u32
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LimitedBuffer {
|
||||||
|
pub fn new(capacity: usize) -> LimitedBuffer {
|
||||||
|
return LimitedBuffer{
|
||||||
|
items: Vec::with_capacity(capacity),
|
||||||
|
capacity,
|
||||||
|
estimated_size: 0,
|
||||||
|
consumed_count: 0,
|
||||||
|
processed_count: 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn try_insert(&mut self, entry: DataEntry) -> bool {
|
||||||
|
if self.items.len() == self.capacity {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
self.items.push(entry);
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove(&mut self) -> Option<DataEntry> {
|
||||||
|
if self.items.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(self.items.remove(0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_done(&self) -> bool {
|
||||||
|
return (self.estimated_size as u32) == self.processed_count
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn consume_estimated(&mut self) -> bool {
|
||||||
|
if (self.estimated_size as u32) == self.consumed_count {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
self.consumed_count += 1;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_estimated(&mut self, change: i32) {
|
||||||
|
self.estimated_size += change
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mark_processed(&mut self) {
|
||||||
|
self.processed_count += 1
|
||||||
|
}
|
||||||
|
}
|
118
lab1a/rust/src/main.rs
Normal file
118
lab1a/rust/src/main.rs
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
use std::{env::args, process::exit, thread, time::Duration, sync::{Arc, Mutex}};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
mod limited_buffer;
|
||||||
|
mod common;
|
||||||
|
|
||||||
|
use common::DataEntry;
|
||||||
|
use limited_buffer::LimitedBuffer;
|
||||||
|
|
||||||
|
// This was a mistake
|
||||||
|
|
||||||
|
const INPUT_BUFFER_SIZE: usize = 2;
|
||||||
|
const INPUT_MONITOR_COUNT: i32 = 2;
|
||||||
|
|
||||||
|
const OUTPUT_BUFFER_SIZE: usize = 10;
|
||||||
|
const OUTPUT_MONITOR_COUNT: i32 = 1;
|
||||||
|
|
||||||
|
fn filter_entry(entry: &DataEntry) -> bool {
|
||||||
|
thread::sleep(Duration::from_millis(100 + entry.criteria as u64));
|
||||||
|
return entry.sugar > entry.criteria as f32
|
||||||
|
}
|
||||||
|
|
||||||
|
fn output_entry(output_list: &mut Vec<DataEntry>, entry: DataEntry) {
|
||||||
|
thread::sleep(Duration::from_millis(200));
|
||||||
|
output_list.push(entry);
|
||||||
|
output_list.sort_by(|a, b| a.sugar.total_cmp(&b.sugar))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn filter_entries(input: Arc<Mutex<LimitedBuffer>>, output: Arc<Mutex<LimitedBuffer>>) {
|
||||||
|
while !input.lock().unwrap().is_done() {
|
||||||
|
if input.lock().unwrap().consume_estimated() {
|
||||||
|
let entry = input.lock().unwrap().remove();
|
||||||
|
if entry.is_none() { continue; }
|
||||||
|
let entry = entry.unwrap();
|
||||||
|
|
||||||
|
println!("Started to filter: {}", entry.name);
|
||||||
|
let is_filtered = filter_entry(&entry);
|
||||||
|
println!("Finished to filter: {}", entry.name);
|
||||||
|
|
||||||
|
if is_filtered {
|
||||||
|
// output.lock().unwrap().update_estimated(1);
|
||||||
|
// while !output.lock().unwrap().try_insert(entry) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
input.lock().unwrap().mark_processed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ouput_entries(input: Arc<Mutex<LimitedBuffer>>, output: Arc<Mutex<LimitedBuffer>>, output_list: Arc<Mutex<Vec<DataEntry>>>) {
|
||||||
|
while !input.lock().unwrap().is_done() {
|
||||||
|
let entry = input.lock().unwrap().remove();
|
||||||
|
if entry.is_none() { continue; }
|
||||||
|
let entry = entry.unwrap();
|
||||||
|
output.lock().unwrap().consume_estimated();
|
||||||
|
|
||||||
|
let entry_name = entry.name.clone();
|
||||||
|
println!("Started to output: {}", entry_name);
|
||||||
|
{
|
||||||
|
let mut output_list = output_list.lock().unwrap();
|
||||||
|
output_entry(&mut output_list, entry);
|
||||||
|
}
|
||||||
|
println!("Finished to output: {}", entry_name);
|
||||||
|
|
||||||
|
output.lock().unwrap().mark_processed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
let args = args().collect::<Vec<_>>();
|
||||||
|
if args.len() != 3 {
|
||||||
|
eprintln!("Usage: {} <data-file> <output-file>", args[0]);
|
||||||
|
exit(-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
let entries = vec![
|
||||||
|
DataEntry{ name: "foo1".into(), sugar: 100.0, criteria: 100 },
|
||||||
|
DataEntry{ name: "foo2".into(), sugar: 100.0, criteria: 100 },
|
||||||
|
DataEntry{ name: "foo3".into(), sugar: 100.0, criteria: 100 },
|
||||||
|
DataEntry{ name: "foo4".into(), sugar: 100.0, criteria: 100 },
|
||||||
|
DataEntry{ name: "foo5".into(), sugar: 100.0, criteria: 100 }
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut input_threads = vec![];
|
||||||
|
// let mut output_threads = vec![];
|
||||||
|
|
||||||
|
// let output_list = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let input_buffer = Arc::new(Mutex::new(LimitedBuffer::new(INPUT_BUFFER_SIZE)));
|
||||||
|
let output_buffer = Arc::new(Mutex::new(LimitedBuffer::new(OUTPUT_BUFFER_SIZE)));
|
||||||
|
input_buffer.lock().unwrap().update_estimated(entries.len() as i32);
|
||||||
|
|
||||||
|
for _ in 0..INPUT_MONITOR_COUNT {
|
||||||
|
let input_buffer = input_buffer.clone();
|
||||||
|
let output_buffer = output_buffer.clone();
|
||||||
|
input_threads.push(thread::spawn(|| filter_entries(input_buffer, output_buffer)));
|
||||||
|
}
|
||||||
|
// for _ in 0..OUTPUT_MONITOR_COUNT {
|
||||||
|
// let input_buffer = input_buffer.clone();
|
||||||
|
// let output_buffer = output_buffer.clone();
|
||||||
|
// let output_list = output_list.clone();
|
||||||
|
// output_threads.push(thread::spawn(|| ouput_entries(input_buffer, output_buffer, output_list)));
|
||||||
|
// }
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
input_buffer.lock().unwrap().try_insert(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
for handle in input_threads {
|
||||||
|
handle.join().expect("Failed to join input thread");
|
||||||
|
}
|
||||||
|
// for handle in output_threads {
|
||||||
|
// handle.join().expect("Failed to join output thread");
|
||||||
|
// }
|
||||||
|
|
||||||
|
println!("Finished");
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user