add task-pool.zig

This commit is contained in:
Rokas Puzonas 2024-11-24 14:38:43 +02:00
parent 9a83bcfbb5
commit e6877ef2e4
6 changed files with 1327 additions and 285 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
.zig-cache
zig-out
.vscode

View File

@ -5,26 +5,6 @@ pub fn build(b: *std.Build) !void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});
var lib: *Module = undefined;
{
lib = b.createModule(.{
.root_source_file = b.path("src/root.zig"),
.target = target,
.optimize = optimize,
});
const unit_tests = b.addTest(.{
.root_source_file = b.path("src/root.zig"),
.target = target,
.optimize = optimize,
});
const run_unit_tests = b.addRunArtifact(unit_tests);
const test_step = b.step("test", "Run unit tests");
test_step.dependOn(&run_unit_tests.step);
}
{
const raylib_dep = b.dependency("raylib-zig", .{
.target = target,
.optimize = optimize,
@ -61,5 +41,15 @@ pub fn build(b: *std.Build) !void {
const run_step = b.step("run", "Run the program");
run_step.dependOn(&run_cmd.step);
}
const unit_tests = b.addTest(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});
const run_unit_tests = b.addRunArtifact(unit_tests);
const test_step = b.step("test", "Run unit tests");
test_step.dependOn(&run_unit_tests.step);
}

View File

@ -1,11 +1,8 @@
const std = @import("std");
const builtin = @import("builtin");
const rl = @import("raylib");
const c = @cImport({
@cInclude("stdint.h");
@cDefine("__int64", "long long");
@cInclude("NIDAQmx.h");
});
const NIDaq = @import("ni-daq.zig");
const TaskPool = @import("./task-pool.zig");
const windows = @cImport({
@cDefine("_WIN32_WINNT", "0x0500");
@cInclude("windows.h");
@ -27,87 +24,6 @@ fn toTraceLogLevel(log_level: std.log.Level) rl.TraceLogLevel {
};
}
fn logDAQmxError(error_code: i32) void {
if (error_code == 0) {
return;
}
var error_msg: [512:0]u8 = .{ 0 } ** 512;
if (c.DAQmxGetErrorString(error_code, &error_msg, error_msg.len) == 0) {
log.err("DAQmx ({}): {s}\n", .{error_code, error_msg});
} else {
log.err("DAQmx ({}): Unknown (Buffer too small for error message)\n", .{error_code});
}
}
fn checkDAQmxError(error_code: i32, err: anyerror) !void {
logDAQmxError(error_code);
if (error_code < 0) {
return err;
}
}
fn splitCommaDelimitedList(allocator: Allocator, list: []u8) ![][:0]u8 {
const name_count = std.mem.count(u8, list, ",") + 1;
var names = try std.ArrayList([:0]u8).initCapacity(allocator, name_count);
errdefer names.deinit();
var name_iter = std.mem.tokenizeSequence(u8, list, ", ");
while (name_iter.next()) |name| {
names.appendAssumeCapacity(try allocator.dupeZ(u8, name));
}
return try names.toOwnedSlice();
}
fn listDeviceNames(allocator: Allocator) ![][:0]u8 {
const required_size = c.DAQmxGetSysDevNames(null, 0);
if (required_size == 0) {
return try allocator.alloc([:0]u8, 0);
}
const device_names_list = try allocator.alloc(u8, @intCast(required_size));
defer allocator.free(device_names_list);
try checkDAQmxError(
c.DAQmxGetSysDevNames(device_names_list.ptr, @intCast(device_names_list.len)),
error.getDeviceNames
);
const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable;
assert(nullbyte == required_size - 1);
return try splitCommaDelimitedList(allocator, device_names_list[0..nullbyte]);
}
fn listDeviceAIPhysicalChannels(allocator: Allocator, device: [:0]u8) ![][:0]u8 {
const required_size = c.DAQmxGetDevAIPhysicalChans(device, null, 0);
if (required_size == 0) {
return try allocator.alloc([:0]u8, 0);
}
const device_names_list = try allocator.alloc(u8, @intCast(required_size));
defer allocator.free(device_names_list);
try checkDAQmxError(
c.DAQmxGetDevAIPhysicalChans(device, device_names_list.ptr, @intCast(device_names_list.len)),
error.getAIChannels
);
const nullbyte = std.mem.indexOfScalar(u8, device_names_list, 0) orelse unreachable;
assert(nullbyte == required_size - 1);
return try splitCommaDelimitedList(allocator, device_names_list[0..nullbyte]);
}
fn freeNameList(allocator: Allocator, names: [][:0]u8) void {
for (names) |name| {
allocator.free(name);
}
allocator.free(names);
}
fn remap(from_min: f32, from_max: f32, to_min: f32, to_max: f32, value: f32) f32 {
const t = (value - from_min) / (from_max - from_min);
return std.math.lerp(to_min, to_max, t);
@ -117,111 +33,40 @@ const Channel = struct {
color: rl.Color,
min_sample: f64,
max_sample: f64,
samples: std.ArrayList(f64),
fn init(allocator: Allocator) Channel {
fn init() Channel {
return Channel{
.color = rl.Color.red,
.min_sample = 0,
.max_sample = 0,
.samples = std.ArrayList(f64).init(allocator)
.max_sample = 0
};
}
fn deinit(self: Channel) void {
self.samples.deinit();
}
};
const Application = struct {
allocator: Allocator,
channels: std.ArrayList(Channel),
channel_samples: ?*TaskPool.ChannelSamples = null,
task_handle: c.TaskHandle = null,
read_thread: ?std.Thread = null,
read_timeout_s: f64 = 0.01,
last_read_size: u32 = 0,
last_read_at_ns: i128 = 0,
samples_mutex: std.Thread.Mutex = .{},
dropped_samples: u32 = 0,
task_pool: TaskPool,
fn init(allocator: Allocator) Application {
fn init(allocator: Allocator, task_pool_options: TaskPool.Options) !Application {
return Application{
.allocator = allocator,
.task_pool = try TaskPool.init(allocator, task_pool_options),
.channels = std.ArrayList(Channel).init(allocator)
};
}
fn deinit(self: *Application) void {
self.stopReadingThread();
for (self.channels.items) |channel| {
channel.deinit();
}
self.channels.deinit();
self.task_pool.deinit(self.allocator);
}
fn appendChannel(self: *Application) !*Channel {
try self.channels.append(Channel.init(self.allocator));
try self.channels.append(Channel.init());
return &self.channels.items[self.channels.items.len-1];
}
fn startReadingThread(self: *Application) !void {
assert(self.read_thread == null);
self.read_thread = try std.Thread.spawn(.{ .allocator = self.allocator }, readThreadMain, .{ self });
}
fn stopReadingThread(self: *Application) void {
if (self.read_thread) |thread| {
self.read_thread = null;
thread.join();
}
}
fn readChannels(self: *Application) !void {
var read_buffer: [1024]f64 = undefined;
var read: i32 = 0;
const err = c.DAQmxReadAnalogF64(self.task_handle, -1, self.read_timeout_s, c.DAQmx_Val_GroupByChannel, &read_buffer, @intCast(read_buffer.len), &read, null);
if (err == 0 and read > 0) {
const read_u32 = @as(u32, @intCast(read));
self.samples_mutex.lock();
defer self.samples_mutex.unlock();
for (0.., self.channels.items) |i, *channel| {
const channel_samples = read_buffer[(i*read_u32)..((i+1)*read_u32)];
try channel.samples.appendSlice(channel_samples);
}
self.last_read_size = read_u32;
self.last_read_at_ns = std.time.nanoTimestamp();
} else if (err != 0 and err != c.DAQmxErrorSamplesNotYetAvailable) {
if (err == c.DAQmxErrorSamplesNoLongerAvailable) {
self.dropped_samples += 1;
log.err("Dropped samples!", .{});
} else {
try checkDAQmxError(err, error.readAnalog);
}
}
}
fn readThreadMain(app: *Application) void {
while (app.read_thread != null) {
var is_done: c_uint = 0;
_ = c.DAQmxIsTaskDone(app.task_handle, &is_done);
if (is_done != 0) {
log.info("Detected that task is done, stopped reading", .{});
break;
}
app.readChannels() catch |e| {
log.err("readChannels() {}\n", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
};
}
}
};
pub fn nanoToSeconds(ns: i128) f32 {
@ -238,59 +83,108 @@ pub fn main() !void {
// _ = windows.ShowWindow(hWnd, windows.SW_SHOW);
// }
// TODO: SetTraceLogCallback
rl.setTraceLogLevel(toTraceLogLevel(std.log.default_level));
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
defer _ = gpa.deinit();
var app = Application.init(allocator);
defer app.deinit();
var ni_daq = try NIDaq.init(allocator, .{
.max_devices = 4,
.max_analog_inputs = 32,
.max_analog_outputs = 8,
.max_counter_outputs = 8,
.max_counter_inputs = 8,
.max_analog_input_voltage_ranges = 4,
.max_analog_output_voltage_ranges = 4
});
defer ni_daq.deinit(allocator);
const devices = try listDeviceNames(allocator);
defer freeNameList(allocator, devices);
const devices = try ni_daq.listDeviceNames();
std.debug.print("NI-DAQ version: {}\n", .{try NIDaq.version()});
std.debug.print("Devices ({}):\n", .{devices.len});
for (devices) |device| {
std.debug.print(" * '{s}'\n", .{device});
std.debug.print(" * '{s}' ({})\n", .{device, device.len});
const channel_names = try listDeviceAIPhysicalChannels(allocator, device);
defer freeNameList(allocator, channel_names);
for (channel_names) |channel_name| {
std.debug.print(" * '{s}'\n", .{channel_name});
const analog_inputs = try ni_daq.listDeviceAIPhysicalChannels(device);
for (analog_inputs) |channel_name| {
std.debug.print(" * '{s}' (Analog input)\n", .{channel_name});
}
for (try ni_daq.listDeviceAOPhysicalChannels(device)) |channel_name| {
std.debug.print(" * '{s}' (Analog output)\n", .{channel_name});
}
// for (try ni_daq.listDeviceCOPhysicalChannels(device)) |channel_name| {
// std.debug.print(" * '{s}' (Counter output)\n", .{channel_name});
// }
// for (try ni_daq.listDeviceCIPhysicalChannels(device)) |channel_name| {
// std.debug.print(" * '{s}' (Counter input)\n", .{channel_name});
// }
}
var app = try Application.init(allocator, .{
.max_tasks = devices.len * 2,
.max_channels = 64
});
defer app.deinit();
for (devices) |device| {
if (try ni_daq.checkDeviceAIMeasurementType(device, .Voltage)) {
const voltage_ranges = try ni_daq.listDeviceAIVoltageRanges(device);
assert(voltage_ranges.len > 0);
const min_sample = voltage_ranges[0].low;
const max_sample = voltage_ranges[0].high;
for (try ni_daq.listDeviceAIPhysicalChannels(device)) |channel_name| {
var channel = try app.appendChannel();
channel.min_sample = min_sample;
channel.max_sample = max_sample;
try app.task_pool.createAIVoltageChannel(ni_daq, .{
.channel = channel_name,
.min_value = min_sample,
.max_value = max_sample,
});
}
}
// if (try ni_daq.checkDeviceAOOutputType(device, .Voltage)) {
// const voltage_ranges = try ni_daq.listDeviceAOVoltageRanges(device);
// assert(voltage_ranges.len > 0);
// const min_sample = voltage_ranges[0].low;
// const max_sample = voltage_ranges[0].high;
// for (try ni_daq.listDeviceAOPhysicalChannels(device)) |channel_name| {
// var channel = try app.appendChannel();
// channel.min_sample = min_sample;
// channel.max_sample = max_sample;
// try app.task_pool.createAOVoltageChannel(ni_daq, .{
// .channel = channel_name,
// .min_value = min_sample,
// .max_value = max_sample,
// });
// }
// }
}
for (0.., app.channels.items) |i, *channel| {
channel.color = rl.Color.fromHSV(@as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(app.channels.items.len)) * 360, 0.75, 0.8);
}
const sample_rate: f64 = 5000;
try app.task_pool.setContinousSampleRate(sample_rate);
try checkDAQmxError(c.DAQmxCreateTask("", &app.task_handle), error.createTask);
defer checkDAQmxError(c.DAQmxClearTask(app.task_handle), error.clearTask) catch unreachable;
var channel_samples = try app.task_pool.start(0.01, allocator);
defer channel_samples.deinit();
defer app.task_pool.stop() catch @panic("stop task failed");
for (devices) |device| {
const channel_names = try listDeviceAIPhysicalChannels(allocator, device);
defer freeNameList(allocator, channel_names);
for (0.., channel_names) |i, channel_name| {
var channel = try app.appendChannel();
channel.color = rl.Color.fromHSV(@as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(channel_names.len)) * 360, 0.75, 0.8);
channel.min_sample = -10.0;
channel.max_sample = 10.0;
try checkDAQmxError(
c.DAQmxCreateAIVoltageChan(app.task_handle, channel_name, "", c.DAQmx_Val_Cfg_Default, channel.min_sample, channel.max_sample, c.DAQmx_Val_Volts, null),
error.createAIVoltageChannel
);
}
}
try checkDAQmxError(
c.DAQmxCfgSampClkTiming(app.task_handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0),
error.configureSampleRate
);
try checkDAQmxError(c.DAQmxStartTask(app.task_handle), error.startTask);
defer checkDAQmxError(c.DAQmxStopTask(app.task_handle), error.stopTask) catch unreachable;
try app.startReadingThread();
app.channel_samples = channel_samples;
rl.initWindow(800, 450, "DAQ view");
defer rl.closeWindow();
@ -319,9 +213,9 @@ pub fn main() !void {
rl.Color.gray
);
app.samples_mutex.lock();
for (app.channels.items) |channel| {
const samples = channel.samples;
channel_samples.mutex.lock();
for (0.., channel_samples.samples) |channel_index, samples| {
const channel = app.channels.items[channel_index];
const min_sample: f32 = @floatCast(channel.min_sample);
const max_sample: f32 = @floatCast(channel.max_sample);
@ -363,7 +257,7 @@ pub fn main() !void {
}
}
}
app.samples_mutex.unlock();
channel_samples.mutex.unlock();
if (rl.isKeyPressedRepeat(rl.KeyboardKey.key_e) or rl.isKeyPressed(rl.KeyboardKey.key_e)) {
zoom *= 1.1;
@ -374,6 +268,7 @@ pub fn main() !void {
const now_ns = std.time.nanoTimestamp();
const now_since_start = nanoToSeconds(now_ns - start_time);
const now_since_samping_start = nanoToSeconds(now_ns - channel_samples.started_sampling_ns.?);
{
var y: f32 = 10;
@ -383,32 +278,20 @@ pub fn main() !void {
try font_face.drawTextAlloc(allocator, "Zoom: {d:.03}", .{zoom}, Vec2.init(10, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Dropped samples: {d:.03}", .{app.dropped_samples}, Vec2.init(10, y), rl.Color.black);
try font_face.drawTextAlloc(allocator, "Dropped samples: {d:.03}", .{app.task_pool.droppedSamples()}, Vec2.init(10, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Last read size (bytes): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len)) * @sizeOf(f64)}, Vec2.init(20, y), rl.Color.black);
for (0..app.channels.items.len) |i| {
const sample_count = channel_samples.samples[i].items.len;
y += 10;
try font_face.drawTextAlloc(allocator, "Last read count (doubles): {d}", .{app.last_read_size * @as(u32, @intCast(app.channels.items.len))}, Vec2.init(20, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Last read count per channel: {d}", .{app.last_read_size}, Vec2.init(20, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Time since last read: {d:.05}", .{nanoToSeconds(now_ns - app.last_read_at_ns)}, Vec2.init(20, y), rl.Color.black);
y += 10;
for (1.., app.channels.items) |i, channel| {
const sample_count = channel.samples.items.len;
y += 10;
try font_face.drawTextAlloc(allocator, "Channel {}:", .{i}, Vec2.init(10, y), rl.Color.black);
try font_face.drawTextAlloc(allocator, "Channel {}:", .{i + 1}, Vec2.init(10, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Sample count: {}", .{sample_count}, Vec2.init(20, y), rl.Color.black);
y += 10;
try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now_since_start}, Vec2.init(20, y), rl.Color.black);
try font_face.drawTextAlloc(allocator, "Sample rate: {d:.03}", .{@as(f64, @floatFromInt(sample_count)) / now_since_samping_start}, Vec2.init(20, y), rl.Color.black);
y += 10;
}
}
@ -416,3 +299,7 @@ pub fn main() !void {
rl.drawFPS(@as(i32, @intFromFloat(window_width)) - 100, 10);
}
}
test {
_ = NIDaq;
}

823
src/ni-daq.zig Normal file
View File

@ -0,0 +1,823 @@
const std = @import("std");
pub const c = @cImport({
@cInclude("stdint.h");
@cDefine("__int64", "long long");
@cInclude("NIDAQmx.h");
});
const assert = std.debug.assert;
const log = std.log.scoped(.ni_daq);
const max_device_name_size = 255;
const max_task_name_size = 255;
pub const BoundedDeviceName = std.BoundedArray(u8, max_device_name_size);
const StringArrayListUnmanaged = std.ArrayListUnmanaged([:0]const u8);
const NIDaq = @This();
pub const TaskHandle = c.TaskHandle;
pub const Options = struct {
max_devices: u32,
max_analog_inputs: u32,
max_analog_outputs: u32,
max_counter_outputs: u32,
max_counter_inputs: u32,
max_analog_input_voltage_ranges: u32,
max_analog_output_voltage_ranges: u32
};
pub const Task = struct {
handle: TaskHandle,
name_buffer: [max_task_name_size]u8 = undefined,
dropped_samples: u32 = 0,
pub fn clear(self: Task) !void {
try checkDAQmxError(
c.DAQmxClearTask(self.handle),
error.DAQmxClearTask
);
}
pub fn name(self: *Task) ![]const u8 {
const required_size = c.DAQmxGetTaskName(self.handle, null, 0);
assert(required_size >= 0);
if (required_size > self.name_buffer.len) {
return error.BufferTooSmall;
}
try checkDAQmxError(
c.DAQmxGetTaskName(
self.handle,
&self.name_buffer,
self.name_buffer.len
),
error.DAQmxGetTaskName
);
return self.name_buffer[0..@intCast(required_size)];
}
pub fn start(self: Task) !void {
try checkDAQmxError(
c.DAQmxStartTask(self.handle),
error.DAQmxStartTask
);
}
pub fn stop(self: Task) !void {
try checkDAQmxError(
c.DAQmxStopTask(self.handle),
error.DAQmxStopTask
);
}
pub fn setContinousSampleRate(self: Task, sample_rate: f64) !void {
try checkDAQmxError(
c.DAQmxCfgSampClkTiming(self.handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, 0),
error.DAQmxCfgSampClkTiming
);
}
pub fn setFiniteSampleRate(self: Task, sample_rate: f64, samples_per_channel: u64) !void {
try checkDAQmxError(
c.DAQmxCfgSampClkTiming(self.handle, null, sample_rate, c.DAQmx_Val_Rising, c.DAQmx_Val_FiniteSamps, samples_per_channel),
error.DAQmxCfgSampClkTiming
);
}
pub const AIVoltageChannelOptions = struct {
channel: [:0]const u8,
assigned_name: [*c]const u8 = null,
terminal_config: i32 = c.DAQmx_Val_Cfg_Default,
min_value: f64,
max_value: f64,
units: i32 = c.DAQmx_Val_Volts,
custom_scale_name: [*c]const u8 = null,
};
pub fn createAIVoltageChannel(self: Task, options: AIVoltageChannelOptions) !void {
try checkDAQmxError(
c.DAQmxCreateAIVoltageChan(
self.handle,
options.channel,
options.assigned_name,
options.terminal_config,
options.min_value,
options.max_value,
options.units,
options.custom_scale_name
),
error.DAQmxCreateAIVoltageChan
);
}
pub const AOVoltageChannelOptions = struct {
channel: [:0]const u8,
assigned_name: [*c]const u8 = null,
min_value: f64,
max_value: f64,
units: i32 = c.DAQmx_Val_Volts,
custom_scale_name: [*c]const u8 = null,
};
pub fn createAOVoltageChannel(self: Task, options: AOVoltageChannelOptions) !void {
try checkDAQmxError(
c.DAQmxCreateAOVoltageChan(
self.handle,
options.channel,
options.assigned_name,
options.min_value,
options.max_value,
options.units,
options.custom_scale_name
),
error.DAQmxCreateAOVoltageChan
);
}
pub const ReadAnalogOptions = struct {
read_array: []f64,
timeout: f64,
samples_per_channel: i32 = c.DAQmx_Val_Auto,
fill_mode: u32 = c.DAQmx_Val_GroupByChannel,
};
pub fn readAnalog(self: *Task, options: ReadAnalogOptions) !u32 {
var samples_per_channel: i32 = 0;
const err = c.DAQmxReadAnalogF64(
self.handle,
options.samples_per_channel,
options.timeout,
options.fill_mode,
options.read_array.ptr,
@intCast(options.read_array.len),
&samples_per_channel,
null
);
if (err == c.DAQmxErrorSamplesNoLongerAvailable) {
self.dropped_samples += 1;
log.err("Dropped samples, not reading samples fast enough.", .{});
} else if (err < 0) {
try checkDAQmxError(err, error.DAQmxReadAnalogF64);
}
return @intCast(samples_per_channel);
}
pub fn isDone(self: Task) !bool {
var result: c.bool32 = 0;
try checkDAQmxError(
c.DAQmxIsTaskDone(self.handle, &result),
error.DAQmxIsTaskDone
);
return result != 0;
}
};
pub const AIMeasurementType = enum(i32) {
Voltage = c.DAQmx_Val_Voltage,
VoltageRMS = c.DAQmx_Val_VoltageRMS,
Current = c.DAQmx_Val_Current,
CurrentRMS = c.DAQmx_Val_CurrentRMS,
Voltage_CustomWithExcitation = c.DAQmx_Val_Voltage_CustomWithExcitation,
Bridge = c.DAQmx_Val_Bridge,
Freq_Voltage = c.DAQmx_Val_Freq_Voltage,
Resistance = c.DAQmx_Val_Resistance,
Temp_TC = c.DAQmx_Val_Temp_TC,
Temp_Thrmstr = c.DAQmx_Val_Temp_Thrmstr,
Temp_RTD = c.DAQmx_Val_Temp_RTD,
Temp_BuiltInSensor = c.DAQmx_Val_Temp_BuiltInSensor,
Strain_Gage = c.DAQmx_Val_Strain_Gage,
Rosette_Strain_Gage = c.DAQmx_Val_Rosette_Strain_Gage,
Position_LVDT = c.DAQmx_Val_Position_LVDT,
Position_RVDT = c.DAQmx_Val_Position_RVDT,
Position_EddyCurrentProximityProbe = c.DAQmx_Val_Position_EddyCurrentProximityProbe,
Accelerometer = c.DAQmx_Val_Accelerometer,
Acceleration_Charge = c.DAQmx_Val_Acceleration_Charge,
Acceleration_4WireDCVoltage = c.DAQmx_Val_Acceleration_4WireDCVoltage,
Velocity_IEPESensor = c.DAQmx_Val_Velocity_IEPESensor,
Force_Bridge = c.DAQmx_Val_Force_Bridge,
Force_IEPESensor = c.DAQmx_Val_Force_IEPESensor,
Pressure_Bridge = c.DAQmx_Val_Pressure_Bridge,
SoundPressure_Microphone = c.DAQmx_Val_SoundPressure_Microphone,
Torque_Bridge = c.DAQmx_Val_Torque_Bridge,
TEDS_Sensor = c.DAQmx_Val_TEDS_Sensor,
Charge = c.DAQmx_Val_Charge,
Power = c.DAQmx_Val_Power,
_
};
const AIMeasurementTypeList = std.BoundedArray(AIMeasurementType, @typeInfo(AIMeasurementType).Enum.fields.len);
pub const AOOutputType = enum(i32) {
Voltage = c.DAQmx_Val_Voltage,
Current = c.DAQmx_Val_Current,
FuncGen = c.DAQmx_Val_FuncGen,
_
};
const AOOutputTypeList = std.BoundedArray(AOOutputType, @typeInfo(AOOutputType).Enum.fields.len);
pub const Range = packed struct {
low: f64,
high: f64
};
pub const ProductCategory = enum(i32) {
MSeriesDAQ = c.DAQmx_Val_MSeriesDAQ,
XSeriesDAQ = c.DAQmx_Val_XSeriesDAQ,
ESeriesDAQ = c.DAQmx_Val_ESeriesDAQ,
SSeriesDAQ = c.DAQmx_Val_SSeriesDAQ,
BSeriesDAQ = c.DAQmx_Val_BSeriesDAQ,
SCSeriesDAQ = c.DAQmx_Val_SCSeriesDAQ,
USBDAQ = c.DAQmx_Val_USBDAQ,
AOSeries = c.DAQmx_Val_AOSeries,
DigitalIO = c.DAQmx_Val_DigitalIO,
TIOSeries = c.DAQmx_Val_TIOSeries,
DynamicSignalAcquisition = c.DAQmx_Val_DynamicSignalAcquisition,
Switches = c.DAQmx_Val_Switches,
CompactDAQChassis = c.DAQmx_Val_CompactDAQChassis,
CompactRIOChassis = c.DAQmx_Val_CompactRIOChassis,
CSeriesModule = c.DAQmx_Val_CSeriesModule,
SCXIModule = c.DAQmx_Val_SCXIModule,
SCCConnectorBlock = c.DAQmx_Val_SCCConnectorBlock,
SCCModule = c.DAQmx_Val_SCCModule,
NIELVIS = c.DAQmx_Val_NIELVIS,
NetworkDAQ = c.DAQmx_Val_NetworkDAQ,
SCExpress = c.DAQmx_Val_SCExpress,
FieldDAQ = c.DAQmx_Val_FieldDAQ,
TestScaleChassis = c.DAQmx_Val_TestScaleChassis,
TestScaleModule = c.DAQmx_Val_TestScaleModule,
mioDAQ = c.DAQmx_Val_mioDAQ,
Unknown = c.DAQmx_Val_Unknown,
_,
pub fn hasMultideviceTaskSupport(self: ProductCategory) bool {
return switch (self) {
.CSeriesModule,
.FieldDAQ,
.SSeriesDAQ,
.DynamicSignalAcquisition,
.SCExpress,
.XSeriesDAQ => true,
else => false
};
}
};
const DeviceBuffers = struct {
const ChannelNames = struct {
buffer: []u8,
array_list: StringArrayListUnmanaged,
fn init(allocator: std.mem.Allocator, capacity: u32) !ChannelNames {
const max_channel_name_size = count: {
var count: u32 = 0;
count += max_device_name_size;
count += 1; // '/'
count += 2; // 'ai' or 'ao' or 'co' or 'ci'
count += std.math.log10_int(capacity) + 1;
break :count count;
};
const buffer_size = capacity * (max_channel_name_size + 2);
const buffer = try allocator.alloc(u8, buffer_size);
errdefer allocator.free(buffer);
var array_list = try StringArrayListUnmanaged.initCapacity(allocator, capacity);
errdefer array_list.deinit(allocator);
return ChannelNames{
.buffer = buffer,
.array_list = array_list
};
}
fn deinit(self: *ChannelNames, allocator: std.mem.Allocator) void {
allocator.free(self.buffer);
self.array_list.deinit(allocator);
}
fn clear(self: *ChannelNames) void {
self.array_list.clearRetainingCapacity();
}
};
analog_input_names: ChannelNames,
analog_output_names: ChannelNames,
counter_output_names: ChannelNames,
counter_input_names: ChannelNames,
// TODO: Digital input
// TODO: Digital output
analog_input_voltage_ranges: std.ArrayListUnmanaged(Range),
analog_output_voltage_ranges: std.ArrayListUnmanaged(Range),
fn init(allocator: std.mem.Allocator, options: Options) !DeviceBuffers {
var analog_input_names = try ChannelNames.init(allocator, options.max_analog_inputs);
errdefer analog_input_names.deinit(allocator);
var analog_output_names = try ChannelNames.init(allocator, options.max_analog_outputs);
errdefer analog_output_names.deinit(allocator);
var counter_output_names = try ChannelNames.init(allocator, options.max_counter_outputs);
errdefer counter_output_names.deinit(allocator);
var counter_input_names = try ChannelNames.init(allocator, options.max_counter_inputs);
errdefer counter_input_names.deinit(allocator);
var analog_input_voltage_ranges = try std.ArrayListUnmanaged(Range).initCapacity(allocator, options.max_analog_input_voltage_ranges);
errdefer analog_input_voltage_ranges.deinit(allocator);
var analog_output_voltage_ranges = try std.ArrayListUnmanaged(Range).initCapacity(allocator, options.max_analog_output_voltage_ranges);
errdefer analog_output_voltage_ranges.deinit(allocator);
return DeviceBuffers{
.analog_input_names = analog_input_names,
.analog_output_names = analog_output_names,
.counter_output_names = counter_output_names,
.counter_input_names = counter_input_names,
.analog_input_voltage_ranges = analog_input_voltage_ranges,
.analog_output_voltage_ranges = analog_output_voltage_ranges
};
}
fn deinit(self: *DeviceBuffers, allocator: std.mem.Allocator) void {
self.analog_input_names.deinit(allocator);
self.analog_output_names.deinit(allocator);
self.counter_input_names.deinit(allocator);
self.counter_output_names.deinit(allocator);
self.analog_input_voltage_ranges.deinit(allocator);
self.analog_output_voltage_ranges.deinit(allocator);
}
fn clear(self: *DeviceBuffers) void {
self.analog_input_names.clear();
self.analog_output_names.clear();
self.counter_input_names.clear();
self.counter_output_names.clear();
self.analog_input_voltage_ranges.clearRetainingCapacity();
self.analog_output_voltage_ranges.clearRetainingCapacity();
}
};
device_names_buffer: []u8,
device_names: StringArrayListUnmanaged,
device_buffers: []DeviceBuffers,
pub fn init(allocator: std.mem.Allocator, options: Options) !NIDaq {
const device_names_buffer_size = options.max_devices * (max_device_name_size + 2);
const device_names_buffer = try allocator.alloc(u8, device_names_buffer_size);
errdefer allocator.free(device_names_buffer);
var device_names = try StringArrayListUnmanaged.initCapacity(allocator, options.max_devices);
errdefer device_names.deinit(allocator);
const device_buffers = try allocator.alloc(DeviceBuffers, options.max_devices);
errdefer allocator.free(device_buffers);
for (device_buffers) |*buffers| {
// TODO: Handle clean up
buffers.* = try DeviceBuffers.init(allocator, options);
}
return NIDaq{
.device_names_buffer = device_names_buffer,
.device_names = device_names,
.device_buffers = device_buffers
};
}
pub fn deinit(self: *NIDaq, allocator: std.mem.Allocator) void {
self.device_names.deinit(allocator);
allocator.free(self.device_names_buffer);
for (self.device_buffers) |*buffers| {
buffers.deinit(allocator);
}
allocator.free(self.device_buffers);
}
fn maxIndexStringLength(count: u32) u32 {
if (count == 0) {
return 0;
} else if (count == 1) {
return 1;
} else {
return std.math.log10_int(count - 1) + 1;
}
}
test {
try std.testing.expectEqual(0, maxIndexStringLength(0));
try std.testing.expectEqual(1, maxIndexStringLength(1));
try std.testing.expectEqual(1, maxIndexStringLength(10));
try std.testing.expectEqual(2, maxIndexStringLength(11));
try std.testing.expectEqual(2, maxIndexStringLength(100));
try std.testing.expectEqual(3, maxIndexStringLength(101));
}
pub fn logDAQmxError(error_code: i32) void {
if (error_code == 0) {
return;
}
var msg: [512:0]u8 = .{ 0 } ** 512;
if (c.DAQmxGetErrorString(error_code, &msg, msg.len) == 0) {
if (error_code < 0) {
log.err("DAQmx ({}): {s}", .{error_code, msg});
} else if (!std.mem.startsWith(u8, &msg, "Error code could not be found.")) {
// Ignore positive error codes if it could not be found.
// This commonly happens when trying to preallocate bytes for buffer and it returns a positive number.
log.warn("DAQmx ({}): {s}", .{error_code, msg});
}
} else {
log.err("DAQmx ({}): Unknown (Buffer too small for message)", .{error_code});
}
}
pub fn checkDAQmxError(error_code: i32, err: anyerror) !void {
logDAQmxError(error_code);
if (error_code < 0) {
return err;
}
}
fn splitCommaDelimitedList(array_list: *std.ArrayListUnmanaged([:0]const u8), buffer: []u8) !void {
const count = std.mem.count(u8, buffer, ",") + 1;
if (count > array_list.capacity) {
return error.TooManyItems;
}
array_list.clearRetainingCapacity();
var name_iter = std.mem.tokenizeAny(u8, buffer, &.{ ',', ' ', 0 });
while (name_iter.next()) |name| {
buffer[name_iter.index] = 0;
const name_z: [:0]const u8 = @ptrCast(name);
assert(name_z[name.len] == 0);
array_list.appendAssumeCapacity(name_z);
}
}
pub fn version() !std.SemanticVersion {
var major: u32 = 0;
try checkDAQmxError(
c.DAQmxGetSysNIDAQMajorVersion(&major),
error.GetMajorVersion
);
var minor: u32 = 0;
try checkDAQmxError(
c.DAQmxGetSysNIDAQMinorVersion(&minor),
error.GetMinorVersion
);
var update: u32 = 0;
try checkDAQmxError(
c.DAQmxGetSysNIDAQUpdateVersion(&update),
error.GetUpdateVersion
);
return std.SemanticVersion{
.major = major,
.minor = minor,
.patch = update
};
}
pub fn listDeviceNames(self: *NIDaq) ![]const [:0]const u8 {
self.device_names.clearRetainingCapacity();
self.clearAllDeviceBuffers();
const required_size = c.DAQmxGetSysDevNames(null, 0);
if (required_size == 0) {
return self.device_names.items;
}
if (required_size > self.device_names_buffer.len) {
return error.BufferTooSmall;
}
try checkDAQmxError(
c.DAQmxGetSysDevNames(self.device_names_buffer.ptr, @intCast(self.device_names_buffer.len)),
error.GetDeviceNames
);
const nullbyte = std.mem.indexOfScalar(u8, self.device_names_buffer, 0) orelse unreachable;
assert(nullbyte == required_size - 1);
try splitCommaDelimitedList(&self.device_names, self.device_names_buffer[0..@intCast(required_size)]);
return self.device_names.items;
}
fn getDeviceBuffers(self: *NIDaq, device: [:0]const u8) !*DeviceBuffers {
if (self.device_names.capacity == 0) {
return error.DeviceNotFound;
}
if (self.device_names.items.len == 0) {
_ = try self.listDeviceNames();
}
for (0.., self.device_names.items) |i, device_name| {
if (std.mem.eql(u8, device, device_name)) {
return &self.device_buffers[i];
}
}
return error.DeviceNotFound;
}
fn clearAllDeviceBuffers(self: *NIDaq) void {
for (self.device_buffers) |*buffers| {
buffers.clear();
}
}
fn listDevicePhysicalChannels(
getPhysicalChannels: anytype,
device: [:0]const u8,
channel_names: *DeviceBuffers.ChannelNames,
) ![]const [:0]const u8 {
const buffer = channel_names.buffer;
const array_list = &channel_names.array_list;
array_list.clearRetainingCapacity();
const required_size = getPhysicalChannels(device, null, 0);
try checkDAQmxError(required_size, error.GetPhysicalChannels);
if (required_size == 0) {
return array_list.items;
}
if (required_size > buffer.len) {
return error.BufferTooSmall;
}
try checkDAQmxError(
getPhysicalChannels(device, buffer.ptr, @intCast(buffer.len)),
error.GetPhysicalChannels
);
const nullbyte = std.mem.indexOfScalar(u8, buffer, 0) orelse unreachable;
assert(nullbyte == required_size - 1);
try splitCommaDelimitedList(array_list, buffer[0..@intCast(required_size)]);
return array_list.items;
}
pub fn listDeviceAIPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 {
var device_buffers = try self.getDeviceBuffers(device);
return listDevicePhysicalChannels(
c.DAQmxGetDevAIPhysicalChans,
device,
&device_buffers.analog_input_names
);
}
pub fn listDeviceAOPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 {
var device_buffers = try self.getDeviceBuffers(device);
return listDevicePhysicalChannels(
c.DAQmxGetDevAOPhysicalChans,
device,
&device_buffers.analog_output_names
);
}
pub fn listDeviceCOPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 {
var device_buffers = try self.getDeviceBuffers(device);
return listDevicePhysicalChannels(
c.DAQmxGetDevCOPhysicalChans,
device,
&device_buffers.counter_output_names
);
}
pub fn listDeviceCIPhysicalChannels(self: *NIDaq, device: [:0]const u8) ![]const [:0]const u8 {
var device_buffers = try self.getDeviceBuffers(device);
return listDevicePhysicalChannels(
c.DAQmxGetDevCIPhysicalChans,
device,
&device_buffers.counter_input_names
);
}
const ChannelType = enum {
analog_input,
analog_output,
counter_input,
counter_output,
};
fn getChannelType(device: [:0]const u8) ?ChannelType {
const slash = std.mem.indexOfScalar(u8, device, '/') orelse return null;
const afterSlash = device[(slash+1)..];
if (std.mem.startsWith(u8, afterSlash, "ai")) {
return ChannelType.analog_input;
} else if (std.mem.startsWith(u8, afterSlash, "ao")) {
return ChannelType.analog_output;
} else if (std.mem.startsWith(u8, afterSlash, "ci")) {
return ChannelType.counter_input;
} else if (std.mem.startsWith(u8, afterSlash, "co")) {
return ChannelType.counter_output;
} else {
return null;
}
}
pub fn getMinSampleRate(self: NIDaq, channel: [:0]const u8) !f64 {
_ = self;
var result: f64 = 0;
const channel_type = getChannelType(channel) orelse return error.UnknownChannelType;
switch (channel_type) {
.analog_input => {
try checkDAQmxError(
c.DAQmxGetDevAIMinRate(channel, &result),
error.DAQmxGetDevAIMinRate
);
},
.analog_output => {
try checkDAQmxError(
c.DAQmxGetDevAOMinRate(channel, &result),
error.DAQmxGetDevAOMinRate
);
},
.counter_output,
.counter_input => {
return error.UnsupportedChannelType;
},
}
return result;
}
pub fn getMaxSampleRate(self: NIDaq, channel: [:0]const u8) !f64 {
_ = self;
var result: f64 = 0;
const channel_type = getChannelType(channel) orelse return error.UnknownChannelType;
switch (channel_type) {
.analog_input => {
try checkDAQmxError(
c.DAQmxGetDevAIMaxSingleChanRate(channel, &result),
error.DAQmxGetDevAIMaxSingleChanRate
);
},
.analog_output => {
try checkDAQmxError(
c.DAQmxGetDevAOMaxRate(channel, &result),
error.DAQmxGetDevAOMaxRate
);
},
.counter_output,
.counter_input => {
return error.UnsupportedChannelType;
},
}
return result;
}
fn listDeviceVoltageRanges(
getVoltageRanges: anytype,
device: [:0]const u8,
voltage_ranges: *std.ArrayListUnmanaged(Range)
) ![]Range {
voltage_ranges.clearRetainingCapacity();
const count = getVoltageRanges(device, null, 0);
try checkDAQmxError(count, error.GetVoltageRanges);
if (count == 0) {
return voltage_ranges.items;
}
const buffer: [*]f64 = @ptrCast(voltage_ranges.allocatedSlice().ptr);
const buffer_len = voltage_ranges.capacity * 2;
assert(@as(c_uint, @intCast(count)) % 2 == 0);
if (count > buffer_len) {
return error.BufferTooSmall;
}
try checkDAQmxError(
getVoltageRanges(device, buffer, @intCast(buffer_len)),
error.GetVoltageRanges
);
voltage_ranges.items.len = @intCast(@divExact(count, 2));
return voltage_ranges.items;
}
pub fn listDeviceAIVoltageRanges(self: *NIDaq, device: [:0]const u8) ![]Range {
var device_buffers = try self.getDeviceBuffers(device);
return listDeviceVoltageRanges(
c.DAQmxGetDevAIVoltageRngs,
device,
&device_buffers.analog_input_voltage_ranges
);
}
pub fn listDeviceAOVoltageRanges(self: *NIDaq, device: [:0]const u8) ![]Range {
var device_buffers = try self.getDeviceBuffers(device);
return listDeviceVoltageRanges(
c.DAQmxGetDevAOVoltageRngs,
device,
&device_buffers.analog_output_voltage_ranges
);
}
pub fn createTask(self: NIDaq, name: ?[:0]const u8) !Task {
_ = self;
var handle: TaskHandle = undefined;
try checkDAQmxError(
c.DAQmxCreateTask(name orelse "", &handle),
error.DAQmxCreateTask
);
return Task{ .handle = handle };
}
pub fn listDeviceAIMeasurementTypes(self: NIDaq, device: [:0]const u8) !AIMeasurementTypeList {
var result = AIMeasurementTypeList.init(0) catch unreachable;
_ = self;
const count = c.DAQmxGetDevAISupportedMeasTypes(device, null, 0);
try checkDAQmxError(count, error.DAQmxGetDevAISupportedMeasTypes);
assert(count <= result.buffer.len);
try checkDAQmxError(
c.DAQmxGetDevAISupportedMeasTypes(device, @as([*c]c_int, @ptrCast(&result.buffer)), result.buffer.len),
error.DAQmxGetDevAISupportedMeasTypes
);
result.len = @intCast(count);
return result;
}
pub fn checkDeviceAIMeasurementType(self: NIDaq, device: [:0]const u8, measurement_type: AIMeasurementType) !bool {
var measurement_types = try self.listDeviceAIMeasurementTypes(device);
return std.mem.indexOfScalar(AIMeasurementType, measurement_types.slice(), measurement_type) != null;
}
pub fn listDeviceAOOutputTypes(self: NIDaq, device: [:0]const u8) !AOOutputTypeList {
var result = AOOutputTypeList.init(0) catch unreachable;
_ = self;
const count = c.DAQmxGetDevAOSupportedOutputTypes(device, null, 0);
try checkDAQmxError(count, error.DAQmxGetDevAOSupportedOutputTypes);
assert(count <= result.buffer.len);
try checkDAQmxError(
c.DAQmxGetDevAOSupportedOutputTypes(device, @as([*c]c_int, @ptrCast(&result.buffer)), result.buffer.len),
error.DAQmxGetDevAOSupportedOutputTypes
);
result.len = @intCast(count);
return result;
}
pub fn checkDeviceAOOutputType(self: NIDaq, device: [:0]const u8, output_type: AOOutputType) !bool {
var output_types = try self.listDeviceAOOutputTypes(device);
return std.mem.indexOfScalar(AOOutputType, output_types.slice(), output_type) != null;
}
pub fn getDeviceProductCategory(self: NIDaq, device: [:0]const u8) !ProductCategory {
_ = self;
var product_category = ProductCategory.Unknown;
try checkDAQmxError(
c.DAQmxGetDevProductCategory(device, @ptrCast(&product_category)),
error.DAQmxGetDevProductCategory
);
return product_category;
}

View File

@ -1,10 +0,0 @@
const std = @import("std");
const testing = std.testing;
export fn add(a: i32, b: i32) i32 {
return a + b;
}
test "basic add functionality" {
try testing.expect(add(3, 7) == 10);
}

351
src/task-pool.zig Normal file
View File

@ -0,0 +1,351 @@
const std = @import("std");
const NIDaq = @import("./ni-daq.zig");
const TaskPool = @This();
const assert = std.debug.assert;
const log = std.log.scoped(.task_pool);
const ChannelType = enum { analog_input, analog_output };
const Entry = struct {
device: NIDaq.BoundedDeviceName,
channel_type: ChannelType,
task: NIDaq.Task,
channel_order: std.ArrayListUnmanaged(usize)
};
channel_count: usize = 0,
max_channel_count: usize,
entries: std.ArrayListUnmanaged(Entry),
read_thread: ?std.Thread = null,
thread_running: bool = false,
sampling: ?union(enum) {
finite: struct {
sample_rate: f64,
samples_per_channel: u64
},
continous: struct {
sample_rate: f64
}
} = null,
pub const ChannelSamples = struct {
allocator: std.mem.Allocator,
mutex: std.Thread.Mutex = .{},
read_arrays_by_task: [][]f64,
samples: []std.ArrayList(f64),
started_sampling_ns: ?i128 = null,
stopped_sampling_ns: ?i128 = null,
pub fn deinit(self: *ChannelSamples) void {
for (self.read_arrays_by_task) |read_arrays| {
self.allocator.free(read_arrays);
}
self.allocator.free(self.read_arrays_by_task);
for (self.samples) |samples_per_channel| {
samples_per_channel.deinit();
}
self.allocator.free(self.samples);
self.allocator.destroy(self);
}
};
pub const Options = struct {
max_tasks: usize,
max_channels: usize
};
pub fn init(allocator: std.mem.Allocator, options: Options) !TaskPool {
var entries = try std.ArrayListUnmanaged(Entry).initCapacity(allocator, options.max_tasks);
errdefer entries.deinit(allocator);
for (entries.allocatedSlice()) |*entry| {
// TODO: .deinit() on failure
entry.channel_order = try std.ArrayListUnmanaged(usize).initCapacity(allocator, options.max_channels);
}
return TaskPool{
.entries = entries,
.max_channel_count = options.max_channels
};
}
pub fn deinit(self: *TaskPool, allocator: std.mem.Allocator) void {
if (self.read_thread != null) {
self.stop() catch @panic("Failed to stop task");
}
for (self.entries.items) |e| {
e.task.clear() catch @panic("Failed to clear task");
}
for (self.entries.allocatedSlice()) |*e| {
e.channel_order.deinit(allocator);
}
self.entries.deinit(allocator);
}
pub fn setContinousSampleRate(self: *TaskPool, sample_rate: f64) !void {
assert(self.read_thread == null);
for (self.entries.items) |e| {
try e.task.setContinousSampleRate(sample_rate);
}
self.sampling = .{
.continous = .{
.sample_rate = sample_rate
}
};
}
pub fn setFiniteSampleRate(self: *TaskPool, sample_rate: f64, samples_per_channel: u64) !void {
assert(self.read_thread == null);
for (self.entries.items) |e| {
try e.task.setFiniteSampleRate(sample_rate, samples_per_channel);
}
self.sampling = .{
.finite = .{
.sample_rate = sample_rate,
.samples_per_channel = samples_per_channel
}
};
}
pub fn start(self: *TaskPool, read_timeout: f64, allocator: std.mem.Allocator) !*ChannelSamples {
assert(self.read_thread == null);
var channel_samples = try self.createChannelSamples(allocator);
errdefer channel_samples.deinit();
for (self.entries.items) |e| {
try e.task.start();
}
self.thread_running = true;
var read_thread = try std.Thread.spawn(
.{ .allocator = allocator },
readThreadCallback,
.{ self, read_timeout, channel_samples }
);
errdefer read_thread.join();
self.read_thread = read_thread;
return channel_samples;
}
pub fn stop(self: *TaskPool) !void {
assert(self.read_thread != null);
for (self.entries.items) |e| {
try e.task.stop();
}
self.thread_running = false;
self.read_thread.?.join();
self.read_thread = null;
}
fn getDeviceFromChannel(channel: [:0]const u8) ?[]const u8 {
const slash = std.mem.indexOfScalar(u8, channel, '/') orelse return null;
return channel[0..slash];
}
fn getOrPutTask(self: *TaskPool, ni_daq: NIDaq, device: []const u8, channel_type: ChannelType) !*Entry {
for (self.entries.items) |*entry| {
const entry_device = entry.device.slice();
if (entry.channel_type == channel_type and std.mem.eql(u8, entry_device, device)) {
return entry;
}
}
if (self.entries.items.len == self.entries.capacity) {
return error.TaskLimitReached;
}
const task = try ni_daq.createTask(null);
errdefer task.clear() catch {};
var entry = self.entries.addOneAssumeCapacity();
entry.channel_type = channel_type;
entry.device = try NIDaq.BoundedDeviceName.fromSlice(device);
entry.task = task;
return entry;
}
pub fn createAIVoltageChannel(self: *TaskPool, ni_daq: NIDaq, options: NIDaq.Task.AIVoltageChannelOptions) !void {
assert(self.read_thread == null);
const device = getDeviceFromChannel(options.channel) orelse return error.UnknownDevice;
var entry = try self.getOrPutTask(ni_daq, device, .analog_input);
if (entry.channel_order.items.len == entry.channel_order.capacity) {
return error.MaxChannelsLimitReached;
}
try entry.task.createAIVoltageChannel(options);
entry.channel_order.appendAssumeCapacity(self.channel_count);
self.channel_count += 1;
}
pub fn createAOVoltageChannel(self: *TaskPool, ni_daq: NIDaq, options: NIDaq.Task.AOVoltageChannelOptions) !void {
assert(self.read_thread == null);
const device = getDeviceFromChannel(options.channel) orelse return error.UnknownDevice;
var entry = try self.getOrPutTask(ni_daq, device, .analog_output);
if (entry.channel_order.items.len == entry.channel_order.capacity) {
return error.MaxChannelsLimitReached;
}
try entry.task.createAOVoltageChannel(options);
entry.channel_order.appendAssumeCapacity(self.channel_count);
self.channel_count += 1;
}
pub fn createChannelSamples(self: TaskPool, allocator: std.mem.Allocator) !*ChannelSamples {
assert(self.channel_count > 0);
assert(self.sampling != null);
var read_arrays_by_task = try allocator.alloc([]f64, self.entries.items.len);
errdefer allocator.free(read_arrays_by_task);
const sampling = self.sampling.?;
const array_size_per_channel: usize = switch (sampling) {
// TODO: For now reserve 1s worth of samples per channel, maybe this should be configurable?
// Maybe it should be proportional to timeout?
.continous => |args| @intFromFloat(@ceil(args.sample_rate)),
.finite => |args| args.samples_per_channel,
};
for (0.., self.entries.items) |i, entry| {
const channel_count = entry.channel_order.items.len;
// TODO: Add allocator.free on failure
read_arrays_by_task[i] = try allocator.alloc(f64, array_size_per_channel * channel_count);
}
const samples = try allocator.alloc(std.ArrayList(f64), self.channel_count);
errdefer allocator.free(samples);
if (sampling == .finite) {
for (samples) |*samples_per_channel| {
// TODO: Add .deinit() on failure
samples_per_channel.* = try std.ArrayList(f64).initCapacity(allocator, sampling.finite.samples_per_channel);
}
} else {
for (samples) |*samples_per_channel| {
// TODO: Maybe it would be good to reserve a large amount of space for samples?
// Even if it is continous. Maybe use ringbuffer?
// TODO: Add .deinit() on failure
samples_per_channel.* = std.ArrayList(f64).init(allocator);
}
}
const channel_samples = try allocator.create(ChannelSamples);
errdefer allocator.destroy(channel_samples);
channel_samples.* = ChannelSamples{
.allocator = allocator,
.read_arrays_by_task = read_arrays_by_task,
.samples = samples
};
return channel_samples;
}
pub fn readAnalog(self: *TaskPool, timeout: f64, samples: *ChannelSamples) !void {
assert(self.read_thread != null);
assert(self.channel_count > 0);
samples.mutex.lock();
defer samples.mutex.unlock();
for (0.., self.entries.items) |i, *entry| {
const read_array = samples.read_arrays_by_task[i];
const samples_per_channel = try entry.task.readAnalog(.{
.read_array = read_array,
.timeout = timeout
});
if (samples_per_channel == 0) continue;
const channel_count = entry.channel_order.items.len;
const read_array_used = samples_per_channel * channel_count;
var channel_index_of_task: usize = 0;
var samples_window = std.mem.window(f64, read_array[0..read_array_used], samples_per_channel, samples_per_channel);
while (samples_window.next()) |channel_samples| : (channel_index_of_task += 1) {
const channel_index: usize = entry.channel_order.items[channel_index_of_task];
// TODO: Maybe use .appendSliceAssumeCapacity(), when doing finite sampling?
try samples.samples[channel_index].appendSlice(channel_samples);
}
}
}
pub fn isDone(self: TaskPool) !bool {
for (self.entries.items) |entry| {
const is_done = try entry.task.isDone();
if (!is_done) {
return false;
}
}
return true;
}
pub fn droppedSamples(self: TaskPool) u32 {
var sum: u32 = 0;
for (self.entries.items) |entry| {
sum += entry.task.dropped_samples;
}
return sum;
}
fn readThreadCallback(task_pool: *TaskPool, timeout: f64, channel_samples: *ChannelSamples) void {
defer task_pool.thread_running = false;
channel_samples.started_sampling_ns = std.time.nanoTimestamp();
defer channel_samples.stopped_sampling_ns = std.time.nanoTimestamp();
var error_count: u32 = 0;
const max_error_count = 3;
while (error_count < max_error_count and task_pool.thread_running) {
const is_done = task_pool.isDone() catch |e| {
error_count += 1;
log.err(".isDone() failed in thread: {}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
continue;
};
if (is_done) {
break;
}
task_pool.readAnalog(timeout, channel_samples) catch |e| {
error_count += 1;
log.err(".readAnalog() failed in thread: {}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
continue;
};
}
if (max_error_count == error_count) {
log.err("Stopping read thread, too many errors occured", .{});
}
}