chore: code format

This commit is contained in:
zawz 2026-05-02 13:54:57 +02:00
parent 25bb724961
commit a9d09e30f8
6 changed files with 318 additions and 263 deletions

View file

@ -1,8 +1,8 @@
use std::time::Duration;
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
pub type RetData = (usize,Option<usize>);
pub type RetData = (usize, Option<usize>);
pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
if dur.as_nanos() < 10000 {
@ -17,8 +17,8 @@ pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
}
pub fn dur_to_str(dur: Duration) -> String {
let (n,s) = dur_to_num(dur);
n.to_string() +" "+ s
let (n, s) = dur_to_num(dur);
n.to_string() + " " + s
}
pub fn process_data(dat: TestData) -> RetData {
@ -31,12 +31,7 @@ pub fn process_data_ref(dat: &TestData) -> RetData {
pub const DATASIZE: usize = 1000000;
const TEST_STRINGS: [&str; 4] = [
"toto",
"tata",
"titi",
"tutu"
];
const TEST_STRINGS: [&str; 4] = ["toto", "tata", "titi", "tutu"];
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum TestEnum {
@ -50,11 +45,10 @@ pub struct TestData {
vec: Vec<(bool, TestEnum)>,
}
use crate::transport::{ClientTransporter,ServerTransporter};
use crate::transport::{ClientTransporter, ServerTransporter};
#[rspc::service]
impl TestData {
pub fn len(&self) -> usize {
self.vec.len()
}
@ -77,7 +71,7 @@ impl TestData {
if n <= 1 {
n
} else {
Self::internal_fib(n-1) + Self::internal_fib(n-2)
Self::internal_fib(n - 1) + Self::internal_fib(n - 2)
}
}
@ -86,11 +80,11 @@ impl TestData {
}
pub fn add(&self, a: usize, b: usize) -> usize {
return a+b;
return a + b;
}
pub fn calc_add(&self, data: TestData) -> Option<usize> {
return self.calc().and_then(|a| data.calc().map(|b| a+b));
return self.calc().and_then(|a| data.calc().map(|b| a + b));
}
pub fn push(&mut self, u: (bool, TestEnum)) {
@ -102,16 +96,16 @@ pub fn make_test_data(n: usize) -> TestData {
let mut v = Vec::with_capacity(n);
let mut b = true;
for i in 0..n {
v.push(( b ,
match i%3 {
v.push((
b,
match i % 3 {
0 => TestEnum::NoValue,
1 => TestEnum::Num(i),
2 => TestEnum::Str(TEST_STRINGS[i%4].to_string()),
2 => TestEnum::Str(TEST_STRINGS[i % 4].to_string()),
_ => panic!("unexpected error"),
}));
},
));
b = !b;
}
TestData {
vec: v,
}
TestData { vec: v }
}

View file

@ -1,7 +1,8 @@
pub mod data;
use data::{make_test_data,dur_to_str,TestData,TestEnum,TestDataClient,TestDataServer,DATASIZE};
use data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport::serde::TcpClient;
@ -10,19 +11,18 @@ use rspc::transport;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// let t = TcpClient::connect("127.0.0.1:3306").await.unwrap();
// let client = t.spawn().await;
// let client = TestDataClient::new(client);
let (c,s) = transport::channel::new_async();
let (c, s) = transport::channel::new_async();
let data: TestData = make_test_data(DATASIZE);
let srv_thread = tokio::spawn(async move {
let mut server = TestDataServer::from(data);
server.listen(s).await
} );
});
let client = TestDataClient::new(c);
let clientref = &client;
@ -43,7 +43,7 @@ async fn main() -> anyhow::Result<()> {
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0) );
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
@ -60,9 +60,8 @@ async fn main() -> anyhow::Result<()> {
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE+1, client.len().await.unwrap());
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);

View file

@ -1,4 +1,3 @@
extern crate proc_macro;
extern crate quote;
@ -9,8 +8,8 @@ use syn::{
parse::{Parse, ParseStream},
parse_macro_input,
spanned::Spanned,
FnArg, Ident,
Pat, PatType, ReturnType, Visibility, Receiver, ItemImpl, ImplItemFn, ImplItemType, Signature, Attribute,
Attribute, FnArg, Ident, ImplItemFn, ImplItemType, ItemImpl, Pat, PatType, Receiver,
ReturnType, Signature, Visibility,
};
use quote::{format_ident, quote};
@ -63,25 +62,29 @@ struct Service {
}
impl TryFrom<ImplItemFn> for RpcMethod {
type Error = syn::Error;
fn try_from(value: ImplItemFn) -> Result<Self, Self::Error> {
let mut reciever = None;
let mut args = vec!();
let mut args = vec![];
for arg in &value.sig.inputs {
match arg {
FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
args.push(captured.clone());
},
}
FnArg::Typed(captured) => {
return Err(syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args"));
},
return Err(syn::Error::new(
captured.pat.span(),
"patterns aren't allowed in RPC args",
));
}
FnArg::Receiver(v) => {
if matches!(v.reference, None) {
return Err(syn::Error::new(v.span(), "self cannot be consumed in RPC method"));
return Err(syn::Error::new(
v.span(),
"self cannot be consumed in RPC method",
));
} else {
reciever = Some(v.clone());
}
@ -105,37 +108,48 @@ impl Parse for Service {
let mut errors = Ok(());
let mut typeitems: Vec<ImplItemType> = vec!();
let fns: Vec<&ImplItemFn> = item.items.iter().filter(|x| {
match x {
let mut typeitems: Vec<ImplItemType> = vec![];
let fns: Vec<&ImplItemFn> = item
.items
.iter()
.filter(|x| match x {
syn::ImplItem::Fn(fnit) => {
matches!(fnit.vis, Visibility::Public(_))
},
}
syn::ImplItem::Type(x) => {
typeitems.push(x.clone());
false
}
_ => false,
}
}).map(|x|
if let syn::ImplItem::Fn(fnit) = x {
fnit
} else {
unreachable!()
}
).collect();
})
.map(|x| {
if let syn::ImplItem::Fn(fnit) = x {
fnit
} else {
unreachable!()
}
})
.collect();
let ident = match item.self_ty.as_ref() {
syn::Type::Path(x) => {
match x.path.get_ident() {
Some(i) => i.clone(),
None => return Err(syn::Error::new(x.span(), "generics and paths are not supported")),
syn::Type::Path(x) => match x.path.get_ident() {
Some(i) => i.clone(),
None => {
return Err(syn::Error::new(
x.span(),
"generics and paths are not supported",
))
}
},
_ => return Err(syn::Error::new(item.self_ty.span(), "unsupported self type")),
_ => {
return Err(syn::Error::new(
item.self_ty.span(),
"unsupported self type",
))
}
};
let mut rpcs = vec!();
let mut rpcs = vec![];
for one_fn in fns {
match RpcMethod::try_from(one_fn.clone()) {
Ok(v) => rpcs.push(v),
@ -157,7 +171,6 @@ impl Parse for Service {
)
)
}
}
errors?;
@ -171,7 +184,6 @@ impl Parse for Service {
}
}
/// Macro used to generate the RSPC Server and Client objects.
#[proc_macro_attribute]
pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
@ -188,62 +200,74 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let server = &format_ident!("{}Server", ident);
let client = &format_ident!("{}Client", ident);
let fn_names: Vec<_> = rpcs.iter().map(|x| {
&x.sig.ident
}).collect();
let fn_names: Vec<_> = rpcs.iter().map(|x| &x.sig.ident).collect();
let fn_names_camel: Vec<_> = fn_names.iter().map(|x| {
Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span())
}).collect();
let fn_names_camel: Vec<_> = fn_names
.iter()
.map(|x| Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span()))
.collect();
let fn_locks: Vec<_> = rpcs.iter().map(|x| {
match x.reciever.as_ref() {
let fn_locks: Vec<_> = rpcs
.iter()
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability {
Some(_) => quote!(write()),
None => quote!(read()),
},
None => quote!(read()),
}
}).collect();
})
.collect();
let fn_mut: Vec<_> = rpcs.iter().map(|x| {
match x.reciever.as_ref() {
let fn_mut: Vec<_> = rpcs
.iter()
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability {
Some(_) => quote!(mut),
None => quote!(),
},
None => quote!(),
}
}).collect();
})
.collect();
let fn_await: Vec<_> = rpcs.iter().map(|x| {
match &x.sig.asyncness {
let fn_await: Vec<_> = rpcs
.iter()
.map(|x| match &x.sig.asyncness {
Some(_) => quote!(.await),
None => quote!(),
}
}).collect();
})
.collect();
let args: Vec<_> = rpcs.iter().map(|x| {
let args = &x.args;
quote!{ #(#args),* }
}).collect();
let args: Vec<_> = rpcs
.iter()
.map(|x| {
let args = &x.args;
quote! { #(#args),* }
})
.collect();
let arg_types: Vec<_> = rpcs.iter().map(|x| {
let args = x.args.iter().map(|a| &a.ty);
quote!{ #(#args),* }
}).collect();
let arg_types: Vec<_> = rpcs
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.ty);
quote! { #(#args),* }
})
.collect();
let arg_idents: Vec<_> = rpcs.iter().map(|x| {
let args = x.args.iter().map(|a| &a.pat);
quote!{ #(#args),* }
}).collect();
let arg_idents: Vec<_> = rpcs
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.pat);
quote! { #(#args),* }
})
.collect();
let outs: Vec<_> = rpcs.iter().map(|x| {
match &x.sig.output {
ReturnType::Default => quote!{()},
ReturnType::Type(_, t) => quote!{#t},
}
}).collect();
let outs: Vec<_> = rpcs
.iter()
.map(|x| match &x.sig.output {
ReturnType::Default => quote! {()},
ReturnType::Type(_, t) => quote! {#t},
})
.collect();
let t = quote! {

View file

@ -4,14 +4,14 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use async_trait::async_trait;
use super::{ClientTransporter,ServerTransporter};
use super::{ClientTransporter, ServerTransporter};
#[derive(Error,Debug)]
#[derive(Error, Debug)]
pub enum Error {
#[error("channel recv error")]
ChannelRecvError,
@ -23,16 +23,15 @@ pub enum Error {
OneshotRecvError(#[from] oneshot::error::RecvError),
}
#[derive(Clone)]
/// Client endpoint of any channel.
pub struct ChannelClient<T,R> {
channel: UnboundedSender<(T,oneshot::Sender<R>)>,
pub struct ChannelClient<T, R> {
channel: UnboundedSender<(T, oneshot::Sender<R>)>,
}
/// Server endpoint of a synchronous channel
pub struct SyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
pub struct SyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new synchronous channel client/server instance.
@ -65,35 +64,35 @@ pub struct SyncChannelServer<T,R> {
///
/// let client = MyStructClient::new(client_channel);
/// ```
pub fn new_sync<T,R>() -> (ChannelClient<T,R>,SyncChannelServer<T,R>) {
let (c,s) = mpsc::unbounded_channel();
pub fn new_sync<T, R>() -> (ChannelClient<T, R>, SyncChannelServer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
SyncChannelServer { channel: s },
)
}
impl<T,R> ChannelClient<T,R>
impl<T, R> ChannelClient<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
pub async fn internal_request(&self, data: T) -> Result<R, Error> {
let (t,r) = oneshot::channel();
self.channel.send( (data, t) ).map_err(|_| Error::ChannelSendError)?;
let (t, r) = oneshot::channel();
self.channel
.send((data, t))
.map_err(|_| Error::ChannelSendError)?;
let output = r.await?;
Ok(output)
}
}
#[async_trait]
impl<T,R> ClientTransporter<T,R> for ChannelClient<T,R>
impl<T, R> ClientTransporter<T, R> for ChannelClient<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn request(&self, data: T) -> Result<R, Self::Error> {
@ -102,19 +101,23 @@ where
}
#[async_trait]
impl<T,R> ServerTransporter<T,R> for SyncChannelServer<T,R>
impl<T, R> ServerTransporter<T, R> for SyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send,
F: Fn(T, &D) -> FR + Send + Sync,
D: Send+Sync,
D: Send + Sync,
{
while let Some(msg) = self.channel.recv().await {
match handler(msg.0, &userdata).await {
@ -124,15 +127,15 @@ where
msg.1.send(v).map_err(|_| Error::ChannelRespError)?;
}
break;
},
}
};
}
Ok(())
}
}
pub struct AsyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
pub struct AsyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance.
@ -164,20 +167,25 @@ pub struct AsyncChannelServer<T,R> {
///
/// let client = MyStructClient::new(client_channel);
/// ```
pub fn new_async<T,R>() -> (ChannelClient<T,R>,AsyncChannelServer<T,R>) {
let (c,s) = mpsc::unbounded_channel();
pub fn new_async<T, R>() -> (ChannelClient<T, R>, AsyncChannelServer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
AsyncChannelServer { channel: s },
)
}
impl<T,R> AsyncChannelServer<T,R>
impl<T, R> AsyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
{
async fn internal_listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Error>
async fn internal_listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
@ -185,7 +193,7 @@ where
{
let mut pending = FuturesUnordered::new();
loop {
tokio::select!{
tokio::select! {
Some(rcv) = self.channel.recv() => {
pending.push(
async {
@ -212,27 +220,34 @@ where
}
let results: Vec<_> = pending.collect().await;
results.into_iter().map(|r| -> Result<(), Error> {
match r {
(Some(r),sender) => sender.send(r).map_err(|_| Error::ChannelRespError),
_ => Ok(()),
}
}).collect::<Result<Vec<_>, Error>>()?;
results
.into_iter()
.map(|r| -> Result<(), Error> {
match r {
(Some(r), sender) => sender.send(r).map_err(|_| Error::ChannelRespError),
_ => Ok(()),
}
})
.collect::<Result<Vec<_>, Error>>()?;
Ok(())
}
}
#[async_trait]
impl<T,R> ServerTransporter<T,R> for AsyncChannelServer<T,R>
impl<T, R> ServerTransporter<T, R> for AsyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
@ -245,19 +260,16 @@ where
/// Create a channel multiplexer.
///
/// This is intended to be used with self-mutable clients to provide immutable clients to it
pub fn new_multiplexer<T,R>() -> (ChannelClient<T,R>,Multiplexer<T,R>) {
let (c,s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
Multiplexer { channel: s },
)
pub fn new_multiplexer<T, R>() -> (ChannelClient<T, R>, Multiplexer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(ChannelClient { channel: c }, Multiplexer { channel: s })
}
pub struct Multiplexer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
pub struct Multiplexer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
impl<T,R> Multiplexer<T,R>
impl<T, R> Multiplexer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
@ -270,17 +282,23 @@ where
/// - The 3rd party must be handling (usize,T) as input and (usize,R) as output
/// - sender_send(sender) is the function used to send data to the 3rd party
/// - listener_recv(listener) is the function used to recieve data from the 3rd party
pub async fn start<S, SF, L, LF>(mut self, mut sender: S, sender_send: SF, mut listener: L, listener_recv: LF) -> Result<(), Error>
pub async fn start<S, SF, L, LF>(
mut self,
mut sender: S,
sender_send: SF,
mut listener: L,
listener_recv: LF,
) -> Result<(), Error>
where
SF: Fn(&mut S, (usize,T)) -> BoxFuture<bool> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Option<(usize,R)>> + 'static,
SF: Fn(&mut S, (usize, T)) -> BoxFuture<bool> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Option<(usize, R)>> + 'static,
{
let mut pending: BTreeMap<usize, oneshot::Sender<R>> = BTreeMap::new();
let mut id_counter: usize = 0;
loop {
tokio::select!{
tokio::select! {
q = self.channel.recv() => {
match q {
Some(rcv) => {

View file

@ -1,5 +1,5 @@
use async_trait::async_trait;
use futures::{Future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt};
pub mod channel;
pub mod serde;
@ -11,7 +11,7 @@ pub mod serde;
/// - ClientTransporter must be immutable. If a client implementation requires self mutability,
/// use Mutex, RwLock, or similar tools to mutate values without requiring self mutability
#[async_trait]
pub trait ClientTransporter<T,R> {
pub trait ClientTransporter<T, R> {
type Error: std::fmt::Debug;
async fn request(&self, data: T) -> Result<R, Self::Error>;
}
@ -23,22 +23,29 @@ pub trait ClientTransporter<T,R> {
/// - Upon recieving a stop request (handler function return None), server must respond with stop_response if specified to only this request and none other.
/// Finishing and responding to pending jobs is optional.
#[async_trait]
pub trait ServerTransporter<T,R>
{
pub trait ServerTransporter<T, R> {
type Error: std::fmt::Debug;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync + Copy + 'static,
D: Send + Sync + 'static,
;
D: Send + Sync + 'static;
}
pub
async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener: &mut L, listener_recv: LF,
sender: &mut S, sender_send: SF,
handler: F, stop_response: Option<R>, userdata: &D) -> Result<(), E>
pub async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener: &mut L,
listener_recv: LF,
sender: &mut S,
sender_send: SF,
handler: F,
stop_response: Option<R>,
userdata: &D,
) -> Result<(), E>
where
T: Send + Sync,
R: Send + Sync + 'static,
@ -46,12 +53,12 @@ where
F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync + 'static,
C: Send + Sync + 'static,
SF: Fn(&mut S, (C,R)) -> BoxFuture<Result<(), E>> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Result<Option<(C,T)>, E>> + 'static,
SF: Fn(&mut S, (C, R)) -> BoxFuture<Result<(), E>> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Result<Option<(C, T)>, E>> + 'static,
{
let mut pending = FuturesUnordered::new();
loop {
tokio::select!{
tokio::select! {
rcv = listener_recv(listener) => {
match rcv? {
Some((id, data)) => {
@ -87,9 +94,9 @@ where
let results: Vec<_> = pending.collect().await;
for it in results {
match it {
(id,Some(r)) => {
sender_send(sender, (id,r)).await?;
},
(id, Some(r)) => {
sender_send(sender, (id, r)).await?;
}
_ => (),
}
}

View file

@ -1,19 +1,19 @@
use async_trait::async_trait;
use futures::future::BoxFuture;
use thiserror::Error;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicUsize;
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
//use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::io::{ReadHalf,WriteHalf};
use tokio_serde::SymmetricallyFramed;
use tokio::io::{ReadHalf, WriteHalf};
use tokio_serde::formats::*;
use tokio_serde::SymmetricallyFramed;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::channel::{self, ChannelClient, Multiplexer};
use super::{ClientTransporter,ServerTransporter};
use super::{ClientTransporter, ServerTransporter};
#[derive(Debug, Error)]
pub enum Error {
@ -26,19 +26,22 @@ pub enum Error {
type SymmetricalReader<T> = SymmetricallyFramed<
FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec>,
T,
SymmetricalBincode<T>>;
SymmetricalBincode<T>,
>;
type SymmetricalWriter<T> = SymmetricallyFramed<
FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec>,
T,
SymmetricalBincode<T>>;
SymmetricalBincode<T>,
>;
pub struct Receiver<T> {
pub reader: SymmetricalReader<T>,
}
impl<T> Receiver<T> where
SymmetricalReader<T> : TryStream<Ok=T> + Unpin,
impl<T> Receiver<T>
where
SymmetricalReader<T>: TryStream<Ok = T> + Unpin,
{
pub async fn recv(&mut self) -> Result<Option<T>, Error> {
if let Ok(msg) = self.reader.try_next().await {
@ -53,101 +56,106 @@ pub struct Sender<T> {
pub writer: SymmetricalWriter<T>,
}
impl<T> Sender<T> where
T: for<'a> Deserialize<'a> + Serialize + Unpin
impl<T> Sender<T>
where
T: for<'a> Deserialize<'a> + Serialize + Unpin,
{
pub async fn send(&mut self, item: T) -> Result<(), Error> {
self.writer.send(item).await.map_err(Error::IO)
}
}
async fn split<T,R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
async fn split<T, R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
let (reader, writer) = tokio::io::split(socket);
let reader: FramedRead<
ReadHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> = SymmetricallyFramed::new(
reader, SymmetricalBincode::default());
let reader: FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec> =
FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> =
SymmetricallyFramed::new(reader, SymmetricalBincode::default());
let writer: FramedWrite<
WriteHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec> =
FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: SymmetricalWriter<(usize, T)> = SymmetricallyFramed::new(
writer, SymmetricalBincode::default());
let writer: SymmetricalWriter<(usize, T)> =
SymmetricallyFramed::new(writer, SymmetricalBincode::default());
(Sender{ writer }, Receiver{ reader })
(Sender { writer }, Receiver { reader })
}
pub struct TcpClient<T,R> {
pub struct TcpClient<T, R> {
sender: Sender<(usize, T)>,
receiver: Receiver<(usize, R)>,
multiplexer: Option<Multiplexer<T,R>>,
multiplexer: Option<Multiplexer<T, R>>,
req_id: AtomicUsize,
ghost: std::marker::PhantomData<(T, R)>,
}
pub struct TcpServer<T,R> {
pub struct TcpServer<T, R> {
listener: TcpListener,
ghost: std::marker::PhantomData<(T, R)>,
}
impl<T,R> TcpClient<T,R>
impl<T, R> TcpClient<T, R>
where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
{
pub async fn connect<A>(address: &A) ->
Result<TcpClient<T, R>, Error>
pub async fn connect<A>(address: &A) -> Result<TcpClient<T, R>, Error>
where
A: tokio::net::ToSocketAddrs + ?Sized,
{
let socket = TcpStream::connect(&address).await.map_err(Error::IO)?;
let (sender,receiver) = split(socket).await;
let (sender, receiver) = split(socket).await;
Ok(TcpClient{ sender, receiver, multiplexer: None, req_id: AtomicUsize::new(0), ghost: Default::default() })
Ok(TcpClient {
sender,
receiver,
multiplexer: None,
req_id: AtomicUsize::new(0),
ghost: Default::default(),
})
}
pub async fn multiplex(self) -> (ChannelClient<T,R>, BoxFuture<'static, Result<(), channel::Error>>) {
let (client,multiplexer) = channel::new_multiplexer::<T,R>();
pub async fn multiplex(
self,
) -> (
ChannelClient<T, R>,
BoxFuture<'static, Result<(), channel::Error>>,
) {
let (client, multiplexer) = channel::new_multiplexer::<T, R>();
let fut = multiplexer.start(
self.sender,
|sender, data| { Box::pin(async {sender.send(data).await.is_ok()}) },
|sender, data| Box::pin(async { sender.send(data).await.is_ok() }),
self.receiver,
|receiver| { Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }) },
|receiver| Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }),
);
(client, Box::pin(fut))
}
pub async fn spawn(self) -> ChannelClient<T,R> {
pub async fn spawn(self) -> ChannelClient<T, R> {
let (client, job) = self.multiplex().await;
tokio::spawn(job);
client
}
}
impl<T,R> TcpServer<T,R> where
impl<T, R> TcpServer<T, R>
where
T: for<'a> Deserialize<'a> + Serialize,
R: for<'a> Deserialize<'a> + Serialize,
{
pub async fn new(address: &Ipv4Addr, port: u16) ->
Result<TcpServer<T,R>, Error>
{
pub async fn new(address: &Ipv4Addr, port: u16) -> Result<TcpServer<T, R>, Error> {
let address = format!("{}:{}", address, port);
let listener = TcpListener::bind(&address).await.map_err(Error::IO)?;
Ok(TcpServer{ listener, ghost: Default::default() })
Ok(TcpServer {
listener,
ghost: Default::default(),
})
}
async fn accept(&mut self) -> Result<TcpStream, Error>
{
async fn accept(&mut self) -> Result<TcpStream, Error> {
let (socket, address) = self.listener.accept().await.map_err(Error::IO)?;
println!("connection accepted: {:?}", address);
Ok(socket)
@ -155,20 +163,25 @@ impl<T,R> TcpServer<T,R> where
}
#[async_trait]
impl<T,R> ServerTransporter<T,R> for TcpServer<T,R>
impl<T, R> ServerTransporter<T, R> for TcpServer<T, R>
where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync,
{
let (client,fut) = channel::new_multiplexer::<R,T>();
let (client, fut) = channel::new_multiplexer::<R, T>();
// super::async_listener(
// &mut receiver, |_self| { Box::pin(async {
@ -180,7 +193,7 @@ where
// handler, stop_response, &userdata);
while let Ok(mut stream) = self.accept().await {
let (sender,receiver) = split::<R,T>(stream).await;
let (sender, receiver) = split::<R, T>(stream).await;
// tokio::spawn(async move {
// super::async_listener(
// &mut receiver, |_self| { Box::pin(async {