From a9d09e30f8a3911387e0fd184fb355db084a9f16 Mon Sep 17 00:00:00 2001 From: zawz Date: Sat, 2 May 2026 13:54:57 +0200 Subject: [PATCH] chore: code format --- example/src/data.rs | 38 ++++---- example/src/main.rs | 25 +++--- macros/src/lib.rs | 182 ++++++++++++++++++++++----------------- src/transport/channel.rs | 162 ++++++++++++++++++---------------- src/transport/mod.rs | 49 ++++++----- src/transport/serde.rs | 125 +++++++++++++++------------ 6 files changed, 318 insertions(+), 263 deletions(-) diff --git a/example/src/data.rs b/example/src/data.rs index 94a65b4..0163335 100644 --- a/example/src/data.rs +++ b/example/src/data.rs @@ -1,8 +1,8 @@ use std::time::Duration; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; -pub type RetData = (usize,Option); +pub type RetData = (usize, Option); pub fn dur_to_num(dur: Duration) -> (u128, &'static str) { if dur.as_nanos() < 10000 { @@ -17,8 +17,8 @@ pub fn dur_to_num(dur: Duration) -> (u128, &'static str) { } pub fn dur_to_str(dur: Duration) -> String { - let (n,s) = dur_to_num(dur); - n.to_string() +" "+ s + let (n, s) = dur_to_num(dur); + n.to_string() + " " + s } pub fn process_data(dat: TestData) -> RetData { @@ -31,12 +31,7 @@ pub fn process_data_ref(dat: &TestData) -> RetData { pub const DATASIZE: usize = 1000000; -const TEST_STRINGS: [&str; 4] = [ - "toto", - "tata", - "titi", - "tutu" -]; +const TEST_STRINGS: [&str; 4] = ["toto", "tata", "titi", "tutu"]; #[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum TestEnum { @@ -50,11 +45,10 @@ pub struct TestData { vec: Vec<(bool, TestEnum)>, } -use crate::transport::{ClientTransporter,ServerTransporter}; +use crate::transport::{ClientTransporter, ServerTransporter}; #[rspc::service] impl TestData { - pub fn len(&self) -> usize { self.vec.len() } @@ -77,7 +71,7 @@ impl TestData { if n <= 1 { n } else { - Self::internal_fib(n-1) + Self::internal_fib(n-2) + Self::internal_fib(n - 1) + Self::internal_fib(n - 2) } } @@ -86,11 +80,11 @@ impl TestData { } pub fn add(&self, a: usize, b: usize) -> usize { - return a+b; + return a + b; } pub fn calc_add(&self, data: TestData) -> Option { - return self.calc().and_then(|a| data.calc().map(|b| a+b)); + return self.calc().and_then(|a| data.calc().map(|b| a + b)); } pub fn push(&mut self, u: (bool, TestEnum)) { @@ -102,16 +96,16 @@ pub fn make_test_data(n: usize) -> TestData { let mut v = Vec::with_capacity(n); let mut b = true; for i in 0..n { - v.push(( b , - match i%3 { + v.push(( + b, + match i % 3 { 0 => TestEnum::NoValue, 1 => TestEnum::Num(i), - 2 => TestEnum::Str(TEST_STRINGS[i%4].to_string()), + 2 => TestEnum::Str(TEST_STRINGS[i % 4].to_string()), _ => panic!("unexpected error"), - })); + }, + )); b = !b; } - TestData { - vec: v, - } + TestData { vec: v } } diff --git a/example/src/main.rs b/example/src/main.rs index e7ad5d4..9630487 100644 --- a/example/src/main.rs +++ b/example/src/main.rs @@ -1,7 +1,8 @@ - pub mod data; -use data::{make_test_data,dur_to_str,TestData,TestEnum,TestDataClient,TestDataServer,DATASIZE}; +use data::{ + dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE, +}; use tokio::join; use rspc::transport::serde::TcpClient; @@ -10,19 +11,18 @@ use rspc::transport; #[tokio::main] async fn main() -> anyhow::Result<()> { - // let t = TcpClient::connect("127.0.0.1:3306").await.unwrap(); // let client = t.spawn().await; - + // let client = TestDataClient::new(client); - let (c,s) = transport::channel::new_async(); + let (c, s) = transport::channel::new_async(); let data: TestData = make_test_data(DATASIZE); - let srv_thread = tokio::spawn(async move { + let srv_thread = tokio::spawn(async move { let mut server = TestDataServer::from(data); server.listen(s).await - } ); + }); let client = TestDataClient::new(c); let clientref = &client; @@ -41,11 +41,11 @@ async fn main() -> anyhow::Result<()> { let now = std::time::Instant::now(); assert_eq!(DATASIZE, client.len().await.unwrap()); println!("len: {}", dur_to_str(now.elapsed())); - + let now = std::time::Instant::now(); - assert_eq!(4, client.calc().await.unwrap().unwrap_or(0) ); + assert_eq!(4, client.calc().await.unwrap().unwrap_or(0)); println!("calc: {}", dur_to_str(now.elapsed())); - + let cdat = make_test_data(DATASIZE); let now = std::time::Instant::now(); assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0)); @@ -60,11 +60,10 @@ async fn main() -> anyhow::Result<()> { println!("push: {}", dur_to_str(now.elapsed())); let now = std::time::Instant::now(); - assert_eq!(DATASIZE+1, client.len().await.unwrap()); + assert_eq!(DATASIZE + 1, client.len().await.unwrap()); println!("len: {}", dur_to_str(now.elapsed())); - }; - + join!(job1, job2); client.stop().await.unwrap(); srv_thread.await.unwrap().unwrap(); diff --git a/macros/src/lib.rs b/macros/src/lib.rs index 57176d4..33ebb28 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -1,4 +1,3 @@ - extern crate proc_macro; extern crate quote; @@ -9,8 +8,8 @@ use syn::{ parse::{Parse, ParseStream}, parse_macro_input, spanned::Spanned, - FnArg, Ident, - Pat, PatType, ReturnType, Visibility, Receiver, ItemImpl, ImplItemFn, ImplItemType, Signature, Attribute, + Attribute, FnArg, Ident, ImplItemFn, ImplItemType, ItemImpl, Pat, PatType, Receiver, + ReturnType, Signature, Visibility, }; use quote::{format_ident, quote}; @@ -63,25 +62,29 @@ struct Service { } impl TryFrom for RpcMethod { - type Error = syn::Error; fn try_from(value: ImplItemFn) -> Result { - let mut reciever = None; - let mut args = vec!(); + let mut args = vec![]; for arg in &value.sig.inputs { match arg { FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => { args.push(captured.clone()); - }, + } FnArg::Typed(captured) => { - return Err(syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args")); - }, + return Err(syn::Error::new( + captured.pat.span(), + "patterns aren't allowed in RPC args", + )); + } FnArg::Receiver(v) => { if matches!(v.reference, None) { - return Err(syn::Error::new(v.span(), "self cannot be consumed in RPC method")); + return Err(syn::Error::new( + v.span(), + "self cannot be consumed in RPC method", + )); } else { reciever = Some(v.clone()); } @@ -104,38 +107,49 @@ impl Parse for Service { let item: ItemImpl = input.parse()?; let mut errors = Ok(()); - - let mut typeitems: Vec = vec!(); - let fns: Vec<&ImplItemFn> = item.items.iter().filter(|x| { - match x { + + let mut typeitems: Vec = vec![]; + let fns: Vec<&ImplItemFn> = item + .items + .iter() + .filter(|x| match x { syn::ImplItem::Fn(fnit) => { matches!(fnit.vis, Visibility::Public(_)) - }, + } syn::ImplItem::Type(x) => { typeitems.push(x.clone()); false } _ => false, - } - }).map(|x| - if let syn::ImplItem::Fn(fnit) = x { - fnit - } else { - unreachable!() - } - ).collect(); + }) + .map(|x| { + if let syn::ImplItem::Fn(fnit) = x { + fnit + } else { + unreachable!() + } + }) + .collect(); let ident = match item.self_ty.as_ref() { - syn::Type::Path(x) => { - match x.path.get_ident() { - Some(i) => i.clone(), - None => return Err(syn::Error::new(x.span(), "generics and paths are not supported")), + syn::Type::Path(x) => match x.path.get_ident() { + Some(i) => i.clone(), + None => { + return Err(syn::Error::new( + x.span(), + "generics and paths are not supported", + )) } }, - _ => return Err(syn::Error::new(item.self_ty.span(), "unsupported self type")), + _ => { + return Err(syn::Error::new( + item.self_ty.span(), + "unsupported self type", + )) + } }; - - let mut rpcs = vec!(); + + let mut rpcs = vec![]; for one_fn in fns { match RpcMethod::try_from(one_fn.clone()) { Ok(v) => rpcs.push(v), @@ -157,7 +171,6 @@ impl Parse for Service { ) ) } - } errors?; @@ -171,7 +184,6 @@ impl Parse for Service { } } - /// Macro used to generate the RSPC Server and Client objects. #[proc_macro_attribute] pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { @@ -182,68 +194,80 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { item: _, types: _, } = parse_macro_input!(inputclone as Service); - + let transport_request = &format_ident!("{}TransportRequest", ident); let transport_response = &format_ident!("{}TransportResponse", ident); let server = &format_ident!("{}Server", ident); let client = &format_ident!("{}Client", ident); - let fn_names: Vec<_> = rpcs.iter().map(|x| { - &x.sig.ident - }).collect(); + let fn_names: Vec<_> = rpcs.iter().map(|x| &x.sig.ident).collect(); - let fn_names_camel: Vec<_> = fn_names.iter().map(|x| { - Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span()) - }).collect(); + let fn_names_camel: Vec<_> = fn_names + .iter() + .map(|x| Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span())) + .collect(); - let fn_locks: Vec<_> = rpcs.iter().map(|x| { - match x.reciever.as_ref() { + let fn_locks: Vec<_> = rpcs + .iter() + .map(|x| match x.reciever.as_ref() { Some(rcv) => match &rcv.mutability { Some(_) => quote!(write()), None => quote!(read()), }, None => quote!(read()), - } - }).collect(); + }) + .collect(); - let fn_mut: Vec<_> = rpcs.iter().map(|x| { - match x.reciever.as_ref() { + let fn_mut: Vec<_> = rpcs + .iter() + .map(|x| match x.reciever.as_ref() { Some(rcv) => match &rcv.mutability { Some(_) => quote!(mut), None => quote!(), }, None => quote!(), - } - }).collect(); + }) + .collect(); - let fn_await: Vec<_> = rpcs.iter().map(|x| { - match &x.sig.asyncness { + let fn_await: Vec<_> = rpcs + .iter() + .map(|x| match &x.sig.asyncness { Some(_) => quote!(.await), None => quote!(), - } - }).collect(); + }) + .collect(); - let args: Vec<_> = rpcs.iter().map(|x| { - let args = &x.args; - quote!{ #(#args),* } - }).collect(); + let args: Vec<_> = rpcs + .iter() + .map(|x| { + let args = &x.args; + quote! { #(#args),* } + }) + .collect(); - let arg_types: Vec<_> = rpcs.iter().map(|x| { - let args = x.args.iter().map(|a| &a.ty); - quote!{ #(#args),* } - }).collect(); + let arg_types: Vec<_> = rpcs + .iter() + .map(|x| { + let args = x.args.iter().map(|a| &a.ty); + quote! { #(#args),* } + }) + .collect(); - let arg_idents: Vec<_> = rpcs.iter().map(|x| { - let args = x.args.iter().map(|a| &a.pat); - quote!{ #(#args),* } - }).collect(); + let arg_idents: Vec<_> = rpcs + .iter() + .map(|x| { + let args = x.args.iter().map(|a| &a.pat); + quote! { #(#args),* } + }) + .collect(); - let outs: Vec<_> = rpcs.iter().map(|x| { - match &x.sig.output { - ReturnType::Default => quote!{()}, - ReturnType::Type(_, t) => quote!{#t}, - } - }).collect(); + let outs: Vec<_> = rpcs + .iter() + .map(|x| match &x.sig.output { + ReturnType::Default => quote! {()}, + ReturnType::Type(_, t) => quote! {#t}, + }) + .collect(); let t = quote! { @@ -258,7 +282,7 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { #( #fn_names_camel(#outs) ),* , Stop, } - + impl #transport_response { #( pub fn #fn_names(self) -> #outs { @@ -274,7 +298,7 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { pub struct #server { obj: std::sync::Arc>, } - + impl From<#ident> for #server { fn from(obj: #ident) -> Self { Self { @@ -282,14 +306,14 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { } } } - - impl #server { + + impl #server { pub async fn listen(&mut self, mut transport: Tr) -> Result<(), Tr::Error> where Tr: ServerTransporter<#transport_request,#transport_response> + Send { { - transport.listen( |v,obj| { + transport.listen( |v,obj| { let obj = obj.clone(); async move { match v { @@ -306,30 +330,30 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { } } } - + pub struct #client where Tr: ClientTransporter<#transport_request,#transport_response>, { transporter: Tr, } - + impl #client where Tr: ClientTransporter<#transport_request,#transport_response> - { + { pub fn new(transporter: Tr) -> Self { #client { transporter, } } - + #( pub async fn #fn_names(&self, #args) -> Result<#outs, Tr::Error> { Ok(self.transporter.request(#transport_request::#fn_names_camel(#arg_idents)).await?.#fn_names()) } )* - + // TODO: graceful stop response pub async fn stop(&self) -> Result<(), Tr::Error> { self.transporter.request(#transport_request::Stop).await.map(|_| ()) diff --git a/src/transport/channel.rs b/src/transport/channel.rs index 6373fde..7e67090 100644 --- a/src/transport/channel.rs +++ b/src/transport/channel.rs @@ -4,14 +4,14 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; use thiserror::Error; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; -use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver}; use async_trait::async_trait; -use super::{ClientTransporter,ServerTransporter}; +use super::{ClientTransporter, ServerTransporter}; -#[derive(Error,Debug)] +#[derive(Error, Debug)] pub enum Error { #[error("channel recv error")] ChannelRecvError, @@ -23,77 +23,76 @@ pub enum Error { OneshotRecvError(#[from] oneshot::error::RecvError), } - #[derive(Clone)] /// Client endpoint of any channel. -pub struct ChannelClient { - channel: UnboundedSender<(T,oneshot::Sender)>, +pub struct ChannelClient { + channel: UnboundedSender<(T, oneshot::Sender)>, } /// Server endpoint of a synchronous channel -pub struct SyncChannelServer { - channel: UnboundedReceiver<(T,oneshot::Sender)>, +pub struct SyncChannelServer { + channel: UnboundedReceiver<(T, oneshot::Sender)>, } /// Create a new synchronous channel client/server instance. -/// +/// /// Synchronous channels can only process one job at a time. /// If you want job concurrency refer to new_async() and Asynchronous channels -/// +/// /// Unpon recieving a stop signal, jobs currently pending recv() will not be processed. /// Said jobs will continue waiting until either the server listens again, or the server is dropped. -/// +/// /// Example: /// ```no_run /// use serde::{Serialize,Deserialize}; /// use rspc::transport::{channel, ClientTransporter,ServerTransporter}; -/// +/// /// #[derive(Serialize,Deserialize)] /// pub struct MyStruct; -/// +/// /// #[rspc::service] /// impl MyStruct { /// pub fn dummy(&self) -> () { () } /// } -/// +/// /// let (client_channel,server_channel) = channel::new_sync(); -/// -/// let server_thread = tokio::spawn(async move { +/// +/// let server_thread = tokio::spawn(async move { /// let mut server = MyStructServer::from( MyStruct ); /// server.listen(server_channel).await /// }); -/// +/// /// let client = MyStructClient::new(client_channel); /// ``` -pub fn new_sync() -> (ChannelClient,SyncChannelServer) { - let (c,s) = mpsc::unbounded_channel(); +pub fn new_sync() -> (ChannelClient, SyncChannelServer) { + let (c, s) = mpsc::unbounded_channel(); ( ChannelClient { channel: c }, SyncChannelServer { channel: s }, ) } -impl ChannelClient +impl ChannelClient where T: Send + Sync, R: Send + Sync, { - pub async fn internal_request(&self, data: T) -> Result { - let (t,r) = oneshot::channel(); - self.channel.send( (data, t) ).map_err(|_| Error::ChannelSendError)?; + let (t, r) = oneshot::channel(); + self.channel + .send((data, t)) + .map_err(|_| Error::ChannelSendError)?; let output = r.await?; Ok(output) } } #[async_trait] -impl ClientTransporter for ChannelClient +impl ClientTransporter for ChannelClient where T: Send + Sync, R: Send + Sync, { - type Error = Error; async fn request(&self, data: T) -> Result { @@ -102,19 +101,23 @@ where } #[async_trait] -impl ServerTransporter for SyncChannelServer +impl ServerTransporter for SyncChannelServer where T: Send + Sync, R: Send + Sync, { - type Error = Error; - async fn listen(&mut self, handler: F, stop_response: Option, userdata: D) -> Result<(), Self::Error> + async fn listen( + &mut self, + handler: F, + stop_response: Option, + userdata: D, + ) -> Result<(), Self::Error> where FR: Future> + Send, F: Fn(T, &D) -> FR + Send + Sync, - D: Send+Sync, + D: Send + Sync, { while let Some(msg) = self.channel.recv().await { match handler(msg.0, &userdata).await { @@ -124,60 +127,65 @@ where msg.1.send(v).map_err(|_| Error::ChannelRespError)?; } break; - }, + } }; } Ok(()) } } -pub struct AsyncChannelServer { - channel: UnboundedReceiver<(T,oneshot::Sender)>, +pub struct AsyncChannelServer { + channel: UnboundedReceiver<(T, oneshot::Sender)>, } /// Create a new asynchronous channel client/server instance. /// /// Can process any number of jobs in parallel. -/// +/// /// Unpon recieving a stop signal, pending jobs are finished and reponded to, but new jobs are not processed. /// Said new jobs will continue waiting until either the server listens again, or the server is dropped. -/// +/// /// Example: /// ```no_run /// use serde::{Serialize,Deserialize}; /// use rspc::transport::{channel, ClientTransporter,ServerTransporter}; -/// +/// /// #[derive(Serialize,Deserialize)] /// pub struct MyStruct; -/// +/// /// #[rspc::service] /// impl MyStruct { /// pub fn dummy(&self) -> () { () } /// } -/// +/// /// let (client_channel,server_channel) = channel::new_async(); -/// -/// let server_thread = tokio::spawn(async move { +/// +/// let server_thread = tokio::spawn(async move { /// let mut server = MyStructServer::from( MyStruct ); /// server.listen(server_channel).await /// }); -/// +/// /// let client = MyStructClient::new(client_channel); /// ``` -pub fn new_async() -> (ChannelClient,AsyncChannelServer) { - let (c,s) = mpsc::unbounded_channel(); +pub fn new_async() -> (ChannelClient, AsyncChannelServer) { + let (c, s) = mpsc::unbounded_channel(); ( ChannelClient { channel: c }, AsyncChannelServer { channel: s }, ) } -impl AsyncChannelServer +impl AsyncChannelServer where T: Send + Sync, R: Send + Sync + 'static, { - async fn internal_listen(&mut self, handler: F, stop_response: Option, userdata: D) -> Result<(), Error> + async fn internal_listen( + &mut self, + handler: F, + stop_response: Option, + userdata: D, + ) -> Result<(), Error> where FR: Future> + Send + 'static, F: Fn(T, &D) -> FR + Send + Sync, @@ -185,7 +193,7 @@ where { let mut pending = FuturesUnordered::new(); loop { - tokio::select!{ + tokio::select! { Some(rcv) = self.channel.recv() => { pending.push( async { @@ -212,27 +220,34 @@ where } let results: Vec<_> = pending.collect().await; - results.into_iter().map(|r| -> Result<(), Error> { - match r { - (Some(r),sender) => sender.send(r).map_err(|_| Error::ChannelRespError), - _ => Ok(()), - } - }).collect::, Error>>()?; + results + .into_iter() + .map(|r| -> Result<(), Error> { + match r { + (Some(r), sender) => sender.send(r).map_err(|_| Error::ChannelRespError), + _ => Ok(()), + } + }) + .collect::, Error>>()?; Ok(()) } } #[async_trait] -impl ServerTransporter for AsyncChannelServer +impl ServerTransporter for AsyncChannelServer where T: Send + Sync, R: Send + Sync + 'static, { - type Error = Error; - async fn listen(&mut self, handler: F, stop_response: Option, userdata: D) -> Result<(), Self::Error> + async fn listen( + &mut self, + handler: F, + stop_response: Option, + userdata: D, + ) -> Result<(), Self::Error> where FR: Future> + Send + 'static, F: Fn(T, &D) -> FR + Send + Sync, @@ -243,44 +258,47 @@ where } /// Create a channel multiplexer. -/// +/// /// This is intended to be used with self-mutable clients to provide immutable clients to it -pub fn new_multiplexer() -> (ChannelClient,Multiplexer) { - let (c,s) = mpsc::unbounded_channel(); - ( - ChannelClient { channel: c }, - Multiplexer { channel: s }, - ) +pub fn new_multiplexer() -> (ChannelClient, Multiplexer) { + let (c, s) = mpsc::unbounded_channel(); + (ChannelClient { channel: c }, Multiplexer { channel: s }) } -pub struct Multiplexer { - channel: UnboundedReceiver<(T,oneshot::Sender)>, +pub struct Multiplexer { + channel: UnboundedReceiver<(T, oneshot::Sender)>, } -impl Multiplexer +impl Multiplexer where T: Send + Sync, R: Send + Sync + 'static, { /// Start multiplexing with a 3rd party. - /// + /// /// While this is running, the associated ChannelClient can then call request() to send data, and recieve the final response from the 3rd party - /// + /// /// Requirements: /// - The 3rd party must be handling (usize,T) as input and (usize,R) as output /// - sender_send(sender) is the function used to send data to the 3rd party /// - listener_recv(listener) is the function used to recieve data from the 3rd party - pub async fn start(mut self, mut sender: S, sender_send: SF, mut listener: L, listener_recv: LF) -> Result<(), Error> + pub async fn start( + mut self, + mut sender: S, + sender_send: SF, + mut listener: L, + listener_recv: LF, + ) -> Result<(), Error> where - SF: Fn(&mut S, (usize,T)) -> BoxFuture + Send + Sync + 'static, - LF: Fn(&mut L) -> BoxFuture> + 'static, + SF: Fn(&mut S, (usize, T)) -> BoxFuture + Send + Sync + 'static, + LF: Fn(&mut L) -> BoxFuture> + 'static, { let mut pending: BTreeMap> = BTreeMap::new(); let mut id_counter: usize = 0; - + loop { - tokio::select!{ + tokio::select! { q = self.channel.recv() => { match q { Some(rcv) => { @@ -309,4 +327,4 @@ where Ok(()) } -} \ No newline at end of file +} diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 281f6b3..e851ab0 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,44 +1,51 @@ use async_trait::async_trait; -use futures::{Future, future::BoxFuture, stream::FuturesUnordered, StreamExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt}; pub mod channel; pub mod serde; /// Definition of a client transporter for RSPC. -/// +/// /// ### Implementation specifications /// - ClientTransporter is a producer object, can be single-producer or multi-producer. /// - ClientTransporter must be immutable. If a client implementation requires self mutability, /// use Mutex, RwLock, or similar tools to mutate values without requiring self mutability #[async_trait] -pub trait ClientTransporter { +pub trait ClientTransporter { type Error: std::fmt::Debug; async fn request(&self, data: T) -> Result; } /// Definition of a server transporter for RSPC. -/// +/// /// ### Implementation specifications /// - ServerTransporter is a single-consumer object /// - Upon recieving a stop request (handler function return None), server must respond with stop_response if specified to only this request and none other. /// Finishing and responding to pending jobs is optional. #[async_trait] -pub trait ServerTransporter -{ +pub trait ServerTransporter { type Error: std::fmt::Debug; - async fn listen(&mut self, handler: F, stop_response: Option, userdata: D) -> Result<(), Self::Error> + async fn listen( + &mut self, + handler: F, + stop_response: Option, + userdata: D, + ) -> Result<(), Self::Error> where FR: Future> + Send + 'static, F: Fn(T, &D) -> FR + Send + Sync + Copy + 'static, - D: Send + Sync + 'static, - ; + D: Send + Sync + 'static; } -pub -async fn async_listener( - listener: &mut L, listener_recv: LF, - sender: &mut S, sender_send: SF, - handler: F, stop_response: Option, userdata: &D) -> Result<(), E> +pub async fn async_listener( + listener: &mut L, + listener_recv: LF, + sender: &mut S, + sender_send: SF, + handler: F, + stop_response: Option, + userdata: &D, +) -> Result<(), E> where T: Send + Sync, R: Send + Sync + 'static, @@ -46,12 +53,12 @@ where F: Fn(T, &D) -> FR + Send + Sync, D: Send + Sync + 'static, C: Send + Sync + 'static, - SF: Fn(&mut S, (C,R)) -> BoxFuture> + Send + Sync + 'static, - LF: Fn(&mut L) -> BoxFuture, E>> + 'static, + SF: Fn(&mut S, (C, R)) -> BoxFuture> + Send + Sync + 'static, + LF: Fn(&mut L) -> BoxFuture, E>> + 'static, { let mut pending = FuturesUnordered::new(); loop { - tokio::select!{ + tokio::select! { rcv = listener_recv(listener) => { match rcv? { Some((id, data)) => { @@ -87,12 +94,12 @@ where let results: Vec<_> = pending.collect().await; for it in results { match it { - (id,Some(r)) => { - sender_send(sender, (id,r)).await?; - }, + (id, Some(r)) => { + sender_send(sender, (id, r)).await?; + } _ => (), } } Ok(()) -} \ No newline at end of file +} diff --git a/src/transport/serde.rs b/src/transport/serde.rs index 8a0dd37..89ffc64 100644 --- a/src/transport/serde.rs +++ b/src/transport/serde.rs @@ -1,19 +1,19 @@ use async_trait::async_trait; use futures::future::BoxFuture; -use thiserror::Error; use futures::prelude::*; use serde::{Deserialize, Serialize}; use std::net::Ipv4Addr; use std::sync::atomic::AtomicUsize; +use thiserror::Error; use tokio::net::{TcpListener, TcpStream}; //use tokio::net::tcp::{ReadHalf, WriteHalf}; -use tokio::io::{ReadHalf,WriteHalf}; -use tokio_serde::SymmetricallyFramed; +use tokio::io::{ReadHalf, WriteHalf}; use tokio_serde::formats::*; +use tokio_serde::SymmetricallyFramed; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use super::channel::{self, ChannelClient, Multiplexer}; -use super::{ClientTransporter,ServerTransporter}; +use super::{ClientTransporter, ServerTransporter}; #[derive(Debug, Error)] pub enum Error { @@ -26,19 +26,22 @@ pub enum Error { type SymmetricalReader = SymmetricallyFramed< FramedRead, LengthDelimitedCodec>, T, - SymmetricalBincode>; + SymmetricalBincode, +>; type SymmetricalWriter = SymmetricallyFramed< FramedWrite, LengthDelimitedCodec>, T, - SymmetricalBincode>; + SymmetricalBincode, +>; pub struct Receiver { pub reader: SymmetricalReader, } -impl Receiver where - SymmetricalReader : TryStream + Unpin, +impl Receiver +where + SymmetricalReader: TryStream + Unpin, { pub async fn recv(&mut self) -> Result, Error> { if let Ok(msg) = self.reader.try_next().await { @@ -53,101 +56,106 @@ pub struct Sender { pub writer: SymmetricalWriter, } -impl Sender where - T: for<'a> Deserialize<'a> + Serialize + Unpin +impl Sender +where + T: for<'a> Deserialize<'a> + Serialize + Unpin, { pub async fn send(&mut self, item: T) -> Result<(), Error> { self.writer.send(item).await.map_err(Error::IO) } } - -async fn split(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) { +async fn split(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) { let (reader, writer) = tokio::io::split(socket); - let reader: FramedRead< - ReadHalf, - LengthDelimitedCodec, - > = FramedRead::new(reader, LengthDelimitedCodec::new()); - let reader: SymmetricalReader<(usize, R)> = SymmetricallyFramed::new( - reader, SymmetricalBincode::default()); + let reader: FramedRead, LengthDelimitedCodec> = + FramedRead::new(reader, LengthDelimitedCodec::new()); + let reader: SymmetricalReader<(usize, R)> = + SymmetricallyFramed::new(reader, SymmetricalBincode::default()); - let writer: FramedWrite< - WriteHalf, - LengthDelimitedCodec, - > = FramedWrite::new(writer, LengthDelimitedCodec::new()); + let writer: FramedWrite, LengthDelimitedCodec> = + FramedWrite::new(writer, LengthDelimitedCodec::new()); - let writer: SymmetricalWriter<(usize, T)> = SymmetricallyFramed::new( - writer, SymmetricalBincode::default()); + let writer: SymmetricalWriter<(usize, T)> = + SymmetricallyFramed::new(writer, SymmetricalBincode::default()); - (Sender{ writer }, Receiver{ reader }) + (Sender { writer }, Receiver { reader }) } -pub struct TcpClient { +pub struct TcpClient { sender: Sender<(usize, T)>, receiver: Receiver<(usize, R)>, - multiplexer: Option>, + multiplexer: Option>, req_id: AtomicUsize, ghost: std::marker::PhantomData<(T, R)>, } -pub struct TcpServer { +pub struct TcpServer { listener: TcpListener, ghost: std::marker::PhantomData<(T, R)>, } - -impl TcpClient +impl TcpClient where T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static, R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static, { - pub async fn connect(address: &A) -> - Result, Error> + pub async fn connect(address: &A) -> Result, Error> where A: tokio::net::ToSocketAddrs + ?Sized, { let socket = TcpStream::connect(&address).await.map_err(Error::IO)?; - let (sender,receiver) = split(socket).await; + let (sender, receiver) = split(socket).await; - Ok(TcpClient{ sender, receiver, multiplexer: None, req_id: AtomicUsize::new(0), ghost: Default::default() }) + Ok(TcpClient { + sender, + receiver, + multiplexer: None, + req_id: AtomicUsize::new(0), + ghost: Default::default(), + }) } - pub async fn multiplex(self) -> (ChannelClient, BoxFuture<'static, Result<(), channel::Error>>) { - let (client,multiplexer) = channel::new_multiplexer::(); + pub async fn multiplex( + self, + ) -> ( + ChannelClient, + BoxFuture<'static, Result<(), channel::Error>>, + ) { + let (client, multiplexer) = channel::new_multiplexer::(); let fut = multiplexer.start( self.sender, - |sender, data| { Box::pin(async {sender.send(data).await.is_ok()}) }, + |sender, data| Box::pin(async { sender.send(data).await.is_ok() }), self.receiver, - |receiver| { Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }) }, + |receiver| Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }), ); (client, Box::pin(fut)) } - pub async fn spawn(self) -> ChannelClient { + pub async fn spawn(self) -> ChannelClient { let (client, job) = self.multiplex().await; tokio::spawn(job); client } } -impl TcpServer where +impl TcpServer +where T: for<'a> Deserialize<'a> + Serialize, R: for<'a> Deserialize<'a> + Serialize, { - - pub async fn new(address: &Ipv4Addr, port: u16) -> - Result, Error> - { + pub async fn new(address: &Ipv4Addr, port: u16) -> Result, Error> { let address = format!("{}:{}", address, port); let listener = TcpListener::bind(&address).await.map_err(Error::IO)?; - Ok(TcpServer{ listener, ghost: Default::default() }) + Ok(TcpServer { + listener, + ghost: Default::default(), + }) } - async fn accept(&mut self) -> Result - { + async fn accept(&mut self) -> Result { let (socket, address) = self.listener.accept().await.map_err(Error::IO)?; println!("connection accepted: {:?}", address); Ok(socket) @@ -155,38 +163,43 @@ impl TcpServer where } #[async_trait] -impl ServerTransporter for TcpServer +impl ServerTransporter for TcpServer where T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin, R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin, { type Error = Error; - async fn listen(&mut self, handler: F, stop_response: Option, userdata: D) -> Result<(), Self::Error> + async fn listen( + &mut self, + handler: F, + stop_response: Option, + userdata: D, + ) -> Result<(), Self::Error> where FR: Future> + Send + 'static, F: Fn(T, &D) -> FR + Send + Sync, D: Send + Sync, { - let (client,fut) = channel::new_multiplexer::(); + let (client, fut) = channel::new_multiplexer::(); // super::async_listener( - // &mut receiver, |_self| { Box::pin(async { + // &mut receiver, |_self| { Box::pin(async { // _self.recv().await // }) }, - // &mut sender, |_self, data| { Box::pin(async { + // &mut sender, |_self, data| { Box::pin(async { // _self.send(data).await // }) }, // handler, stop_response, &userdata); while let Ok(mut stream) = self.accept().await { - let (sender,receiver) = split::(stream).await; + let (sender, receiver) = split::(stream).await; // tokio::spawn(async move { // super::async_listener( - // &mut receiver, |_self| { Box::pin(async { + // &mut receiver, |_self| { Box::pin(async { // _self.recv().await // }) }, - // &mut sender, |_self, data| { Box::pin(async { + // &mut sender, |_self, data| { Box::pin(async { // _self.send(data).await // }) }, // handler, stop_response, &userdata) @@ -195,4 +208,4 @@ where todo!() } -} \ No newline at end of file +}