diff --git a/lab1a/cpp/limited_buffer.cpp b/lab1a/cpp/limited_buffer.cpp index 3cefa55..1c93738 100644 --- a/lab1a/cpp/limited_buffer.cpp +++ b/lab1a/cpp/limited_buffer.cpp @@ -10,26 +10,20 @@ class LimitedBuffer { std::mutex items_mutex; std::condition_variable items_cv; + int reserved_count; public: - int estimated_size; - int consumed_count; - int processed_count; - std::mutex processed_mutex; + int estimated_count; LimitedBuffer(int capacity) { this->capacity = capacity; - estimated_size = 0; - consumed_count = 0; - processed_count = 0; + estimated_count = 0; + reserved_count = 0; } void insert(DataEntry entry); DataEntry remove(); - void update_estimated(int change); - bool consume_estimated(); - void mark_processed(); - bool is_done(); + bool reserve_entry(); }; void LimitedBuffer::insert(DataEntry entry) { @@ -50,27 +44,13 @@ DataEntry LimitedBuffer::remove() { return entry; } -void LimitedBuffer::update_estimated(int change) { - std::unique_lock guard(processed_mutex); - estimated_size += change; -} +bool LimitedBuffer::reserve_entry() { + std::unique_lock guard(items_mutex); -bool LimitedBuffer::consume_estimated() { - std::unique_lock guard(processed_mutex); - - if (consumed_count == estimated_size) { + if (reserved_count == estimated_count) { return false; } else { - consumed_count++; + reserved_count++; return true; } } - -void LimitedBuffer::mark_processed() { - std::unique_lock guard(processed_mutex); - processed_count++; -} - -bool LimitedBuffer::is_done() { - return estimated_size == processed_count; -} diff --git a/lab1a/cpp/main.cpp b/lab1a/cpp/main.cpp index 714923d..47f2868 100644 --- a/lab1a/cpp/main.cpp +++ b/lab1a/cpp/main.cpp @@ -11,57 +11,32 @@ using namespace std; using json = nlohmann::json; -const int inputBufferSize = 10; -const int inputMonitorCount = 3; - -const int outputBufferSize = 10; -const int outputMonitorCount = 3; +const int bufferSize = 10; +const int threadCount = 1; #include "common.h" #include "limited_buffer.cpp" -bool filterEntry(DataEntry *entry) { +void processEntry(DataEntry *entry, vector *entries, mutex *outputMutex) { this_thread::sleep_for(chrono::milliseconds(100 + entry->criteria)); - return entry->sugar > entry->criteria; -} - -void outputEntry(vector *entries, DataEntry *entry) { - this_thread::sleep_for(chrono::milliseconds(200)); - cout << "Output: " << entry->name << endl; - entries->push_back(*entry); - sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) { - return a.sugar < b.sugar; - }); -} - -void filterEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer) { - while (!inputBuffer->is_done()) { - if (inputBuffer->consume_estimated()) { - DataEntry entry = inputBuffer->remove(); - cout << "Started to filter: " << entry.name << endl; - bool is_filtered = filterEntry(&entry); - cout << "Finished to filter: " << entry.name << endl; - - if (is_filtered) { - outputBuffer->update_estimated(+1); - outputBuffer->insert(entry); - } - - inputBuffer->mark_processed(); - } + if (entry->sugar > entry->criteria) { + outputMutex->lock(); + cout << "Output: " << entry->name << endl; + // TODO: make this a sorted add + entries->push_back(*entry); + sort(entries->begin(), entries->end(), [](DataEntry &a, DataEntry &b) { + return a.sugar < b.sugar; + }); + outputMutex->unlock(); } } -void outputEntries(LimitedBuffer *inputBuffer, LimitedBuffer *outputBuffer, vector *outputList, mutex *outputMutex) { - while (!inputBuffer->is_done() || !outputBuffer->is_done()) { - if (outputBuffer->consume_estimated()) { - DataEntry entry = outputBuffer->remove(); - cout << "Started to output: " << entry.name << endl; - outputEntry(outputList, &entry); - cout << "Finished to output: " << entry.name << endl; - - outputBuffer->mark_processed(); - } +void processEntries(LimitedBuffer *monitor, vector *outputList, mutex *outputMutex) { + while (monitor->reserve_entry()) { + DataEntry entry = monitor->remove(); + cout << "Started to filter: " << entry.name << endl; + processEntry(&entry, outputList, outputMutex); + cout << "Finished to filter: " << entry.name << endl; } } @@ -89,22 +64,17 @@ int main(int argc, char **argv) { vector outputList; mutex outputListMutex; - LimitedBuffer inputBuffer(inputBufferSize); - LimitedBuffer outputBuffer(outputBufferSize); - inputBuffer.update_estimated(entries.size()); + LimitedBuffer monitor(bufferSize); + monitor.estimated_count = entries.size(); vector threads; - for (int i = 0; i < inputMonitorCount; i++) { - threads.push_back(thread(filterEntries, &inputBuffer, &outputBuffer)); - } - for (int i = 0; i < outputMonitorCount; i++) { - threads.push_back(thread(outputEntries, &inputBuffer, &outputBuffer, &outputList, &outputListMutex)); + for (int i = 0; i < threadCount; i++) { + threads.push_back(thread(processEntries, &monitor, &outputList, &outputListMutex)); } for (int i = 0; i < entries.size(); i++) { - inputBuffer.insert(entries[i]); + monitor.insert(entries[i]); } - for (int i = 0; i < threads.size(); i++) { threads[i].join(); } @@ -130,9 +100,3 @@ int main(int argc, char **argv) { cout << "Finished" << endl; return 0; } - -void execute(const string &name) { - cout << name << ": one" << endl; - cout << name << ": two" << endl; - cout << name << ": three" << endl; -}