add 'listen' command

This commit is contained in:
Rokas Puzonas 2023-03-26 15:32:47 +03:00
parent a652a7ac07
commit e022477ef2
2 changed files with 77 additions and 14 deletions

View File

@ -1,13 +1,12 @@
use std::net::ToSocketAddrs;
#[cfg_attr(not(debug_assertions), windows_subsystem = "windows")] // hide console window on Windows in release
use std::{net::ToSocketAddrs, thread};
use anyhow::Result;
use smol::Async;
use async_ssh2_lite::{AsyncSession, AsyncSessionStream};
use smol::{channel, block_on};
use async_ssh2_lite::{AsyncSession};
use app::App;
use crate::ubus::Ubus;
use crate::ubus::{Ubus, UbusEvent};
mod app;
mod ubus;
@ -27,14 +26,22 @@ async fn main() -> Result<()> {
session.userauth_password(username, password).await?;
let ubus = Ubus::new(session);
// dbg!(ubus.call(
// "container",
// "get_features",
// None
// ).await?
// );
let (tx, rx) = channel::unbounded::<UbusEvent>();
let listener = {
let tx = tx.clone();
tokio::spawn(async move {
println!("before listen");
if let Err(err) = ubus.listen(&[], tx).await {
dbg!(err);
};
println!("after listen");
})
};
dbg!(ubus.wait_for(&vec!["network"]).await?);
loop {
let e = rx.recv().await?;
dbg!(e);
}
/*
let mut native_options = eframe::NativeOptions::default();

View File

@ -2,11 +2,11 @@ use std::{io::Read, borrow::Cow, net::TcpStream};
use async_ssh2_lite::AsyncSession;
use lazy_regex::regex_captures;
use serde_json::Value;
use serde_json::{Value, json};
use shell_escape::unix::escape;
use hex::FromHex;
use anyhow::{Result, bail, anyhow};
use smol::{Async, io::AsyncReadExt};
use smol::{Async, io::AsyncReadExt, channel::Sender};
use thiserror::Error;
pub struct Ubus {
@ -61,6 +61,12 @@ pub struct UbusObject {
pub methods: Vec<(String, Vec<(String, UbusParamType)>)>
}
#[derive(Debug)]
pub struct UbusEvent {
pub path: String,
pub value: Value
}
fn parse_parameter_type(param: &str) -> Option<UbusParamType> {
use UbusParamType::*;
match param {
@ -217,4 +223,54 @@ impl Ubus {
Ok(())
}
fn parse_event(bytes: &[u8]) -> Result<UbusEvent> {
let event_value: Value = serde_json::from_slice(bytes)?;
let event_map = event_value.as_object()
.ok_or(anyhow!("Expected event to be an object"))?;
if event_map.keys().len() != 1 {
bail!("Expected event object to only contain one key");
}
let path = event_map.keys().next().unwrap().clone();
let value = event_map.get(&path).unwrap().clone();
Ok(UbusEvent { path, value })
}
pub async fn listen(self, paths: &[&str], sender: Sender<UbusEvent>) -> Result<()> {
let cmd = format!("ubus listen {}", paths.join(" "));
let mut channel = self.session.channel_session().await?;
channel.exec(&cmd).await?;
let mut line_buffer = vec![0u8; 1024];
let mut buffer_size = 0usize;
loop {
let n = channel.read(&mut line_buffer[buffer_size..]).await?;
let delim_pos = line_buffer[buffer_size..(buffer_size+n)]
.iter()
.position(|c| *c == b'\n')
.map(|pos| pos + buffer_size);
buffer_size += n;
if let Some(pos) = delim_pos {
let event = Ubus::parse_event(&line_buffer[0..pos])?;
sender.send(event).await?;
line_buffer.copy_within((pos+1)..buffer_size, 0);
buffer_size -= pos;
buffer_size -= 1;
}
// Double line buffer size if at capacity
if buffer_size == line_buffer.len() {
for _ in 0..=line_buffer.len() {
line_buffer.push(0u8);
}
}
}
}
}