diff --git a/.gitignore b/.gitignore index 7c046b6..c6f34c7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .zig-cache -zig-out \ No newline at end of file +zig-out +.vscode \ No newline at end of file diff --git a/build.zig b/build.zig index eeca6eb..49f795a 100644 --- a/build.zig +++ b/build.zig @@ -5,61 +5,51 @@ pub fn build(b: *std.Build) !void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); - var lib: *Module = undefined; - { - lib = b.createModule(.{ - .root_source_file = b.path("src/root.zig"), - .target = target, - .optimize = optimize, - }); + const raylib_dep = b.dependency("raylib-zig", .{ + .target = target, + .optimize = optimize, + }); - const unit_tests = b.addTest(.{ - .root_source_file = b.path("src/root.zig"), - .target = target, - .optimize = optimize, - }); + const exe = b.addExecutable(.{ + .name = "daq-view", + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + }); - const run_unit_tests = b.addRunArtifact(unit_tests); - const test_step = b.step("test", "Run unit tests"); - test_step.dependOn(&run_unit_tests.step); + exe.linkLibrary(raylib_dep.artifact("raylib")); + exe.root_module.addImport("raylib", raylib_dep.module("raylib")); + + const external_compiler_support_dir = try std.process.getEnvVarOwned(b.allocator, "NIEXTCCOMPILERSUPP"); + exe.addSystemIncludePath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "include" }) }); + + const lib_folder_name = if (target.result.ptrBitWidth() == 64) "lib64" else "lib32"; + exe.addLibraryPath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, lib_folder_name, "msvc" }) }); + + exe.linkSystemLibrary("nidaqmx"); + + if (target.result.os.tag == .windows) { + exe.subsystem = if (optimize == .Debug) .Console else .Windows; } - { - const raylib_dep = b.dependency("raylib-zig", .{ - .target = target, - .optimize = optimize, - }); - - const exe = b.addExecutable(.{ - .name = "daq-view", - .root_source_file = b.path("src/main.zig"), - .target = target, - .optimize = optimize, - }); - - exe.linkLibrary(raylib_dep.artifact("raylib")); - exe.root_module.addImport("raylib", raylib_dep.module("raylib")); - - const external_compiler_support_dir = try std.process.getEnvVarOwned(b.allocator, "NIEXTCCOMPILERSUPP"); - exe.addSystemIncludePath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, "include" }) }); - - const lib_folder_name = if (target.result.ptrBitWidth() == 64) "lib64" else "lib32"; - exe.addLibraryPath(.{ .cwd_relative = try std.fs.path.join(b.allocator, &.{ external_compiler_support_dir, lib_folder_name, "msvc" }) }); - - exe.linkSystemLibrary("nidaqmx"); - - if (target.result.os.tag == .windows) { - exe.subsystem = if (optimize == .Debug) .Console else .Windows; - } - - b.installArtifact(exe); - const run_cmd = b.addRunArtifact(exe); - run_cmd.step.dependOn(b.getInstallStep()); - if (b.args) |args| { - run_cmd.addArgs(args); - } - - const run_step = b.step("run", "Run the program"); - run_step.dependOn(&run_cmd.step); + b.installArtifact(exe); + const run_cmd = b.addRunArtifact(exe); + run_cmd.step.dependOn(b.getInstallStep()); + if (b.args) |args| { + run_cmd.addArgs(args); } + + const run_step = b.step("run", "Run the program"); + run_step.dependOn(&run_cmd.step); + + + const unit_tests = b.addTest(.{ + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + }); + + const run_unit_tests = b.addRunArtifact(unit_tests); + const test_step = b.step("test", "Run unit tests"); + test_step.dependOn(&run_unit_tests.step); } diff --git a/src/main.zig b/src/main.zig index c9b961f..bbcce98 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,11 +1,8 @@ const std = @import("std"); const builtin = @import("builtin"); const rl = @import("raylib"); -const c = @cImport({ - @cInclude("stdint.h"); - @cDefine("__int64", "long long"); - @cInclude("NIDAQmx.h"); -}); +const NIDaq = @import("ni-daq.zig"); +const TaskPool = @import("./task-pool.zig"); const windows = @cImport({ @cDefine("_WIN32_WINNT", "0x0500"); @cInclude("windows.h"); @@ -27,87 +24,6 @@ fn toTraceLogLevel(log_level: std.log.Level) rl.TraceLogLevel { }; } -fn logDAQmxError(error_code: i32) void { - if (error_code == 0) { - return; - } - - var error_msg: [512:0]u8 = .{ 0 } ** 512; - if (c.DAQmxGetErrorString(error_code, &error_msg, error_msg.len) == 0) { - log.err("DAQmx ({}): {s}\n", .{error_code, error_msg}); - } else { - log.err("DAQmx ({}): Unknown (Buffer too small for error message)\n", .{error_code}); - } -} - -fn checkDAQmxError(error_code: i32, err: anyerror) !void { - logDAQmxError(error_code); - - if (error_code < 0) { - return err; - } -} - -fn splitCommaDelimitedList(allocator: Allocator, list: []u8) ![][:0]u8 { - const name_count = std.mem.count(u8, list, ",") + 1; - var names = try std.ArrayList([:0]u8).initCapacity(allocator, name_count); - errdefer names.deinit(); - - var name_iter = std.mem.tokenizeSequence(u8, list, ", "); - while (name_iter.next()) |name| { - names.appendAssumeCapacity(try allocator.dupeZ(u8, name)); - } - - return try names.toOwnedSlice(); -} - -fn listDeviceNames(allocator: Allocator) ![][:0]u8 { - const required_size = c.DAQmxGetSysDevNames(null, 0); - if (required_size == 0) { - return try allocator.alloc([:0]u8, 0); - } - - const device_names_list = try allocator.alloc(u8, @intCast(required_size)); - defer allocator.free(device_names_list); - - try checkDAQmxError( - c.DAQmxGetSysDevNames(device_names_list.ptr, @intCast(device_names_list.len)), - error.getDeviceNames - ); - - const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable; - assert(nullbyte == required_size - 1); - - return try splitCommaDelimitedList(allocator, device_names_list[0..nullbyte]); -} - -fn listDeviceAIPhysicalChannels(allocator: Allocator, device: [:0]u8) ![][:0]u8 { - const required_size = c.DAQmxGetDevAIPhysicalChans(device, null, 0); - if (required_size == 0) { - return try allocator.alloc([:0]u8, 0); - } - - const device_names_list = try allocator.alloc(u8, @intCast(required_size)); - defer allocator.free(device_names_list); - - try checkDAQmxError( - c.DAQmxGetDevAIPhysicalChans(device, device_names_list.ptr, @intCast(device_names_list.len)), - error.getAIChannels - ); - - const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable; - assert(nullbyte == required_size - 1); - - return try splitCommaDelimitedList(allocator, device_names_list[0..nullbyte]); -} - -fn freeNameList(allocator: Allocator, names: [][:0]u8) void { - for (names) |name| { - allocator.free(name); - } - allocator.free(names); -} - fn remap(from_min: f32, from_max: f32, to_min: f32, to_max: f32, value: f32) f32 { const t = (value - from_min) / (from_max - from_min); return std.math.lerp(to_min, to_max, t); @@ -117,111 +33,40 @@ const Channel = struct { color: rl.Color, min_sample: f64, max_sample: f64, - samples: std.ArrayList(f64), - fn init(allocator: Allocator) Channel { + fn init() Channel { return Channel{ .color = rl.Color.red, .min_sample = 0, - .max_sample = 0, - .samples = std.ArrayList(f64).init(allocator) + .max_sample = 0 }; } - - fn deinit(self: Channel) void { - self.samples.deinit(); - } }; const Application = struct { allocator: Allocator, channels: std.ArrayList(Channel), + channel_samples: ?*TaskPool.ChannelSamples = null, - task_handle: c.TaskHandle = null, - read_thread: ?std.Thread = null, - read_timeout_s: f64 = 0.01, - last_read_size: u32 = 0, - last_read_at_ns: i128 = 0, - samples_mutex: std.Thread.Mutex = .{}, - dropped_samples: u32 = 0, + task_pool: TaskPool, - fn init(allocator: Allocator) Application { + fn init(allocator: Allocator, task_pool_options: TaskPool.Options) !Application { return Application{ .allocator = allocator, + .task_pool = try TaskPool.init(allocator, task_pool_options), .channels = std.ArrayList(Channel).init(allocator) }; } fn deinit(self: *Application) void { - self.stopReadingThread(); - - for (self.channels.items) |channel| { - channel.deinit(); - } self.channels.deinit(); + self.task_pool.deinit(self.allocator); } fn appendChannel(self: *Application) !*Channel { - try self.channels.append(Channel.init(self.allocator)); + try self.channels.append(Channel.init()); return &self.channels.items[self.channels.items.len-1]; } - - fn startReadingThread(self: *Application) !void { - assert(self.read_thread == null); - self.read_thread = try std.Thread.spawn(.{ .allocator = self.allocator }, readThreadMain, .{ self }); - } - - fn stopReadingThread(self: *Application) void { - if (self.read_thread) |thread| { - self.read_thread = null; - thread.join(); - } - } - - fn readChannels(self: *Application) !void { - var read_buffer: [1024]f64 = undefined; - var read: i32 = 0; - const err = c.DAQmxReadAnalogF64(self.task_handle, -1, self.read_timeout_s, c.DAQmx_Val_GroupByChannel, &read_buffer, @intCast(read_buffer.len), &read, null); - if (err == 0 and read > 0) { - const read_u32 = @as(u32, @intCast(read)); - - self.samples_mutex.lock(); - defer self.samples_mutex.unlock(); - for (0.., self.channels.items) |i, *channel| { - const channel_samples = read_buffer[(i*read_u32)..((i+1)*read_u32)]; - try channel.samples.appendSlice(channel_samples); - } - - self.last_read_size = read_u32; - self.last_read_at_ns = std.time.nanoTimestamp(); - } else if (err != 0 and err != c.DAQmxErrorSamplesNotYetAvailable) { - - if (err == c.DAQmxErrorSamplesNoLongerAvailable) { - self.dropped_samples += 1; - log.err("Dropped samples!", .{}); - } else { - try checkDAQmxError(err, error.readAnalog); - } - } - } - - fn readThreadMain(app: *Application) void { - while (app.read_thread != null) { - var is_done: c_uint = 0; - _ = c.DAQmxIsTaskDone(app.task_handle, &is_done); - if (is_done != 0) { - log.info("Detected that task is done, stopped reading", .{}); - break; - } - - app.readChannels() catch |e| { - log.err("readChannels() {}\n", .{e}); - if (@errorReturnTrace()) |trace| { - std.debug.dumpStackTrace(trace.*); - } - }; - } - } }; pub fn nanoToSeconds(ns: i128) f32 { @@ -238,59 +83,108 @@ pub fn main() !void { // _ = windows.ShowWindow(hWnd, windows.SW_SHOW); // } + // TODO: SetTraceLogCallback rl.setTraceLogLevel(toTraceLogLevel(std.log.default_level)); var gpa = std.heap.GeneralPurposeAllocator(.{}){}; const allocator = gpa.allocator(); defer _ = gpa.deinit(); - var app = Application.init(allocator); - defer app.deinit(); + var ni_daq = try NIDaq.init(allocator, .{ + .max_devices = 4, + .max_analog_inputs = 32, + .max_analog_outputs = 8, + .max_counter_outputs = 8, + .max_counter_inputs = 8, + .max_analog_input_voltage_ranges = 4, + .max_analog_output_voltage_ranges = 4 + }); + defer ni_daq.deinit(allocator); - const devices = try listDeviceNames(allocator); - defer freeNameList(allocator, devices); + const devices = try ni_daq.listDeviceNames(); + + std.debug.print("NI-DAQ version: {}\n", .{try NIDaq.version()}); std.debug.print("Devices ({}):\n", .{devices.len}); for (devices) |device| { - std.debug.print(" * '{s}'\n", .{device}); + std.debug.print(" * '{s}' ({})\n", .{device, device.len}); - const channel_names = try listDeviceAIPhysicalChannels(allocator, device); - defer freeNameList(allocator, channel_names); - for (channel_names) |channel_name| { - std.debug.print(" * '{s}'\n", .{channel_name}); + const analog_inputs = try ni_daq.listDeviceAIPhysicalChannels(device); + for (analog_inputs) |channel_name| { + std.debug.print(" * '{s}' (Analog input)\n", .{channel_name}); } + + for (try ni_daq.listDeviceAOPhysicalChannels(device)) |channel_name| { + std.debug.print(" * '{s}' (Analog output)\n", .{channel_name}); + } + + // for (try ni_daq.listDeviceCOPhysicalChannels(device)) |channel_name| { + // std.debug.print(" * '{s}' (Counter output)\n", .{channel_name}); + // } + + // for (try ni_daq.listDeviceCIPhysicalChannels(device)) |channel_name| { + // std.debug.print(" * '{s}' (Counter input)\n", .{channel_name}); + // } + } + + var app = try Application.init(allocator, .{ + .max_tasks = devices.len * 2, + .max_channels = 64 + }); + defer app.deinit(); + + for (devices) |device| { + if (try ni_daq.checkDeviceAIMeasurementType(device, .Voltage)) { + const voltage_ranges = try ni_daq.listDeviceAIVoltageRanges(device); + assert(voltage_ranges.len > 0); + + const min_sample = voltage_ranges[0].low; + const max_sample = voltage_ranges[0].high; + + for (try ni_daq.listDeviceAIPhysicalChannels(device)) |channel_name| { + var channel = try app.appendChannel(); + channel.min_sample = min_sample; + channel.max_sample = max_sample; + try app.task_pool.createAIVoltageChannel(ni_daq, .{ + .channel = channel_name, + .min_value = min_sample, + .max_value = max_sample, + }); + } + } + + // if (try ni_daq.checkDeviceAOOutputType(device, .Voltage)) { + // const voltage_ranges = try ni_daq.listDeviceAOVoltageRanges(device); + // assert(voltage_ranges.len > 0); + + // const min_sample = voltage_ranges[0].low; + // const max_sample = voltage_ranges[0].high; + + // for (try ni_daq.listDeviceAOPhysicalChannels(device)) |channel_name| { + // var channel = try app.appendChannel(); + // channel.min_sample = min_sample; + // channel.max_sample = max_sample; + // try app.task_pool.createAOVoltageChannel(ni_daq, .{ + // .channel = channel_name, + // .min_value = min_sample, + // .max_value = max_sample, + // }); + // } + // } + } + + for (0.., app.channels.items) |i, *channel| { + channel.color = rl.Color.fromHSV(@as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(app.channels.items.len)) * 360, 0.75, 0.8); } const sample_rate: f64 = 5000; + try app.task_pool.setContinousSampleRate(sample_rate); - try checkDAQmxError(c.DAQmxCreateTask("", &app.task_handle), error.createTask); - defer checkDAQmxError(c.DAQmxClearTask(app.task_handle), error.clearTask) catch unreachable; + var channel_samples = try app.task_pool.start(0.01, allocator); + defer channel_samples.deinit(); + defer app.task_pool.stop() catch @panic("stop task failed"); - for (devices) |device| { - const channel_names = try listDeviceAIPhysicalChannels(allocator, device); - defer freeNameList(allocator, channel_names); - - for (0.., channel_names) |i, channel_name| { - var channel = try app.appendChannel(); - channel.color = rl.Color.fromHSV(@as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(channel_names.len)) * 360, 0.75, 0.8); - channel.min_sample = -10.0; - channel.max_sample = 10.0; - try checkDAQmxError( - c.DAQmxCreateAIVoltageChan(app.task_handle, channel_name, "", c.DAQmx_Val_Cfg_Default, channel.min_sample, channel.max_sample, c.DAQmx_Val_Volts, null), - error.createAIVoltageChannel - ); - } - } - - try checkDAQmxError( - c.DAQmxCfgSampClkTiming(app.task_handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0), - error.configureSampleRate - ); - - try checkDAQmxError(c.DAQmxStartTask(app.task_handle), error.startTask); - defer checkDAQmxError(c.DAQmxStopTask(app.task_handle), error.stopTask) catch unreachable; - - try app.startReadingThread(); + app.channel_samples = channel_samples; rl.initWindow(800, 450, "DAQ view"); defer rl.closeWindow(); @@ -319,9 +213,9 @@ pub fn main() !void { rl.Color.gray ); - app.samples_mutex.lock(); - for (app.channels.items) |channel| { - const samples = channel.samples; + channel_samples.mutex.lock(); + for (0.., channel_samples.samples) |channel_index, samples| { + const channel = app.channels.items[channel_index]; const min_sample: f32 = @floatCast(channel.min_sample); const max_sample: f32 = @floatCast(channel.max_sample); @@ -363,7 +257,7 @@ pub fn main() !void { } } } - app.samples_mutex.unlock(); + channel_samples.mutex.unlock(); if (rl.isKeyPressedRepeat(rl.KeyboardKey.key_e) or rl.isKeyPressed(rl.KeyboardKey.key_e)) { zoom *= 1.1; @@ -374,6 +268,7 @@ pub fn main() !void { const now_ns = std.time.nanoTimestamp(); const now_since_start = nanoToSeconds(now_ns - start_time); + const now_since_samping_start = nanoToSeconds(now_ns - channel_samples.started_sampling_ns.?); { var y: f32 = 10; @@ -383,32 +278,20 @@ pub fn main() !void { try font_face.drawTextAlloc(allocator, "Zoom: {d:.03}", .{zoom}, Vec2.init(10, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Dropped samples: {d:.03}", .{app.dropped_samples}, Vec2.init(10, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Dropped samples: {d:.03}", .{app.task_pool.droppedSamples()}, Vec2.init(10, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Last read size (bytes): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len)) * @sizeOf(f64)}, Vec2.init(20, y), rl.Color.black); - y += 10; - - try font_face.drawTextAlloc(allocator, "Last read count (doubles): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len))}, Vec2.init(20, y), rl.Color.black); - y += 10; - - try font_face.drawTextAlloc(allocator, "Last read count per channel: {d}", .{app.last_read_size}, Vec2.init(20, y), rl.Color.black); - y += 10; - - try font_face.drawTextAlloc(allocator, "Time since last read: {d:.05}", .{nanoToSeconds(now_ns - app.last_read_at_ns)}, Vec2.init(20, y), rl.Color.black); - y += 10; - - for (1.., app.channels.items) |i, channel| { - const sample_count = channel.samples.items.len; + for (0..app.channels.items.len) |i| { + const sample_count = channel_samples.samples[i].items.len; y += 10; - try font_face.drawTextAlloc(allocator, "Channel {}:", .{i}, Vec2.init(10, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Channel {}:", .{i + 1}, Vec2.init(10, y), rl.Color.black); y += 10; try font_face.drawTextAlloc(allocator, "Sample count: {}", .{sample_count}, Vec2.init(20, y), rl.Color.black); y += 10; - try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now_since_start}, Vec2.init(20, y), rl.Color.black); + try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now_since_samping_start}, Vec2.init(20, y), rl.Color.black); y += 10; } } @@ -416,3 +299,7 @@ pub fn main() !void { rl.drawFPS(@as(i32, @intFromFloat(window_width)) - 100, 10); } } + +test { + _ = NIDaq; +} \ No newline at end of file diff --git a/src/ni-daq.zig b/src/ni-daq.zig new file mode 100644 index 0000000..2e19d34 --- /dev/null +++ b/src/ni-daq.zig @@ -0,0 +1,823 @@ +const std = @import("std"); +pub const c = @cImport({ + @cInclude("stdint.h"); + @cDefine("__int64", "long long"); + @cInclude("NIDAQmx.h"); +}); + +const assert = std.debug.assert; +const log = std.log.scoped(.ni_daq); + +const max_device_name_size = 255; +const max_task_name_size = 255; + +pub const BoundedDeviceName = std.BoundedArray(u8, max_device_name_size); +const StringArrayListUnmanaged = std.ArrayListUnmanaged([:0]const u8); + +const NIDaq = @This(); + +pub const TaskHandle = c.TaskHandle; + +pub const Options = struct { + max_devices: u32, + max_analog_inputs: u32, + max_analog_outputs: u32, + max_counter_outputs: u32, + max_counter_inputs: u32, + max_analog_input_voltage_ranges: u32, + max_analog_output_voltage_ranges: u32 +}; + +pub const Task = struct { + handle: TaskHandle, + + name_buffer: [max_task_name_size]u8 = undefined, + + dropped_samples: u32 = 0, + + pub fn clear(self: Task) !void { + try checkDAQmxError( + c.DAQmxClearTask(self.handle), + error.DAQmxClearTask + ); + } + + pub fn name(self: *Task) ![]const u8 { + const required_size = c.DAQmxGetTaskName(self.handle, null, 0); + assert(required_size >= 0); + + if (required_size > self.name_buffer.len) { + return error.BufferTooSmall; + } + + try checkDAQmxError( + c.DAQmxGetTaskName( + self.handle, + &self.name_buffer, + self.name_buffer.len + ), + error.DAQmxGetTaskName + ); + + return self.name_buffer[0..@intCast(required_size)]; + } + + pub fn start(self: Task) !void { + try checkDAQmxError( + c.DAQmxStartTask(self.handle), + error.DAQmxStartTask + ); + } + + pub fn stop(self: Task) !void { + try checkDAQmxError( + c.DAQmxStopTask(self.handle), + error.DAQmxStopTask + ); + } + + pub fn setContinousSampleRate(self: Task, sample_rate: f64) !void { + try checkDAQmxError( + c.DAQmxCfgSampClkTiming(self.handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0), + error.DAQmxCfgSampClkTiming + ); + } + + pub fn setFiniteSampleRate(self: Task, sample_rate: f64, samples_per_channel: u64) !void { + try checkDAQmxError( + c.DAQmxCfgSampClkTiming(self.handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_FiniteSamps, samples_per_channel), + error.DAQmxCfgSampClkTiming + ); + } + + pub const AIVoltageChannelOptions = struct { + channel: [:0]const u8, + assigned_name: [*c]const u8 = null, + terminal_config: i32 = c.DAQmx_Val_Cfg_Default, + min_value: f64, + max_value: f64, + units: i32 = c.DAQmx_Val_Volts, + custom_scale_name: [*c]const u8 = null, + }; + + pub fn createAIVoltageChannel(self: Task, options: AIVoltageChannelOptions) !void { + try checkDAQmxError( + c.DAQmxCreateAIVoltageChan( + self.handle, + options.channel, + options.assigned_name, + options.terminal_config, + options.min_value, + options.max_value, + options.units, + options.custom_scale_name + ), + error.DAQmxCreateAIVoltageChan + ); + } + + pub const AOVoltageChannelOptions = struct { + channel: [:0]const u8, + assigned_name: [*c]const u8 = null, + min_value: f64, + max_value: f64, + units: i32 = c.DAQmx_Val_Volts, + custom_scale_name: [*c]const u8 = null, + }; + + pub fn createAOVoltageChannel(self: Task, options: AOVoltageChannelOptions) !void { + try checkDAQmxError( + c.DAQmxCreateAOVoltageChan( + self.handle, + options.channel, + options.assigned_name, + options.min_value, + options.max_value, + options.units, + options.custom_scale_name + ), + error.DAQmxCreateAOVoltageChan + ); + } + + pub const ReadAnalogOptions = struct { + read_array: []f64, + timeout: f64, + + samples_per_channel: i32 = c.DAQmx_Val_Auto, + fill_mode: u32 = c.DAQmx_Val_GroupByChannel, + }; + + pub fn readAnalog(self: *Task, options: ReadAnalogOptions) !u32 { + var samples_per_channel: i32 = 0; + const err = c.DAQmxReadAnalogF64( + self.handle, + options.samples_per_channel, + options.timeout, + options.fill_mode, + options.read_array.ptr, + @intCast(options.read_array.len), + &samples_per_channel, + null + ); + + if (err == c.DAQmxErrorSamplesNoLongerAvailable) { + self.dropped_samples += 1; + log.err("Dropped samples, not reading samples fast enough.", .{}); + } else if (err < 0) { + try checkDAQmxError(err, error.DAQmxReadAnalogF64); + } + + return @intCast(samples_per_channel); + } + + pub fn isDone(self: Task) !bool { + var result: c.bool32 = 0; + try checkDAQmxError( + c.DAQmxIsTaskDone(self.handle, &result), + error.DAQmxIsTaskDone + ); + return result != 0; + } +}; + +pub const AIMeasurementType = enum(i32) { + Voltage = c.DAQmx_Val_Voltage, + VoltageRMS = c.DAQmx_Val_VoltageRMS, + Current = c.DAQmx_Val_Current, + CurrentRMS = c.DAQmx_Val_CurrentRMS, + Voltage_CustomWithExcitation = c.DAQmx_Val_Voltage_CustomWithExcitation, + Bridge = c.DAQmx_Val_Bridge, + Freq_Voltage = c.DAQmx_Val_Freq_Voltage, + Resistance = c.DAQmx_Val_Resistance, + Temp_TC = c.DAQmx_Val_Temp_TC, + Temp_Thrmstr = c.DAQmx_Val_Temp_Thrmstr, + Temp_RTD = c.DAQmx_Val_Temp_RTD, + Temp_BuiltInSensor = c.DAQmx_Val_Temp_BuiltInSensor, + Strain_Gage = c.DAQmx_Val_Strain_Gage, + Rosette_Strain_Gage = c.DAQmx_Val_Rosette_Strain_Gage, + Position_LVDT = c.DAQmx_Val_Position_LVDT, + Position_RVDT = c.DAQmx_Val_Position_RVDT, + Position_EddyCurrentProximityProbe = c.DAQmx_Val_Position_EddyCurrentProximityProbe, + Accelerometer = c.DAQmx_Val_Accelerometer, + Acceleration_Charge = c.DAQmx_Val_Acceleration_Charge, + Acceleration_4WireDCVoltage = c.DAQmx_Val_Acceleration_4WireDCVoltage, + Velocity_IEPESensor = c.DAQmx_Val_Velocity_IEPESensor, + Force_Bridge = c.DAQmx_Val_Force_Bridge, + Force_IEPESensor = c.DAQmx_Val_Force_IEPESensor, + Pressure_Bridge = c.DAQmx_Val_Pressure_Bridge, + SoundPressure_Microphone = c.DAQmx_Val_SoundPressure_Microphone, + Torque_Bridge = c.DAQmx_Val_Torque_Bridge, + TEDS_Sensor = c.DAQmx_Val_TEDS_Sensor, + Charge = c.DAQmx_Val_Charge, + Power = c.DAQmx_Val_Power, + _ +}; +const AIMeasurementTypeList = std.BoundedArray(AIMeasurementType, @typeInfo(AIMeasurementType).Enum.fields.len); + +pub const AOOutputType = enum(i32) { + Voltage = c.DAQmx_Val_Voltage, + Current = c.DAQmx_Val_Current, + FuncGen = c.DAQmx_Val_FuncGen, + _ +}; +const AOOutputTypeList = std.BoundedArray(AOOutputType, @typeInfo(AOOutputType).Enum.fields.len); + +pub const Range = packed struct { + low: f64, + high: f64 +}; + +pub const ProductCategory = enum(i32) { + MSeriesDAQ = c.DAQmx_Val_MSeriesDAQ, + XSeriesDAQ = c.DAQmx_Val_XSeriesDAQ, + ESeriesDAQ = c.DAQmx_Val_ESeriesDAQ, + SSeriesDAQ = c.DAQmx_Val_SSeriesDAQ, + BSeriesDAQ = c.DAQmx_Val_BSeriesDAQ, + SCSeriesDAQ = c.DAQmx_Val_SCSeriesDAQ, + USBDAQ = c.DAQmx_Val_USBDAQ, + AOSeries = c.DAQmx_Val_AOSeries, + DigitalIO = c.DAQmx_Val_DigitalIO, + TIOSeries = c.DAQmx_Val_TIOSeries, + DynamicSignalAcquisition = c.DAQmx_Val_DynamicSignalAcquisition, + Switches = c.DAQmx_Val_Switches, + CompactDAQChassis = c.DAQmx_Val_CompactDAQChassis, + CompactRIOChassis = c.DAQmx_Val_CompactRIOChassis, + CSeriesModule = c.DAQmx_Val_CSeriesModule, + SCXIModule = c.DAQmx_Val_SCXIModule, + SCCConnectorBlock = c.DAQmx_Val_SCCConnectorBlock, + SCCModule = c.DAQmx_Val_SCCModule, + NIELVIS = c.DAQmx_Val_NIELVIS, + NetworkDAQ = c.DAQmx_Val_NetworkDAQ, + SCExpress = c.DAQmx_Val_SCExpress, + FieldDAQ = c.DAQmx_Val_FieldDAQ, + TestScaleChassis = c.DAQmx_Val_TestScaleChassis, + TestScaleModule = c.DAQmx_Val_TestScaleModule, + mioDAQ = c.DAQmx_Val_mioDAQ, + Unknown = c.DAQmx_Val_Unknown, + _, + + pub fn hasMultideviceTaskSupport(self: ProductCategory) bool { + return switch (self) { + .CSeriesModule, + .FieldDAQ, + .SSeriesDAQ, + .DynamicSignalAcquisition, + .SCExpress, + .XSeriesDAQ => true, + else => false + }; + } +}; + +const DeviceBuffers = struct { + const ChannelNames = struct { + buffer: []u8, + array_list: StringArrayListUnmanaged, + + fn init(allocator: std.mem.Allocator, capacity: u32) !ChannelNames { + const max_channel_name_size = count: { + var count: u32 = 0; + count += max_device_name_size; + count += 1; // '/' + count += 2; // 'ai' or 'ao' or 'co' or 'ci' + count += std.math.log10_int(capacity) + 1; + + break :count count; + }; + + const buffer_size = capacity * (max_channel_name_size + 2); + + const buffer = try allocator.alloc(u8, buffer_size); + errdefer allocator.free(buffer); + + var array_list = try StringArrayListUnmanaged.initCapacity(allocator, capacity); + errdefer array_list.deinit(allocator); + + return ChannelNames{ + .buffer = buffer, + .array_list = array_list + }; + } + + fn deinit(self: *ChannelNames, allocator: std.mem.Allocator) void { + allocator.free(self.buffer); + self.array_list.deinit(allocator); + } + + fn clear(self: *ChannelNames) void { + self.array_list.clearRetainingCapacity(); + } + }; + + analog_input_names: ChannelNames, + analog_output_names: ChannelNames, + counter_output_names: ChannelNames, + counter_input_names: ChannelNames, + // TODO: Digital input + // TODO: Digital output + + analog_input_voltage_ranges: std.ArrayListUnmanaged(Range), + analog_output_voltage_ranges: std.ArrayListUnmanaged(Range), + + fn init(allocator: std.mem.Allocator, options: Options) !DeviceBuffers { + var analog_input_names = try ChannelNames.init(allocator, options.max_analog_inputs); + errdefer analog_input_names.deinit(allocator); + + var analog_output_names = try ChannelNames.init(allocator, options.max_analog_outputs); + errdefer analog_output_names.deinit(allocator); + + var counter_output_names = try ChannelNames.init(allocator, options.max_counter_outputs); + errdefer counter_output_names.deinit(allocator); + + var counter_input_names = try ChannelNames.init(allocator, options.max_counter_inputs); + errdefer counter_input_names.deinit(allocator); + + var analog_input_voltage_ranges = try std.ArrayListUnmanaged(Range).initCapacity(allocator, options.max_analog_input_voltage_ranges); + errdefer analog_input_voltage_ranges.deinit(allocator); + + var analog_output_voltage_ranges = try std.ArrayListUnmanaged(Range).initCapacity(allocator, options.max_analog_output_voltage_ranges); + errdefer analog_output_voltage_ranges.deinit(allocator); + + return DeviceBuffers{ + .analog_input_names = analog_input_names, + .analog_output_names = analog_output_names, + .counter_output_names = counter_output_names, + .counter_input_names = counter_input_names, + .analog_input_voltage_ranges = analog_input_voltage_ranges, + .analog_output_voltage_ranges = analog_output_voltage_ranges + }; + } + + fn deinit(self: *DeviceBuffers, allocator: std.mem.Allocator) void { + self.analog_input_names.deinit(allocator); + self.analog_output_names.deinit(allocator); + self.counter_input_names.deinit(allocator); + self.counter_output_names.deinit(allocator); + self.analog_input_voltage_ranges.deinit(allocator); + self.analog_output_voltage_ranges.deinit(allocator); + } + + fn clear(self: *DeviceBuffers) void { + self.analog_input_names.clear(); + self.analog_output_names.clear(); + self.counter_input_names.clear(); + self.counter_output_names.clear(); + self.analog_input_voltage_ranges.clearRetainingCapacity(); + self.analog_output_voltage_ranges.clearRetainingCapacity(); + } +}; + +device_names_buffer: []u8, +device_names: StringArrayListUnmanaged, + +device_buffers: []DeviceBuffers, + +pub fn init(allocator: std.mem.Allocator, options: Options) !NIDaq { + const device_names_buffer_size = options.max_devices * (max_device_name_size + 2); + const device_names_buffer = try allocator.alloc(u8, device_names_buffer_size); + errdefer allocator.free(device_names_buffer); + + var device_names = try StringArrayListUnmanaged.initCapacity(allocator, options.max_devices); + errdefer device_names.deinit(allocator); + + const device_buffers = try allocator.alloc(DeviceBuffers, options.max_devices); + errdefer allocator.free(device_buffers); + + for (device_buffers) |*buffers| { + // TODO: Handle clean up + buffers.* = try DeviceBuffers.init(allocator, options); + } + + return NIDaq{ + .device_names_buffer = device_names_buffer, + .device_names = device_names, + .device_buffers = device_buffers + }; +} + +pub fn deinit(self: *NIDaq, allocator: std.mem.Allocator) void { + self.device_names.deinit(allocator); + allocator.free(self.device_names_buffer); + + for (self.device_buffers) |*buffers| { + buffers.deinit(allocator); + } + allocator.free(self.device_buffers); +} + +fn maxIndexStringLength(count: u32) u32 { + if (count == 0) { + return 0; + } else if (count == 1) { + return 1; + } else { + return std.math.log10_int(count - 1) + 1; + } +} + +test { + try std.testing.expectEqual(0, maxIndexStringLength(0)); + try std.testing.expectEqual(1, maxIndexStringLength(1)); + try std.testing.expectEqual(1, maxIndexStringLength(10)); + try std.testing.expectEqual(2, maxIndexStringLength(11)); + try std.testing.expectEqual(2, maxIndexStringLength(100)); + try std.testing.expectEqual(3, maxIndexStringLength(101)); +} + +pub fn logDAQmxError(error_code: i32) void { + if (error_code == 0) { + return; + } + + var msg: [512:0]u8 = .{ 0 } ** 512; + if (c.DAQmxGetErrorString(error_code, &msg, msg.len) == 0) { + if (error_code < 0) { + log.err("DAQmx ({}): {s}", .{error_code, msg}); + } else if (!std.mem.startsWith(u8, &msg, "Error code could not be found.")) { + // Ignore positive error codes if it could not be found. + // This commonly happens when trying to preallocate bytes for buffer and it returns a positive number. + log.warn("DAQmx ({}): {s}", .{error_code, msg}); + } + } else { + log.err("DAQmx ({}): Unknown (Buffer too small for message)", .{error_code}); + } +} + +pub fn checkDAQmxError(error_code: i32, err: anyerror) !void { + logDAQmxError(error_code); + + if (error_code < 0) { + return err; + } +} + +fn splitCommaDelimitedList(array_list: *std.ArrayListUnmanaged([:0]const u8), buffer: []u8) !void { + const count = std.mem.count(u8, buffer, ",") + 1; + if (count > array_list.capacity) { + return error.TooManyItems; + } + + array_list.clearRetainingCapacity(); + + var name_iter = std.mem.tokenizeAny(u8, buffer, &.{ ',', ' ', 0 }); + while (name_iter.next()) |name| { + buffer[name_iter.index] = 0; + + const name_z: [:0]const u8 = @ptrCast(name); + assert(name_z[name.len] == 0); + + array_list.appendAssumeCapacity(name_z); + } +} + +pub fn version() !std.SemanticVersion { + var major: u32 = 0; + try checkDAQmxError( + c.DAQmxGetSysNIDAQMajorVersion(&major), + error.GetMajorVersion + ); + + var minor: u32 = 0; + try checkDAQmxError( + c.DAQmxGetSysNIDAQMinorVersion(&minor), + error.GetMinorVersion + ); + + var update: u32 = 0; + try checkDAQmxError( + c.DAQmxGetSysNIDAQUpdateVersion(&update), + error.GetUpdateVersion + ); + + return std.SemanticVersion{ + .major = major, + .minor = minor, + .patch = update + }; +} + +pub fn listDeviceNames(self: *NIDaq) ![]const [:0]const u8 { + self.device_names.clearRetainingCapacity(); + self.clearAllDeviceBuffers(); + + const required_size = c.DAQmxGetSysDevNames(null, 0); + if (required_size == 0) { + return self.device_names.items; + } + + if (required_size > self.device_names_buffer.len) { + return error.BufferTooSmall; + } + + try checkDAQmxError( + c.DAQmxGetSysDevNames(self.device_names_buffer.ptr, @intCast(self.device_names_buffer.len)), + error.GetDeviceNames + ); + + const nullbyte = std.mem.indexOfScalar(u8, self.device_names_buffer, 0) orelse unreachable; + assert(nullbyte == required_size - 1); + + try splitCommaDelimitedList(&self.device_names, self.device_names_buffer[0..@intCast(required_size)]); + + return self.device_names.items; +} + +fn getDeviceBuffers(self: *NIDaq, device: [:0]const u8) !*DeviceBuffers { + if (self.device_names.capacity == 0) { + return error.DeviceNotFound; + } + + if (self.device_names.items.len == 0) { + _ = try self.listDeviceNames(); + } + + for (0.., self.device_names.items) |i, device_name| { + if (std.mem.eql(u8, device, device_name)) { + return &self.device_buffers[i]; + } + } + + return error.DeviceNotFound; +} + +fn clearAllDeviceBuffers(self: *NIDaq) void { + for (self.device_buffers) |*buffers| { + buffers.clear(); + } +} + +fn listDevicePhysicalChannels( + getPhysicalChannels: anytype, + device: [:0]const u8, + channel_names: *DeviceBuffers.ChannelNames, +) ![]const [:0]const u8 { + const buffer = channel_names.buffer; + const array_list = &channel_names.array_list; + + array_list.clearRetainingCapacity(); + + const required_size = getPhysicalChannels(device, null, 0); + try checkDAQmxError(required_size, error.GetPhysicalChannels); + if (required_size == 0) { + return array_list.items; + } + + if (required_size > buffer.len) { + return error.BufferTooSmall; + } + + try checkDAQmxError( + getPhysicalChannels(device, buffer.ptr, @intCast(buffer.len)), + error.GetPhysicalChannels + ); + + const nullbyte = std.mem.indexOfScalar(u8, buffer, 0) orelse unreachable; + assert(nullbyte == required_size - 1); + + try splitCommaDelimitedList(array_list, buffer[0..@intCast(required_size)]); + + return array_list.items; +} + +pub fn listDeviceAIPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 { + var device_buffers = try self.getDeviceBuffers(device); + + return listDevicePhysicalChannels( + c.DAQmxGetDevAIPhysicalChans, + device, + &device_buffers.analog_input_names + ); +} + +pub fn listDeviceAOPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 { + var device_buffers = try self.getDeviceBuffers(device); + + return listDevicePhysicalChannels( + c.DAQmxGetDevAOPhysicalChans, + device, + &device_buffers.analog_output_names + ); +} + +pub fn listDeviceCOPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 { + var device_buffers = try self.getDeviceBuffers(device); + + return listDevicePhysicalChannels( + c.DAQmxGetDevCOPhysicalChans, + device, + &device_buffers.counter_output_names + ); +} + +pub fn listDeviceCIPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 { + var device_buffers = try self.getDeviceBuffers(device); + + return listDevicePhysicalChannels( + c.DAQmxGetDevCIPhysicalChans, + device, + &device_buffers.counter_input_names + ); +} + +const ChannelType = enum { + analog_input, + analog_output, + counter_input, + counter_output, +}; + +fn getChannelType(device: [:0]const u8) ?ChannelType { + const slash = std.mem.indexOfScalar(u8, device, '/') orelse return null; + + const afterSlash = device[(slash+1)..]; + if (std.mem.startsWith(u8, afterSlash, "ai")) { + return ChannelType.analog_input; + } else if (std.mem.startsWith(u8, afterSlash, "ao")) { + return ChannelType.analog_output; + } else if (std.mem.startsWith(u8, afterSlash, "ci")) { + return ChannelType.counter_input; + } else if (std.mem.startsWith(u8, afterSlash, "co")) { + return ChannelType.counter_output; + } else { + return null; + } +} + +pub fn getMinSampleRate(self: NIDaq, channel: [:0]const u8) !f64 { + _ = self; + + var result: f64 = 0; + + const channel_type = getChannelType(channel) orelse return error.UnknownChannelType; + switch (channel_type) { + .analog_input => { + try checkDAQmxError( + c.DAQmxGetDevAIMinRate(channel, &result), + error.DAQmxGetDevAIMinRate + ); + }, + .analog_output => { + try checkDAQmxError( + c.DAQmxGetDevAOMinRate(channel, &result), + error.DAQmxGetDevAOMinRate + ); + }, + .counter_output, + .counter_input => { + return error.UnsupportedChannelType; + }, + } + + return result; +} + +pub fn getMaxSampleRate(self: NIDaq, channel: [:0]const u8) !f64 { + _ = self; + + var result: f64 = 0; + + const channel_type = getChannelType(channel) orelse return error.UnknownChannelType; + switch (channel_type) { + .analog_input => { + try checkDAQmxError( + c.DAQmxGetDevAIMaxSingleChanRate(channel, &result), + error.DAQmxGetDevAIMaxSingleChanRate + ); + }, + .analog_output => { + try checkDAQmxError( + c.DAQmxGetDevAOMaxRate(channel, &result), + error.DAQmxGetDevAOMaxRate + ); + }, + .counter_output, + .counter_input => { + return error.UnsupportedChannelType; + }, + } + + return result; +} + +fn listDeviceVoltageRanges( + getVoltageRanges: anytype, + device: [:0]const u8, + voltage_ranges: *std.ArrayListUnmanaged(Range) +) ![]Range { + voltage_ranges.clearRetainingCapacity(); + + const count = getVoltageRanges(device, null, 0); + try checkDAQmxError(count, error.GetVoltageRanges); + if (count == 0) { + return voltage_ranges.items; + } + + const buffer: [*]f64 = @ptrCast(voltage_ranges.allocatedSlice().ptr); + const buffer_len = voltage_ranges.capacity * 2; + + assert(@as(c_uint, @intCast(count)) % 2 == 0); + if (count > buffer_len) { + return error.BufferTooSmall; + } + + try checkDAQmxError( + getVoltageRanges(device, buffer, @intCast(buffer_len)), + error.GetVoltageRanges + ); + + voltage_ranges.items.len = @intCast(@divExact(count, 2)); + + return voltage_ranges.items; +} + +pub fn listDeviceAIVoltageRanges(self: *NIDaq, device: [:0]const u8) ![]Range { + var device_buffers = try self.getDeviceBuffers(device); + + return listDeviceVoltageRanges( + c.DAQmxGetDevAIVoltageRngs, + device, + &device_buffers.analog_input_voltage_ranges + ); +} + +pub fn listDeviceAOVoltageRanges(self: *NIDaq, device: [:0]const u8) ![]Range { + var device_buffers = try self.getDeviceBuffers(device); + + return listDeviceVoltageRanges( + c.DAQmxGetDevAOVoltageRngs, + device, + &device_buffers.analog_output_voltage_ranges + ); +} + +pub fn createTask(self: NIDaq, name: ?[:0]const u8) !Task { + _ = self; + + var handle: TaskHandle = undefined; + + try checkDAQmxError( + c.DAQmxCreateTask(name orelse "", &handle), + error.DAQmxCreateTask + ); + + return Task{ .handle = handle }; +} + +pub fn listDeviceAIMeasurementTypes(self: NIDaq, device: [:0]const u8) !AIMeasurementTypeList { + var result = AIMeasurementTypeList.init(0) catch unreachable; + _ = self; + + const count = c.DAQmxGetDevAISupportedMeasTypes(device, null, 0); + try checkDAQmxError(count, error.DAQmxGetDevAISupportedMeasTypes); + assert(count <= result.buffer.len); + + try checkDAQmxError( + c.DAQmxGetDevAISupportedMeasTypes(device, @as([*c]c_int, @ptrCast(&result.buffer)), result.buffer.len), + error.DAQmxGetDevAISupportedMeasTypes + ); + + result.len = @intCast(count); + + return result; +} + +pub fn checkDeviceAIMeasurementType(self: NIDaq, device: [:0]const u8, measurement_type: AIMeasurementType) !bool { + var measurement_types = try self.listDeviceAIMeasurementTypes(device); + return std.mem.indexOfScalar(AIMeasurementType, measurement_types.slice(), measurement_type) != null; +} + +pub fn listDeviceAOOutputTypes(self: NIDaq, device: [:0]const u8) !AOOutputTypeList { + var result = AOOutputTypeList.init(0) catch unreachable; + _ = self; + + const count = c.DAQmxGetDevAOSupportedOutputTypes(device, null, 0); + try checkDAQmxError(count, error.DAQmxGetDevAOSupportedOutputTypes); + assert(count <= result.buffer.len); + + try checkDAQmxError( + c.DAQmxGetDevAOSupportedOutputTypes(device, @as([*c]c_int, @ptrCast(&result.buffer)), result.buffer.len), + error.DAQmxGetDevAOSupportedOutputTypes + ); + + result.len = @intCast(count); + + return result; +} + +pub fn checkDeviceAOOutputType(self: NIDaq, device: [:0]const u8, output_type: AOOutputType) !bool { + var output_types = try self.listDeviceAOOutputTypes(device); + return std.mem.indexOfScalar(AOOutputType, output_types.slice(), output_type) != null; +} + +pub fn getDeviceProductCategory(self: NIDaq, device: [:0]const u8) !ProductCategory { + _ = self; + + var product_category = ProductCategory.Unknown; + try checkDAQmxError( + c.DAQmxGetDevProductCategory(device, @ptrCast(&product_category)), + error.DAQmxGetDevProductCategory + ); + + return product_category; +} \ No newline at end of file diff --git a/src/root.zig b/src/root.zig deleted file mode 100644 index ecfeade..0000000 --- a/src/root.zig +++ /dev/null @@ -1,10 +0,0 @@ -const std = @import("std"); -const testing = std.testing; - -export fn add(a: i32, b: i32) i32 { - return a + b; -} - -test "basic add functionality" { - try testing.expect(add(3, 7) == 10); -} diff --git a/src/task-pool.zig b/src/task-pool.zig new file mode 100644 index 0000000..fbbc9cd --- /dev/null +++ b/src/task-pool.zig @@ -0,0 +1,351 @@ +const std = @import("std"); +const NIDaq = @import("./ni-daq.zig"); + +const TaskPool = @This(); + +const assert = std.debug.assert; +const log = std.log.scoped(.task_pool); + +const ChannelType = enum { analog_input, analog_output }; + +const Entry = struct { + device: NIDaq.BoundedDeviceName, + channel_type: ChannelType, + task: NIDaq.Task, + channel_order: std.ArrayListUnmanaged(usize) +}; + +channel_count: usize = 0, +max_channel_count: usize, +entries: std.ArrayListUnmanaged(Entry), + +read_thread: ?std.Thread = null, +thread_running: bool = false, +sampling: ?union(enum) { + finite: struct { + sample_rate: f64, + samples_per_channel: u64 + }, + continous: struct { + sample_rate: f64 + } +} = null, + +pub const ChannelSamples = struct { + allocator: std.mem.Allocator, + mutex: std.Thread.Mutex = .{}, + read_arrays_by_task: [][]f64, + samples: []std.ArrayList(f64), + started_sampling_ns: ?i128 = null, + stopped_sampling_ns: ?i128 = null, + + pub fn deinit(self: *ChannelSamples) void { + for (self.read_arrays_by_task) |read_arrays| { + self.allocator.free(read_arrays); + } + self.allocator.free(self.read_arrays_by_task); + + for (self.samples) |samples_per_channel| { + samples_per_channel.deinit(); + } + self.allocator.free(self.samples); + + self.allocator.destroy(self); + } +}; + +pub const Options = struct { + max_tasks: usize, + max_channels: usize +}; + +pub fn init(allocator: std.mem.Allocator, options: Options) !TaskPool { + var entries = try std.ArrayListUnmanaged(Entry).initCapacity(allocator, options.max_tasks); + errdefer entries.deinit(allocator); + + for (entries.allocatedSlice()) |*entry| { + // TODO: .deinit() on failure + entry.channel_order = try std.ArrayListUnmanaged(usize).initCapacity(allocator, options.max_channels); + } + + return TaskPool{ + .entries = entries, + .max_channel_count = options.max_channels + }; +} + +pub fn deinit(self: *TaskPool, allocator: std.mem.Allocator) void { + if (self.read_thread != null) { + self.stop() catch @panic("Failed to stop task"); + } + + for (self.entries.items) |e| { + e.task.clear() catch @panic("Failed to clear task"); + } + for (self.entries.allocatedSlice()) |*e| { + e.channel_order.deinit(allocator); + } + self.entries.deinit(allocator); +} + +pub fn setContinousSampleRate(self: *TaskPool, sample_rate: f64) !void { + assert(self.read_thread == null); + + for (self.entries.items) |e| { + try e.task.setContinousSampleRate(sample_rate); + } + + self.sampling = .{ + .continous = .{ + .sample_rate = sample_rate + } + }; +} + +pub fn setFiniteSampleRate(self: *TaskPool, sample_rate: f64, samples_per_channel: u64) !void { + assert(self.read_thread == null); + + for (self.entries.items) |e| { + try e.task.setFiniteSampleRate(sample_rate, samples_per_channel); + } + + self.sampling = .{ + .finite = .{ + .sample_rate = sample_rate, + .samples_per_channel = samples_per_channel + } + }; +} + +pub fn start(self: *TaskPool, read_timeout: f64, allocator: std.mem.Allocator) !*ChannelSamples { + assert(self.read_thread == null); + + var channel_samples = try self.createChannelSamples(allocator); + errdefer channel_samples.deinit(); + + for (self.entries.items) |e| { + try e.task.start(); + } + + self.thread_running = true; + var read_thread = try std.Thread.spawn( + .{ .allocator = allocator }, + readThreadCallback, + .{ self, read_timeout, channel_samples } + ); + errdefer read_thread.join(); + + self.read_thread = read_thread; + + return channel_samples; +} + +pub fn stop(self: *TaskPool) !void { + assert(self.read_thread != null); + + for (self.entries.items) |e| { + try e.task.stop(); + } + + self.thread_running = false; + self.read_thread.?.join(); + self.read_thread = null; +} + +fn getDeviceFromChannel(channel: [:0]const u8) ?[]const u8 { + const slash = std.mem.indexOfScalar(u8, channel, '/') orelse return null; + return channel[0..slash]; +} + +fn getOrPutTask(self: *TaskPool, ni_daq: NIDaq, device: []const u8, channel_type: ChannelType) !*Entry { + for (self.entries.items) |*entry| { + const entry_device = entry.device.slice(); + if (entry.channel_type == channel_type and std.mem.eql(u8, entry_device, device)) { + return entry; + } + } + + if (self.entries.items.len == self.entries.capacity) { + return error.TaskLimitReached; + } + + const task = try ni_daq.createTask(null); + errdefer task.clear() catch {}; + + var entry = self.entries.addOneAssumeCapacity(); + entry.channel_type = channel_type; + entry.device = try NIDaq.BoundedDeviceName.fromSlice(device); + entry.task = task; + + return entry; +} + +pub fn createAIVoltageChannel(self: *TaskPool, ni_daq: NIDaq, options: NIDaq.Task.AIVoltageChannelOptions) !void { + assert(self.read_thread == null); + + const device = getDeviceFromChannel(options.channel) orelse return error.UnknownDevice; + + var entry = try self.getOrPutTask(ni_daq, device, .analog_input); + if (entry.channel_order.items.len == entry.channel_order.capacity) { + return error.MaxChannelsLimitReached; + } + + try entry.task.createAIVoltageChannel(options); + entry.channel_order.appendAssumeCapacity(self.channel_count); + self.channel_count += 1; +} + +pub fn createAOVoltageChannel(self: *TaskPool, ni_daq: NIDaq, options: NIDaq.Task.AOVoltageChannelOptions) !void { + assert(self.read_thread == null); + + const device = getDeviceFromChannel(options.channel) orelse return error.UnknownDevice; + + var entry = try self.getOrPutTask(ni_daq, device, .analog_output); + if (entry.channel_order.items.len == entry.channel_order.capacity) { + return error.MaxChannelsLimitReached; + } + + try entry.task.createAOVoltageChannel(options); + entry.channel_order.appendAssumeCapacity(self.channel_count); + self.channel_count += 1; +} + +pub fn createChannelSamples(self: TaskPool, allocator: std.mem.Allocator) !*ChannelSamples { + assert(self.channel_count > 0); + assert(self.sampling != null); + + var read_arrays_by_task = try allocator.alloc([]f64, self.entries.items.len); + errdefer allocator.free(read_arrays_by_task); + + const sampling = self.sampling.?; + const array_size_per_channel: usize = switch (sampling) { + // TODO: For now reserve 1s worth of samples per channel, maybe this should be configurable? + // Maybe it should be proportional to timeout? + .continous => |args| @intFromFloat(@ceil(args.sample_rate)), + .finite => |args| args.samples_per_channel, + }; + + for (0.., self.entries.items) |i, entry| { + const channel_count = entry.channel_order.items.len; + // TODO: Add allocator.free on failure + read_arrays_by_task[i] = try allocator.alloc(f64, array_size_per_channel * channel_count); + } + + const samples = try allocator.alloc(std.ArrayList(f64), self.channel_count); + errdefer allocator.free(samples); + + if (sampling == .finite) { + for (samples) |*samples_per_channel| { + // TODO: Add .deinit() on failure + samples_per_channel.* = try std.ArrayList(f64).initCapacity(allocator, sampling.finite.samples_per_channel); + } + } else { + for (samples) |*samples_per_channel| { + // TODO: Maybe it would be good to reserve a large amount of space for samples? + // Even if it is continous. Maybe use ringbuffer? + + // TODO: Add .deinit() on failure + samples_per_channel.* = std.ArrayList(f64).init(allocator); + } + } + + const channel_samples = try allocator.create(ChannelSamples); + errdefer allocator.destroy(channel_samples); + + channel_samples.* = ChannelSamples{ + .allocator = allocator, + .read_arrays_by_task = read_arrays_by_task, + .samples = samples + }; + + return channel_samples; +} + +pub fn readAnalog(self: *TaskPool, timeout: f64, samples: *ChannelSamples) !void { + assert(self.read_thread != null); + assert(self.channel_count > 0); + + samples.mutex.lock(); + defer samples.mutex.unlock(); + + for (0.., self.entries.items) |i, *entry| { + const read_array = samples.read_arrays_by_task[i]; + const samples_per_channel = try entry.task.readAnalog(.{ + .read_array = read_array, + .timeout = timeout + }); + + if (samples_per_channel == 0) continue; + + const channel_count = entry.channel_order.items.len; + const read_array_used = samples_per_channel * channel_count; + + var channel_index_of_task: usize = 0; + var samples_window = std.mem.window(f64, read_array[0..read_array_used], samples_per_channel, samples_per_channel); + while (samples_window.next()) |channel_samples| : (channel_index_of_task += 1) { + const channel_index: usize = entry.channel_order.items[channel_index_of_task]; + + // TODO: Maybe use .appendSliceAssumeCapacity(), when doing finite sampling? + try samples.samples[channel_index].appendSlice(channel_samples); + } + } +} + +pub fn isDone(self: TaskPool) !bool { + for (self.entries.items) |entry| { + const is_done = try entry.task.isDone(); + if (!is_done) { + return false; + } + } + + return true; +} + +pub fn droppedSamples(self: TaskPool) u32 { + var sum: u32 = 0; + for (self.entries.items) |entry| { + sum += entry.task.dropped_samples; + } + return sum; +} + +fn readThreadCallback(task_pool: *TaskPool, timeout: f64, channel_samples: *ChannelSamples) void { + defer task_pool.thread_running = false; + + channel_samples.started_sampling_ns = std.time.nanoTimestamp(); + defer channel_samples.stopped_sampling_ns = std.time.nanoTimestamp(); + + var error_count: u32 = 0; + const max_error_count = 3; + while (error_count < max_error_count and task_pool.thread_running) { + const is_done = task_pool.isDone() catch |e| { + error_count += 1; + + log.err(".isDone() failed in thread: {}", .{e}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + + continue; + }; + if (is_done) { + break; + } + + task_pool.readAnalog(timeout, channel_samples) catch |e| { + error_count += 1; + + log.err(".readAnalog() failed in thread: {}", .{e}); + if (@errorReturnTrace()) |trace| { + std.debug.dumpStackTrace(trace.*); + } + + continue; + }; + } + + if (max_error_count == error_count) { + log.err("Stopping read thread, too many errors occured", .{}); + } +} \ No newline at end of file