diff --git a/src/async_line_reader.rs b/src/async_line_reader.rs new file mode 100644 index 0000000..f662134 --- /dev/null +++ b/src/async_line_reader.rs @@ -0,0 +1,65 @@ +use std::net::TcpStream; +use async_ssh2_lite::AsyncStream; +use smol::{Async, io::AsyncReadExt}; +use anyhow::Result; + +const INITIAL_CAPACITY: usize = 1024; + +pub struct AsyncLineReader { + stream: AsyncStream>, + line_buffer: Vec, + buffer_size: usize +} + +impl AsyncLineReader { + pub fn new(stream: AsyncStream>) -> Self { + Self { + stream, + line_buffer: vec![0u8; INITIAL_CAPACITY], + buffer_size: 0 + } + } + + pub async fn read_line(&mut self) -> Result> { + let delim_pos = self.line_buffer[0..self.buffer_size] + .iter() + .position(|c| *c == b'\n'); + + if let Some(pos) = delim_pos { + let line = self.line_buffer[0..pos].to_vec(); + + self.line_buffer.copy_within((pos+1)..self.buffer_size, 0); + self.buffer_size -= pos; + self.buffer_size -= 1; + + return Ok(line); + } + + loop { + // Double line buffer size if at capacity + if self.buffer_size == self.line_buffer.len() { + for _ in 0..=self.line_buffer.len() { + self.line_buffer.push(0u8); + } + } + + let n = self.stream.read(&mut self.line_buffer[self.buffer_size..]).await?; + + let delim_pos = self.line_buffer[self.buffer_size..(self.buffer_size+n)] + .iter() + .position(|c| *c == b'\n') + .map(|pos| pos + self.buffer_size); + + self.buffer_size += n; + if let Some(pos) = delim_pos { + let line = self.line_buffer[0..pos].to_vec(); + + self.line_buffer.copy_within((pos+1)..self.buffer_size, 0); + self.buffer_size -= pos; + self.buffer_size -= 1; + + return Ok(line); + } + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 870c167..f2916a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,11 +5,13 @@ use anyhow::Result; use smol::{channel, block_on}; use async_ssh2_lite::{AsyncSession}; use app::App; +use ubus::MonitorEvent; -use crate::ubus::{Ubus, UbusEvent}; +use crate::ubus::{Ubus, ListenEvent, MonitorDir, MonitorEventType}; mod app; mod ubus; +mod async_line_reader; #[tokio::main] async fn main() -> Result<()> { @@ -26,12 +28,12 @@ async fn main() -> Result<()> { session.userauth_password(username, password).await?; let ubus = Ubus::new(session); - let (tx, rx) = channel::unbounded::(); + let (tx, rx) = channel::unbounded::(); let listener = { let tx = tx.clone(); tokio::spawn(async move { println!("before listen"); - if let Err(err) = ubus.listen(&[], tx).await { + if let Err(err) = ubus.monitor(None, &[], tx).await { dbg!(err); }; println!("after listen"); diff --git a/src/ubus.rs b/src/ubus.rs index 1e74524..5013ff5 100644 --- a/src/ubus.rs +++ b/src/ubus.rs @@ -1,4 +1,4 @@ -use std::{io::Read, borrow::Cow, net::TcpStream}; +use std::{io::Read, borrow::Cow, net::TcpStream, str::FromStr, any}; use async_ssh2_lite::AsyncSession; use lazy_regex::regex_captures; @@ -9,6 +9,8 @@ use anyhow::{Result, bail, anyhow}; use smol::{Async, io::AsyncReadExt, channel::Sender}; use thiserror::Error; +use crate::async_line_reader::AsyncLineReader; + pub struct Ubus { session: AsyncSession> } @@ -62,11 +64,80 @@ pub struct UbusObject { } #[derive(Debug)] -pub struct UbusEvent { +pub struct ListenEvent { pub path: String, pub value: Value } +#[derive(Debug, Clone, Copy)] +pub enum MonitorEventType { + Hello, + Status, + Data, + Ping, + Lookup, + Invoke, + AddObject, + RemoveObject, + Subscribe, + Unsubscribe, + Notify, +} + +impl ToString for MonitorEventType { + fn to_string(&self) -> String { + match self { + MonitorEventType::Hello => "hello", + MonitorEventType::Status => "status", + MonitorEventType::Data => "data", + MonitorEventType::Ping => "ping", + MonitorEventType::Lookup => "lookup", + MonitorEventType::Invoke => "invoke", + MonitorEventType::AddObject => "add_object", + MonitorEventType::RemoveObject => "remove_object", + MonitorEventType::Subscribe => "subscribe", + MonitorEventType::Unsubscribe => "unsubscribe", + MonitorEventType::Notify => "notify", + }.into() + } +} + +impl FromStr for MonitorEventType { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + match s { + "hello" => Ok(MonitorEventType::Hello), + "status" => Ok(MonitorEventType::Status), + "data" => Ok(MonitorEventType::Data), + "ping" => Ok(MonitorEventType::Ping), + "lookup" => Ok(MonitorEventType::Lookup), + "invoke" => Ok(MonitorEventType::Invoke), + "add_object" => Ok(MonitorEventType::AddObject), + "remove_object" => Ok(MonitorEventType::RemoveObject), + "subscribe" => Ok(MonitorEventType::Subscribe), + "unsubscribe" => Ok(MonitorEventType::Unsubscribe), + "notify" => Ok(MonitorEventType::Notify), + _ => Err(anyhow!("Unknown event type '{}'", s)) + } + } +} + +#[derive(Debug)] +pub struct MonitorEvent { + direction: MonitorDir, + client: u32, + peer: u32, + kind: MonitorEventType, + data: Value // TODO: Figure out the possible values for every `MonitorEventType`, to make this more safe. +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum MonitorDir { + Rx, + Tx +} + fn parse_parameter_type(param: &str) -> Option { use UbusParamType::*; match param { @@ -95,7 +166,7 @@ fn parse_error_code(code: i32) -> Option { 7 => Some(Timeout), 8 => Some(NotSupported), 9 => Some(UnknownError), - 10 => Some(ConnectionFailed), + 10 | -1 => Some(ConnectionFailed), 11 => Some(NoMemory), 12 => Some(ParseError), 13 => Some(SystemError), @@ -137,15 +208,15 @@ impl Ubus { Ok(output) } - pub async fn list(self, path: Option<&str>) -> Result> { + pub async fn list(&self, path: Option<&str>) -> Result> { let output = match path { - Some(path) => self.exec_cmd(&format!("ubus list {}", path)).await?, - None => self.exec_cmd("ubus list").await?, + Some(path) => self.exec_cmd(&format!("ubus -S list {}", path)).await?, + None => self.exec_cmd("ubus -S list").await?, }; Ok(output.lines().map(ToOwned::to_owned).collect::>()) } - pub async fn list_verbose(self, path: Option<&str>) -> Result> { + pub async fn list_verbose(&self, path: Option<&str>) -> Result> { let output = match path { Some(path) => self.exec_cmd(&format!("ubus -v list {}", path)).await?, None => self.exec_cmd("ubus -v list").await?, @@ -157,7 +228,7 @@ impl Ubus { let mut objects = vec![]; for line in output.lines() { - if let Some((_, name, id)) = regex_captures!(r"^'([\w.-]+)' @([0-9a-zA-Z]+)$", line) { + if let Some((_, name, id)) = regex_captures!(r"^'([\w.-]+)' @([0-9a-zA-Z]{8})$", line) { if cur_name.is_some() && cur_id.is_some() { objects.push(UbusObject { id: cur_id.unwrap(), @@ -190,10 +261,10 @@ impl Ubus { Ok(objects) } - pub async fn call(self, path: &str, method: &str, message: Option<&Value>) -> Result { + pub async fn call(&self, path: &str, method: &str, message: Option<&Value>) -> Result { let cmd = match message { - Some(msg) => format!("ubus call {} {} {}", path, method, escape_json(msg)), - None => format!("ubus call {} {}", path, method), + Some(msg) => format!("ubus -S call {} {} {}", path, method, escape_json(msg)), + None => format!("ubus -S call {} {}", path, method), }; let output = self.exec_cmd(&cmd).await?; @@ -201,10 +272,10 @@ impl Ubus { Ok(value) } - pub async fn send(self, event_type: &str, message: Option<&Value>) -> Result<()> { + pub async fn send(&self, event_type: &str, message: Option<&Value>) -> Result<()> { let cmd = match message { - Some(msg) => format!("ubus send {} {}", event_type, escape_json(msg)), - None => format!("ubus send {}", event_type), + Some(msg) => format!("ubus -S send {} {}", event_type, escape_json(msg)), + None => format!("ubus -S send {}", event_type), }; self.exec_cmd(&cmd).await?; @@ -212,11 +283,11 @@ impl Ubus { Ok(()) } - pub async fn wait_for(self, objects: &[&str]) -> Result<()> { + pub async fn wait_for(&self, objects: &[&str]) -> Result<()> { if objects.len() < 1 { bail!("At least 1 object is required") } - let cmd = format!("ubus wait_for {}", objects.join(" ")); + let cmd = format!("ubus -S wait_for {}", objects.join(" ")); let mut channel = self.session.channel_session().await?; channel.exec(&cmd).await?; channel.close().await?; @@ -224,7 +295,7 @@ impl Ubus { Ok(()) } - fn parse_event(bytes: &[u8]) -> Result { + fn parse_listen_event(bytes: &[u8]) -> Result { let event_value: Value = serde_json::from_slice(bytes)?; let event_map = event_value.as_object() .ok_or(anyhow!("Expected event to be an object"))?; @@ -236,54 +307,84 @@ impl Ubus { let path = event_map.keys().next().unwrap().clone(); let value = event_map.get(&path).unwrap().clone(); - Ok(UbusEvent { path, value }) + Ok(ListenEvent { path, value }) } - pub async fn listen(self, paths: &[&str], sender: Sender) -> Result<()> { - let cmd = format!("ubus listen {}", paths.join(" ")); + pub async fn listen(&self, paths: &[&str], sender: Sender) -> Result<()> { + let cmd = format!("ubus -S listen {}", paths.join(" ")); let mut channel = self.session.channel_session().await?; channel.exec(&cmd).await?; - // TODO: Handle error? 'channel.exit_status()', idk if neededdi + // TODO: Handle error? 'channel.exit_status()', idk if needed - let mut line_buffer = vec![0u8; 1024]; - let mut buffer_size = 0usize; + let mut line_reader = AsyncLineReader::new(channel.stream(0)); loop { - let n = channel.read(&mut line_buffer[buffer_size..]).await?; - - let delim_pos = line_buffer[buffer_size..(buffer_size+n)] - .iter() - .position(|c| *c == b'\n') - .map(|pos| pos + buffer_size); - - buffer_size += n; - if let Some(pos) = delim_pos { - let event = Ubus::parse_event(&line_buffer[0..pos])?; - sender.send(event).await?; - - line_buffer.copy_within((pos+1)..buffer_size, 0); - buffer_size -= pos; - buffer_size -= 1; - } - - - // Double line buffer size if at capacity - if buffer_size == line_buffer.len() { - for _ in 0..=line_buffer.len() { - line_buffer.push(0u8); - } - } + let line = line_reader.read_line().await?; + let event = Ubus::parse_listen_event(&line)?; + sender.send(event).await?; } } - pub async fn subscribe(self, paths: &[&str]) -> Result<()> { + pub async fn subscribe(&self, paths: &[&str]) -> Result<()> { if paths.len() < 1 { bail!("At least 1 object is required") } - let cmd = format!("ubus subscribe {}", paths.join(" ")); + let cmd = format!("ubus -S subscribe {}", paths.join(" ")); let mut channel = self.session.channel_session().await?; channel.exec(&cmd).await?; // TODO: Haven't figured out how to test subscribe event using default objects on ubus. todo!(); } + + fn parse_monitor_event(bytes: &[u8]) -> Result { + let line = bytes.iter() + .map(|c| char::from_u32(*c as u32).unwrap()) + .collect::(); + + let (_, + direction_str, + client_str, + peer_str, + kind_str, + data_str + ) = regex_captures!(r"^([<\->]{2}) ([0-9a-zA-Z]{8}) #([0-9a-zA-Z]{8})\s+([a-z_]+): (\{.*\})$", &line) + .ok_or(anyhow!("Unknown pattern of monitor message '{}'", line))?; + + let direction = match direction_str { + "->" => MonitorDir::Tx, + "<-" => MonitorDir::Rx, + _ => bail!("Unknown monitor message direction '{}'", direction_str) + }; + let client = parse_hex_id(client_str)?; + let peer = parse_hex_id(peer_str)?; + let kind = MonitorEventType::from_str(kind_str)?; + let data = serde_json::from_str(data_str)?; + + Ok(MonitorEvent { direction, client, peer, kind, data }) + } + + pub async fn monitor(&self, dir: Option, filter: &[MonitorEventType], sender: Sender) -> Result<()> { + let mut cmd = vec!["ubus -S".into()]; + if let Some(dir) = dir { + if dir == MonitorDir::Rx { + cmd.push("-M r".into()); + } else if dir == MonitorDir::Tx { + cmd.push("-M t".into()); + } + } + cmd.extend(filter.iter().map(|e| format!("-m {}", e.to_string()))); + cmd.push("monitor".into()); + + let mut channel = self.session.channel_session().await?; + println!("{}", cmd.join(" ")); + channel.exec(&cmd.join(" ")).await?; + // TODO: Handle error? 'channel.exit_status()', idk if needed + + let mut line_reader = AsyncLineReader::new(channel.stream(0)); + loop { + let line = line_reader.read_line().await?; + let event = Ubus::parse_monitor_event(&line)?; + sender.send(event).await?; + } + } }