From 11509109be909b454fcc45885497e3ceeadebd94 Mon Sep 17 00:00:00 2001 From: zawz Date: Thu, 24 Aug 2023 09:29:40 +0200 Subject: [PATCH] feat: add signal handling --- Cargo.toml | 1 + src/main.rs | 8 +++-- src/midi/alsa.rs | 87 ++++++++++++++++++++++++++++++++++-------------- src/midi/mod.rs | 28 ++++++++-------- src/run.rs | 46 +++++++++++++++++++++---- 5 files changed, 123 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d40c1a..4b9b684 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ thiserror = "1.0" enum-display-derive = "0.1" queues = "1.1.0" duration-str = { version = "0.5.1", features = ["serde"] } +signal-hook = "0.3.17" [target.'cfg(target_os = "linux")'.dependencies] alsa = "0.7.0" diff --git a/src/main.rs b/src/main.rs index a32cb20..c49fe3c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,9 +21,11 @@ use cli::Cli; fn main() { let c = Cli::parse(); - match run_file(&c.map_file) { - Ok(_) => (), - Err(err) => println!("Error: {}", err) + loop { + match run_file(&c.map_file) { + Ok(_) => (), + Err(err) => println!("Error: {}", err) + } } } diff --git a/src/midi/alsa.rs b/src/midi/alsa.rs index aec0497..86114da 100644 --- a/src/midi/alsa.rs +++ b/src/midi/alsa.rs @@ -198,8 +198,10 @@ impl MidiInputAlsa { Ok(()) } - fn handle_input_internal(&mut self, callback: F, userdata: D) -> Result<(), Error> - where F: Fn(Option, &[u8], &mut D) + Send { + fn binary_input_handler(&mut self, callback: F, userdata: D) -> Result<(), Error> + where + F: Fn(&Self, Option, &[u8], &mut D) -> Result<(),Error> + Send + { let decoder = MidiEvent::new(0)?; decoder.enable_running_status(false); @@ -212,7 +214,7 @@ impl MidiInputAlsa { _ => SystemTime::now(), }; - self.alsa_input_handler(|_, mut ev, (message, buffer, continue_sysex, userdata)| { + self.alsa_input_handler(|s, mut ev, (message, buffer, continue_sysex, userdata)| { if !*continue_sysex { message.clear() } let do_decode = match ev.get_type() { @@ -241,12 +243,57 @@ impl MidiInputAlsa { if message.is_empty() || *continue_sysex { return Ok(false); } let ts: Option = ev.get_time().map(|v| ts+v); - (callback)(ts, message, userdata); + (callback)(s, ts, message, userdata)?; Ok(false) } , (message, buffer, continue_sysex, userdata))?; Ok(()) } + + fn threaded_alsa_input(&mut self, callback: F, (ts, rs): (mpsc::Sender, mpsc::Receiver), userdata: D) -> Result<(),Error> + where + F: Fn(&Self, alsa::seq::Event, &mut D) -> Result + Send, + D: Send, + { + thread::scope( |sc| -> Result<(), Error> { + let stop_trigger = self.stop_trigger[1]; + let t = sc.spawn(move || -> Result<(), Error> { + let userdata = userdata; + self.alsa_input_handler(callback, userdata)?; + ts.send(false).expect("unexpected send() error"); + Ok(()) + }); + match rs.recv()? { + true => Self::signal_stop_input_internal(stop_trigger)?, + false => () + }; + t.join().expect("unexpected thread error")?; + Ok(()) + }) + } + + fn threaded_handler(&mut self, handler: H, callback: F, (ts, rs): (mpsc::Sender, mpsc::Receiver), userdata: D) -> Result<(),Error> + where + H: Fn(&mut Self, F,D) -> Result<(), Error> + Send, + F: Fn(&Self, T, &RF, &mut D) -> Result + Send, + D: Send, + { + thread::scope( |sc| -> Result<(), Error> { + let stop_trigger = self.stop_trigger[1]; + let t = sc.spawn(move || -> Result<(), Error> { + let userdata = userdata; + handler(self, callback, userdata)?; + ts.send(false).expect("unexpected send() error"); + Ok(()) + }); + match rs.recv()? { + true => Self::signal_stop_input_internal(stop_trigger)?, + false => () + }; + t.join().expect("unexpected thread error")?; + Ok(()) + }) + } } impl MidiInput for MidiInputAlsa { @@ -301,6 +348,7 @@ impl MidiInput for MidiInputAlsa { PortFilter::Name(s) => p.name.contains(s), PortFilter::Regex(s) => s.is_match(&p.name), PortFilter::Addr(MidiAddrHandler::ALSA(s)) => p.addr == *s, + _ => panic!("unexpected error"), } } ); @@ -325,27 +373,26 @@ impl MidiInput for MidiInputAlsa { Ok(()) } - fn device_events(&mut self, ts: mpsc::Sender) -> Result<(), Error> { + fn device_events(&mut self, ts: mpsc::Sender>, (tss, rss): (mpsc::Sender, mpsc::Receiver)) -> Result<(), Error> { let ports = self.ports()?; let port = self.filter_ports(ports, PortFilter::Name("System:Announce".to_string())); self.connect(&port[0].addr, "rmidimap-alsa-announce")?; - self.alsa_input_handler(|s, ev, _|{ + self.threaded_alsa_input(move |s: &Self, ev: alsa::seq::Event, _| -> Result { // handle disconnect event on watched port match ev.get_type() { - // EventType::PortStart | EventType::ClientStart | EventType::PortExit | EventType::ClientExit => { EventType::PortStart => { if let Some(a) = ev.get_data::() { let p = s.ports()?; let pp = s.filter_ports(p, PortFilter::Addr( MidiAddrHandler::ALSA(a) )); if !pp.is_empty() { - ts.send(MidiPortHandler::ALSA(pp[0].clone())).expect("unexpected send() error"); + ts.send(Some(MidiPortHandler::ALSA(pp[0].clone()))).expect("unexpected send() error"); } }; Ok(false) } _ => Ok(false), } - }, ())?; + }, (tss, rss), ())?; self.close_internal(); Ok(()) } @@ -357,26 +404,16 @@ impl MidiInputHandler for MidiInputAlsa Self::signal_stop_input_internal(self.stop_trigger[1]) } - fn handle_input(&mut self, callback: F, (rs, ts): (mpsc::Receiver, mpsc::Sender), userdata: D) -> Result<(), Error> + fn handle_input(&mut self, callback: F, (ts, rs): (mpsc::Sender, mpsc::Receiver), userdata: D) -> Result<(), Error> where - F: Fn(Option, &[u8], &mut D) + Send, + F: Fn(&Self, &[u8], Option, &mut D) + Send + Sync, D: Send, { - thread::scope( |sc| -> Result<(), Error> { - let stop_trigger = self.stop_trigger[1]; - let t = sc.spawn(move || -> Result<(), Error> { - let userdata = userdata; - self.handle_input_internal(callback, userdata)?; - ts.send(false).expect("unexpected send() error"); + self.threaded_handler(Self::binary_input_handler, + |s, t, m, d| -> Result<(),Error> { + callback(s,m,t,d); Ok(()) - }); - match rs.recv()? { - true => Self::signal_stop_input_internal(stop_trigger)?, - false => () - }; - t.join().expect("unexpected thread error")?; - Ok(()) - }) + }, (ts, rs), userdata) } } diff --git a/src/midi/mod.rs b/src/midi/mod.rs index e3338ef..411b8b7 100644 --- a/src/midi/mod.rs +++ b/src/midi/mod.rs @@ -3,7 +3,7 @@ pub mod alsa; use queues::{CircularBuffer, IsQueue}; use crate::config::device::Identifier; -use crate::midi::alsa::MidiInputAlsa; +use super::midi::alsa::MidiInputAlsa; use crate::Error; extern crate libc; @@ -53,6 +53,7 @@ impl From for MidiAddrHandler { fn from(a: MidiPortHandler) -> Self { match a { MidiPortHandler::ALSA(p) => MidiAddrHandler::ALSA(p.addr), + _ => todo!(), } } } @@ -81,15 +82,15 @@ pub trait MidiInput { fn connect(&mut self, port_addr: &T, port_name: &str) -> Result<(), Error>; - fn device_events(&mut self, ts: mpsc::Sender) -> Result<(), Error>; + fn device_events(&mut self, ts: mpsc::Sender>, ss: (mpsc::Sender, mpsc::Receiver)) -> Result<(), Error>; } pub trait MidiInputHandler { fn signal_stop_input(&self) -> Result<(), Error>; - fn handle_input(&mut self, callback: F, rts: (mpsc::Receiver, mpsc::Sender), userdata: D) -> Result<(), Error> + fn handle_input(&mut self, callback: F, rts: (mpsc::Sender, mpsc::Receiver), userdata: D) -> Result<(), Error> where - F: Fn(Option, &[u8], &mut D) + Send, + F: Fn(&Self, &[u8], Option, &mut D) + Send + Sync, D: Send, ; } @@ -146,6 +147,7 @@ impl MidiHandler { pub fn new_with_driver(name: &str, driver: MidiHandlerDriver) -> Result { match driver { MidiHandlerDriver::ALSA => Ok(MidiHandler::ALSA(MidiInputAlsa::new(name)?)), + _ => todo!(), } } @@ -164,9 +166,9 @@ impl MidiHandler { r } - pub fn run(&mut self, conf: &DeviceConfig, eventmap: &EventMap, (rs,ts): (mpsc::Receiver, mpsc::Sender)) -> Result<(), Error> { + pub fn run(&mut self, conf: &DeviceConfig, eventmap: &EventMap, trs: (mpsc::Sender, mpsc::Receiver)) -> Result<(), Error> { handler_fcall!{ - self, handle_inputport, (conf, eventmap,(rs,ts)), + self, handle_inputport, (conf, eventmap,trs), ALSA } } @@ -178,9 +180,9 @@ impl MidiHandler { } } - pub fn device_events(&mut self, ts: mpsc::Sender) -> Result<(), Error> { + pub fn device_events(&mut self, ts: mpsc::Sender>, (tss, rss): (mpsc::Sender,mpsc::Receiver)) -> Result<(), Error> { handler_fcall!{ - self, device_events, ts, + self, device_events, (ts,tss,rss), ALSA } } @@ -192,7 +194,7 @@ where T: MidiInput input.ports_handle() } -fn handle_inputport(input: &mut T, (conf, eventmap, (rs, ts)): (&DeviceConfig, &EventMap, (mpsc::Receiver, mpsc::Sender))) -> Result<(), Error> +fn handle_inputport(input: &mut T, (conf, eventmap, (ts, rs)): (&DeviceConfig, &EventMap, (mpsc::Sender, mpsc::Receiver))) -> Result<(), Error> where T: MidiInputHandler + Send { thread::scope(|s| -> Result<(), Error> { @@ -229,13 +231,13 @@ where T: MidiInputHandler + Send Ok(()) }); - input.handle_input(|t,m,(evq,pts)| { + input.handle_input(|_,m,t,(evq,pts)| { let mut event: EventBuf = Event::from(m).into(); event.timestamp = t; let mut evq = evq.lock().unwrap(); evq.add(event).unwrap(); pts.send(false).expect("unexpected write error"); - }, (rs,ts), (evq,pts.clone()))?; + }, (ts,rs), (evq,pts.clone()))?; pts.send(true).expect("unexpected write error"); let _ = exec_thread.join(); @@ -252,9 +254,9 @@ where T: MidiInputHandler input.signal_stop_input() } -fn device_events(input: &mut T, ts: mpsc::Sender) -> Result<(), Error> +fn device_events(input: &mut T, (ts,tss,rss): (mpsc::Sender>, mpsc::Sender, mpsc::Receiver)) -> Result<(), Error> where T: MidiInput { - input.device_events(ts) + input.device_events(ts, (tss, rss)) } diff --git a/src/run.rs b/src/run.rs index 31a4481..12dabda 100644 --- a/src/run.rs +++ b/src/run.rs @@ -2,8 +2,10 @@ use std::sync::mpsc; use std::thread; use std::sync::{Mutex,Arc}; -use crate::Error; +use libc::SIGUSR1; +use signal_hook::iterator::Signals; +use crate::Error; use crate::midi::{PortFilter,MidiHandler,MidiPortHandler}; use crate::config::{Config,DeviceConfig}; use crate::eventmap::EventMap; @@ -31,21 +33,53 @@ pub fn run_config(conf: &Config) -> Result<(), Error> { let input = MidiHandler::new("rmidimap")?; + let (tdev,rdev) = mpsc::channel::>(); + let (tsd,rsd) = mpsc::channel::(); + + let ntsd = tsd.clone(); + let ntdev = tdev.clone(); + let mut signals = Signals::new(&[SIGUSR1])?; + let _signal_thread = thread::spawn(move || { + for sig in signals.forever() { + match sig { + 10 => { + println!("Recieved SIGUSR1, reloading config file"); + ntsd.send(true).unwrap(); + ntdev.send(None).unwrap(); + break; + } + _ => (), + } + } + }); + thread::scope(|s| -> Result<(), Error> { - let (tdev,rdev) = mpsc::channel::(); let mut threads: Vec<(thread::ScopedJoinHandle<'_, ()>, mpsc::Sender)> = Vec::new(); let ports = input.ports()?; for p in ports { if let Some(v) = try_connect_process(&input, s, &p, &cfevmap)? { threads.push(v) } } - let _event_thread = s.spawn(|| { + + let event_thread = s.spawn(move || { let mut input = MidiHandler::new("rmidimap-event-watcher").unwrap(); - input.device_events(tdev).unwrap(); + let r = input.device_events(tdev.clone(), (tsd,rsd)); + tdev.send(None).unwrap(); + r }); + loop { let p = rdev.recv()?; - if let Some(v) = try_connect_process(&input, s, &p, &cfevmap)? { threads.push(v) } + if p.is_none() { + break; + } + if let Some(v) = try_connect_process(&input, s, &p.unwrap(), &cfevmap)? { threads.push(v) } + }; + event_thread.join().unwrap()?; + for (thread,ss) in threads { + let _ = ss.send(true); + thread.join().unwrap(); } + Ok(()) })?; Ok(()) } @@ -74,7 +108,7 @@ fn try_connect_process<'a>( let mm = m.as_ref().map(Arc::clone); let t = s.spawn( move || { dev.run_connect().unwrap(); - c.run(dev, eventmap, (srs,nsts)).unwrap(); + c.run(dev, eventmap, (nsts,srs)).unwrap(); if let Some(m) = mm { let mut m = m.lock().unwrap(); m.0 -= 1;