From 9a83bcfbb5cb6020208388ed64901b7afd528ada Mon Sep 17 00:00:00 2001 From: Rokas Puzonas Date: Tue, 19 Nov 2024 22:24:24 +0200 Subject: [PATCH] move sampling to thread --- build.zig | 11 ++- src/main.zig | 271 +++++++++++++++++++++++++++++++++++---------------- 2 files changed, 197 insertions(+), 85 deletions(-) diff --git a/build.zig b/build.zig index 354a939..eeca6eb 100644 --- a/build.zig +++ b/build.zig @@ -36,16 +36,21 @@ pub fn build(b: *std.Build) !void { .target = target, .optimize = optimize, }); + exe.linkLibrary(raylib_dep.artifact("raylib")); exe.root_module.addImport("raylib", raylib_dep.module("raylib")); const external_compiler_support_dir = try std.process.getEnvVarOwned(b.allocator, "NIEXTCCOMPILERSUPP"); exe.addSystemIncludePath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "include" }) }); - exe.addLibraryPath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "lib64", "msvc" }) }); + + const lib_folder_name = if (target.result.ptrBitWidth() == 64) "lib64" else "lib32"; + exe.addLibraryPath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, lib_folder_name, "msvc" }) }); exe.linkSystemLibrary("nidaqmx"); - exe.linkSystemLibrary("odbccp32"); - exe.linkSystemLibrary("odbc32"); + + if (target.result.os.tag == .windows) { + exe.subsystem = if (optimize == .Debug) .Console else .Windows; + } b.installArtifact(exe); const run_cmd = b.addRunArtifact(exe); diff --git a/src/main.zig b/src/main.zig index 08abbd4..c9b961f 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,10 +1,17 @@ const std = @import("std"); +const builtin = @import("builtin"); const rl = @import("raylib"); const c = @cImport({ @cInclude("stdint.h"); @cDefine("__int64", "long long"); @cInclude("NIDAQmx.h"); }); +const windows = @cImport({ + @cDefine("_WIN32_WINNT", "0x0500"); + @cInclude("windows.h"); +}); + +const log = std.log; const Allocator = std.mem.Allocator; const FontFace = @import("font-face.zig"); @@ -20,18 +27,24 @@ fn toTraceLogLevel(log_level: std.log.Level) rl.TraceLogLevel { }; } -fn checkDAQmxError(error_code: i32) !void { - if (error_code != 0) { - var error_msg: [512:0]u8 = .{ 0 } ** 512; - if (c.DAQmxGetErrorString(error_code, &error_msg, error_msg.len) == 0) { - std.debug.print("DAQmx error ({}): {s}\n", .{error_code, error_msg}); - } else { - std.debug.print("DAQmx error ({}): Unknown (Buffer too small for error message)\n", .{error_code}); - } +fn logDAQmxError(error_code: i32) void { + if (error_code == 0) { + return; } + var error_msg: [512:0]u8 = .{ 0 } ** 512; + if (c.DAQmxGetErrorString(error_code, &error_msg, error_msg.len) == 0) { + log.err("DAQmx ({}): {s}\n", .{error_code, error_msg}); + } else { + log.err("DAQmx ({}): Unknown (Buffer too small for error message)\n", .{error_code}); + } +} + +fn checkDAQmxError(error_code: i32, err: anyerror) !void { + logDAQmxError(error_code); + if (error_code < 0) { - return error.DAQError; + return err; } } @@ -57,7 +70,10 @@ fn listDeviceNames(allocator: Allocator) ![][:0]u8 { const device_names_list = try allocator.alloc(u8, @intCast(required_size)); defer allocator.free(device_names_list); - try checkDAQmxError( c.DAQmxGetSysDevNames(device_names_list.ptr, @intCast(device_names_list.len)) ); + try checkDAQmxError( + c.DAQmxGetSysDevNames(device_names_list.ptr, @intCast(device_names_list.len)), + error.getDeviceNames + ); const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable; assert(nullbyte == required_size - 1); @@ -74,7 +90,10 @@ fn listDeviceAIPhysicalChannels(allocator: Allocator, device: [:0]u8) ![][:0]u8 const device_names_list = try allocator.alloc(u8, @intCast(required_size)); defer allocator.free(device_names_list); - try checkDAQmxError( c.DAQmxGetDevAIPhysicalChans(device, device_names_list.ptr, @intCast(device_names_list.len)) ); + try checkDAQmxError( + c.DAQmxGetDevAIPhysicalChans(device, device_names_list.ptr, @intCast(device_names_list.len)), + error.getAIChannels + ); const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable; assert(nullbyte == required_size - 1); @@ -114,14 +133,119 @@ const Channel = struct { } }; +const Application = struct { + allocator: Allocator, + channels: std.ArrayList(Channel), + + task_handle: c.TaskHandle = null, + read_thread: ?std.Thread = null, + read_timeout_s: f64 = 0.01, + last_read_size: u32 = 0, + last_read_at_ns: i128 = 0, + samples_mutex: std.Thread.Mutex = .{}, + dropped_samples: u32 = 0, + + fn init(allocator: Allocator) Application { + return Application{ + .allocator = allocator, + .channels = std.ArrayList(Channel).init(allocator) + }; + } + + fn deinit(self: *Application) void { + self.stopReadingThread(); + + for (self.channels.items) |channel| { + channel.deinit(); + } + self.channels.deinit(); + } + + fn appendChannel(self: *Application) !*Channel { + try self.channels.append(Channel.init(self.allocator)); + return &self.channels.items[self.channels.items.len-1]; + } + + fn startReadingThread(self: *Application) !void { + assert(self.read_thread == null); + self.read_thread = try std.Thread.spawn(.{ .allocator = self.allocator }, readThreadMain, .{ self }); + } + + fn stopReadingThread(self: *Application) void { + if (self.read_thread) |thread| { + self.read_thread = null; + thread.join(); + } + } + + fn readChannels(self: *Application) !void { + var read_buffer: [1024]f64 = undefined; + var read: i32 = 0; + const err = c.DAQmxReadAnalogF64(self.task_handle, -1, self.read_timeout_s, c.DAQmx_Val_GroupByChannel, &read_buffer, @intCast(read_buffer.len), &read, null); + if (err == 0 and read > 0) { + const read_u32 = @as(u32, @intCast(read)); + + self.samples_mutex.lock(); + defer self.samples_mutex.unlock(); + for (0.., self.channels.items) |i, *channel| { + const channel_samples = read_buffer[(i*read_u32)..((i+1)*read_u32)]; + try channel.samples.appendSlice(channel_samples); + } + + self.last_read_size = read_u32; + self.last_read_at_ns = std.time.nanoTimestamp(); + } else if (err != 0 and err != c.DAQmxErrorSamplesNotYetAvailable) { + + if (err == c.DAQmxErrorSamplesNoLongerAvailable) { + self.dropped_samples += 1; + log.err("Dropped samples!", .{}); + } else { + try checkDAQmxError(err, error.readAnalog); + } + } + } + + fn readThreadMain(app: *Application) void { + while (app.read_thread != null) { + var is_done: c_uint = 0; + _ = c.DAQmxIsTaskDone(app.task_handle, &is_done); + if (is_done != 0) { + log.info("Detected that task is done, stopped reading", .{}); + break; + } + + app.readChannels() catch |e| { + log.err("readChannels() {}\n", .{e}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + }; + } + } +}; + +pub fn nanoToSeconds(ns: i128) f32 { + return @as(f32, @floatFromInt(ns)) / std.time.ns_per_s; +} + pub fn main() !void { + const start_time = std.time.nanoTimestamp(); + + // if (builtin.os.tag == .windows) { + // const hWnd = windows.GetConsoleWindow(); + // std.debug.print("{any}\n", .{hWnd}); + // //_ = windows.ShowWindow(hWnd, windows.SW_HIDE); + // _ = windows.ShowWindow(hWnd, windows.SW_SHOW); + // } + rl.setTraceLogLevel(toTraceLogLevel(std.log.default_level)); var gpa = std.heap.GeneralPurposeAllocator(.{}){}; const allocator = gpa.allocator(); defer _ = gpa.deinit(); - + var app = Application.init(allocator); + defer app.deinit(); const devices = try listDeviceNames(allocator); defer freeNameList(allocator, devices); @@ -139,50 +263,34 @@ pub fn main() !void { const sample_rate: f64 = 5000; - var task_handle: c.TaskHandle = null; - try checkDAQmxError(c.DAQmxCreateTask("", &task_handle)); - defer checkDAQmxError(c.DAQmxClearTask(task_handle)) catch unreachable; + try checkDAQmxError(c.DAQmxCreateTask("", &app.task_handle), error.createTask); + defer checkDAQmxError(c.DAQmxClearTask(app.task_handle), error.clearTask) catch unreachable; - var channels = std.ArrayList(Channel).init(allocator); - defer channels.deinit(); - defer { - for (channels.items) |channel| { - channel.deinit(); + for (devices) |device| { + const channel_names = try listDeviceAIPhysicalChannels(allocator, device); + defer freeNameList(allocator, channel_names); + + for (0.., channel_names) |i, channel_name| { + var channel = try app.appendChannel(); + channel.color = rl.Color.fromHSV(@as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(channel_names.len)) * 360, 0.75, 0.8); + channel.min_sample = -10.0; + channel.max_sample = 10.0; + try checkDAQmxError( + c.DAQmxCreateAIVoltageChan(app.task_handle, channel_name, "", c.DAQmx_Val_Cfg_Default, channel.min_sample, channel.max_sample, c.DAQmx_Val_Volts, null), + error.createAIVoltageChannel + ); } } - var channel1 = Channel.init(allocator); - channel1.color = rl.Color.red; - channel1.min_sample = -10.0; - channel1.max_sample = 10.0; - try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai1", "", c.DAQmx_Val_Cfg_Default, channel1.min_sample, channel1.max_sample, c.DAQmx_Val_Volts, null)); - try channels.append(channel1); + try checkDAQmxError( + c.DAQmxCfgSampClkTiming(app.task_handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0), + error.configureSampleRate + ); - var channel2 = Channel.init(allocator); - channel2.color = rl.Color.green; - channel2.min_sample = -10.0; - channel2.max_sample = 10.0; - try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai2", "", c.DAQmx_Val_Cfg_Default, channel2.min_sample, channel2.max_sample, c.DAQmx_Val_Volts, null)); - try channels.append(channel2); + try checkDAQmxError(c.DAQmxStartTask(app.task_handle), error.startTask); + defer checkDAQmxError(c.DAQmxStopTask(app.task_handle), error.stopTask) catch unreachable; - var channel3 = Channel.init(allocator); - channel3.color = rl.Color.blue; - channel3.min_sample = -10.0; - channel3.max_sample = 10.0; - try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai3", "", c.DAQmx_Val_Cfg_Default, channel3.min_sample, channel3.max_sample, c.DAQmx_Val_Volts, null)); - try channels.append(channel3); - - var channel4 = Channel.init(allocator); - channel4.color = rl.Color.yellow; - channel4.min_sample = -10.0; - channel4.max_sample = 10.0; - try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai4", "", c.DAQmx_Val_Cfg_Default, channel4.min_sample, channel4.max_sample, c.DAQmx_Val_Volts, null)); - try channels.append(channel4); - - try checkDAQmxError(c.DAQmxCfgSampClkTiming(task_handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0)); - - try checkDAQmxError(c.DAQmxStartTask(task_handle)); - defer checkDAQmxError(c.DAQmxStopTask(task_handle)) catch unreachable; + try app.startReadingThread(); rl.initWindow(800, 450, "DAQ view"); defer rl.closeWindow(); @@ -194,8 +302,7 @@ pub fn main() !void { .font = rl.getFontDefault() }; - var last_read_size: u32 = 0; - var last_read_at: f64 = 0; + var zoom: f64 = 1.0; while (!rl.windowShouldClose()) { rl.beginDrawing(); @@ -212,12 +319,13 @@ pub fn main() !void { rl.Color.gray ); - for (channels.items) |channel| { + app.samples_mutex.lock(); + for (app.channels.items) |channel| { const samples = channel.samples; const min_sample: f32 = @floatCast(channel.min_sample); const max_sample: f32 = @floatCast(channel.max_sample); - const max_visible_samples: u32 = @intFromFloat(sample_rate * 5); + const max_visible_samples: u32 = @intFromFloat(sample_rate * 20); var shown_samples = samples.items; if (shown_samples.len > max_visible_samples) { shown_samples = samples.items[(samples.items.len-max_visible_samples-1)..(samples.items.len-1)]; @@ -244,35 +352,53 @@ pub fn main() !void { const start_pos = Vec2.init( (offset_i + i) / max_visible_samples * window_width, - remap(min_sample, max_sample, 0, window_height, @floatCast(min_slice_sample)) + remap(min_sample, max_sample, 0, window_height, @as(f32, @floatCast(min_slice_sample)) * @as(f32, @floatCast(zoom))) ); const end_pos = Vec2.init( (offset_i + i) / max_visible_samples * window_width, - remap(min_sample, max_sample, 0, window_height, @floatCast(max_slice_sample)) + remap(min_sample, max_sample, 0, window_height, @as(f32, @floatCast(max_slice_sample)) * @as(f32, @floatCast(zoom))) ); rl.drawLineV(start_pos, end_pos, color); } } } + app.samples_mutex.unlock(); - const now = rl.getTime(); + if (rl.isKeyPressedRepeat(rl.KeyboardKey.key_e) or rl.isKeyPressed(rl.KeyboardKey.key_e)) { + zoom *= 1.1; + } + if (rl.isKeyPressedRepeat(rl.KeyboardKey.key_q) or rl.isKeyPressed(rl.KeyboardKey.key_q)) { + zoom *= 0.9; + } + + const now_ns = std.time.nanoTimestamp(); + const now_since_start = nanoToSeconds(now_ns - start_time); { var y: f32 = 10; - try font_face.drawTextAlloc(allocator, "Time: {d:.03}", .{now}, Vec2.init(10, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Time: {d:.03}", .{now_since_start}, Vec2.init(10, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Last read size (bytes): {d}", .{last_read_size * @as(u32, @intCast(channels.items.len)) * @sizeOf(f64)}, Vec2.init(20, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Zoom: {d:.03}", .{zoom}, Vec2.init(10, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Last read count per channel: {d}", .{last_read_size}, Vec2.init(20, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Dropped samples: {d:.03}", .{app.dropped_samples}, Vec2.init(10, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Time since last read: {d:.05}", .{now - last_read_at}, Vec2.init(20, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Last read size (bytes): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len)) * @sizeOf(f64)}, Vec2.init(20, y), rl.Color.black); y += 10; - for (1.., channels.items) |i, channel| { + try font_face.drawTextAlloc(allocator, "Last read count (doubles): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len))}, Vec2.init(20, y), rl.Color.black); + y += 10; + + try font_face.drawTextAlloc(allocator, "Last read count per channel: {d}", .{app.last_read_size}, Vec2.init(20, y), rl.Color.black); + y += 10; + + try font_face.drawTextAlloc(allocator, "Time since last read: {d:.05}", .{nanoToSeconds(now_ns - app.last_read_at_ns)}, Vec2.init(20, y), rl.Color.black); + y += 10; + + for (1.., app.channels.items) |i, channel| { const sample_count = channel.samples.items.len; y += 10; @@ -282,30 +408,11 @@ pub fn main() !void { try font_face.drawTextAlloc(allocator, "Sample count: {}", .{sample_count}, Vec2.init(20, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now}, Vec2.init(20, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now_since_start}, Vec2.init(20, y), rl.Color.black); y += 10; } } rl.drawFPS(@as(i32, @intFromFloat(window_width)) - 100, 10); - - var read_buffer: [1024]f64 = undefined; - var read: i32 = 0; - const err = c.DAQmxReadAnalogF64(task_handle, -1, 0, c.DAQmx_Val_GroupByChannel, &read_buffer, @intCast(read_buffer.len), &read, null); - if (err == 0 and read > 0) { - const read_u32 = @as(u32, @intCast(read)); - - for (0.., channels.items) |i, *channel| { - const channel_samples = read_buffer[(i*read_u32)..((i+1)*read_u32)]; - try channel.samples.appendSlice(channel_samples); - } - - last_read_size = read_u32; - last_read_at = now; - } else if (err != 0 and err != c.DAQmxErrorSamplesNotYetAvailable) { - // TODO: Handle error c.DAQmxErrorSamplesNoLongerAvailable - // This error occurs, when application is not reading data fast enough and the DAQmx internal buffer fills up. - try checkDAQmxError(err); - } } }