1
0

Compare commits

...

3 Commits

Author SHA1 Message Date
062dfd9685 solve lab1a in cpp 2023-09-17 18:39:40 +03:00
04ab320661 give up on lab1a rust 2023-09-17 15:12:51 +03:00
b3eb803fb9 update lab1a go solution 2023-09-17 13:21:24 +03:00
25 changed files with 23674 additions and 109 deletions

View File

@ -1,5 +0,0 @@
# Lab1
```shell
go run main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt
```

View File

@ -1,104 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
)
const inputQueueSize = 10
const inputMonitorCount = 3
const outputQueueSize = 10
const outputMonitorCount = 3
type DataEntry struct {
Name string `json:"name"`
Sugar float32 `json:"sugar"`
Criteria float32 `json:"criteria"`
}
func filterEntry(entry DataEntry) bool {
time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria))
return entry.Sugar > entry.Criteria
}
func outputEntry(file *os.File, entry DataEntry) {
time.Sleep(time.Millisecond * 200)
fmt.Println("Output:", entry)
file.WriteString(entry.Name)
file.WriteString("\n")
}
func filterEntries(inputGroup, outputGroup *sync.WaitGroup, input <-chan DataEntry, output chan<- DataEntry) {
for entry := range input {
fmt.Println("Started to filter:", entry)
isFiltered := filterEntry(entry)
fmt.Println("Finished to filter:", entry)
if (isFiltered) {
outputGroup.Add(1)
output <- entry
}
inputGroup.Done()
}
}
func ouputEntries(file *os.File, group *sync.WaitGroup, channel <-chan DataEntry) {
for entry := range channel {
outputEntry(file, entry)
group.Done()
}
}
func main() {
if len(os.Args) != 3 {
fmt.Println("Usage:", os.Args[0], "<data-file> <output-file>")
os.Exit(-1)
}
dataFilename := os.Args[1]
outputFilename := os.Args[2]
fileData, err := os.ReadFile(dataFilename)
if err != nil {
log.Fatal(err)
}
entries := []DataEntry{}
err = json.Unmarshal(fileData, &entries)
if err != nil {
log.Fatal(err)
}
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
defer outputFile.Close()
inputChannel := make(chan DataEntry, inputQueueSize)
outputChannel := make(chan DataEntry, outputQueueSize)
var inputsGroups = sync.WaitGroup{}
var ouputsGroups = sync.WaitGroup{}
inputsGroups.Add(len(entries))
for i := 0; i < inputMonitorCount; i++ {
go filterEntries(&inputsGroups, &ouputsGroups, inputChannel, outputChannel)
}
for i := 0; i < outputMonitorCount; i++ {
go ouputEntries(outputFile, &ouputsGroups, outputChannel)
}
for _, entry := range entries {
inputChannel <- entry
}
close(inputChannel)
inputsGroups.Wait()
ouputsGroups.Wait()
close(outputChannel)
fmt.Println("Finished")
}

1
lab1a/cpp/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
build

8
lab1a/cpp/CMakeLists.txt Normal file
View File

@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 3.12)
project(lab1a)
set(CMAKE_CXX_STANDARD 20)
add_executable(lab1a main.cpp)
TARGET_LINK_LIBRARIES(lab1a pthread)

9
lab1a/cpp/README.md Normal file
View File

@ -0,0 +1,9 @@
# Lab1a
```cpp
mkdir -p build
cd build
cmake ..
make
./lab1a
```

8
lab1a/cpp/common.h Normal file
View File

@ -0,0 +1,8 @@
#pragma once
#include <string>
struct DataEntry {
std::string name;
float sugar;
int criteria;
};

22875
lab1a/cpp/json.hpp Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,76 @@
#include <vector>
#include <mutex>
#include <condition_variable>
#include "common.h"
class LimitedBuffer {
std::vector<DataEntry> items;
int capacity;
std::mutex items_mutex;
std::condition_variable items_cv;
public:
int estimated_size;
int consumed_count;
int processed_count;
std::mutex processed_mutex;
LimitedBuffer(int capacity) {
this->capacity = capacity;
estimated_size = 0;
consumed_count = 0;
processed_count = 0;
}
void insert(DataEntry entry);
DataEntry remove();
void update_estimated(int change);
bool consume_estimated();
void mark_processed();
bool is_done();
};
void LimitedBuffer::insert(DataEntry entry) {
std::unique_lock<std::mutex> guard(items_mutex);
items_cv.wait(guard, [&](){ return items.size() < this->capacity; });
items.push_back(entry);
items_cv.notify_all();
}
DataEntry LimitedBuffer::remove() {
std::unique_lock<std::mutex> guard(items_mutex);
items_cv.wait(guard, [&](){ return items.size() > 0; });
DataEntry entry = items[items.size()-1];
items.pop_back();
items_cv.notify_all();
return entry;
}
void LimitedBuffer::update_estimated(int change) {
std::unique_lock<std::mutex> guard(processed_mutex);
estimated_size += change;
}
bool LimitedBuffer::consume_estimated() {
std::unique_lock<std::mutex> guard(processed_mutex);
if (consumed_count == estimated_size) {
return false;
} else {
consumed_count++;
return true;
}
}
void LimitedBuffer::mark_processed() {
std::unique_lock<std::mutex> guard(processed_mutex);
processed_count++;
}
bool LimitedBuffer::is_done() {
return estimated_size == processed_count;
}

138
lab1a/cpp/main.cpp Normal file
View File

@ -0,0 +1,138 @@
#include <algorithm>
#include <chrono>
#include <thread>
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include "json.hpp"
using namespace std;
using json = nlohmann::json;
const int inputBufferSize = 10;
const int inputMonitorCount = 3;
const int outputBufferSize = 10;
const int outputMonitorCount = 3;
#include "common.h"
#include "limited_buffer.cpp"
bool filterEntry(DataEntry *entry) {
this_thread::sleep_for(chrono::milliseconds(100 + entry->criteria));
return entry->sugar > entry->criteria;
}
void outputEntry(vector<DataEntry> *entries, DataEntry *entry) {
this_thread::sleep_for(chrono::milliseconds(200));
cout << "Output: " << entry->name << endl;
entries->push_back(*entry);
sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) {
return a.sugar < b.sugar;
});
}
void filterEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer) {
while (!inputBuffer->is_done()) {
if (inputBuffer->consume_estimated()) {
DataEntry entry = inputBuffer->remove();
cout << "Started to filter: " << entry.name << endl;
bool is_filtered = filterEntry(&entry);
cout << "Finished to filter: " << entry.name << endl;
if (is_filtered) {
outputBuffer->update_estimated(+1);
outputBuffer->insert(entry);
}
inputBuffer->mark_processed();
}
}
}
void outputEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer, vector<DataEntry> *outputList, mutex *outputMutex) {
while (!inputBuffer->is_done() || !outputBuffer->is_done()) {
if (outputBuffer->consume_estimated()) {
DataEntry entry = outputBuffer->remove();
cout << "Started to output: " << entry.name << endl;
outputEntry(outputList, &entry);
cout << "Finished to output: " << entry.name << endl;
outputBuffer->mark_processed();
}
}
}
int main(int argc, char **argv) {
if (argc != 3) {
cout << "Usage: " << argv[0] << " <data-file> <output-file>" << endl;
return -1;
}
char *inputPath = argv[1];
char *outputPath = argv[2];
std::ifstream f(inputPath);
json data = json::parse(f);
vector<DataEntry> entries;
for (auto it : data) {
entries.push_back({
.name = it["name"],
.sugar = it["sugar"],
.criteria = it["criteria"],
});
}
vector<DataEntry> outputList;
mutex outputListMutex;
LimitedBuffer inputBuffer(inputBufferSize);
LimitedBuffer outputBuffer(outputBufferSize);
inputBuffer.update_estimated(entries.size());
vector<thread> threads;
for (int i = 0; i < inputMonitorCount; i++) {
threads.push_back(thread(filterEntries, &inputBuffer, &outputBuffer));
}
for (int i = 0; i < outputMonitorCount; i++) {
threads.push_back(thread(outputEntries, &inputBuffer, &outputBuffer, &outputList, &outputListMutex));
}
for (int i = 0; i < entries.size(); i++) {
inputBuffer.insert(entries[i]);
}
for (int i = 0; i < threads.size(); i++) {
threads[i].join();
}
std::ofstream outputFile(outputPath);
outputFile << "[" << endl;
for (int i = 0; i < outputList.size(); i++) {
DataEntry *entry = &outputList[i];
json json_entry;
json_entry["name"] = entry->name;
json_entry["sugar"] = entry->sugar;
json_entry["criteria"] = entry->criteria;
std::string json_str = json_entry.dump();
outputFile << "\t" << json_str;
if (i < outputList.size()-1) {
outputFile << ",";
}
outputFile << endl;
}
outputFile << "]" << endl;
cout << "Finished" << endl;
return 0;
}
void execute(const string &name) {
cout << name << ": one" << endl;
cout << name << ": two" << endl;
cout << name << ": three" << endl;
}

6
lab1a/cpp/run.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/sh
mkdir -p build
cd build
cmake ..
make
./lab1a ../$1 ../$2

5
lab1a/go/README.md Normal file
View File

@ -0,0 +1,5 @@
# Lab1
```shell
go run limited_buffer.go main.go ../IF-1-1_PuzonasR_L1_dat_1.json IF-1-1_PuzonasR_L1_rez.txt
```

105
lab1a/go/limited_buffer.go Normal file
View File

@ -0,0 +1,105 @@
package main
import "sync"
type LimitedBuffer struct {
items []DataEntry
readIndex int
writeIndex int
currentSize int
mutex *sync.Mutex
updateCond *sync.Cond
estimatedSize int
estimatedSizeMutex *sync.Mutex
consumedCount int
consumedMutex *sync.Mutex
processedCount int
processedMutex *sync.Mutex
}
func makeLimitedBuffer(bufferSize int) LimitedBuffer {
var container = make([]DataEntry, bufferSize)
var estimatedSizeMutex = sync.Mutex{}
var processedMutex = sync.Mutex{}
var consumedMutex = sync.Mutex{}
var itemsMutex = sync.Mutex{}
var cond = sync.NewCond(&itemsMutex)
return LimitedBuffer{
items: container,
mutex: &itemsMutex,
updateCond: cond,
estimatedSizeMutex: &estimatedSizeMutex,
processedMutex: &processedMutex,
consumedMutex: &consumedMutex,
}
}
func (buffer *LimitedBuffer) Insert(item DataEntry) {
buffer.mutex.Lock()
for buffer.currentSize == len(buffer.items) {
buffer.updateCond.Wait()
}
buffer.items[buffer.writeIndex] = item
buffer.writeIndex = (buffer.writeIndex + 1) % len(buffer.items)
buffer.currentSize++
buffer.updateCond.Broadcast()
buffer.mutex.Unlock()
}
func (buffer *LimitedBuffer) Remove() DataEntry {
buffer.mutex.Lock()
for buffer.currentSize == 0 {
buffer.updateCond.Wait()
}
var item = buffer.items[buffer.readIndex]
buffer.readIndex = (buffer.readIndex + 1) % len(buffer.items)
buffer.currentSize--
buffer.updateCond.Broadcast()
buffer.mutex.Unlock()
return item
}
func (buffer *LimitedBuffer) IsDone() bool {
return buffer.estimatedSize == buffer.processedCount
}
func (buffer *LimitedBuffer) WaitUntilDone() {
for !buffer.IsDone() {}
}
func (buffer *LimitedBuffer) ConsumeEstimatedEntry() bool {
buffer.estimatedSizeMutex.Lock()
defer buffer.estimatedSizeMutex.Unlock()
if (buffer.estimatedSize == buffer.consumedCount) {
return false
} else {
buffer.consumedCount += 1
return true
}
}
func (buffer *LimitedBuffer) IsEstimatedEmpty() bool {
buffer.estimatedSizeMutex.Lock()
defer buffer.estimatedSizeMutex.Unlock()
return buffer.estimatedSize == 0
}
func (buffer *LimitedBuffer) UpdateEstimated(change int) {
buffer.estimatedSizeMutex.Lock()
defer buffer.estimatedSizeMutex.Unlock()
buffer.estimatedSize += change
}
func (buffer *LimitedBuffer) MarkAsProcessed() {
buffer.processedMutex.Lock()
defer buffer.processedMutex.Unlock()
buffer.processedCount += 1
}

121
lab1a/go/main.go Normal file
View File

@ -0,0 +1,121 @@
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"sort"
"sync"
"time"
)
const inputBufferSize = 10
const inputMonitorCount = 3
const outputBufferSize = 10
const outputMonitorCount = 1
type DataEntry struct {
Name string `json:"name"`
Sugar float32 `json:"sugar"`
Criteria int `json:"criteria"`
}
func filterEntry(entry DataEntry) bool {
time.Sleep(time.Millisecond * 100 + time.Millisecond * time.Duration(entry.Criteria))
return entry.Sugar > float32(entry.Criteria)
}
func outputEntry(outputEntries *[]DataEntry, entry DataEntry) {
time.Sleep(time.Millisecond * 200)
fmt.Println("Output:", entry)
*outputEntries = append(*outputEntries, entry)
sort.Slice(*outputEntries, func(i, j int) bool {
return (*outputEntries)[i].Sugar < (*outputEntries)[j].Sugar
})
}
func filterEntries(inputBuffer *LimitedBuffer, outputBuffer *LimitedBuffer) {
for inputBuffer.ConsumeEstimatedEntry() {
entry := inputBuffer.Remove()
fmt.Println("Started to filter:", entry)
isFiltered := filterEntry(entry)
fmt.Println("Finished to filter:", entry)
if (isFiltered) {
outputBuffer.UpdateEstimated(+1)
outputBuffer.Insert(entry)
}
inputBuffer.MarkAsProcessed()
}
}
func outputEntries(outputBuffer *LimitedBuffer, inputBuffer *LimitedBuffer, outputList *[]DataEntry, outputMutex *sync.Mutex) {
for !inputBuffer.IsDone() {
for outputBuffer.ConsumeEstimatedEntry() {
entry := outputBuffer.Remove()
outputMutex.Lock()
fmt.Println("Started to output:", entry)
outputEntry(outputList, entry)
fmt.Println("Finished to output:", entry)
outputMutex.Unlock()
outputBuffer.MarkAsProcessed()
}
}
}
func main() {
if len(os.Args) != 3 {
fmt.Println("Usage:", os.Args[0], "<data-file> <output-file>")
os.Exit(-1)
}
dataFilename := os.Args[1]
outputFilename := os.Args[2]
fileData, err := os.ReadFile(dataFilename)
if err != nil {
log.Fatal(err)
}
entries := []DataEntry{}
err = json.Unmarshal(fileData, &entries)
if err != nil {
log.Fatal(err)
}
outputList := []DataEntry{}
inputBuffer := makeLimitedBuffer(inputBufferSize)
outputBuffer := makeLimitedBuffer(outputBufferSize)
outputMutex := sync.Mutex{}
inputBuffer.UpdateEstimated(len(entries))
for i := 0; i < inputMonitorCount; i++ {
go filterEntries(&inputBuffer, &outputBuffer)
}
for i := 0; i < outputMonitorCount; i++ {
go outputEntries(&outputBuffer, &inputBuffer, &outputList, &outputMutex)
}
for _, entry := range entries {
inputBuffer.Insert(entry);
}
inputBuffer.WaitUntilDone()
outputBuffer.WaitUntilDone()
outputFile, err := os.OpenFile(outputFilename, os.O_TRUNC | os.O_CREATE | os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
defer outputFile.Close()
outputListBytes, err := json.MarshalIndent(outputList, "", "\t")
if err != nil {
log.Fatal(err)
}
outputFile.Write(outputListBytes)
fmt.Println("Finished")
}

1
lab1a/rust/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target

122
lab1a/rust/Cargo.lock generated Normal file
View File

@ -0,0 +1,122 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "a-rust"
version = "0.1.0"
dependencies = [
"anyhow",
"crossbeam-channel",
"crossbeam-utils",
"serde_json",
]
[[package]]
name = "anyhow"
version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
dependencies = [
"cfg-if",
]
[[package]]
name = "itoa"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "proc-macro2"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [
"proc-macro2",
]
[[package]]
name = "ryu"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "serde"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "syn"
version = "2.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e02e55d62894af2a08aca894c6577281f76769ba47c94d5756bec8ac6e7373"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"

12
lab1a/rust/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "a-rust"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.75"
crossbeam-channel = "0.5.8"
crossbeam-utils = "0.8.16"
serde_json = "1.0.107"

3
lab1a/rust/README.md Normal file
View File

@ -0,0 +1,3 @@
# Lab1a
THIS IS NOT FINISHED

7
lab1a/rust/src/common.rs Normal file
View File

@ -0,0 +1,7 @@
#[derive(Debug)]
pub struct DataEntry {
pub name: String,
pub sugar: f32,
pub criteria: i32
}

View File

@ -0,0 +1,59 @@
use crate::common::DataEntry;
pub struct LimitedBuffer {
items: Vec<DataEntry>,
capacity: usize,
estimated_size: i32,
consumed_count: u32,
processed_count: u32
}
impl LimitedBuffer {
pub fn new(capacity: usize) -> LimitedBuffer {
return LimitedBuffer{
items: Vec::with_capacity(capacity),
capacity,
estimated_size: 0,
consumed_count: 0,
processed_count: 0
}
}
pub fn try_insert(&mut self, entry: DataEntry) -> bool {
if self.items.len() == self.capacity {
return false
}
self.items.push(entry);
return true
}
pub fn remove(&mut self) -> Option<DataEntry> {
if self.items.is_empty() {
None
} else {
Some(self.items.remove(0))
}
}
pub fn is_done(&self) -> bool {
return (self.estimated_size as u32) == self.processed_count
}
pub fn consume_estimated(&mut self) -> bool {
if (self.estimated_size as u32) == self.consumed_count {
return false;
} else {
self.consumed_count += 1;
return true;
}
}
pub fn update_estimated(&mut self, change: i32) {
self.estimated_size += change
}
pub fn mark_processed(&mut self) {
self.processed_count += 1
}
}

118
lab1a/rust/src/main.rs Normal file
View File

@ -0,0 +1,118 @@
use std::{env::args, process::exit, thread, time::Duration, sync::{Arc, Mutex}};
use anyhow::Result;
mod limited_buffer;
mod common;
use common::DataEntry;
use limited_buffer::LimitedBuffer;
// This was a mistake
const INPUT_BUFFER_SIZE: usize = 2;
const INPUT_MONITOR_COUNT: i32 = 2;
const OUTPUT_BUFFER_SIZE: usize = 10;
const OUTPUT_MONITOR_COUNT: i32 = 1;
fn filter_entry(entry: &DataEntry) -> bool {
thread::sleep(Duration::from_millis(100 + entry.criteria as u64));
return entry.sugar > entry.criteria as f32
}
fn output_entry(output_list: &mut Vec<DataEntry>, entry: DataEntry) {
thread::sleep(Duration::from_millis(200));
output_list.push(entry);
output_list.sort_by(|a, b| a.sugar.total_cmp(&b.sugar))
}
fn filter_entries(input: Arc<Mutex<LimitedBuffer>>, output: Arc<Mutex<LimitedBuffer>>) {
while !input.lock().unwrap().is_done() {
if input.lock().unwrap().consume_estimated() {
let entry = input.lock().unwrap().remove();
if entry.is_none() { continue; }
let entry = entry.unwrap();
println!("Started to filter: {}", entry.name);
let is_filtered = filter_entry(&entry);
println!("Finished to filter: {}", entry.name);
if is_filtered {
// output.lock().unwrap().update_estimated(1);
// while !output.lock().unwrap().try_insert(entry) {}
}
input.lock().unwrap().mark_processed()
}
}
}
fn ouput_entries(input: Arc<Mutex<LimitedBuffer>>, output: Arc<Mutex<LimitedBuffer>>, output_list: Arc<Mutex<Vec<DataEntry>>>) {
while !input.lock().unwrap().is_done() {
let entry = input.lock().unwrap().remove();
if entry.is_none() { continue; }
let entry = entry.unwrap();
output.lock().unwrap().consume_estimated();
let entry_name = entry.name.clone();
println!("Started to output: {}", entry_name);
{
let mut output_list = output_list.lock().unwrap();
output_entry(&mut output_list, entry);
}
println!("Finished to output: {}", entry_name);
output.lock().unwrap().mark_processed()
}
}
fn main() -> Result<()> {
let args = args().collect::<Vec<_>>();
if args.len() != 3 {
eprintln!("Usage: {} <data-file> <output-file>", args[0]);
exit(-1)
}
let entries = vec![
DataEntry{ name: "foo1".into(), sugar: 100.0, criteria: 100 },
DataEntry{ name: "foo2".into(), sugar: 100.0, criteria: 100 },
DataEntry{ name: "foo3".into(), sugar: 100.0, criteria: 100 },
DataEntry{ name: "foo4".into(), sugar: 100.0, criteria: 100 },
DataEntry{ name: "foo5".into(), sugar: 100.0, criteria: 100 }
];
let mut input_threads = vec![];
// let mut output_threads = vec![];
// let output_list = Arc::new(Mutex::new(Vec::new()));
let input_buffer = Arc::new(Mutex::new(LimitedBuffer::new(INPUT_BUFFER_SIZE)));
let output_buffer = Arc::new(Mutex::new(LimitedBuffer::new(OUTPUT_BUFFER_SIZE)));
input_buffer.lock().unwrap().update_estimated(entries.len() as i32);
for _ in 0..INPUT_MONITOR_COUNT {
let input_buffer = input_buffer.clone();
let output_buffer = output_buffer.clone();
input_threads.push(thread::spawn(|| filter_entries(input_buffer, output_buffer)));
}
// for _ in 0..OUTPUT_MONITOR_COUNT {
// let input_buffer = input_buffer.clone();
// let output_buffer = output_buffer.clone();
// let output_list = output_list.clone();
// output_threads.push(thread::spawn(|| ouput_entries(input_buffer, output_buffer, output_list)));
// }
for entry in entries {
input_buffer.lock().unwrap().try_insert(entry);
}
for handle in input_threads {
handle.join().expect("Failed to join input thread");
}
// for handle in output_threads {
// handle.join().expect("Failed to join output thread");
// }
println!("Finished");
Ok(())
}