feat: add queue and interval

This commit is contained in:
zawz 2023-08-18 17:14:41 +02:00 committed by mateoferon
parent 73e824ee5c
commit 45f5f447c0
7 changed files with 126 additions and 18 deletions

View file

@ -15,6 +15,8 @@ lazy_static = "1.4"
clap = { version = "4.1", features = ["derive"] }
thiserror = "1.0"
enum-display-derive = "0.1"
queues = "1.1.0"
duration-str = { version = "0.5.1", features = ["serde"] }
[target.'cfg(target_os = "linux")'.dependencies]
alsa = "0.7.0"

View file

@ -1,3 +1,5 @@
use std::time::Duration;
use crate::event::Event;
use crate::util;
use crate::Error;
@ -19,6 +21,8 @@ pub struct DeviceConfig {
pub connect: Option<Vec<RunConfig>>,
pub disconnect: Option<Vec<RunConfig>>,
pub events: Option<Vec<EventConfig>>,
pub queue_length: usize,
pub interval: Duration,
}
impl DeviceConfig {
@ -66,6 +70,8 @@ impl TryFrom<DeviceConfigSerializer> for DeviceConfig {
connect: util::map_opt_tryfrom(v.connect)?,
disconnect: util::map_opt_tryfrom(v.disconnect)?,
events: util::map_opt_tryfrom(v.events)?,
queue_length: v.queue_length.unwrap_or(256),
interval: v.interval.map(|x| x.unwrap()).unwrap_or_else(|| Duration::new(0, 0)),
})
}
}

View file

@ -1,7 +1,26 @@
use std::time::Duration;
use super::{RunConfigSerializer,EventConfigSerializer};
use duration_str::deserialize_duration;
use serde::Deserialize;
#[derive(Debug,Clone,Deserialize)]
#[serde(untagged)]
pub enum DurationWrapper {
#[serde(deserialize_with = "deserialize_duration")]
Some(Duration),
}
impl DurationWrapper {
pub fn unwrap(self) -> Duration
{
match self {
DurationWrapper::Some(v) => v,
}
}
}
#[derive(Deserialize,Debug,Clone)]
#[serde(deny_unknown_fields)]
pub struct DeviceConfigSerializer {
@ -12,4 +31,6 @@ pub struct DeviceConfigSerializer {
pub disconnect: Option<Vec<RunConfigSerializer>>,
pub events: Option<Vec<EventConfigSerializer>>,
pub max_connections: Option<u32>,
pub queue_length: Option<usize>,
pub interval: Option<DurationWrapper>,
}

View file

@ -87,6 +87,16 @@ pub struct Event<'a> {
pub timestamp: Option<SystemTime>,
}
#[derive(Debug,Clone,Default)]
pub struct EventBuf {
pub r#type: EventType,
pub channel: u8,
pub id: u8,
pub value: u16,
pub raw: Vec<u8>,
pub timestamp: Option<SystemTime>,
}
pub struct EventEnv {
pub channel: String,
pub id: String,
@ -106,6 +116,32 @@ struct EventEnvRef<'a> {
pub value: &'a str,
}
impl EventBuf {
pub fn as_event(&self) -> Event {
Event {
r#type: self.r#type,
channel: self.channel,
id: self.id,
value: self.value,
raw: &self.raw[..],
timestamp: self.timestamp,
}
}
}
impl Into<EventBuf> for Event<'_> {
fn into(self) -> EventBuf {
EventBuf {
r#type: self.r#type,
channel: self.channel,
id: self.id,
value: self.value,
raw: Vec::from(self.raw),
timestamp: self.timestamp,
}
}
}
impl From<u8> for EventType {
fn from(v: u8) -> Self {
if ! (0b1000..=0b1111).contains(&v) {

View file

@ -1,5 +1,7 @@
pub mod alsa;
use queues::{CircularBuffer, IsQueue};
use crate::config::device::Identifier;
use crate::midi::alsa::MidiInputAlsa;
use crate::Error;
@ -8,10 +10,11 @@ extern crate libc;
use crate::config::DeviceConfig;
use crate::eventmap::EventMap;
use crate::event::Event;
use crate::event::{Event, EventBuf};
use std::time::SystemTime;
use std::sync::mpsc;
use std::thread;
use std::time::{SystemTime, Instant};
use std::sync::{mpsc, Mutex, Arc};
#[derive(Eq,PartialEq,Debug,Clone)]
pub struct MidiPort<T>{
@ -161,9 +164,9 @@ impl MidiHandler {
r
}
pub fn run(&mut self, eventmap: &EventMap, (rs,ts): (mpsc::Receiver<bool>, mpsc::Sender<bool>)) -> Result<(), Error> {
pub fn run(&mut self, conf: &DeviceConfig, eventmap: &EventMap, (rs,ts): (mpsc::Receiver<bool>, mpsc::Sender<bool>)) -> Result<(), Error> {
handler_fcall!{
self, handle_inputport ,(eventmap,(rs,ts)),
self, handle_inputport, (conf, eventmap,(rs,ts)),
ALSA
}
}
@ -189,19 +192,57 @@ where T: MidiInput<A>
input.ports_handle()
}
fn handle_inputport<T>(input: &mut T, (eventmap, (rs, ts)): (&EventMap, (mpsc::Receiver<bool>, mpsc::Sender<bool>))) -> Result<(), Error>
where T: MidiInputHandler
fn handle_inputport<T>(input: &mut T, (conf, eventmap, (rs, ts)): (&DeviceConfig, &EventMap, (mpsc::Receiver<bool>, mpsc::Sender<bool>))) -> Result<(), Error>
where T: MidiInputHandler + Send
{
input.handle_input(|t,m,_| {
let mut event = Event::from(m);
event.timestamp = t;
match eventmap.run_event(&event) {
Ok(_) => (),
Err(e) => {
eprintln!("ERROR: error on run: {}", e)
},
}
}, (rs,ts), ())?;
thread::scope(|s| -> Result<(), Error> {
// parking signal for runner, true = stop
let (pts,prs) = mpsc::channel::<bool>();
let evq = Arc::new(Mutex::new(CircularBuffer::<EventBuf>::new(conf.queue_length)));
// background execution loop
let rq = evq.clone();
let exec_thread = s.spawn(move || -> Result<(),Error> {
loop {
if prs.recv()? {
break;
}
loop {
// nest the lock into a scope to release it before run
let (ev,start): (EventBuf,Instant) = {
let mut evq = rq.lock().unwrap();
if evq.size() > 0 {
(evq.remove().unwrap(), Instant::now())
} else {
break;
}
};
eventmap.run_event(&ev.as_event()).unwrap_or_else(|e| eprintln!("ERROR: error on run: {}", e) );
let elapsed_time = start.elapsed();
if elapsed_time < conf.interval {
thread::sleep(conf.interval - elapsed_time);
}
}
}
Ok(())
});
input.handle_input(|t,m,(evq,pts)| {
let mut event: EventBuf = Event::from(m).into();
event.timestamp = t;
let mut evq = evq.lock().unwrap();
evq.add(event).unwrap();
pts.send(false).expect("unexpected write error");
}, (rs,ts), (evq,pts.clone()))?;
pts.send(true).expect("unexpected write error");
let _ = exec_thread.join();
Ok(())
})?;
Ok(())
}

View file

@ -74,7 +74,7 @@ fn try_connect_process<'a>(
let mm = m.as_ref().map(Arc::clone);
let t = s.spawn( move || {
dev.run_connect().unwrap();
c.run(eventmap, (srs,nsts)).unwrap();
c.run(dev, eventmap, (srs,nsts)).unwrap();
if let Some(m) = mm {
let mut m = m.lock().unwrap();
m.0 -= 1;

View file

@ -1,6 +1,8 @@
devices:
- name: 'VMPK'
max_connections: 1
queue_length: 3
interval: 100ms
connect:
- args: [ "sh", "-c", "echo Hello world!" ]
disconnect: