From e022477ef20eaa85ed9680d9bcb893248ca8968f Mon Sep 17 00:00:00 2001 From: Rokas Puzonas Date: Sun, 26 Mar 2023 15:32:47 +0300 Subject: [PATCH] add 'listen' command --- src/main.rs | 31 ++++++++++++++++----------- src/ubus.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index 86e7e2b..870c167 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,12 @@ -use std::net::ToSocketAddrs; - #[cfg_attr(not(debug_assertions), windows_subsystem = "windows")] // hide console window on Windows in release +use std::{net::ToSocketAddrs, thread}; use anyhow::Result; -use smol::Async; -use async_ssh2_lite::{AsyncSession, AsyncSessionStream}; +use smol::{channel, block_on}; +use async_ssh2_lite::{AsyncSession}; use app::App; -use crate::ubus::Ubus; +use crate::ubus::{Ubus, UbusEvent}; mod app; mod ubus; @@ -27,14 +26,22 @@ async fn main() -> Result<()> { session.userauth_password(username, password).await?; let ubus = Ubus::new(session); - // dbg!(ubus.call( - // "container", - // "get_features", - // None - // ).await? - // ); + let (tx, rx) = channel::unbounded::(); + let listener = { + let tx = tx.clone(); + tokio::spawn(async move { + println!("before listen"); + if let Err(err) = ubus.listen(&[], tx).await { + dbg!(err); + }; + println!("after listen"); + }) + }; - dbg!(ubus.wait_for(&vec!["network"]).await?); + loop { + let e = rx.recv().await?; + dbg!(e); + } /* let mut native_options = eframe::NativeOptions::default(); diff --git a/src/ubus.rs b/src/ubus.rs index 19a8e03..b15d6d0 100644 --- a/src/ubus.rs +++ b/src/ubus.rs @@ -2,11 +2,11 @@ use std::{io::Read, borrow::Cow, net::TcpStream}; use async_ssh2_lite::AsyncSession; use lazy_regex::regex_captures; -use serde_json::Value; +use serde_json::{Value, json}; use shell_escape::unix::escape; use hex::FromHex; use anyhow::{Result, bail, anyhow}; -use smol::{Async, io::AsyncReadExt}; +use smol::{Async, io::AsyncReadExt, channel::Sender}; use thiserror::Error; pub struct Ubus { @@ -61,6 +61,12 @@ pub struct UbusObject { pub methods: Vec<(String, Vec<(String, UbusParamType)>)> } +#[derive(Debug)] +pub struct UbusEvent { + pub path: String, + pub value: Value +} + fn parse_parameter_type(param: &str) -> Option { use UbusParamType::*; match param { @@ -217,4 +223,54 @@ impl Ubus { Ok(()) } + + fn parse_event(bytes: &[u8]) -> Result { + let event_value: Value = serde_json::from_slice(bytes)?; + let event_map = event_value.as_object() + .ok_or(anyhow!("Expected event to be an object"))?; + + if event_map.keys().len() != 1 { + bail!("Expected event object to only contain one key"); + } + + let path = event_map.keys().next().unwrap().clone(); + let value = event_map.get(&path).unwrap().clone(); + + Ok(UbusEvent { path, value }) + } + + pub async fn listen(self, paths: &[&str], sender: Sender) -> Result<()> { + let cmd = format!("ubus listen {}", paths.join(" ")); + let mut channel = self.session.channel_session().await?; + channel.exec(&cmd).await?; + + let mut line_buffer = vec![0u8; 1024]; + let mut buffer_size = 0usize; + loop { + let n = channel.read(&mut line_buffer[buffer_size..]).await?; + + let delim_pos = line_buffer[buffer_size..(buffer_size+n)] + .iter() + .position(|c| *c == b'\n') + .map(|pos| pos + buffer_size); + + buffer_size += n; + if let Some(pos) = delim_pos { + let event = Ubus::parse_event(&line_buffer[0..pos])?; + sender.send(event).await?; + + line_buffer.copy_within((pos+1)..buffer_size, 0); + buffer_size -= pos; + buffer_size -= 1; + } + + + // Double line buffer size if at capacity + if buffer_size == line_buffer.len() { + for _ in 0..=line_buffer.len() { + line_buffer.push(0u8); + } + } + } + } }