1
0

update cpp solution

This commit is contained in:
Rokas Puzonas 2023-09-22 22:21:20 +03:00
parent 5a16ea9ff6
commit 15772e30d9
2 changed files with 32 additions and 88 deletions

View File

@ -10,26 +10,20 @@ class LimitedBuffer {
std::mutex items_mutex; std::mutex items_mutex;
std::condition_variable items_cv; std::condition_variable items_cv;
int reserved_count;
public: public:
int estimated_size; int estimated_count;
int consumed_count;
int processed_count;
std::mutex processed_mutex;
LimitedBuffer(int capacity) { LimitedBuffer(int capacity) {
this->capacity = capacity; this->capacity = capacity;
estimated_size = 0; estimated_count = 0;
consumed_count = 0; reserved_count = 0;
processed_count = 0;
} }
void insert(DataEntry entry); void insert(DataEntry entry);
DataEntry remove(); DataEntry remove();
void update_estimated(int change); bool reserve_entry();
bool consume_estimated();
void mark_processed();
bool is_done();
}; };
void LimitedBuffer::insert(DataEntry entry) { void LimitedBuffer::insert(DataEntry entry) {
@ -50,27 +44,13 @@ DataEntry LimitedBuffer::remove() {
return entry; return entry;
} }
void LimitedBuffer::update_estimated(int change) { bool LimitedBuffer::reserve_entry() {
std::unique_lock<std::mutex> guard(processed_mutex); std::unique_lock<std::mutex> guard(items_mutex);
estimated_size += change;
}
bool LimitedBuffer::consume_estimated() { if (reserved_count == estimated_count) {
std::unique_lock<std::mutex> guard(processed_mutex);
if (consumed_count == estimated_size) {
return false; return false;
} else { } else {
consumed_count++; reserved_count++;
return true; return true;
} }
} }
void LimitedBuffer::mark_processed() {
std::unique_lock<std::mutex> guard(processed_mutex);
processed_count++;
}
bool LimitedBuffer::is_done() {
return estimated_size == processed_count;
}

View File

@ -11,57 +11,32 @@
using namespace std; using namespace std;
using json = nlohmann::json; using json = nlohmann::json;
const int inputBufferSize = 10; const int bufferSize = 10;
const int inputMonitorCount = 3; const int threadCount = 1;
const int outputBufferSize = 10;
const int outputMonitorCount = 3;
#include "common.h" #include "common.h"
#include "limited_buffer.cpp" #include "limited_buffer.cpp"
bool filterEntry(DataEntry *entry) { void processEntry(DataEntry *entry, vector<DataEntry> *entries, mutex *outputMutex) {
this_thread::sleep_for(chrono::milliseconds(100 + entry->criteria)); this_thread::sleep_for(chrono::milliseconds(100 + entry->criteria));
return entry->sugar > entry->criteria; if (entry->sugar > entry->criteria) {
} outputMutex->lock();
cout << "Output: " << entry->name << endl;
void outputEntry(vector<DataEntry> *entries, DataEntry *entry) { // TODO: make this a sorted add
this_thread::sleep_for(chrono::milliseconds(200)); entries->push_back(*entry);
cout << "Output: " << entry->name << endl; sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) {
entries->push_back(*entry); return a.sugar < b.sugar;
sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) { });
return a.sugar < b.sugar; outputMutex->unlock();
});
}
void filterEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer) {
while (!inputBuffer->is_done()) {
if (inputBuffer->consume_estimated()) {
DataEntry entry = inputBuffer->remove();
cout << "Started to filter: " << entry.name << endl;
bool is_filtered = filterEntry(&entry);
cout << "Finished to filter: " << entry.name << endl;
if (is_filtered) {
outputBuffer->update_estimated(+1);
outputBuffer->insert(entry);
}
inputBuffer->mark_processed();
}
} }
} }
void outputEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer, vector<DataEntry> *outputList, mutex *outputMutex) { void processEntries(LimitedBuffer *monitor, vector<DataEntry> *outputList, mutex *outputMutex) {
while (!inputBuffer->is_done() || !outputBuffer->is_done()) { while (monitor->reserve_entry()) {
if (outputBuffer->consume_estimated()) { DataEntry entry = monitor->remove();
DataEntry entry = outputBuffer->remove(); cout << "Started to filter: " << entry.name << endl;
cout << "Started to output: " << entry.name << endl; processEntry(&entry, outputList, outputMutex);
outputEntry(outputList, &entry); cout << "Finished to filter: " << entry.name << endl;
cout << "Finished to output: " << entry.name << endl;
outputBuffer->mark_processed();
}
} }
} }
@ -89,22 +64,17 @@ int main(int argc, char **argv) {
vector<DataEntry> outputList; vector<DataEntry> outputList;
mutex outputListMutex; mutex outputListMutex;
LimitedBuffer inputBuffer(inputBufferSize); LimitedBuffer monitor(bufferSize);
LimitedBuffer outputBuffer(outputBufferSize); monitor.estimated_count = entries.size();
inputBuffer.update_estimated(entries.size());
vector<thread> threads; vector<thread> threads;
for (int i = 0; i < inputMonitorCount; i++) { for (int i = 0; i < threadCount; i++) {
threads.push_back(thread(filterEntries, &inputBuffer, &outputBuffer)); threads.push_back(thread(processEntries, &monitor, &outputList, &outputListMutex));
}
for (int i = 0; i < outputMonitorCount; i++) {
threads.push_back(thread(outputEntries, &inputBuffer, &outputBuffer, &outputList, &outputListMutex));
} }
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
inputBuffer.insert(entries[i]); monitor.insert(entries[i]);
} }
for (int i = 0; i < threads.size(); i++) { for (int i = 0; i < threads.size(); i++) {
threads[i].join(); threads[i].join();
} }
@ -130,9 +100,3 @@ int main(int argc, char **argv) {
cout << "Finished" << endl; cout << "Finished" << endl;
return 0; return 0;
} }
void execute(const string &name) {
cout << name << ": one" << endl;
cout << name << ": two" << endl;
cout << name << ": three" << endl;
}