first draft of multithreaded sample transforms

This commit is contained in:
Rokas Puzonas 2025-05-03 22:32:28 +03:00
parent e48887cdee
commit b9e0a7a2d6
4 changed files with 311 additions and 61 deletions

View File

@ -187,6 +187,23 @@ pub const SampleList = struct {
return self.buffer[0..self.len]; return self.buffer[0..self.len];
} }
pub fn recomputeMinMax(self: *Block) void {
if (self.len == 0) {
self.min = null;
self.max = 0;
}
var min = self.buffer[0];
var max = self.buffer[0];
for (self.samplesSlice()[1..]) |sample| {
min = @min(min, sample);
max = @max(max, sample);
}
self.min = min;
self.max = max;
}
pub fn append(self: *Block, samples: []const f64) usize { pub fn append(self: *Block, samples: []const f64) usize {
const unused_slice = self.buffer[self.len..]; const unused_slice = self.buffer[self.len..];
@ -285,6 +302,18 @@ pub const SampleList = struct {
self.* = SampleList.init(allocator); self.* = SampleList.init(allocator);
} }
fn appendEmptyBlock(self: *SampleList) !void {
const allocator = self.arena.child_allocator;
const buffer_allocator = self.arena.allocator();
const block_buffer = try buffer_allocator.create(Block.Buffer);
errdefer buffer_allocator.destroy(block_buffer);
try self.blocks.append(allocator, Block{
.buffer = block_buffer
});
}
fn getBlockAtSample(self: *SampleList, sample_index: usize) ?*Block { fn getBlockAtSample(self: *SampleList, sample_index: usize) ?*Block {
return self.getBlock(@divFloor(sample_index, Block.capacity)); return self.getBlock(@divFloor(sample_index, Block.capacity));
} }
@ -297,21 +326,35 @@ pub const SampleList = struct {
return null; return null;
} }
pub fn getSample(self: *SampleList, sample_index: usize) ?*f64 {
const block_id = @divFloor(sample_index, Block.capacity);
if (self.getBlock(block_id)) |block| {
const i = @mod(sample_index, Block.capacity);
if (i <= block.len) {
return &block.buffer[i];
}
}
return null;
}
pub fn reserveEmptyBlocks(self: *SampleList, total_block_count: usize) !void {
for (self.blocks.items) |*block| {
block.clear();
}
while (self.blocks.items.len < total_block_count) {
try self.appendEmptyBlock();
}
}
pub fn append(self: *SampleList, samples: []const f64) !void { pub fn append(self: *SampleList, samples: []const f64) !void {
if (samples.len == 0) return; if (samples.len == 0) return;
var appended_count: usize = 0; var appended_count: usize = 0;
while (appended_count < samples.len) { while (appended_count < samples.len) {
if (self.blocks.items.len == 0 or self.blocks.getLast().len == Block.capacity) { if (self.blocks.items.len == 0 or self.blocks.getLast().len == Block.capacity) {
const allocator = self.arena.child_allocator; try self.appendEmptyBlock();
const buffer_allocator = self.arena.allocator();
const block_buffer = try buffer_allocator.create(Block.Buffer);
errdefer buffer_allocator.destroy(block_buffer);
try self.blocks.append(allocator, Block{
.buffer = block_buffer
});
} }
const last_block = &self.blocks.items[self.blocks.items.len - 1]; const last_block = &self.blocks.items[self.blocks.items.len - 1];
@ -756,6 +799,7 @@ pub const Project = struct {
} }
pub fn appendSamples(self: *Project, allocator: Allocator, sample_list_id: Id, samples: []const f64) !void { pub fn appendSamples(self: *Project, allocator: Allocator, sample_list_id: Id, samples: []const f64) !void {
_ = allocator;
if (samples.len == 0) return; if (samples.len == 0) return;
const sample_list = self.sample_lists.get(sample_list_id).?; const sample_list = self.sample_lists.get(sample_list_id).?;
@ -767,7 +811,6 @@ pub const Project = struct {
try sample_list.append(samples); try sample_list.append(samples);
try self.refreshTransformedSamplesBySampleList(allocator, sample_list_id);
self.refreshMarkedRanges(sample_list_id, affected_range); self.refreshMarkedRanges(sample_list_id, affected_range);
} }
@ -781,7 +824,6 @@ pub const Project = struct {
sample_list.clear(allocator); sample_list.clear(allocator);
try self.refreshTransformedSamplesBySampleList(allocator, sample_list_id);
self.refreshMarkedRanges(sample_list_id, affected_range); self.refreshMarkedRanges(sample_list_id, affected_range);
} }
@ -821,38 +863,6 @@ pub const Project = struct {
// } // }
} }
fn refreshTransformedSamplesBySampleList(self: *Project, allocator: Allocator, sample_list_id: Id) !void {
var view_iter = self.views.idIterator();
while (view_iter.next()) |view_id| {
const view_sample_list_id = self.getViewSampleListId(view_id);
if (view_sample_list_id.eql(sample_list_id)) {
try self.refreshTransformedSamplesByView(allocator, view_id);
}
}
}
fn refreshTransformedSamplesByView(self: *Project, allocator: Allocator, view_id: Id) !void {
const view = self.views.get(view_id).?;
if (view.sliding_window) |sliding_window| {
if (view.transformed_samples == null) {
view.transformed_samples = try self.addSampleList(allocator);
}
// var transformed_sample_list = self.sample_lists.get(view.transformed_samples.?).?;
// const samples = self.getViewSamples(view_id);
// try transformed_sample_list.append(samples);
_ = sliding_window;
} else {
if (view.transformed_samples) |sample_list_id| {
self.removeSampleList(sample_list_id);
view.transformed_samples = null;
}
}
}
pub fn readSamplesFromFile(self: *Project, allocator: Allocator, sample_list_id: Id, file: std.fs.File, sample_count: usize) !void { pub fn readSamplesFromFile(self: *Project, allocator: Allocator, sample_list_id: Id, file: std.fs.File, sample_count: usize) !void {
var bytes_left: usize = sample_count * 8; var bytes_left: usize = sample_count * 8;
var buffer: [SampleList.Block.capacity * 8]u8 = undefined; var buffer: [SampleList.Block.capacity * 8]u8 = undefined;
@ -867,17 +877,6 @@ pub const Project = struct {
} }
} }
pub fn updateSlidingWindow(self: *Project, view_id: Id, allocator: Allocator, sliding_window: ?f64) !void {
const view = self.views.get(view_id).?;
if (view.sliding_window == sliding_window) {
return;
}
view.sliding_window = sliding_window;
try self.refreshTransformedSamplesByView(allocator, view_id);
}
pub fn appendMarkedRange(self: *Project, view_id: Id, axis: UI.Axis, range: RangeF64) ?*View.MarkedRange { pub fn appendMarkedRange(self: *Project, view_id: Id, axis: UI.Axis, range: RangeF64) ?*View.MarkedRange {
const view = self.views.get(view_id) orelse return null; const view = self.views.get(view_id) orelse return null;
@ -1223,6 +1222,10 @@ pub const Command = union(enum) {
start_output: Id, // Channel id start_output: Id, // Channel id
add_file_from_picker, add_file_from_picker,
reload_file: Id, // File id reload_file: Id, // File id
update_sliding_window: struct {
view_id: Id,
sliding_window: ?f64
}
}; };
pub const CollectionTask = struct { pub const CollectionTask = struct {
@ -1237,6 +1240,105 @@ pub const CollectionTask = struct {
} }
}; };
const WorkJob = struct {
const Stage = enum {
init,
calculate_blocks
};
view_id: Id,
stage: Stage = .init,
sliding_window: ?f64 = null,
mutex: std.Thread.Mutex = .{},
running_thread_jobs: std.ArrayListUnmanaged(SampleList.Block.Id) = .{},
processed_up_to: u32 = 0,
pub fn appendRunningThread(self: *WorkJob, allocator: Allocator, block_id: SampleList.Block.Id) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.running_thread_jobs.append(allocator, block_id);
}
pub fn removeRunningThread(self: *WorkJob, block_id: SampleList.Block.Id) void {
self.mutex.lock();
defer self.mutex.unlock();
if (std.mem.indexOfScalar(SampleList.Block.Id, self.running_thread_jobs.items, block_id)) |index| {
_ = self.running_thread_jobs.swapRemove(index);
}
}
pub fn getRunningThreadCount(self: *WorkJob) usize {
self.mutex.lock();
defer self.mutex.unlock();
return self.running_thread_jobs.items.len;
}
pub fn containsRunningThread(self: *WorkJob, block_id: SampleList.Block.Id) bool {
self.mutex.lock();
defer self.mutex.unlock();
return std.mem.indexOfScalar(SampleList.Block.Id, self.running_thread_jobs.items, block_id) != null;
}
pub fn deinit(self: *WorkJob, allocator: Allocator) void {
self.running_thread_jobs.deinit(allocator);
}
pub fn update(self: *WorkJob, id: Id, app: *App) !bool {
const project = &app.project;
const view = project.views.get(self.view_id) orelse return true;
const sample_list_id = project.getViewSampleListId(self.view_id);
const sample_list = project.sample_lists.get(sample_list_id).?;
if (view.sliding_window != self.sliding_window) return true;
switch (self.stage) {
.init => {
if (self.sliding_window == null) {
if (view.transformed_samples) |transformed_samples_id| {
project.removeSampleList(transformed_samples_id);
}
view.transformed_samples = null;
return true;
} else {
if (view.transformed_samples == null) {
view.transformed_samples = try project.addSampleList(app.allocator);
}
const transformed_samples = project.sample_lists.get(view.transformed_samples.?).?;
try transformed_samples.reserveEmptyBlocks(sample_list.blocks.items.len);
self.stage = .calculate_blocks;
}
},
.calculate_blocks => {
const max_block_to_process = 32;
while (self.getRunningThreadCount() < app.work_thread_pool.threads.len and self.processed_up_to < sample_list.blocks.items.len) {
const block_id = self.processed_up_to;
const block_count = @min(sample_list.blocks.items.len - self.processed_up_to, max_block_to_process);
self.processed_up_to += block_count;
try app.work_thread_pool.spawn(transformedSamplesWorker, .{ app, id, block_id, block_count });
try self.appendRunningThread(app.allocator, block_id);
}
if (self.processed_up_to == sample_list.blocks.items.len and self.getRunningThreadCount() == 0) {
return true;
}
}
}
return false;
}
};
allocator: Allocator, allocator: Allocator,
ui: UI, ui: UI,
double_pass_ui: bool = true, double_pass_ui: bool = true,
@ -1261,15 +1363,22 @@ command_queue: std.BoundedArray(Command, 16) = .{},
file_picker_id: ?Platform.FilePickerId = null, file_picker_id: ?Platform.FilePickerId = null,
work_jobs: GenerationalArray(WorkJob) = .{},
work_thread_pool: std.Thread.Pool,
pub fn init(self: *App, allocator: Allocator) !void { pub fn init(self: *App, allocator: Allocator) !void {
self.* = App{ self.* = App{
.allocator = allocator, .allocator = allocator,
.ui = UI.init(allocator), .ui = UI.init(allocator),
.main_screen = undefined, .main_screen = undefined,
.collection_thread = undefined, .collection_thread = undefined,
.channel_from_device = undefined .channel_from_device = undefined,
.work_thread_pool = undefined
}; };
try self.initUI(); try self.initUI();
try self.work_thread_pool.init(.{
.allocator = allocator
});
if (NIDaq.Api.init()) |ni_daq_api| { if (NIDaq.Api.init()) |ni_daq_api| {
self.ni_daq_api = ni_daq_api; self.ni_daq_api = ni_daq_api;
@ -1301,6 +1410,7 @@ pub fn init(self: *App, allocator: Allocator) !void {
} }
pub fn deinit(self: *App) void { pub fn deinit(self: *App) void {
self.work_thread_pool.deinit();
self.deinitProject(); self.deinitProject();
self.should_close = true; self.should_close = true;
@ -1316,6 +1426,13 @@ pub fn deinit(self: *App) void {
if (self.ni_daq_api) |*ni_daq_api| { if (self.ni_daq_api) |*ni_daq_api| {
ni_daq_api.deinit(); ni_daq_api.deinit();
} }
{
var iter = self.work_jobs.iterator();
while (iter.next()) |work_job| {
work_job.deinit(self.allocator);
}
}
} }
fn deinitProject(self: *App) void { fn deinitProject(self: *App) void {
@ -1357,7 +1474,9 @@ fn loadProject(self: *App) !void {
errdefer loaded.deinit(self.allocator); errdefer loaded.deinit(self.allocator);
try loaded.initFromFile(self.allocator, save_location); try loaded.initFromFile(self.allocator, save_location);
self.deinitUI();
self.deinitProject(); self.deinitProject();
self.project = loaded.*; self.project = loaded.*;
var file_iter = self.project.files.idIterator(); var file_iter = self.project.files.idIterator();
@ -1381,7 +1500,6 @@ fn loadProject(self: *App) !void {
}; };
} }
self.deinitUI();
self.initUI() catch @panic("Failed to initialize UI, can't recover"); self.initUI() catch @panic("Failed to initialize UI, can't recover");
} }
@ -1525,9 +1643,125 @@ pub fn tick(self: *App) !void {
self.loadFile(file_id) catch |e| { self.loadFile(file_id) catch |e| {
log.err("Failed to load file: {}", .{ e }); log.err("Failed to load file: {}", .{ e });
}; };
},
.update_sliding_window => |args| {
const view = self.project.views.get(args.view_id) orelse continue;
view.sliding_window = args.sliding_window;
_ = self.work_jobs.insert(WorkJob{
.view_id = args.view_id,
.sliding_window = args.sliding_window
}) catch |e| {
log.err("Failed to create a work job: {}", .{ e });
continue;
};
} }
} }
} }
{
var work_job_iter = self.work_jobs.idIterator();
while (work_job_iter.next()) |work_job_id| {
const work_job = self.work_jobs.get(work_job_id).?;
const job_done = try work_job.update(work_job_id, self);
if (job_done) {
if (work_job.getRunningThreadCount() == 0) {
// std.debug.print("job done {}\n", .{work_job_id});
work_job.deinit(self.allocator);
self.work_jobs.remove(work_job_id);
}
}
}
}
}
fn transformedSamplesWorker(self: *App, work_job_id: Id, starting_block_id: SampleList.Block.Id, block_count: usize) void {
const work_job = self.work_jobs.get(work_job_id) orelse return;
defer work_job.removeRunningThread(starting_block_id);
const allocator = self.allocator;
var timer = std.time.Timer.start() catch unreachable;
const view = self.project.views.get(work_job.view_id) orelse return;
const transformed_samples_id = view.transformed_samples orelse return;
const transformed_samples = self.project.sample_lists.get(transformed_samples_id) orelse return;
const sliding_window_f64: f64 = @ceil(view.sliding_window.?);
const sliding_window: usize = @intFromFloat(sliding_window_f64);
const sample_list_id = self.project.getViewSampleListId(work_job.view_id);
const sample_list = self.project.sample_lists.get(sample_list_id) orelse return;
var running_sum: f64 = 0;
var last_samples = std.ArrayList(f64).init(allocator);
defer last_samples.deinit();
last_samples.ensureTotalCapacityPrecise(sliding_window) catch return;
for (0..@intFromFloat(@ceil(sliding_window_f64/SampleList.Block.capacity))) |block_offset| {
if (block_offset >= starting_block_id) {
break;
}
const source_block = sample_list.getBlock(starting_block_id - (block_offset + 1)).?;
for (0..source_block.len) |i| {
const sample = source_block.buffer[source_block.len - (i + 1)];
if (last_samples.items.len == last_samples.capacity) {
_ = last_samples.orderedRemove(0);
}
last_samples.appendAssumeCapacity(sample);
}
}
for (0..block_count) |i| {
const block_id = starting_block_id + i;
const source_block = sample_list.getBlock(block_id).?;
const transformed_block = transformed_samples.getBlock(block_id).?;
for (0..source_block.len) |j| {
const sample = source_block.buffer[j];
transformed_block.buffer[j] = sample * 0.1;
if (last_samples.items.len == last_samples.capacity) {
running_sum -= last_samples.orderedRemove(0);
}
last_samples.appendAssumeCapacity(sample);
running_sum += sample;
transformed_block.buffer[j] = running_sum / @as(f64, @floatFromInt(last_samples.items.len));
}
transformed_block.len = source_block.len;
transformed_block.recomputeMinMax();
}
// for (0..(SampleList.Block.capacity * block_count)) |offset| {
// const i = starting_block_id * SampleList.Block.capacity + offset;
// const transformed_sample = &transformed_samples.getBlock(@divFloor(i, SampleList.Block.capacity)).?.buffer[@mod(i, SampleList.Block.capacity)];
// if (i >= 3) {
// const zero: f64 = 0;
// const sample1 = (sample_list.getSample(i) orelse &zero).*;
// const sample2 = (sample_list.getSample(i-1) orelse &zero).*;
// const sample3 = (sample_list.getSample(i-2) orelse &zero).*;
// const sample4 = (sample_list.getSample(i-3) orelse &zero).*;
// // if (sample_list.getSample(i)) |sample| {
// // }
// transformed_sample.* = @tan(sample1 + sample2 + sample3 + sample4);
// } else {
// transformed_sample.* = 0;
// }
// }
// for (0..block_count) |i| {
// transformed_samples.getBlock(starting_block_id + i).?.len = sample_list.getBlock(starting_block_id + i).?.len;
// }
const duration = timer.read();
_ = duration;
// std.debug.print("finished {d:.5}ms\n", .{ @as(f64, @floatFromInt(duration)) / std.time.ns_per_ms });
} }
pub fn pushCommand(self: *App, command: Command) void { pub fn pushCommand(self: *App, command: Command) void {
@ -1893,6 +2127,9 @@ fn loadSavedSamples(self: *App, channel_id: Id) !void {
return error.NotMultipleOf8; return error.NotMultipleOf8;
} }
const sample_list = self.project.sample_lists.get(channel.collected_samples_id).?;
sample_list.clear(self.allocator);
try self.project.readSamplesFromFile(self.allocator, channel.collected_samples_id, samples_file, @divExact(byte_count, 8)); try self.project.readSamplesFromFile(self.allocator, channel.collected_samples_id, samples_file, @divExact(byte_count, 8));
const stat = try dir.statFile(saved_samples_location); const stat = try dir.statFile(saved_samples_location);

View File

@ -109,7 +109,11 @@ fn showGraph(ctx: Context, view_id: Id) *UI.Box {
{ // Render graph { // Render graph
const sample_list_id = app.project.getViewSampleListId(view_id); var sample_list_id = app.project.getViewSampleListId(view_id);
if (view.transformed_samples) |transformed_samples_id| {
sample_list_id = transformed_samples_id;
}
const sample_list = app.project.sample_lists.get(sample_list_id).?; const sample_list = app.project.sample_lists.get(sample_list_id).?;
Graph.drawCached(&view.graph_cache, graph_box.persistent.size, view_opts.*, sample_list); Graph.drawCached(&view.graph_cache, graph_box.persistent.size, view_opts.*, sample_list);
if (view.graph_cache.texture) |texture| { if (view.graph_cache.texture) |texture| {

View File

@ -78,8 +78,6 @@ pub fn main() !void {
raylib_h.SetTraceLogCallback(raylibTraceLogCallback); raylib_h.SetTraceLogCallback(raylibTraceLogCallback);
rl.setTraceLogLevel(toRaylibLogLevel(std.options.log_level)); rl.setTraceLogLevel(toRaylibLogLevel(std.options.log_level));
// std.debug.print("{any}\n", .{ std.mem.bytesAsSlice(f64, std.mem.sliceAsBytes(&[_]f64{ 1, 2, 3}) ) });
const icon_png = @embedFile("./assets/icon.png"); const icon_png = @embedFile("./assets/icon.png");
var icon_image = rl.loadImageFromMemory(".png", icon_png); var icon_image = rl.loadImageFromMemory(".png", icon_png);
defer icon_image.unload(); defer icon_image.unload();

View File

@ -388,10 +388,21 @@ fn showViewSettings(self: *MainScreen, view_id: Id) !void {
} }
_ = ui.label("Duration: {s}", .{ duration_str }); _ = ui.label("Duration: {s}", .{ duration_str });
try project.updateSlidingWindow(view_id, self.app.allocator, try ui.numberInput(f64, .{ const new_sliding_window = try ui.numberInput(f64, .{
.key = ui.keyFromString("Sliding window"), .key = ui.keyFromString("Sliding window"),
.storage = &self.sliding_window_input .storage = &self.sliding_window_input
})); });
if (new_sliding_window != view.sliding_window) {
if (new_sliding_window == null or new_sliding_window.? > 0) {
self.app.pushCommand(.{
.update_sliding_window = .{
.view_id = view_id,
.sliding_window = new_sliding_window
}
});
}
}
} }
fn showMarkedRange(self: *MainScreen, view_id: Id, index: usize) void { fn showMarkedRange(self: *MainScreen, view_id: Id, index: usize) void {