move sampling to thread

This commit is contained in:
Rokas Puzonas 2024-11-19 22:24:24 +02:00
parent 3838293c9d
commit 9a83bcfbb5
2 changed files with 197 additions and 85 deletions

View File

@ -36,16 +36,21 @@ pub fn build(b: *std.Build) !void {
.target = target, .target = target,
.optimize = optimize, .optimize = optimize,
}); });
exe.linkLibrary(raylib_dep.artifact("raylib")); exe.linkLibrary(raylib_dep.artifact("raylib"));
exe.root_module.addImport("raylib", raylib_dep.module("raylib")); exe.root_module.addImport("raylib", raylib_dep.module("raylib"));
const external_compiler_support_dir = try std.process.getEnvVarOwned(b.allocator, "NIEXTCCOMPILERSUPP"); const external_compiler_support_dir = try std.process.getEnvVarOwned(b.allocator, "NIEXTCCOMPILERSUPP");
exe.addSystemIncludePath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "include" }) }); exe.addSystemIncludePath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "include" }) });
exe.addLibraryPath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "lib64", "msvc" }) });
const lib_folder_name = if (target.result.ptrBitWidth() == 64) "lib64" else "lib32";
exe.addLibraryPath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, lib_folder_name, "msvc" }) });
exe.linkSystemLibrary("nidaqmx"); exe.linkSystemLibrary("nidaqmx");
exe.linkSystemLibrary("odbccp32");
exe.linkSystemLibrary("odbc32"); if (target.result.os.tag == .windows) {
exe.subsystem = if (optimize == .Debug) .Console else .Windows;
}
b.installArtifact(exe); b.installArtifact(exe);
const run_cmd = b.addRunArtifact(exe); const run_cmd = b.addRunArtifact(exe);

View File

@ -1,10 +1,17 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const rl = @import("raylib"); const rl = @import("raylib");
const c = @cImport({ const c = @cImport({
@cInclude("stdint.h"); @cInclude("stdint.h");
@cDefine("__int64", "long long"); @cDefine("__int64", "long long");
@cInclude("NIDAQmx.h"); @cInclude("NIDAQmx.h");
}); });
const windows = @cImport({
@cDefine("_WIN32_WINNT", "0x0500");
@cInclude("windows.h");
});
const log = std.log;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const FontFace = @import("font-face.zig"); const FontFace = @import("font-face.zig");
@ -20,18 +27,24 @@ fn toTraceLogLevel(log_level: std.log.Level) rl.TraceLogLevel {
}; };
} }
fn checkDAQmxError(error_code: i32) !void { fn logDAQmxError(error_code: i32) void {
if (error_code != 0) { if (error_code == 0) {
var error_msg: [512:0]u8 = .{ 0 } ** 512; return;
if (c.DAQmxGetErrorString(error_code, &error_msg, error_msg.len) == 0) {
std.debug.print("DAQmx error ({}): {s}\n", .{error_code, error_msg});
} else {
std.debug.print("DAQmx error ({}): Unknown (Buffer too small for error message)\n", .{error_code});
}
} }
var error_msg: [512:0]u8 = .{ 0 } ** 512;
if (c.DAQmxGetErrorString(error_code, &error_msg, error_msg.len) == 0) {
log.err("DAQmx ({}): {s}\n", .{error_code, error_msg});
} else {
log.err("DAQmx ({}): Unknown (Buffer too small for error message)\n", .{error_code});
}
}
fn checkDAQmxError(error_code: i32, err: anyerror) !void {
logDAQmxError(error_code);
if (error_code < 0) { if (error_code < 0) {
return error.DAQError; return err;
} }
} }
@ -57,7 +70,10 @@ fn listDeviceNames(allocator: Allocator) ![][:0]u8 {
const device_names_list = try allocator.alloc(u8, @intCast(required_size)); const device_names_list = try allocator.alloc(u8, @intCast(required_size));
defer allocator.free(device_names_list); defer allocator.free(device_names_list);
try checkDAQmxError( c.DAQmxGetSysDevNames(device_names_list.ptr, @intCast(device_names_list.len)) ); try checkDAQmxError(
c.DAQmxGetSysDevNames(device_names_list.ptr, @intCast(device_names_list.len)),
error.getDeviceNames
);
const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable; const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable;
assert(nullbyte == required_size - 1); assert(nullbyte == required_size - 1);
@ -74,7 +90,10 @@ fn listDeviceAIPhysicalChannels(allocator: Allocator, device: [:0]u8) ![][:0]u8
const device_names_list = try allocator.alloc(u8, @intCast(required_size)); const device_names_list = try allocator.alloc(u8, @intCast(required_size));
defer allocator.free(device_names_list); defer allocator.free(device_names_list);
try checkDAQmxError( c.DAQmxGetDevAIPhysicalChans(device, device_names_list.ptr, @intCast(device_names_list.len)) ); try checkDAQmxError(
c.DAQmxGetDevAIPhysicalChans(device, device_names_list.ptr, @intCast(device_names_list.len)),
error.getAIChannels
);
const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable; const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable;
assert(nullbyte == required_size - 1); assert(nullbyte == required_size - 1);
@ -114,14 +133,119 @@ const Channel = struct {
} }
}; };
const Application = struct {
allocator: Allocator,
channels: std.ArrayList(Channel),
task_handle: c.TaskHandle = null,
read_thread: ?std.Thread = null,
read_timeout_s: f64 = 0.01,
last_read_size: u32 = 0,
last_read_at_ns: i128 = 0,
samples_mutex: std.Thread.Mutex = .{},
dropped_samples: u32 = 0,
fn init(allocator: Allocator) Application {
return Application{
.allocator = allocator,
.channels = std.ArrayList(Channel).init(allocator)
};
}
fn deinit(self: *Application) void {
self.stopReadingThread();
for (self.channels.items) |channel| {
channel.deinit();
}
self.channels.deinit();
}
fn appendChannel(self: *Application) !*Channel {
try self.channels.append(Channel.init(self.allocator));
return &self.channels.items[self.channels.items.len-1];
}
fn startReadingThread(self: *Application) !void {
assert(self.read_thread == null);
self.read_thread = try std.Thread.spawn(.{ .allocator = self.allocator }, readThreadMain, .{ self });
}
fn stopReadingThread(self: *Application) void {
if (self.read_thread) |thread| {
self.read_thread = null;
thread.join();
}
}
fn readChannels(self: *Application) !void {
var read_buffer: [1024]f64 = undefined;
var read: i32 = 0;
const err = c.DAQmxReadAnalogF64(self.task_handle, -1, self.read_timeout_s, c.DAQmx_Val_GroupByChannel, &read_buffer, @intCast(read_buffer.len), &read, null);
if (err == 0 and read > 0) {
const read_u32 = @as(u32, @intCast(read));
self.samples_mutex.lock();
defer self.samples_mutex.unlock();
for (0.., self.channels.items) |i, *channel| {
const channel_samples = read_buffer[(i*read_u32)..((i+1)*read_u32)];
try channel.samples.appendSlice(channel_samples);
}
self.last_read_size = read_u32;
self.last_read_at_ns = std.time.nanoTimestamp();
} else if (err != 0 and err != c.DAQmxErrorSamplesNotYetAvailable) {
if (err == c.DAQmxErrorSamplesNoLongerAvailable) {
self.dropped_samples += 1;
log.err("Dropped samples!", .{});
} else {
try checkDAQmxError(err, error.readAnalog);
}
}
}
fn readThreadMain(app: *Application) void {
while (app.read_thread != null) {
var is_done: c_uint = 0;
_ = c.DAQmxIsTaskDone(app.task_handle, &is_done);
if (is_done != 0) {
log.info("Detected that task is done, stopped reading", .{});
break;
}
app.readChannels() catch |e| {
log.err("readChannels() {}\n", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
};
}
}
};
pub fn nanoToSeconds(ns: i128) f32 {
return @as(f32, @floatFromInt(ns)) / std.time.ns_per_s;
}
pub fn main() !void { pub fn main() !void {
const start_time = std.time.nanoTimestamp();
// if (builtin.os.tag == .windows) {
// const hWnd = windows.GetConsoleWindow();
// std.debug.print("{any}\n", .{hWnd});
// //_ = windows.ShowWindow(hWnd, windows.SW_HIDE);
// _ = windows.ShowWindow(hWnd, windows.SW_SHOW);
// }
rl.setTraceLogLevel(toTraceLogLevel(std.log.default_level)); rl.setTraceLogLevel(toTraceLogLevel(std.log.default_level));
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator(); const allocator = gpa.allocator();
defer _ = gpa.deinit(); defer _ = gpa.deinit();
var app = Application.init(allocator);
defer app.deinit();
const devices = try listDeviceNames(allocator); const devices = try listDeviceNames(allocator);
defer freeNameList(allocator, devices); defer freeNameList(allocator, devices);
@ -139,50 +263,34 @@ pub fn main() !void {
const sample_rate: f64 = 5000; const sample_rate: f64 = 5000;
var task_handle: c.TaskHandle = null; try checkDAQmxError(c.DAQmxCreateTask("", &app.task_handle), error.createTask);
try checkDAQmxError(c.DAQmxCreateTask("", &task_handle)); defer checkDAQmxError(c.DAQmxClearTask(app.task_handle), error.clearTask) catch unreachable;
defer checkDAQmxError(c.DAQmxClearTask(task_handle)) catch unreachable;
var channels = std.ArrayList(Channel).init(allocator); for (devices) |device| {
defer channels.deinit(); const channel_names = try listDeviceAIPhysicalChannels(allocator, device);
defer { defer freeNameList(allocator, channel_names);
for (channels.items) |channel| {
channel.deinit(); for (0.., channel_names) |i, channel_name| {
var channel = try app.appendChannel();
channel.color = rl.Color.fromHSV(@as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(channel_names.len)) * 360, 0.75, 0.8);
channel.min_sample = -10.0;
channel.max_sample = 10.0;
try checkDAQmxError(
c.DAQmxCreateAIVoltageChan(app.task_handle, channel_name, "", c.DAQmx_Val_Cfg_Default, channel.min_sample, channel.max_sample, c.DAQmx_Val_Volts, null),
error.createAIVoltageChannel
);
} }
} }
var channel1 = Channel.init(allocator); try checkDAQmxError(
channel1.color = rl.Color.red; c.DAQmxCfgSampClkTiming(app.task_handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0),
channel1.min_sample = -10.0; error.configureSampleRate
channel1.max_sample = 10.0; );
try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai1", "", c.DAQmx_Val_Cfg_Default, channel1.min_sample, channel1.max_sample, c.DAQmx_Val_Volts, null));
try channels.append(channel1);
var channel2 = Channel.init(allocator); try checkDAQmxError(c.DAQmxStartTask(app.task_handle), error.startTask);
channel2.color = rl.Color.green; defer checkDAQmxError(c.DAQmxStopTask(app.task_handle), error.stopTask) catch unreachable;
channel2.min_sample = -10.0;
channel2.max_sample = 10.0;
try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai2", "", c.DAQmx_Val_Cfg_Default, channel2.min_sample, channel2.max_sample, c.DAQmx_Val_Volts, null));
try channels.append(channel2);
var channel3 = Channel.init(allocator); try app.startReadingThread();
channel3.color = rl.Color.blue;
channel3.min_sample = -10.0;
channel3.max_sample = 10.0;
try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai3", "", c.DAQmx_Val_Cfg_Default, channel3.min_sample, channel3.max_sample, c.DAQmx_Val_Volts, null));
try channels.append(channel3);
var channel4 = Channel.init(allocator);
channel4.color = rl.Color.yellow;
channel4.min_sample = -10.0;
channel4.max_sample = 10.0;
try checkDAQmxError(c.DAQmxCreateAIVoltageChan(task_handle, "Dev1/ai4", "", c.DAQmx_Val_Cfg_Default, channel4.min_sample, channel4.max_sample, c.DAQmx_Val_Volts, null));
try channels.append(channel4);
try checkDAQmxError(c.DAQmxCfgSampClkTiming(task_handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0));
try checkDAQmxError(c.DAQmxStartTask(task_handle));
defer checkDAQmxError(c.DAQmxStopTask(task_handle)) catch unreachable;
rl.initWindow(800, 450, "DAQ view"); rl.initWindow(800, 450, "DAQ view");
defer rl.closeWindow(); defer rl.closeWindow();
@ -194,8 +302,7 @@ pub fn main() !void {
.font = rl.getFontDefault() .font = rl.getFontDefault()
}; };
var last_read_size: u32 = 0; var zoom: f64 = 1.0;
var last_read_at: f64 = 0;
while (!rl.windowShouldClose()) { while (!rl.windowShouldClose()) {
rl.beginDrawing(); rl.beginDrawing();
@ -212,12 +319,13 @@ pub fn main() !void {
rl.Color.gray rl.Color.gray
); );
for (channels.items) |channel| { app.samples_mutex.lock();
for (app.channels.items) |channel| {
const samples = channel.samples; const samples = channel.samples;
const min_sample: f32 = @floatCast(channel.min_sample); const min_sample: f32 = @floatCast(channel.min_sample);
const max_sample: f32 = @floatCast(channel.max_sample); const max_sample: f32 = @floatCast(channel.max_sample);
const max_visible_samples: u32 = @intFromFloat(sample_rate * 5); const max_visible_samples: u32 = @intFromFloat(sample_rate * 20);
var shown_samples = samples.items; var shown_samples = samples.items;
if (shown_samples.len > max_visible_samples) { if (shown_samples.len > max_visible_samples) {
shown_samples = samples.items[(samples.items.len-max_visible_samples-1)..(samples.items.len-1)]; shown_samples = samples.items[(samples.items.len-max_visible_samples-1)..(samples.items.len-1)];
@ -244,35 +352,53 @@ pub fn main() !void {
const start_pos = Vec2.init( const start_pos = Vec2.init(
(offset_i + i) / max_visible_samples * window_width, (offset_i + i) / max_visible_samples * window_width,
remap(min_sample, max_sample, 0, window_height, @floatCast(min_slice_sample)) remap(min_sample, max_sample, 0, window_height, @as(f32, @floatCast(min_slice_sample)) * @as(f32, @floatCast(zoom)))
); );
const end_pos = Vec2.init( const end_pos = Vec2.init(
(offset_i + i) / max_visible_samples * window_width, (offset_i + i) / max_visible_samples * window_width,
remap(min_sample, max_sample, 0, window_height, @floatCast(max_slice_sample)) remap(min_sample, max_sample, 0, window_height, @as(f32, @floatCast(max_slice_sample)) * @as(f32, @floatCast(zoom)))
); );
rl.drawLineV(start_pos, end_pos, color); rl.drawLineV(start_pos, end_pos, color);
} }
} }
} }
app.samples_mutex.unlock();
const now = rl.getTime(); if (rl.isKeyPressedRepeat(rl.KeyboardKey.key_e) or rl.isKeyPressed(rl.KeyboardKey.key_e)) {
zoom *= 1.1;
}
if (rl.isKeyPressedRepeat(rl.KeyboardKey.key_q) or rl.isKeyPressed(rl.KeyboardKey.key_q)) {
zoom *= 0.9;
}
const now_ns = std.time.nanoTimestamp();
const now_since_start = nanoToSeconds(now_ns - start_time);
{ {
var y: f32 = 10; var y: f32 = 10;
try font_face.drawTextAlloc(allocator, "Time: {d:.03}", .{now}, Vec2.init(10, y), rl.Color.black); try font_face.drawTextAlloc(allocator, "Time: {d:.03}", .{now_since_start}, Vec2.init(10, y), rl.Color.black);
y += 10; y += 10;
try font_face.drawTextAlloc(allocator, "Last read size (bytes): {d}", .{last_read_size * @as(u32, @intCast(channels.items.len)) * @sizeOf(f64)}, Vec2.init(20, y), rl.Color.black); try font_face.drawTextAlloc(allocator, "Zoom: {d:.03}", .{zoom}, Vec2.init(10, y), rl.Color.black);
y += 10; y += 10;
try font_face.drawTextAlloc(allocator, "Last read count per channel: {d}", .{last_read_size}, Vec2.init(20, y), rl.Color.black); try font_face.drawTextAlloc(allocator, "Dropped samples: {d:.03}", .{app.dropped_samples}, Vec2.init(10, y), rl.Color.black);
y += 10; y += 10;
try font_face.drawTextAlloc(allocator, "Time since last read: {d:.05}", .{now - last_read_at}, Vec2.init(20, y), rl.Color.black); try font_face.drawTextAlloc(allocator, "Last read size (bytes): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len)) * @sizeOf(f64)}, Vec2.init(20, y), rl.Color.black);
y += 10; y += 10;
for (1.., channels.items) |i, channel| { try font_face.drawTextAlloc(allocator, "Last read count (doubles): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len))}, Vec2.init(20, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Last read count per channel: {d}", .{app.last_read_size}, Vec2.init(20, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Time since last read: {d:.05}", .{nanoToSeconds(now_ns - app.last_read_at_ns)}, Vec2.init(20, y), rl.Color.black);
y += 10;
for (1.., app.channels.items) |i, channel| {
const sample_count = channel.samples.items.len; const sample_count = channel.samples.items.len;
y += 10; y += 10;
@ -282,30 +408,11 @@ pub fn main() !void {
try font_face.drawTextAlloc(allocator, "Sample count: {}", .{sample_count}, Vec2.init(20, y), rl.Color.black); try font_face.drawTextAlloc(allocator, "Sample count: {}", .{sample_count}, Vec2.init(20, y), rl.Color.black);
y += 10; y += 10;
try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now}, Vec2.init(20, y), rl.Color.black); try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now_since_start}, Vec2.init(20, y), rl.Color.black);
y += 10; y += 10;
} }
} }
rl.drawFPS(@as(i32, @intFromFloat(window_width)) - 100, 10); rl.drawFPS(@as(i32, @intFromFloat(window_width)) - 100, 10);
var read_buffer: [1024]f64 = undefined;
var read: i32 = 0;
const err = c.DAQmxReadAnalogF64(task_handle, -1, 0, c.DAQmx_Val_GroupByChannel, &read_buffer, @intCast(read_buffer.len), &read, null);
if (err == 0 and read > 0) {
const read_u32 = @as(u32, @intCast(read));
for (0.., channels.items) |i, *channel| {
const channel_samples = read_buffer[(i*read_u32)..((i+1)*read_u32)];
try channel.samples.appendSlice(channel_samples);
}
last_read_size = read_u32;
last_read_at = now;
} else if (err != 0 and err != c.DAQmxErrorSamplesNotYetAvailable) {
// TODO: Handle error c.DAQmxErrorSamplesNoLongerAvailable
// This error occurs, when application is not reading data fast enough and the DAQmx internal buffer fills up.
try checkDAQmxError(err);
}
} }
} }