feat: add signal handling

This commit is contained in:
zawz 2023-08-24 09:29:40 +02:00
parent 45f5f447c0
commit 11509109be
5 changed files with 123 additions and 47 deletions

View file

@ -17,6 +17,7 @@ thiserror = "1.0"
enum-display-derive = "0.1"
queues = "1.1.0"
duration-str = { version = "0.5.1", features = ["serde"] }
signal-hook = "0.3.17"
[target.'cfg(target_os = "linux")'.dependencies]
alsa = "0.7.0"

View file

@ -21,9 +21,11 @@ use cli::Cli;
fn main() {
let c = Cli::parse();
match run_file(&c.map_file) {
Ok(_) => (),
Err(err) => println!("Error: {}", err)
loop {
match run_file(&c.map_file) {
Ok(_) => (),
Err(err) => println!("Error: {}", err)
}
}
}

View file

@ -198,8 +198,10 @@ impl MidiInputAlsa {
Ok(())
}
fn handle_input_internal<F, D>(&mut self, callback: F, userdata: D) -> Result<(), Error>
where F: Fn(Option<SystemTime>, &[u8], &mut D) + Send {
fn binary_input_handler<F, D>(&mut self, callback: F, userdata: D) -> Result<(), Error>
where
F: Fn(&Self, Option<SystemTime>, &[u8], &mut D) -> Result<(),Error> + Send
{
let decoder = MidiEvent::new(0)?;
decoder.enable_running_status(false);
@ -212,7 +214,7 @@ impl MidiInputAlsa {
_ => SystemTime::now(),
};
self.alsa_input_handler(|_, mut ev, (message, buffer, continue_sysex, userdata)| {
self.alsa_input_handler(|s, mut ev, (message, buffer, continue_sysex, userdata)| {
if !*continue_sysex { message.clear() }
let do_decode = match ev.get_type() {
@ -241,12 +243,57 @@ impl MidiInputAlsa {
if message.is_empty() || *continue_sysex { return Ok(false); }
let ts: Option<SystemTime> = ev.get_time().map(|v| ts+v);
(callback)(ts, message, userdata);
(callback)(s, ts, message, userdata)?;
Ok(false)
}
, (message, buffer, continue_sysex, userdata))?;
Ok(())
}
fn threaded_alsa_input<F, D>(&mut self, callback: F, (ts, rs): (mpsc::Sender<bool>, mpsc::Receiver<bool>), userdata: D) -> Result<(),Error>
where
F: Fn(&Self, alsa::seq::Event, &mut D) -> Result<bool, Error> + Send,
D: Send,
{
thread::scope( |sc| -> Result<(), Error> {
let stop_trigger = self.stop_trigger[1];
let t = sc.spawn(move || -> Result<(), Error> {
let userdata = userdata;
self.alsa_input_handler(callback, userdata)?;
ts.send(false).expect("unexpected send() error");
Ok(())
});
match rs.recv()? {
true => Self::signal_stop_input_internal(stop_trigger)?,
false => ()
};
t.join().expect("unexpected thread error")?;
Ok(())
})
}
fn threaded_handler<H, F, T, RF: ?Sized, D, R>(&mut self, handler: H, callback: F, (ts, rs): (mpsc::Sender<bool>, mpsc::Receiver<bool>), userdata: D) -> Result<(),Error>
where
H: Fn(&mut Self, F,D) -> Result<(), Error> + Send,
F: Fn(&Self, T, &RF, &mut D) -> Result<R,Error> + Send,
D: Send,
{
thread::scope( |sc| -> Result<(), Error> {
let stop_trigger = self.stop_trigger[1];
let t = sc.spawn(move || -> Result<(), Error> {
let userdata = userdata;
handler(self, callback, userdata)?;
ts.send(false).expect("unexpected send() error");
Ok(())
});
match rs.recv()? {
true => Self::signal_stop_input_internal(stop_trigger)?,
false => ()
};
t.join().expect("unexpected thread error")?;
Ok(())
})
}
}
impl MidiInput<Addr> for MidiInputAlsa {
@ -301,6 +348,7 @@ impl MidiInput<Addr> for MidiInputAlsa {
PortFilter::Name(s) => p.name.contains(s),
PortFilter::Regex(s) => s.is_match(&p.name),
PortFilter::Addr(MidiAddrHandler::ALSA(s)) => p.addr == *s,
_ => panic!("unexpected error"),
}
}
);
@ -325,27 +373,26 @@ impl MidiInput<Addr> for MidiInputAlsa {
Ok(())
}
fn device_events(&mut self, ts: mpsc::Sender<MidiPortHandler>) -> Result<(), Error> {
fn device_events(&mut self, ts: mpsc::Sender<Option<MidiPortHandler>>, (tss, rss): (mpsc::Sender<bool>, mpsc::Receiver<bool>)) -> Result<(), Error> {
let ports = self.ports()?;
let port = self.filter_ports(ports, PortFilter::Name("System:Announce".to_string()));
self.connect(&port[0].addr, "rmidimap-alsa-announce")?;
self.alsa_input_handler(|s, ev, _|{
self.threaded_alsa_input(move |s: &Self, ev: alsa::seq::Event, _| -> Result<bool, Error> {
// handle disconnect event on watched port
match ev.get_type() {
// EventType::PortStart | EventType::ClientStart | EventType::PortExit | EventType::ClientExit => {
EventType::PortStart => {
if let Some(a) = ev.get_data::<alsa::seq::Addr>() {
let p = s.ports()?;
let pp = s.filter_ports(p, PortFilter::Addr( MidiAddrHandler::ALSA(a) ));
if !pp.is_empty() {
ts.send(MidiPortHandler::ALSA(pp[0].clone())).expect("unexpected send() error");
ts.send(Some(MidiPortHandler::ALSA(pp[0].clone()))).expect("unexpected send() error");
}
};
Ok(false)
}
_ => Ok(false),
}
}, ())?;
}, (tss, rss), ())?;
self.close_internal();
Ok(())
}
@ -357,26 +404,16 @@ impl MidiInputHandler for MidiInputAlsa
Self::signal_stop_input_internal(self.stop_trigger[1])
}
fn handle_input<F, D>(&mut self, callback: F, (rs, ts): (mpsc::Receiver<bool>, mpsc::Sender<bool>), userdata: D) -> Result<(), Error>
fn handle_input<F, D>(&mut self, callback: F, (ts, rs): (mpsc::Sender<bool>, mpsc::Receiver<bool>), userdata: D) -> Result<(), Error>
where
F: Fn(Option<SystemTime>, &[u8], &mut D) + Send,
F: Fn(&Self, &[u8], Option<SystemTime>, &mut D) + Send + Sync,
D: Send,
{
thread::scope( |sc| -> Result<(), Error> {
let stop_trigger = self.stop_trigger[1];
let t = sc.spawn(move || -> Result<(), Error> {
let userdata = userdata;
self.handle_input_internal(callback, userdata)?;
ts.send(false).expect("unexpected send() error");
self.threaded_handler(Self::binary_input_handler,
|s, t, m, d| -> Result<(),Error> {
callback(s,m,t,d);
Ok(())
});
match rs.recv()? {
true => Self::signal_stop_input_internal(stop_trigger)?,
false => ()
};
t.join().expect("unexpected thread error")?;
Ok(())
})
}, (ts, rs), userdata)
}
}

View file

@ -3,7 +3,7 @@ pub mod alsa;
use queues::{CircularBuffer, IsQueue};
use crate::config::device::Identifier;
use crate::midi::alsa::MidiInputAlsa;
use super::midi::alsa::MidiInputAlsa;
use crate::Error;
extern crate libc;
@ -53,6 +53,7 @@ impl From<MidiPortHandler> for MidiAddrHandler {
fn from(a: MidiPortHandler) -> Self {
match a {
MidiPortHandler::ALSA(p) => MidiAddrHandler::ALSA(p.addr),
_ => todo!(),
}
}
}
@ -81,15 +82,15 @@ pub trait MidiInput<T> {
fn connect(&mut self, port_addr: &T, port_name: &str) -> Result<(), Error>;
fn device_events(&mut self, ts: mpsc::Sender<MidiPortHandler>) -> Result<(), Error>;
fn device_events(&mut self, ts: mpsc::Sender<Option<MidiPortHandler>>, ss: (mpsc::Sender<bool>, mpsc::Receiver<bool>)) -> Result<(), Error>;
}
pub trait MidiInputHandler {
fn signal_stop_input(&self) -> Result<(), Error>;
fn handle_input<F, D>(&mut self, callback: F, rts: (mpsc::Receiver<bool>, mpsc::Sender<bool>), userdata: D) -> Result<(), Error>
fn handle_input<F, D>(&mut self, callback: F, rts: (mpsc::Sender<bool>, mpsc::Receiver<bool>), userdata: D) -> Result<(), Error>
where
F: Fn(Option<SystemTime>, &[u8], &mut D) + Send,
F: Fn(&Self, &[u8], Option<SystemTime>, &mut D) + Send + Sync,
D: Send,
;
}
@ -146,6 +147,7 @@ impl MidiHandler {
pub fn new_with_driver(name: &str, driver: MidiHandlerDriver) -> Result<Self, Error> {
match driver {
MidiHandlerDriver::ALSA => Ok(MidiHandler::ALSA(MidiInputAlsa::new(name)?)),
_ => todo!(),
}
}
@ -164,9 +166,9 @@ impl MidiHandler {
r
}
pub fn run(&mut self, conf: &DeviceConfig, eventmap: &EventMap, (rs,ts): (mpsc::Receiver<bool>, mpsc::Sender<bool>)) -> Result<(), Error> {
pub fn run(&mut self, conf: &DeviceConfig, eventmap: &EventMap, trs: (mpsc::Sender<bool>, mpsc::Receiver<bool>)) -> Result<(), Error> {
handler_fcall!{
self, handle_inputport, (conf, eventmap,(rs,ts)),
self, handle_inputport, (conf, eventmap,trs),
ALSA
}
}
@ -178,9 +180,9 @@ impl MidiHandler {
}
}
pub fn device_events(&mut self, ts: mpsc::Sender<MidiPortHandler>) -> Result<(), Error> {
pub fn device_events(&mut self, ts: mpsc::Sender<Option<MidiPortHandler>>, (tss, rss): (mpsc::Sender<bool>,mpsc::Receiver<bool>)) -> Result<(), Error> {
handler_fcall!{
self, device_events, ts,
self, device_events, (ts,tss,rss),
ALSA
}
}
@ -192,7 +194,7 @@ where T: MidiInput<A>
input.ports_handle()
}
fn handle_inputport<T>(input: &mut T, (conf, eventmap, (rs, ts)): (&DeviceConfig, &EventMap, (mpsc::Receiver<bool>, mpsc::Sender<bool>))) -> Result<(), Error>
fn handle_inputport<T>(input: &mut T, (conf, eventmap, (ts, rs)): (&DeviceConfig, &EventMap, (mpsc::Sender<bool>, mpsc::Receiver<bool>))) -> Result<(), Error>
where T: MidiInputHandler + Send
{
thread::scope(|s| -> Result<(), Error> {
@ -229,13 +231,13 @@ where T: MidiInputHandler + Send
Ok(())
});
input.handle_input(|t,m,(evq,pts)| {
input.handle_input(|_,m,t,(evq,pts)| {
let mut event: EventBuf = Event::from(m).into();
event.timestamp = t;
let mut evq = evq.lock().unwrap();
evq.add(event).unwrap();
pts.send(false).expect("unexpected write error");
}, (rs,ts), (evq,pts.clone()))?;
}, (ts,rs), (evq,pts.clone()))?;
pts.send(true).expect("unexpected write error");
let _ = exec_thread.join();
@ -252,9 +254,9 @@ where T: MidiInputHandler
input.signal_stop_input()
}
fn device_events<T, A>(input: &mut T, ts: mpsc::Sender<MidiPortHandler>) -> Result<(), Error>
fn device_events<T, A>(input: &mut T, (ts,tss,rss): (mpsc::Sender<Option<MidiPortHandler>>, mpsc::Sender<bool>, mpsc::Receiver<bool>)) -> Result<(), Error>
where T: MidiInput<A>
{
input.device_events(ts)
input.device_events(ts, (tss, rss))
}

View file

@ -2,8 +2,10 @@ use std::sync::mpsc;
use std::thread;
use std::sync::{Mutex,Arc};
use crate::Error;
use libc::SIGUSR1;
use signal_hook::iterator::Signals;
use crate::Error;
use crate::midi::{PortFilter,MidiHandler,MidiPortHandler};
use crate::config::{Config,DeviceConfig};
use crate::eventmap::EventMap;
@ -31,21 +33,53 @@ pub fn run_config(conf: &Config) -> Result<(), Error> {
let input = MidiHandler::new("rmidimap")?;
let (tdev,rdev) = mpsc::channel::<Option<MidiPortHandler>>();
let (tsd,rsd) = mpsc::channel::<bool>();
let ntsd = tsd.clone();
let ntdev = tdev.clone();
let mut signals = Signals::new(&[SIGUSR1])?;
let _signal_thread = thread::spawn(move || {
for sig in signals.forever() {
match sig {
10 => {
println!("Recieved SIGUSR1, reloading config file");
ntsd.send(true).unwrap();
ntdev.send(None).unwrap();
break;
}
_ => (),
}
}
});
thread::scope(|s| -> Result<(), Error> {
let (tdev,rdev) = mpsc::channel::<MidiPortHandler>();
let mut threads: Vec<(thread::ScopedJoinHandle<'_, ()>, mpsc::Sender<bool>)> = Vec::new();
let ports = input.ports()?;
for p in ports {
if let Some(v) = try_connect_process(&input, s, &p, &cfevmap)? { threads.push(v) }
}
let _event_thread = s.spawn(|| {
let event_thread = s.spawn(move || {
let mut input = MidiHandler::new("rmidimap-event-watcher").unwrap();
input.device_events(tdev).unwrap();
let r = input.device_events(tdev.clone(), (tsd,rsd));
tdev.send(None).unwrap();
r
});
loop {
let p = rdev.recv()?;
if let Some(v) = try_connect_process(&input, s, &p, &cfevmap)? { threads.push(v) }
if p.is_none() {
break;
}
if let Some(v) = try_connect_process(&input, s, &p.unwrap(), &cfevmap)? { threads.push(v) }
};
event_thread.join().unwrap()?;
for (thread,ss) in threads {
let _ = ss.send(true);
thread.join().unwrap();
}
Ok(())
})?;
Ok(())
}
@ -74,7 +108,7 @@ fn try_connect_process<'a>(
let mm = m.as_ref().map(Arc::clone);
let t = s.spawn( move || {
dev.run_connect().unwrap();
c.run(dev, eventmap, (srs,nsts)).unwrap();
c.run(dev, eventmap, (nsts,srs)).unwrap();
if let Some(m) = mm {
let mut m = m.lock().unwrap();
m.0 -= 1;