add 'monitor'

This commit is contained in:
Rokas Puzonas 2023-03-26 17:33:25 +03:00
parent 4905dde4c3
commit 3888e95a71
3 changed files with 220 additions and 52 deletions

65
src/async_line_reader.rs Normal file
View File

@ -0,0 +1,65 @@
use std::net::TcpStream;
use async_ssh2_lite::AsyncStream;
use smol::{Async, io::AsyncReadExt};
use anyhow::Result;
const INITIAL_CAPACITY: usize = 1024;
pub struct AsyncLineReader {
stream: AsyncStream<Async<TcpStream>>,
line_buffer: Vec<u8>,
buffer_size: usize
}
impl AsyncLineReader {
pub fn new(stream: AsyncStream<Async<TcpStream>>) -> Self {
Self {
stream,
line_buffer: vec![0u8; INITIAL_CAPACITY],
buffer_size: 0
}
}
pub async fn read_line(&mut self) -> Result<Vec<u8>> {
let delim_pos = self.line_buffer[0..self.buffer_size]
.iter()
.position(|c| *c == b'\n');
if let Some(pos) = delim_pos {
let line = self.line_buffer[0..pos].to_vec();
self.line_buffer.copy_within((pos+1)..self.buffer_size, 0);
self.buffer_size -= pos;
self.buffer_size -= 1;
return Ok(line);
}
loop {
// Double line buffer size if at capacity
if self.buffer_size == self.line_buffer.len() {
for _ in 0..=self.line_buffer.len() {
self.line_buffer.push(0u8);
}
}
let n = self.stream.read(&mut self.line_buffer[self.buffer_size..]).await?;
let delim_pos = self.line_buffer[self.buffer_size..(self.buffer_size+n)]
.iter()
.position(|c| *c == b'\n')
.map(|pos| pos + self.buffer_size);
self.buffer_size += n;
if let Some(pos) = delim_pos {
let line = self.line_buffer[0..pos].to_vec();
self.line_buffer.copy_within((pos+1)..self.buffer_size, 0);
self.buffer_size -= pos;
self.buffer_size -= 1;
return Ok(line);
}
}
}
}

View File

@ -5,11 +5,13 @@ use anyhow::Result;
use smol::{channel, block_on};
use async_ssh2_lite::{AsyncSession};
use app::App;
use ubus::MonitorEvent;
use crate::ubus::{Ubus, UbusEvent};
use crate::ubus::{Ubus, ListenEvent, MonitorDir, MonitorEventType};
mod app;
mod ubus;
mod async_line_reader;
#[tokio::main]
async fn main() -> Result<()> {
@ -26,12 +28,12 @@ async fn main() -> Result<()> {
session.userauth_password(username, password).await?;
let ubus = Ubus::new(session);
let (tx, rx) = channel::unbounded::<UbusEvent>();
let (tx, rx) = channel::unbounded::<MonitorEvent>();
let listener = {
let tx = tx.clone();
tokio::spawn(async move {
println!("before listen");
if let Err(err) = ubus.listen(&[], tx).await {
if let Err(err) = ubus.monitor(None, &[], tx).await {
dbg!(err);
};
println!("after listen");

View File

@ -1,4 +1,4 @@
use std::{io::Read, borrow::Cow, net::TcpStream};
use std::{io::Read, borrow::Cow, net::TcpStream, str::FromStr, any};
use async_ssh2_lite::AsyncSession;
use lazy_regex::regex_captures;
@ -9,6 +9,8 @@ use anyhow::{Result, bail, anyhow};
use smol::{Async, io::AsyncReadExt, channel::Sender};
use thiserror::Error;
use crate::async_line_reader::AsyncLineReader;
pub struct Ubus {
session: AsyncSession<Async<TcpStream>>
}
@ -62,11 +64,80 @@ pub struct UbusObject {
}
#[derive(Debug)]
pub struct UbusEvent {
pub struct ListenEvent {
pub path: String,
pub value: Value
}
#[derive(Debug, Clone, Copy)]
pub enum MonitorEventType {
Hello,
Status,
Data,
Ping,
Lookup,
Invoke,
AddObject,
RemoveObject,
Subscribe,
Unsubscribe,
Notify,
}
impl ToString for MonitorEventType {
fn to_string(&self) -> String {
match self {
MonitorEventType::Hello => "hello",
MonitorEventType::Status => "status",
MonitorEventType::Data => "data",
MonitorEventType::Ping => "ping",
MonitorEventType::Lookup => "lookup",
MonitorEventType::Invoke => "invoke",
MonitorEventType::AddObject => "add_object",
MonitorEventType::RemoveObject => "remove_object",
MonitorEventType::Subscribe => "subscribe",
MonitorEventType::Unsubscribe => "unsubscribe",
MonitorEventType::Notify => "notify",
}.into()
}
}
impl FromStr for MonitorEventType {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"hello" => Ok(MonitorEventType::Hello),
"status" => Ok(MonitorEventType::Status),
"data" => Ok(MonitorEventType::Data),
"ping" => Ok(MonitorEventType::Ping),
"lookup" => Ok(MonitorEventType::Lookup),
"invoke" => Ok(MonitorEventType::Invoke),
"add_object" => Ok(MonitorEventType::AddObject),
"remove_object" => Ok(MonitorEventType::RemoveObject),
"subscribe" => Ok(MonitorEventType::Subscribe),
"unsubscribe" => Ok(MonitorEventType::Unsubscribe),
"notify" => Ok(MonitorEventType::Notify),
_ => Err(anyhow!("Unknown event type '{}'", s))
}
}
}
#[derive(Debug)]
pub struct MonitorEvent {
direction: MonitorDir,
client: u32,
peer: u32,
kind: MonitorEventType,
data: Value // TODO: Figure out the possible values for every `MonitorEventType`, to make this more safe.
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum MonitorDir {
Rx,
Tx
}
fn parse_parameter_type(param: &str) -> Option<UbusParamType> {
use UbusParamType::*;
match param {
@ -95,7 +166,7 @@ fn parse_error_code(code: i32) -> Option<UbusError> {
7 => Some(Timeout),
8 => Some(NotSupported),
9 => Some(UnknownError),
10 => Some(ConnectionFailed),
10 | -1 => Some(ConnectionFailed),
11 => Some(NoMemory),
12 => Some(ParseError),
13 => Some(SystemError),
@ -137,15 +208,15 @@ impl Ubus {
Ok(output)
}
pub async fn list(self, path: Option<&str>) -> Result<Vec<String>> {
pub async fn list(&self, path: Option<&str>) -> Result<Vec<String>> {
let output = match path {
Some(path) => self.exec_cmd(&format!("ubus list {}", path)).await?,
None => self.exec_cmd("ubus list").await?,
Some(path) => self.exec_cmd(&format!("ubus -S list {}", path)).await?,
None => self.exec_cmd("ubus -S list").await?,
};
Ok(output.lines().map(ToOwned::to_owned).collect::<Vec<_>>())
}
pub async fn list_verbose(self, path: Option<&str>) -> Result<Vec<UbusObject>> {
pub async fn list_verbose(&self, path: Option<&str>) -> Result<Vec<UbusObject>> {
let output = match path {
Some(path) => self.exec_cmd(&format!("ubus -v list {}", path)).await?,
None => self.exec_cmd("ubus -v list").await?,
@ -157,7 +228,7 @@ impl Ubus {
let mut objects = vec![];
for line in output.lines() {
if let Some((_, name, id)) = regex_captures!(r"^'([\w.-]+)' @([0-9a-zA-Z]+)$", line) {
if let Some((_, name, id)) = regex_captures!(r"^'([\w.-]+)' @([0-9a-zA-Z]{8})$", line) {
if cur_name.is_some() && cur_id.is_some() {
objects.push(UbusObject {
id: cur_id.unwrap(),
@ -190,10 +261,10 @@ impl Ubus {
Ok(objects)
}
pub async fn call(self, path: &str, method: &str, message: Option<&Value>) -> Result<Value> {
pub async fn call(&self, path: &str, method: &str, message: Option<&Value>) -> Result<Value> {
let cmd = match message {
Some(msg) => format!("ubus call {} {} {}", path, method, escape_json(msg)),
None => format!("ubus call {} {}", path, method),
Some(msg) => format!("ubus -S call {} {} {}", path, method, escape_json(msg)),
None => format!("ubus -S call {} {}", path, method),
};
let output = self.exec_cmd(&cmd).await?;
@ -201,10 +272,10 @@ impl Ubus {
Ok(value)
}
pub async fn send(self, event_type: &str, message: Option<&Value>) -> Result<()> {
pub async fn send(&self, event_type: &str, message: Option<&Value>) -> Result<()> {
let cmd = match message {
Some(msg) => format!("ubus send {} {}", event_type, escape_json(msg)),
None => format!("ubus send {}", event_type),
Some(msg) => format!("ubus -S send {} {}", event_type, escape_json(msg)),
None => format!("ubus -S send {}", event_type),
};
self.exec_cmd(&cmd).await?;
@ -212,11 +283,11 @@ impl Ubus {
Ok(())
}
pub async fn wait_for(self, objects: &[&str]) -> Result<()> {
pub async fn wait_for(&self, objects: &[&str]) -> Result<()> {
if objects.len() < 1 {
bail!("At least 1 object is required")
}
let cmd = format!("ubus wait_for {}", objects.join(" "));
let cmd = format!("ubus -S wait_for {}", objects.join(" "));
let mut channel = self.session.channel_session().await?;
channel.exec(&cmd).await?;
channel.close().await?;
@ -224,7 +295,7 @@ impl Ubus {
Ok(())
}
fn parse_event(bytes: &[u8]) -> Result<UbusEvent> {
fn parse_listen_event(bytes: &[u8]) -> Result<ListenEvent> {
let event_value: Value = serde_json::from_slice(bytes)?;
let event_map = event_value.as_object()
.ok_or(anyhow!("Expected event to be an object"))?;
@ -236,54 +307,84 @@ impl Ubus {
let path = event_map.keys().next().unwrap().clone();
let value = event_map.get(&path).unwrap().clone();
Ok(UbusEvent { path, value })
Ok(ListenEvent { path, value })
}
pub async fn listen(self, paths: &[&str], sender: Sender<UbusEvent>) -> Result<()> {
let cmd = format!("ubus listen {}", paths.join(" "));
pub async fn listen(&self, paths: &[&str], sender: Sender<ListenEvent>) -> Result<()> {
let cmd = format!("ubus -S listen {}", paths.join(" "));
let mut channel = self.session.channel_session().await?;
channel.exec(&cmd).await?;
// TODO: Handle error? 'channel.exit_status()', idk if neededdi
// TODO: Handle error? 'channel.exit_status()', idk if needed
let mut line_buffer = vec![0u8; 1024];
let mut buffer_size = 0usize;
let mut line_reader = AsyncLineReader::new(channel.stream(0));
loop {
let n = channel.read(&mut line_buffer[buffer_size..]).await?;
let delim_pos = line_buffer[buffer_size..(buffer_size+n)]
.iter()
.position(|c| *c == b'\n')
.map(|pos| pos + buffer_size);
buffer_size += n;
if let Some(pos) = delim_pos {
let event = Ubus::parse_event(&line_buffer[0..pos])?;
sender.send(event).await?;
line_buffer.copy_within((pos+1)..buffer_size, 0);
buffer_size -= pos;
buffer_size -= 1;
}
// Double line buffer size if at capacity
if buffer_size == line_buffer.len() {
for _ in 0..=line_buffer.len() {
line_buffer.push(0u8);
}
}
let line = line_reader.read_line().await?;
let event = Ubus::parse_listen_event(&line)?;
sender.send(event).await?;
}
}
pub async fn subscribe(self, paths: &[&str]) -> Result<()> {
pub async fn subscribe(&self, paths: &[&str]) -> Result<()> {
if paths.len() < 1 {
bail!("At least 1 object is required")
}
let cmd = format!("ubus subscribe {}", paths.join(" "));
let cmd = format!("ubus -S subscribe {}", paths.join(" "));
let mut channel = self.session.channel_session().await?;
channel.exec(&cmd).await?;
// TODO: Haven't figured out how to test subscribe event using default objects on ubus.
todo!();
}
fn parse_monitor_event(bytes: &[u8]) -> Result<MonitorEvent> {
let line = bytes.iter()
.map(|c| char::from_u32(*c as u32).unwrap())
.collect::<String>();
let (_,
direction_str,
client_str,
peer_str,
kind_str,
data_str
) = regex_captures!(r"^([<\->]{2}) ([0-9a-zA-Z]{8}) #([0-9a-zA-Z]{8})\s+([a-z_]+): (\{.*\})$", &line)
.ok_or(anyhow!("Unknown pattern of monitor message '{}'", line))?;
let direction = match direction_str {
"->" => MonitorDir::Tx,
"<-" => MonitorDir::Rx,
_ => bail!("Unknown monitor message direction '{}'", direction_str)
};
let client = parse_hex_id(client_str)?;
let peer = parse_hex_id(peer_str)?;
let kind = MonitorEventType::from_str(kind_str)?;
let data = serde_json::from_str(data_str)?;
Ok(MonitorEvent { direction, client, peer, kind, data })
}
pub async fn monitor(&self, dir: Option<MonitorDir>, filter: &[MonitorEventType], sender: Sender<MonitorEvent>) -> Result<()> {
let mut cmd = vec!["ubus -S".into()];
if let Some(dir) = dir {
if dir == MonitorDir::Rx {
cmd.push("-M r".into());
} else if dir == MonitorDir::Tx {
cmd.push("-M t".into());
}
}
cmd.extend(filter.iter().map(|e| format!("-m {}", e.to_string())));
cmd.push("monitor".into());
let mut channel = self.session.channel_session().await?;
println!("{}", cmd.join(" "));
channel.exec(&cmd.join(" ")).await?;
// TODO: Handle error? 'channel.exit_status()', idk if needed
let mut line_reader = AsyncLineReader::new(channel.stream(0));
loop {
let line = line_reader.read_line().await?;
let event = Ubus::parse_monitor_event(&line)?;
sender.send(event).await?;
}
}
}