diff --git a/Cargo.toml b/Cargo.toml index 3ff5b7b..8d40c1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,8 @@ lazy_static = "1.4" clap = { version = "4.1", features = ["derive"] } thiserror = "1.0" enum-display-derive = "0.1" +queues = "1.1.0" +duration-str = { version = "0.5.1", features = ["serde"] } [target.'cfg(target_os = "linux")'.dependencies] alsa = "0.7.0" diff --git a/src/config/device.rs b/src/config/device.rs index eac0452..4ed4c3a 100644 --- a/src/config/device.rs +++ b/src/config/device.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::event::Event; use crate::util; use crate::Error; @@ -19,6 +21,8 @@ pub struct DeviceConfig { pub connect: Option>, pub disconnect: Option>, pub events: Option>, + pub queue_length: usize, + pub interval: Duration, } impl DeviceConfig { @@ -66,6 +70,8 @@ impl TryFrom for DeviceConfig { connect: util::map_opt_tryfrom(v.connect)?, disconnect: util::map_opt_tryfrom(v.disconnect)?, events: util::map_opt_tryfrom(v.events)?, + queue_length: v.queue_length.unwrap_or(256), + interval: v.interval.map(|x| x.unwrap()).unwrap_or_else(|| Duration::new(0, 0)), }) } } diff --git a/src/config/serializer/device.rs b/src/config/serializer/device.rs index e8812ae..c4de8c4 100644 --- a/src/config/serializer/device.rs +++ b/src/config/serializer/device.rs @@ -1,7 +1,26 @@ +use std::time::Duration; + use super::{RunConfigSerializer,EventConfigSerializer}; +use duration_str::deserialize_duration; use serde::Deserialize; +#[derive(Debug,Clone,Deserialize)] +#[serde(untagged)] +pub enum DurationWrapper { + #[serde(deserialize_with = "deserialize_duration")] + Some(Duration), +} + +impl DurationWrapper { + pub fn unwrap(self) -> Duration + { + match self { + DurationWrapper::Some(v) => v, + } + } +} + #[derive(Deserialize,Debug,Clone)] #[serde(deny_unknown_fields)] pub struct DeviceConfigSerializer { @@ -12,4 +31,6 @@ pub struct DeviceConfigSerializer { pub disconnect: Option>, pub events: Option>, pub max_connections: Option, + pub queue_length: Option, + pub interval: Option, } diff --git a/src/event.rs b/src/event.rs index 712d86d..c51229b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -87,6 +87,16 @@ pub struct Event<'a> { pub timestamp: Option, } +#[derive(Debug,Clone,Default)] +pub struct EventBuf { + pub r#type: EventType, + pub channel: u8, + pub id: u8, + pub value: u16, + pub raw: Vec, + pub timestamp: Option, +} + pub struct EventEnv { pub channel: String, pub id: String, @@ -106,6 +116,32 @@ struct EventEnvRef<'a> { pub value: &'a str, } +impl EventBuf { + pub fn as_event(&self) -> Event { + Event { + r#type: self.r#type, + channel: self.channel, + id: self.id, + value: self.value, + raw: &self.raw[..], + timestamp: self.timestamp, + } + } +} + +impl Into for Event<'_> { + fn into(self) -> EventBuf { + EventBuf { + r#type: self.r#type, + channel: self.channel, + id: self.id, + value: self.value, + raw: Vec::from(self.raw), + timestamp: self.timestamp, + } + } +} + impl From for EventType { fn from(v: u8) -> Self { if ! (0b1000..=0b1111).contains(&v) { diff --git a/src/midi/mod.rs b/src/midi/mod.rs index 6f906b4..e3338ef 100644 --- a/src/midi/mod.rs +++ b/src/midi/mod.rs @@ -1,5 +1,7 @@ pub mod alsa; +use queues::{CircularBuffer, IsQueue}; + use crate::config::device::Identifier; use crate::midi::alsa::MidiInputAlsa; use crate::Error; @@ -8,10 +10,11 @@ extern crate libc; use crate::config::DeviceConfig; use crate::eventmap::EventMap; -use crate::event::Event; +use crate::event::{Event, EventBuf}; -use std::time::SystemTime; -use std::sync::mpsc; +use std::thread; +use std::time::{SystemTime, Instant}; +use std::sync::{mpsc, Mutex, Arc}; #[derive(Eq,PartialEq,Debug,Clone)] pub struct MidiPort{ @@ -161,9 +164,9 @@ impl MidiHandler { r } - pub fn run(&mut self, eventmap: &EventMap, (rs,ts): (mpsc::Receiver, mpsc::Sender)) -> Result<(), Error> { + pub fn run(&mut self, conf: &DeviceConfig, eventmap: &EventMap, (rs,ts): (mpsc::Receiver, mpsc::Sender)) -> Result<(), Error> { handler_fcall!{ - self, handle_inputport ,(eventmap,(rs,ts)), + self, handle_inputport, (conf, eventmap,(rs,ts)), ALSA } } @@ -189,19 +192,57 @@ where T: MidiInput input.ports_handle() } -fn handle_inputport(input: &mut T, (eventmap, (rs, ts)): (&EventMap, (mpsc::Receiver, mpsc::Sender))) -> Result<(), Error> -where T: MidiInputHandler +fn handle_inputport(input: &mut T, (conf, eventmap, (rs, ts)): (&DeviceConfig, &EventMap, (mpsc::Receiver, mpsc::Sender))) -> Result<(), Error> +where T: MidiInputHandler + Send { - input.handle_input(|t,m,_| { - let mut event = Event::from(m); - event.timestamp = t; - match eventmap.run_event(&event) { - Ok(_) => (), - Err(e) => { - eprintln!("ERROR: error on run: {}", e) - }, - } - }, (rs,ts), ())?; + thread::scope(|s| -> Result<(), Error> { + + // parking signal for runner, true = stop + let (pts,prs) = mpsc::channel::(); + + let evq = Arc::new(Mutex::new(CircularBuffer::::new(conf.queue_length))); + + // background execution loop + let rq = evq.clone(); + let exec_thread = s.spawn(move || -> Result<(),Error> { + loop { + if prs.recv()? { + break; + } + loop { + // nest the lock into a scope to release it before run + let (ev,start): (EventBuf,Instant) = { + let mut evq = rq.lock().unwrap(); + if evq.size() > 0 { + (evq.remove().unwrap(), Instant::now()) + } else { + break; + } + }; + eventmap.run_event(&ev.as_event()).unwrap_or_else(|e| eprintln!("ERROR: error on run: {}", e) ); + let elapsed_time = start.elapsed(); + if elapsed_time < conf.interval { + thread::sleep(conf.interval - elapsed_time); + } + } + } + Ok(()) + }); + + input.handle_input(|t,m,(evq,pts)| { + let mut event: EventBuf = Event::from(m).into(); + event.timestamp = t; + let mut evq = evq.lock().unwrap(); + evq.add(event).unwrap(); + pts.send(false).expect("unexpected write error"); + }, (rs,ts), (evq,pts.clone()))?; + + pts.send(true).expect("unexpected write error"); + let _ = exec_thread.join(); + + Ok(()) + + })?; Ok(()) } diff --git a/src/run.rs b/src/run.rs index cd04ff6..31a4481 100644 --- a/src/run.rs +++ b/src/run.rs @@ -74,7 +74,7 @@ fn try_connect_process<'a>( let mm = m.as_ref().map(Arc::clone); let t = s.spawn( move || { dev.run_connect().unwrap(); - c.run(eventmap, (srs,nsts)).unwrap(); + c.run(dev, eventmap, (srs,nsts)).unwrap(); if let Some(m) = mm { let mut m = m.lock().unwrap(); m.0 -= 1; diff --git a/test/vmpk.yml b/test/vmpk.yml index a36852e..d5057d1 100644 --- a/test/vmpk.yml +++ b/test/vmpk.yml @@ -1,6 +1,8 @@ devices: - name: 'VMPK' max_connections: 1 + queue_length: 3 + interval: 100ms connect: - args: [ "sh", "-c", "echo Hello world!" ] disconnect: