Compare commits

..

2 commits

Author SHA1 Message Date
b619f79095 chore: code cleanup and folder restructuring 2026-05-02 15:37:01 +02:00
a9d09e30f8 chore: code format 2026-05-02 13:54:57 +02:00
19 changed files with 743 additions and 1860 deletions

202
Cargo.lock generated
View file

@ -1,21 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
version = 4
[[package]]
name = "aho-corasick"
@ -127,17 +112,6 @@ version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1"
[[package]]
name = "async-trait"
version = "0.1.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -150,21 +124,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "bincode"
version = "1.3.3"
@ -263,7 +222,7 @@ dependencies = [
"num-traits",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -273,7 +232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -368,7 +327,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -414,12 +373,6 @@ dependencies = [
"windows",
]
[[package]]
name = "gimli"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "gloo-timers"
version = "0.2.6"
@ -455,7 +408,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -490,9 +443,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.148"
version = "0.2.186"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
[[package]]
name = "linux-raw-sys"
@ -548,24 +501,15 @@ version = "2.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.8"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1"
dependencies = [
"libc",
"wasi",
"windows-sys",
"windows-sys 0.61.2",
]
[[package]]
@ -608,25 +552,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.18.0"
@ -694,7 +619,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -733,23 +658,23 @@ dependencies = [
"libc",
"log",
"pin-project-lite",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.67"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
@ -811,35 +736,35 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
name = "rspc"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"futures",
"oneshot",
"pin-project",
"rspc_dev_utilities",
"rspc_macros",
"serde",
"serde_json",
"syn 2.0.38",
"thiserror",
"tokio",
"tokio-serde",
"tokio-util",
]
[[package]]
name = "rspc_dev_utilities"
version = "0.1.0"
dependencies = [
"async-std",
"rspc",
"serde",
"tokio",
]
[[package]]
name = "rspc_macros"
version = "0.1.0"
dependencies = [
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.37.25"
@ -851,7 +776,7 @@ dependencies = [
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -880,22 +805,32 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.189"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.189"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -954,12 +889,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.4"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.61.2",
]
[[package]]
@ -975,9 +910,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.38"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
@ -1001,7 +936,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -1016,32 +951,30 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.32.0"
version = "1.52.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.4",
"socket2 0.6.3",
"tokio-macros",
"windows-sys",
"windows-sys 0.61.2",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -1093,7 +1026,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -1186,7 +1119,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
"wasm-bindgen-shared",
]
@ -1220,7 +1153,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -1272,6 +1205,12 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-sys"
version = "0.48.0"
@ -1281,6 +1220,15 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-targets"
version = "0.48.5"

View file

@ -1,22 +1,44 @@
[workspace]
resolver = "3"
members = ["utilities"]
[dev-dependencies]
rspc_dev_utilities = { path = "utilities" }
[features]
default = ["full"]
full = ["serde","channel"]
channel = ["dep:oneshot"]
serde = ["dep:serde", "rspc_macros/serde"]
[package]
name = "rspc"
version = "0.1.0"
edition = "2021"
include = ["/src"]
[dependencies]
serde = { version = "1.0", features = ["derive"] }
futures = "0.3"
tokio = { version = "1.0", features = ["full"] }
syn = { version = "2.0", features = ["full"] }
rspc_macros = { path = "macros", version = "0.1" }
# necessary dependencies
futures = "0.3"
thiserror = "1.0.49"
async-trait = "0.1.73"
async-std = "1.12.0"
tokio = { version = "1.0", features = ["full"] }
tokio-serde = { version = "0.8.0", features=["json","bincode"] }
pin-project = "1.1.3"
tokio-util = { version = "0.7.10", features=["codec"] }
serde_json = "1.0.108"
oneshot = { version = "0.1.6", features=["std"] }
oneshot = { version = "0.1.6", features=["std"], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
[lib]
[[example]]
name = "channel_sync"
required-features = ["channel"]
[[example]]
name = "channel_async"
required-features = ["channel"]
[[example]]
name = "tcp"
required-features = ["serde"]

View file

@ -3,12 +3,14 @@
Proof of concept RPC framework focused on ease of use.
It works by calling the macro `#[rspc::service]` on an impl block,
and code is generated for the resulting Server and Client objects.
You can then instantiate a Server/Client on the desired transporter.
The Server objects own the original struct data and implements listen functions.<br>
The Client objects must be provided a connection, and replicates all the functions of the impl block where the macro was called.
The Server implements read-write locking, there can be many reads at once, but only one write.
Requests are not errored upon write-lock, but instead waited for.
The Server implements read-write locking, there can be many reads at once, but only one write. Requests are not errored upon write-lock, but instead waited for.
> If you wish to implement parallel writes, you must implement it with internal parallelism as you would in normal rust, for instance with immutable functions and internal read-write locks
The Client object cannot be cloned. Instead all function calls are immutable, so a reference can be shared to all.
@ -17,7 +19,6 @@ Currently only implements local thread messaging. Serialized TCP transport is un
Example:
```rs
use rspc::transport::{channel, ClientTransporter,ServerTransporter};
use rspc::service;
use serde::{Deserialize, Serialize};
@ -26,7 +27,8 @@ pub struct MyStruct {
my_vec: Vec<String>,
}
#[service]
// Functions to instanciate as RPC
#[rspc::service]
impl MyStruct
{
pub fn len(&self) -> usize {
@ -44,9 +46,11 @@ impl MyStruct
#[tokio::test]
async fn test() {
// Create the server data structure
let my_data = MyStruct {
my_vec: Vec::new(),
};
// Instanciate a client and server
let (c,s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
@ -65,3 +69,10 @@ async fn test() {
```
See [example](example) for an more detailed example usage
### Internal logic and determinism
RSPC is built on rust logic and works with type determinism rather than serializing.
Serializing is used for transports that require it (such as TCP with serde), but is otherwise unused in code logic.
Determinism is achieved through enum autogeneration by the macro `rscp::service`, which is used for internal transport logic

1333
example/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,28 +1,19 @@
pub mod data;
use data::{make_test_data,dur_to_str,TestData,TestEnum,TestDataClient,TestDataServer,DATASIZE};
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport::serde::TcpClient;
use rspc::transport;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// let t = TcpClient::connect("127.0.0.1:3306").await.unwrap();
// let client = t.spawn().await;
// let client = TestDataClient::new(client);
let (c,s) = transport::channel::new_async();
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (c, s) = transport::channel::new_async();
let data: TestData = make_test_data(DATASIZE);
let srv_thread = tokio::spawn(async move {
let srv_thread = tokio::spawn(async move {
let mut server = TestDataServer::from(data);
server.listen(s).await
} );
});
let client = TestDataClient::new(c);
let clientref = &client;
@ -41,11 +32,11 @@ async fn main() -> anyhow::Result<()> {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0) );
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
@ -60,11 +51,10 @@ async fn main() -> anyhow::Result<()> {
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE+1, client.len().await.unwrap());
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();

63
examples/channel_sync.rs Normal file
View file

@ -0,0 +1,63 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (c, s) = transport::channel::new_sync();
let data: TestData = make_test_data(DATASIZE);
let srv_thread = tokio::spawn(async move {
let mut server = TestDataServer::from(data);
server.listen(s).await
});
let client = TestDataClient::new(c);
let clientref = &client;
let job1 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, clientref.fib(42).await.unwrap());
println!("fib1: {}", dur_to_str(now.elapsed()));
};
let job2 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
println!("calc_add: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, client.fib(42).await.unwrap());
println!("fib2: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
clientref.push((false, TestEnum::NoValue)).await.unwrap();
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
Ok(())
}

62
examples/tcp.rs Normal file
View file

@ -0,0 +1,62 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport::serde::TcpClient;
use rspc::transport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let t = TcpClient::connect("127.0.0.1:6543").await.unwrap();
let client = t.spawn().await;
// todo : server
let client = TestDataClient::new(client);
let clientref = &client;
let job1 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, clientref.fib(42).await.unwrap());
println!("fib1: {}", dur_to_str(now.elapsed()));
};
let job2 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
println!("calc_add: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, client.fib(42).await.unwrap());
println!("fib2: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
clientref.push((false, TestEnum::NoValue)).await.unwrap();
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
Ok(())
}

46
macros/Cargo.lock generated Normal file
View file

@ -0,0 +1,46 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "proc-macro2"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rspc_macros"
version = "0.1.0"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "syn"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"

View file

@ -5,6 +5,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
full = ["serde"]
serde = []
[dependencies]
#anyhow = "1.0"
#futures = "0.3"

View file

@ -1,4 +1,3 @@
extern crate proc_macro;
extern crate quote;
@ -9,8 +8,8 @@ use syn::{
parse::{Parse, ParseStream},
parse_macro_input,
spanned::Spanned,
FnArg, Ident,
Pat, PatType, ReturnType, Visibility, Receiver, ItemImpl, ImplItemFn, ImplItemType, Signature, Attribute,
Attribute, FnArg, Ident, ImplItemFn, ImplItemType, ItemImpl, Pat, PatType, Receiver,
ReturnType, Signature, Visibility,
};
use quote::{format_ident, quote};
@ -49,7 +48,7 @@ fn snake_to_upper_camel(ident_str: &str) -> String {
}
struct RpcMethod {
pub attrs: Vec<Attribute>,
pub _attrs: Vec<Attribute>,
pub sig: Signature,
pub reciever: Option<Receiver>,
pub args: Vec<PatType>,
@ -58,30 +57,34 @@ struct RpcMethod {
struct Service {
pub ident: Ident,
pub rpcs: Vec<RpcMethod>,
pub item: ItemImpl,
pub types: Vec<ImplItemType>,
pub _item: ItemImpl,
pub _types: Vec<ImplItemType>,
}
impl TryFrom<ImplItemFn> for RpcMethod {
type Error = syn::Error;
fn try_from(value: ImplItemFn) -> Result<Self, Self::Error> {
let mut reciever = None;
let mut args = vec!();
let mut args = vec![];
for arg in &value.sig.inputs {
match arg {
FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
args.push(captured.clone());
},
}
FnArg::Typed(captured) => {
return Err(syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args"));
},
return Err(syn::Error::new(
captured.pat.span(),
"patterns aren't allowed in RPC args",
));
}
FnArg::Receiver(v) => {
if matches!(v.reference, None) {
return Err(syn::Error::new(v.span(), "self cannot be consumed in RPC method"));
return Err(syn::Error::new(
v.span(),
"self cannot be consumed in RPC method",
));
} else {
reciever = Some(v.clone());
}
@ -90,7 +93,7 @@ impl TryFrom<ImplItemFn> for RpcMethod {
}
Ok(RpcMethod {
attrs: value.attrs,
_attrs: value.attrs,
sig: value.sig,
reciever,
args,
@ -104,38 +107,49 @@ impl Parse for Service {
let item: ItemImpl = input.parse()?;
let mut errors = Ok(());
let mut typeitems: Vec<ImplItemType> = vec!();
let fns: Vec<&ImplItemFn> = item.items.iter().filter(|x| {
match x {
let mut typeitems: Vec<ImplItemType> = vec![];
let fns: Vec<&ImplItemFn> = item
.items
.iter()
.filter(|x| match x {
syn::ImplItem::Fn(fnit) => {
matches!(fnit.vis, Visibility::Public(_))
},
}
syn::ImplItem::Type(x) => {
typeitems.push(x.clone());
false
}
_ => false,
}
}).map(|x|
if let syn::ImplItem::Fn(fnit) = x {
fnit
} else {
unreachable!()
}
).collect();
})
.map(|x| {
if let syn::ImplItem::Fn(fnit) = x {
fnit
} else {
unreachable!()
}
})
.collect();
let ident = match item.self_ty.as_ref() {
syn::Type::Path(x) => {
match x.path.get_ident() {
Some(i) => i.clone(),
None => return Err(syn::Error::new(x.span(), "generics and paths are not supported")),
syn::Type::Path(x) => match x.path.get_ident() {
Some(i) => i.clone(),
None => {
return Err(syn::Error::new(
x.span(),
"generics and paths are not supported",
))
}
},
_ => return Err(syn::Error::new(item.self_ty.span(), "unsupported self type")),
_ => {
return Err(syn::Error::new(
item.self_ty.span(),
"unsupported self type",
))
}
};
let mut rpcs = vec!();
let mut rpcs = vec![];
for one_fn in fns {
match RpcMethod::try_from(one_fn.clone()) {
Ok(v) => rpcs.push(v),
@ -157,7 +171,6 @@ impl Parse for Service {
)
)
}
}
errors?;
@ -165,13 +178,12 @@ impl Parse for Service {
Ok(Self {
ident,
rpcs,
item,
types: typeitems,
_item: item,
_types: typeitems,
})
}
}
/// Macro used to generate the RSPC Server and Client objects.
#[proc_macro_attribute]
pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
@ -179,86 +191,104 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let Service {
ident,
rpcs,
item: _,
types: _,
_item: _,
_types: _,
} = parse_macro_input!(inputclone as Service);
let transport_request = &format_ident!("{}TransportRequest", ident);
let transport_response = &format_ident!("{}TransportResponse", ident);
let server = &format_ident!("{}Server", ident);
let client = &format_ident!("{}Client", ident);
let fn_names: Vec<_> = rpcs.iter().map(|x| {
&x.sig.ident
}).collect();
let fn_names: Vec<_> = rpcs.iter().map(|x| &x.sig.ident).collect();
let fn_names_camel: Vec<_> = fn_names.iter().map(|x| {
Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span())
}).collect();
let fn_names_camel: Vec<_> = fn_names
.iter()
.map(|x| Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span()))
.collect();
let fn_locks: Vec<_> = rpcs.iter().map(|x| {
match x.reciever.as_ref() {
let fn_locks: Vec<_> = rpcs
.iter()
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability {
Some(_) => quote!(write()),
None => quote!(read()),
},
None => quote!(read()),
}
}).collect();
})
.collect();
let fn_mut: Vec<_> = rpcs.iter().map(|x| {
match x.reciever.as_ref() {
let fn_mut: Vec<_> = rpcs
.iter()
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability {
Some(_) => quote!(mut),
None => quote!(),
},
None => quote!(),
}
}).collect();
})
.collect();
let fn_await: Vec<_> = rpcs.iter().map(|x| {
match &x.sig.asyncness {
let fn_await: Vec<_> = rpcs
.iter()
.map(|x| match &x.sig.asyncness {
Some(_) => quote!(.await),
None => quote!(),
}
}).collect();
})
.collect();
let args: Vec<_> = rpcs.iter().map(|x| {
let args = &x.args;
quote!{ #(#args),* }
}).collect();
let args: Vec<_> = rpcs
.iter()
.map(|x| {
let args = &x.args;
quote! { #(#args),* }
})
.collect();
let arg_types: Vec<_> = rpcs.iter().map(|x| {
let args = x.args.iter().map(|a| &a.ty);
quote!{ #(#args),* }
}).collect();
let arg_types: Vec<_> = rpcs
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.ty);
quote! { #(#args),* }
})
.collect();
let arg_idents: Vec<_> = rpcs.iter().map(|x| {
let args = x.args.iter().map(|a| &a.pat);
quote!{ #(#args),* }
}).collect();
let arg_idents: Vec<_> = rpcs
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.pat);
quote! { #(#args),* }
})
.collect();
let outs: Vec<_> = rpcs.iter().map(|x| {
match &x.sig.output {
ReturnType::Default => quote!{()},
ReturnType::Type(_, t) => quote!{#t},
}
}).collect();
let outs: Vec<_> = rpcs
.iter()
.map(|x| match &x.sig.output {
ReturnType::Default => quote! {()},
ReturnType::Type(_, t) => quote! {#t},
})
.collect();
let serde_derives = if cfg!(feature = "serde") {
quote! { serde::Serialize, serde::Deserialize }
} else {
quote! {}
};
let t = quote! {
#[derive(PartialEq,Debug,Serialize,Deserialize)]
#[derive(PartialEq, Debug, #serde_derives)]
pub enum #transport_request {
#( #fn_names_camel(#arg_types) ),* ,
Stop,
}
#[derive(PartialEq,Debug,Serialize,Deserialize)]
#[derive(PartialEq, Debug, #serde_derives)]
pub enum #transport_response {
#( #fn_names_camel(#outs) ),* ,
Stop,
}
impl #transport_response {
#(
pub fn #fn_names(self) -> #outs {
@ -272,24 +302,24 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
}
pub struct #server {
obj: std::sync::Arc<async_std::sync::RwLock<#ident>>,
obj: std::sync::Arc<tokio::sync::RwLock<#ident>>,
}
impl From<#ident> for #server {
fn from(obj: #ident) -> Self {
Self {
obj: std::sync::Arc::new(async_std::sync::RwLock::new(obj)),
obj: std::sync::Arc::new(tokio::sync::RwLock::new(obj)),
}
}
}
impl #server {
impl #server {
pub async fn listen<Tr>(&mut self, mut transport: Tr) -> Result<(), Tr::Error>
where
Tr: ServerTransporter<#transport_request,#transport_response> + Send
{
{
transport.listen( |v,obj| {
transport.listen( |v,obj| {
let obj = obj.clone();
async move {
match v {
@ -306,30 +336,30 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
}
}
}
pub struct #client<Tr>
where
Tr: ClientTransporter<#transport_request,#transport_response>,
{
transporter: Tr,
}
impl<Tr> #client<Tr>
where
Tr: ClientTransporter<#transport_request,#transport_response>
{
{
pub fn new(transporter: Tr) -> Self {
#client {
transporter,
}
}
#(
pub async fn #fn_names(&self, #args) -> Result<#outs, Tr::Error> {
Ok(self.transporter.request(#transport_request::#fn_names_camel(#arg_idents)).await?.#fn_names())
}
)*
// TODO: graceful stop response
pub async fn stop(&self) -> Result<(), Tr::Error> {
self.transporter.request(#transport_request::Stop).await.map(|_| ())

View file

@ -1,55 +1,2 @@
pub mod transport;
pub use rspc_macros::service;
#[cfg(test)]
mod tests {
use super::transport::{channel, ClientTransporter,ServerTransporter};
use super::service;
use serde::{Deserialize, Serialize};
// use rspc::transport::{ClientTransporter,ServerTransporter};
#[derive(Serialize,Deserialize)]
pub struct MyStruct {
my_vec: Vec<String>,
}
#[service]
impl MyStruct
{
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct {
my_vec: Vec::new(),
};
let (c,s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
} );
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(client.pop().await.unwrap(), Some("Hello world!".to_string()));
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -4,14 +4,12 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use async_trait::async_trait;
use super::{ClientTransporter, ServerTransporter};
use super::{ClientTransporter,ServerTransporter};
#[derive(Error,Debug)]
#[derive(Error, Debug)]
pub enum Error {
#[error("channel recv error")]
ChannelRecvError,
@ -23,98 +21,88 @@ pub enum Error {
OneshotRecvError(#[from] oneshot::error::RecvError),
}
#[derive(Clone)]
/// Client endpoint of any channel.
pub struct ChannelClient<T,R> {
channel: UnboundedSender<(T,oneshot::Sender<R>)>,
pub struct ChannelClient<T, R> {
channel: UnboundedSender<(T, oneshot::Sender<R>)>,
}
/// Server endpoint of a synchronous channel
pub struct SyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
}
/// Create a new synchronous channel client/server instance.
///
/// Synchronous channels can only process one job at a time.
/// If you want job concurrency refer to new_async() and Asynchronous channels
///
/// Server endpoint of a synchronous channel.
///
/// Synchronous channels can only process one job at a time. If you want job concurrency refer to [new_async](new_async) and [AsyncChannelServer](AsyncChannelServer)
///
/// Unpon recieving a stop signal, jobs currently pending recv() will not be processed.
/// Said jobs will continue waiting until either the server listens again, or the server is dropped.
///
pub struct SyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new synchronous channel client/server instance. See [SyncChannelServer](SyncChannelServer)
///
/// Synchronous channels can only process one job at a time. If you want job concurrency refer to [new_async](new_async) and [AsyncChannelServer](AsyncChannelServer)
///
/// Example:
/// ```no_run
/// use serde::{Serialize,Deserialize};
/// use rspc::transport::{channel, ClientTransporter,ServerTransporter};
///
/// #[derive(Serialize,Deserialize)]
///
/// pub struct MyStruct;
///
///
/// #[rspc::service]
/// impl MyStruct {
/// pub fn dummy(&self) -> () { () }
/// }
///
///
/// let (client_channel,server_channel) = channel::new_sync();
///
/// let server_thread = tokio::spawn(async move {
///
/// let server_thread = tokio::spawn(async move {
/// let mut server = MyStructServer::from( MyStruct );
/// server.listen(server_channel).await
/// });
///
///
/// let client = MyStructClient::new(client_channel);
/// ```
pub fn new_sync<T,R>() -> (ChannelClient<T,R>,SyncChannelServer<T,R>) {
let (c,s) = mpsc::unbounded_channel();
pub fn new_sync<T, R>() -> (ChannelClient<T, R>, SyncChannelServer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
SyncChannelServer { channel: s },
)
}
impl<T,R> ChannelClient<T,R>
impl<T, R> ClientTransporter<T, R> for ChannelClient<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
pub async fn internal_request(&self, data: T) -> Result<R, Error> {
let (t,r) = oneshot::channel();
self.channel.send( (data, t) ).map_err(|_| Error::ChannelSendError)?;
async fn request(&self, data: T) -> Result<R, Self::Error> {
let (t, r) = oneshot::channel();
self.channel
.send((data, t))
.map_err(|_| Error::ChannelSendError)?;
let output = r.await?;
Ok(output)
}
}
#[async_trait]
impl<T,R> ClientTransporter<T,R> for ChannelClient<T,R>
impl<T, R> ServerTransporter<T, R> for SyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn request(&self, data: T) -> Result<R, Self::Error> {
self.internal_request(data).await
}
}
#[async_trait]
impl<T,R> ServerTransporter<T,R> for SyncChannelServer<T,R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send,
F: Fn(T, &D) -> FR + Send + Sync,
D: Send+Sync,
D: Send + Sync,
{
while let Some(msg) = self.channel.recv().await {
match handler(msg.0, &userdata).await {
@ -124,60 +112,63 @@ where
msg.1.send(v).map_err(|_| Error::ChannelRespError)?;
}
break;
},
}
};
}
Ok(())
}
}
pub struct AsyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance.
///
/// Server endpoint of an asynchronous channel.
/// Can process any number of jobs in parallel.
///
///
/// Unpon recieving a stop signal, pending jobs are finished and reponded to, but new jobs are not processed.
/// Said new jobs will continue waiting until either the server listens again, or the server is dropped.
///
pub struct AsyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance. See AsyncChannelServer
///
/// Example:
/// ```no_run
/// use serde::{Serialize,Deserialize};
/// use rspc::transport::{channel, ClientTransporter,ServerTransporter};
///
/// #[derive(Serialize,Deserialize)]
///
/// pub struct MyStruct;
///
///
/// #[rspc::service]
/// impl MyStruct {
/// pub fn dummy(&self) -> () { () }
/// }
///
///
/// let (client_channel,server_channel) = channel::new_async();
///
/// let server_thread = tokio::spawn(async move {
///
/// let server_thread = tokio::spawn(async move {
/// let mut server = MyStructServer::from( MyStruct );
/// server.listen(server_channel).await
/// });
///
///
/// let client = MyStructClient::new(client_channel);
/// ```
pub fn new_async<T,R>() -> (ChannelClient<T,R>,AsyncChannelServer<T,R>) {
let (c,s) = mpsc::unbounded_channel();
pub fn new_async<T, R>() -> (ChannelClient<T, R>, AsyncChannelServer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
AsyncChannelServer { channel: s },
)
}
impl<T,R> AsyncChannelServer<T,R>
impl<T, R> AsyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
{
async fn internal_listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Error>
async fn internal_listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
@ -185,7 +176,7 @@ where
{
let mut pending = FuturesUnordered::new();
loop {
tokio::select!{
tokio::select! {
Some(rcv) = self.channel.recv() => {
pending.push(
async {
@ -212,27 +203,33 @@ where
}
let results: Vec<_> = pending.collect().await;
results.into_iter().map(|r| -> Result<(), Error> {
match r {
(Some(r),sender) => sender.send(r).map_err(|_| Error::ChannelRespError),
_ => Ok(()),
}
}).collect::<Result<Vec<_>, Error>>()?;
results
.into_iter()
.map(|r| -> Result<(), Error> {
match r {
(Some(r), sender) => sender.send(r).map_err(|_| Error::ChannelRespError),
_ => Ok(()),
}
})
.collect::<Result<Vec<_>, Error>>()?;
Ok(())
}
}
#[async_trait]
impl<T,R> ServerTransporter<T,R> for AsyncChannelServer<T,R>
impl<T, R> ServerTransporter<T, R> for AsyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
@ -243,44 +240,47 @@ where
}
/// Create a channel multiplexer.
///
///
/// This is intended to be used with self-mutable clients to provide immutable clients to it
pub fn new_multiplexer<T,R>() -> (ChannelClient<T,R>,Multiplexer<T,R>) {
let (c,s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
Multiplexer { channel: s },
)
pub fn new_multiplexer<T, R>() -> (ChannelClient<T, R>, Multiplexer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(ChannelClient { channel: c }, Multiplexer { channel: s })
}
pub struct Multiplexer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
pub struct Multiplexer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
impl<T,R> Multiplexer<T,R>
impl<T, R> Multiplexer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
{
/// Start multiplexing with a 3rd party.
///
///
/// While this is running, the associated ChannelClient can then call request() to send data, and recieve the final response from the 3rd party
///
///
/// Requirements:
/// - The 3rd party must be handling (usize,T) as input and (usize,R) as output
/// - sender_send(sender) is the function used to send data to the 3rd party
/// - listener_recv(listener) is the function used to recieve data from the 3rd party
pub async fn start<S, SF, L, LF>(mut self, mut sender: S, sender_send: SF, mut listener: L, listener_recv: LF) -> Result<(), Error>
pub async fn start<S, SF, L, LF>(
mut self,
mut sender: S,
sender_send: SF,
mut listener: L,
listener_recv: LF,
) -> Result<(), Error>
where
SF: Fn(&mut S, (usize,T)) -> BoxFuture<bool> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Option<(usize,R)>> + 'static,
SF: Fn(&mut S, (usize, T)) -> BoxFuture<bool> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Option<(usize, R)>> + 'static,
{
let mut pending: BTreeMap<usize, oneshot::Sender<R>> = BTreeMap::new();
let mut id_counter: usize = 0;
loop {
tokio::select!{
tokio::select! {
q = self.channel.recv() => {
match q {
Some(rcv) => {
@ -309,4 +309,4 @@ where
Ok(())
}
}
}

View file

@ -1,44 +1,50 @@
use async_trait::async_trait;
use futures::{Future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt};
#[cfg(feature = "channel")]
pub mod channel;
#[cfg(feature = "serde")]
pub mod serde;
/// Definition of a client transporter for RSPC.
///
///
/// ### Implementation specifications
/// - ClientTransporter is a producer object, can be single-producer or multi-producer.
/// - ClientTransporter must be immutable. If a client implementation requires self mutability,
/// use Mutex, RwLock, or similar tools to mutate values without requiring self mutability
#[async_trait]
pub trait ClientTransporter<T,R> {
pub trait ClientTransporter<T, R> {
type Error: std::fmt::Debug;
async fn request(&self, data: T) -> Result<R, Self::Error>;
fn request(&self, data: T) -> impl std::future::Future<Output = Result<R, Self::Error>> + Send;
}
/// Definition of a server transporter for RSPC.
///
///
/// ### Implementation specifications
/// - ServerTransporter is a single-consumer object
/// - Upon recieving a stop request (handler function return None), server must respond with stop_response if specified to only this request and none other.
/// Finishing and responding to pending jobs is optional.
#[async_trait]
pub trait ServerTransporter<T,R>
{
pub trait ServerTransporter<T, R> {
type Error: std::fmt::Debug;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync + Copy + 'static,
D: Send + Sync + 'static,
;
D: Send + Sync + 'static;
}
pub
async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener: &mut L, listener_recv: LF,
sender: &mut S, sender_send: SF,
handler: F, stop_response: Option<R>, userdata: &D) -> Result<(), E>
pub async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener: &mut L,
listener_recv: LF,
sender: &mut S,
sender_send: SF,
handler: F,
stop_response: Option<R>,
userdata: &D,
) -> Result<(), E>
where
T: Send + Sync,
R: Send + Sync + 'static,
@ -46,12 +52,12 @@ where
F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync + 'static,
C: Send + Sync + 'static,
SF: Fn(&mut S, (C,R)) -> BoxFuture<Result<(), E>> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Result<Option<(C,T)>, E>> + 'static,
SF: Fn(&mut S, (C, R)) -> BoxFuture<Result<(), E>> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Result<Option<(C, T)>, E>> + 'static,
{
let mut pending = FuturesUnordered::new();
loop {
tokio::select!{
tokio::select! {
rcv = listener_recv(listener) => {
match rcv? {
Some((id, data)) => {
@ -87,12 +93,12 @@ where
let results: Vec<_> = pending.collect().await;
for it in results {
match it {
(id,Some(r)) => {
sender_send(sender, (id,r)).await?;
},
(id, Some(r)) => {
sender_send(sender, (id, r)).await?;
}
_ => (),
}
}
Ok(())
}
}

View file

@ -1,19 +1,18 @@
use async_trait::async_trait;
use futures::future::BoxFuture;
use thiserror::Error;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicUsize;
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
//use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::io::{ReadHalf,WriteHalf};
use tokio_serde::SymmetricallyFramed;
use tokio::io::{ReadHalf, WriteHalf};
use tokio_serde::formats::*;
use tokio_serde::SymmetricallyFramed;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::channel::{self, ChannelClient, Multiplexer};
use super::{ClientTransporter,ServerTransporter};
use super::{ClientTransporter, ServerTransporter};
#[derive(Debug, Error)]
pub enum Error {
@ -26,19 +25,22 @@ pub enum Error {
type SymmetricalReader<T> = SymmetricallyFramed<
FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec>,
T,
SymmetricalBincode<T>>;
SymmetricalBincode<T>,
>;
type SymmetricalWriter<T> = SymmetricallyFramed<
FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec>,
T,
SymmetricalBincode<T>>;
SymmetricalBincode<T>,
>;
pub struct Receiver<T> {
pub reader: SymmetricalReader<T>,
}
impl<T> Receiver<T> where
SymmetricalReader<T> : TryStream<Ok=T> + Unpin,
impl<T> Receiver<T>
where
SymmetricalReader<T>: TryStream<Ok = T> + Unpin,
{
pub async fn recv(&mut self) -> Result<Option<T>, Error> {
if let Ok(msg) = self.reader.try_next().await {
@ -53,140 +55,149 @@ pub struct Sender<T> {
pub writer: SymmetricalWriter<T>,
}
impl<T> Sender<T> where
T: for<'a> Deserialize<'a> + Serialize + Unpin
impl<T> Sender<T>
where
T: for<'a> Deserialize<'a> + Serialize + Unpin,
{
pub async fn send(&mut self, item: T) -> Result<(), Error> {
self.writer.send(item).await.map_err(Error::IO)
}
}
async fn split<T,R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
async fn split<T, R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
let (reader, writer) = tokio::io::split(socket);
let reader: FramedRead<
ReadHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> = SymmetricallyFramed::new(
reader, SymmetricalBincode::default());
let reader: FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec> =
FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> =
SymmetricallyFramed::new(reader, SymmetricalBincode::default());
let writer: FramedWrite<
WriteHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec> =
FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: SymmetricalWriter<(usize, T)> = SymmetricallyFramed::new(
writer, SymmetricalBincode::default());
let writer: SymmetricalWriter<(usize, T)> =
SymmetricallyFramed::new(writer, SymmetricalBincode::default());
(Sender{ writer }, Receiver{ reader })
(Sender { writer }, Receiver { reader })
}
pub struct TcpClient<T,R> {
pub struct TcpClient<T, R> {
sender: Sender<(usize, T)>,
receiver: Receiver<(usize, R)>,
multiplexer: Option<Multiplexer<T,R>>,
multiplexer: Option<Multiplexer<T, R>>,
req_id: AtomicUsize,
ghost: std::marker::PhantomData<(T, R)>,
}
pub struct TcpServer<T,R> {
pub struct TcpServer<T, R> {
listener: TcpListener,
ghost: std::marker::PhantomData<(T, R)>,
}
impl<T,R> TcpClient<T,R>
impl<T, R> TcpClient<T, R>
where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
{
pub async fn connect<A>(address: &A) ->
Result<TcpClient<T, R>, Error>
pub async fn connect<A>(address: &A) -> Result<TcpClient<T, R>, Error>
where
A: tokio::net::ToSocketAddrs + ?Sized,
{
let socket = TcpStream::connect(&address).await.map_err(Error::IO)?;
let (sender,receiver) = split(socket).await;
let (sender, receiver) = split(socket).await;
Ok(TcpClient{ sender, receiver, multiplexer: None, req_id: AtomicUsize::new(0), ghost: Default::default() })
Ok(TcpClient {
sender,
receiver,
multiplexer: None,
req_id: AtomicUsize::new(0),
ghost: Default::default(),
})
}
pub async fn multiplex(self) -> (ChannelClient<T,R>, BoxFuture<'static, Result<(), channel::Error>>) {
let (client,multiplexer) = channel::new_multiplexer::<T,R>();
pub async fn multiplex(
self,
) -> (
ChannelClient<T, R>,
BoxFuture<'static, Result<(), channel::Error>>,
) {
let (client, multiplexer) = channel::new_multiplexer::<T, R>();
let fut = multiplexer.start(
self.sender,
|sender, data| { Box::pin(async {sender.send(data).await.is_ok()}) },
|sender, data| Box::pin(async { sender.send(data).await.is_ok() }),
self.receiver,
|receiver| { Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }) },
|receiver| Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }),
);
(client, Box::pin(fut))
}
pub async fn spawn(self) -> ChannelClient<T,R> {
pub async fn spawn(self) -> ChannelClient<T, R> {
let (client, job) = self.multiplex().await;
tokio::spawn(job);
client
}
}
impl<T,R> TcpServer<T,R> where
impl<T, R> TcpServer<T, R>
where
T: for<'a> Deserialize<'a> + Serialize,
R: for<'a> Deserialize<'a> + Serialize,
{
pub async fn new(address: &Ipv4Addr, port: u16) ->
Result<TcpServer<T,R>, Error>
{
pub async fn new(address: &Ipv4Addr, port: u16) -> Result<TcpServer<T, R>, Error> {
let address = format!("{}:{}", address, port);
let listener = TcpListener::bind(&address).await.map_err(Error::IO)?;
Ok(TcpServer{ listener, ghost: Default::default() })
Ok(TcpServer {
listener,
ghost: Default::default(),
})
}
async fn accept(&mut self) -> Result<TcpStream, Error>
{
async fn accept(&mut self) -> Result<TcpStream, Error> {
let (socket, address) = self.listener.accept().await.map_err(Error::IO)?;
println!("connection accepted: {:?}", address);
Ok(socket)
}
}
#[async_trait]
impl<T,R> ServerTransporter<T,R> for TcpServer<T,R>
impl<T, R> ServerTransporter<T, R> for TcpServer<T, R>
where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync,
{
let (client,fut) = channel::new_multiplexer::<R,T>();
let (client, fut) = channel::new_multiplexer::<R, T>();
// super::async_listener(
// &mut receiver, |_self| { Box::pin(async {
// &mut receiver, |_self| { Box::pin(async {
// _self.recv().await
// }) },
// &mut sender, |_self, data| { Box::pin(async {
// &mut sender, |_self, data| { Box::pin(async {
// _self.send(data).await
// }) },
// handler, stop_response, &userdata);
while let Ok(mut stream) = self.accept().await {
let (sender,receiver) = split::<R,T>(stream).await;
let (sender, receiver) = split::<R, T>(stream).await;
// tokio::spawn(async move {
// super::async_listener(
// &mut receiver, |_self| { Box::pin(async {
// &mut receiver, |_self| { Box::pin(async {
// _self.recv().await
// }) },
// &mut sender, |_self, data| { Box::pin(async {
// &mut sender, |_self, data| { Box::pin(async {
// _self.send(data).await
// }) },
// handler, stop_response, &userdata)
@ -195,4 +206,4 @@ where
todo!()
}
}
}

46
tests/channel_async.rs Normal file
View file

@ -0,0 +1,46 @@
#[cfg(test)]
#[cfg(feature = "channel")]
mod tests {
use rspc::transport::{channel, ClientTransporter, ServerTransporter};
pub struct MyStruct {
my_vec: Vec<String>,
}
#[rspc::service]
impl MyStruct {
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct { my_vec: Vec::new() };
let (c, s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
});
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(
client.pop().await.unwrap(),
Some("Hello world!".to_string())
);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

46
tests/channel_sync.rs Normal file
View file

@ -0,0 +1,46 @@
#[cfg(test)]
#[cfg(feature = "channel")]
mod tests {
use rspc::transport::{channel, ClientTransporter, ServerTransporter};
pub struct MyStruct {
my_vec: Vec<String>,
}
#[rspc::service]
impl MyStruct {
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct { my_vec: Vec::new() };
let (c, s) = channel::new_sync();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
});
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(
client.pop().await.unwrap(),
Some("Hello world!".to_string())
);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -1,13 +1,12 @@
[package]
name = "rspc_example"
name = "rspc_dev_utilities"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.86"
async-std = "1.12.0"
rspc = { path = ".." }
serde = "1.0.203"
tokio = "1.38.0"
serde = { version = "1.0", features = ["derive"] }
async-std = "1.12.0"
tokio = { version = "1.0", features = ["full"] }

1
utilities/src/lib.rs Normal file
View file

@ -0,0 +1 @@
pub mod test_data;

View file

@ -1,8 +1,8 @@
use std::time::Duration;
use serde::{Serialize, Deserialize};
use rspc::transport::{ClientTransporter, ServerTransporter};
pub type RetData = (usize,Option<usize>);
use serde::{Deserialize, Serialize};
pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
if dur.as_nanos() < 10000 {
@ -17,26 +17,13 @@ pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
}
pub fn dur_to_str(dur: Duration) -> String {
let (n,s) = dur_to_num(dur);
n.to_string() +" "+ s
}
pub fn process_data(dat: TestData) -> RetData {
(dat.len(), dat.calc())
}
pub fn process_data_ref(dat: &TestData) -> RetData {
(dat.len(), dat.calc())
let (n, s) = dur_to_num(dur);
n.to_string() + " " + s
}
pub const DATASIZE: usize = 1000000;
const TEST_STRINGS: [&str; 4] = [
"toto",
"tata",
"titi",
"tutu"
];
const TEST_STRINGS: [&str; 4] = ["toto", "tata", "titi", "tutu"];
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum TestEnum {
@ -50,11 +37,8 @@ pub struct TestData {
vec: Vec<(bool, TestEnum)>,
}
use crate::transport::{ClientTransporter,ServerTransporter};
#[rspc::service]
impl TestData {
pub fn len(&self) -> usize {
self.vec.len()
}
@ -77,7 +61,7 @@ impl TestData {
if n <= 1 {
n
} else {
Self::internal_fib(n-1) + Self::internal_fib(n-2)
Self::internal_fib(n - 1) + Self::internal_fib(n - 2)
}
}
@ -86,11 +70,11 @@ impl TestData {
}
pub fn add(&self, a: usize, b: usize) -> usize {
return a+b;
return a + b;
}
pub fn calc_add(&self, data: TestData) -> Option<usize> {
return self.calc().and_then(|a| data.calc().map(|b| a+b));
return self.calc().and_then(|a| data.calc().map(|b| a + b));
}
pub fn push(&mut self, u: (bool, TestEnum)) {
@ -102,16 +86,16 @@ pub fn make_test_data(n: usize) -> TestData {
let mut v = Vec::with_capacity(n);
let mut b = true;
for i in 0..n {
v.push(( b ,
match i%3 {
v.push((
b,
match i % 3 {
0 => TestEnum::NoValue,
1 => TestEnum::Num(i),
2 => TestEnum::Str(TEST_STRINGS[i%4].to_string()),
2 => TestEnum::Str(TEST_STRINGS[i % 4].to_string()),
_ => panic!("unexpected error"),
}));
},
));
b = !b;
}
TestData {
vec: v,
}
TestData { vec: v }
}