daq-view/src/ni-daq/task-pool.zig

171 lines
4.0 KiB
Zig

const std = @import("std");
const NIDaq = @import("./root.zig");
const assert = std.debug.assert;
const log = std.log.scoped(.task_pool);
const TaskPool = @This();
const max_tasks = 32;
pub const Sampling = union(enum) {
finite: struct {
sample_rate: f64,
sample_count: u64
},
continous: struct {
sample_rate: f64
}
};
pub const Entry = struct {
task: NIDaq.Task,
in_use: bool = false,
running: bool = false,
started_sampling_ns: i128,
stopped_sampling_ns: ?i128 = null,
dropped_samples: u32 = 0,
sampling: Sampling,
mutex: *std.Thread.Mutex,
samples: *std.ArrayList(f64),
pub fn stop(self: *Entry) !void {
self.running = false;
if (self.in_use) {
try self.task.stop();
self.task.clear();
}
self.in_use = false;
}
};
running: bool = false,
read_thread: std.Thread,
entries: [max_tasks]Entry = undefined,
pub fn init(self: *TaskPool, allocator: std.mem.Allocator) !void {
self.* = TaskPool{
.read_thread = undefined
};
self.running = true;
self.read_thread = try std.Thread.spawn(
.{ .allocator = allocator },
readThreadCallback,
.{ self }
);
for (&self.entries) |*entry| {
entry.in_use = false;
}
}
pub fn deinit(self: *TaskPool) void {
for (&self.entries) |*entry| {
entry.stop() catch log.err("Failed to stop entry", .{});
}
self.running = false;
self.read_thread.join();
}
fn readAnalog(entry: *Entry, timeout: f64) !void {
if (!entry.in_use) return;
if (!entry.running) return;
entry.mutex.lock();
defer entry.mutex.unlock();
switch (entry.sampling) {
.finite => |args| {
try entry.samples.ensureTotalCapacity(args.sample_count);
},
.continous => |args| {
try entry.samples.ensureUnusedCapacity(@intFromFloat(@ceil(args.sample_rate)));
}
}
const unused_capacity = entry.samples.unusedCapacitySlice();
if (unused_capacity.len == 0) return;
const read_amount = try entry.task.readAnalog(.{
.timeout = timeout,
.read_array = unused_capacity,
});
if (read_amount == 0) return;
entry.samples.items.len += read_amount;
}
fn readThreadCallback(task_pool: *TaskPool) void {
const timeout = 0.05;
while (task_pool.running) {
for (&task_pool.entries) |*entry| {
readAnalog(entry, timeout) catch |e| {
log.err("readAnalog() failed in thread: {}", .{e});
if (@errorReturnTrace()) |trace| {
std.debug.dumpStackTrace(trace.*);
}
entry.stop() catch log.err("failed to stop collecting", .{});
};
}
std.time.sleep(0.05 * std.time.ns_per_s);
}
}
fn findFreeEntry(self: *TaskPool) ?*Entry {
for (&self.entries) |*entry| {
if (!entry.in_use) {
return entry;
}
}
return null;
}
pub fn launchAIVoltageChannel(
self: *TaskPool,
ni_daq: *NIDaq,
mutex: *std.Thread.Mutex,
samples: *std.ArrayList(f64),
sampling: Sampling,
options: NIDaq.Task.AIVoltageChannelOptions
) !*Entry {
const task = try ni_daq.createTask(null);
errdefer task.clear();
const entry = self.findFreeEntry() orelse return error.NotEnoughSpace;
errdefer entry.in_use = false;
try task.createAIVoltageChannel(options);
switch (sampling) {
.continous => |args| {
try task.setContinousSampleRate(args.sample_rate);
},
.finite => |args| {
try task.setFiniteSampleRate(args.sample_rate, args.sample_count);
}
}
samples.clearRetainingCapacity();
try task.start();
const started_at = std.time.nanoTimestamp();
entry.* = Entry{
.task = task,
.started_sampling_ns = started_at,
.in_use = true,
.running = true,
.mutex = mutex,
.samples = samples,
.sampling = sampling,
};
return entry;
}