Compare commits

...

2 commits

Author SHA1 Message Date
b619f79095 chore: code cleanup and folder restructuring 2026-05-02 15:37:01 +02:00
a9d09e30f8 chore: code format 2026-05-02 13:54:57 +02:00
19 changed files with 743 additions and 1860 deletions

202
Cargo.lock generated
View file

@ -1,21 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
version = 4
[[package]]
name = "aho-corasick"
@ -127,17 +112,6 @@ version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1"
[[package]]
name = "async-trait"
version = "0.1.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -150,21 +124,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "bincode"
version = "1.3.3"
@ -263,7 +222,7 @@ dependencies = [
"num-traits",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -273,7 +232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -368,7 +327,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -414,12 +373,6 @@ dependencies = [
"windows",
]
[[package]]
name = "gimli"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]]
name = "gloo-timers"
version = "0.2.6"
@ -455,7 +408,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -490,9 +443,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.148"
version = "0.2.186"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
[[package]]
name = "linux-raw-sys"
@ -548,24 +501,15 @@ version = "2.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.8"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1"
dependencies = [
"libc",
"wasi",
"windows-sys",
"windows-sys 0.61.2",
]
[[package]]
@ -608,25 +552,6 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.18.0"
@ -694,7 +619,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -733,23 +658,23 @@ dependencies = [
"libc",
"log",
"pin-project-lite",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.67"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
@ -811,35 +736,35 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
name = "rspc"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"futures",
"oneshot",
"pin-project",
"rspc_dev_utilities",
"rspc_macros",
"serde",
"serde_json",
"syn 2.0.38",
"thiserror",
"tokio",
"tokio-serde",
"tokio-util",
]
[[package]]
name = "rspc_dev_utilities"
version = "0.1.0"
dependencies = [
"async-std",
"rspc",
"serde",
"tokio",
]
[[package]]
name = "rspc_macros"
version = "0.1.0"
dependencies = [
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.37.25"
@ -851,7 +776,7 @@ dependencies = [
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys",
"windows-sys 0.48.0",
]
[[package]]
@ -880,22 +805,32 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.189"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.189"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -954,12 +889,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.4"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.61.2",
]
[[package]]
@ -975,9 +910,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.38"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
@ -1001,7 +936,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -1016,32 +951,30 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.32.0"
version = "1.52.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.4",
"socket2 0.6.3",
"tokio-macros",
"windows-sys",
"windows-sys 0.61.2",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -1093,7 +1026,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
]
[[package]]
@ -1186,7 +1119,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
"wasm-bindgen-shared",
]
@ -1220,7 +1153,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
"syn 2.0.117",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -1272,6 +1205,12 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-sys"
version = "0.48.0"
@ -1281,6 +1220,15 @@ dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-targets"
version = "0.48.5"

View file

@ -1,22 +1,44 @@
[workspace]
resolver = "3"
members = ["utilities"]
[dev-dependencies]
rspc_dev_utilities = { path = "utilities" }
[features]
default = ["full"]
full = ["serde","channel"]
channel = ["dep:oneshot"]
serde = ["dep:serde", "rspc_macros/serde"]
[package]
name = "rspc"
version = "0.1.0"
edition = "2021"
include = ["/src"]
[dependencies]
serde = { version = "1.0", features = ["derive"] }
futures = "0.3"
tokio = { version = "1.0", features = ["full"] }
syn = { version = "2.0", features = ["full"] }
rspc_macros = { path = "macros", version = "0.1" }
# necessary dependencies
futures = "0.3"
thiserror = "1.0.49"
async-trait = "0.1.73"
async-std = "1.12.0"
tokio = { version = "1.0", features = ["full"] }
tokio-serde = { version = "0.8.0", features=["json","bincode"] }
pin-project = "1.1.3"
tokio-util = { version = "0.7.10", features=["codec"] }
serde_json = "1.0.108"
oneshot = { version = "0.1.6", features=["std"] }
oneshot = { version = "0.1.6", features=["std"], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
[lib]
[[example]]
name = "channel_sync"
required-features = ["channel"]
[[example]]
name = "channel_async"
required-features = ["channel"]
[[example]]
name = "tcp"
required-features = ["serde"]

View file

@ -3,12 +3,14 @@
Proof of concept RPC framework focused on ease of use.
It works by calling the macro `#[rspc::service]` on an impl block,
and code is generated for the resulting Server and Client objects.
You can then instantiate a Server/Client on the desired transporter.
The Server objects own the original struct data and implements listen functions.<br>
The Client objects must be provided a connection, and replicates all the functions of the impl block where the macro was called.
The Server implements read-write locking, there can be many reads at once, but only one write.
Requests are not errored upon write-lock, but instead waited for.
The Server implements read-write locking, there can be many reads at once, but only one write. Requests are not errored upon write-lock, but instead waited for.
> If you wish to implement parallel writes, you must implement it with internal parallelism as you would in normal rust, for instance with immutable functions and internal read-write locks
The Client object cannot be cloned. Instead all function calls are immutable, so a reference can be shared to all.
@ -17,7 +19,6 @@ Currently only implements local thread messaging. Serialized TCP transport is un
Example:
```rs
use rspc::transport::{channel, ClientTransporter,ServerTransporter};
use rspc::service;
use serde::{Deserialize, Serialize};
@ -26,7 +27,8 @@ pub struct MyStruct {
my_vec: Vec<String>,
}
#[service]
// Functions to instanciate as RPC
#[rspc::service]
impl MyStruct
{
pub fn len(&self) -> usize {
@ -44,9 +46,11 @@ impl MyStruct
#[tokio::test]
async fn test() {
// Create the server data structure
let my_data = MyStruct {
my_vec: Vec::new(),
};
// Instanciate a client and server
let (c,s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
@ -65,3 +69,10 @@ async fn test() {
```
See [example](example) for an more detailed example usage
### Internal logic and determinism
RSPC is built on rust logic and works with type determinism rather than serializing.
Serializing is used for transports that require it (such as TCP with serde), but is otherwise unused in code logic.
Determinism is achieved through enum autogeneration by the macro `rscp::service`, which is used for internal transport logic

1333
example/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,21 +1,12 @@
pub mod data;
use data::{make_test_data,dur_to_str,TestData,TestEnum,TestDataClient,TestDataServer,DATASIZE};
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport::serde::TcpClient;
use rspc::transport;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// let t = TcpClient::connect("127.0.0.1:3306").await.unwrap();
// let client = t.spawn().await;
// let client = TestDataClient::new(client);
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (c, s) = transport::channel::new_async();
let data: TestData = make_test_data(DATASIZE);
@ -62,7 +53,6 @@ async fn main() -> anyhow::Result<()> {
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);

63
examples/channel_sync.rs Normal file
View file

@ -0,0 +1,63 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (c, s) = transport::channel::new_sync();
let data: TestData = make_test_data(DATASIZE);
let srv_thread = tokio::spawn(async move {
let mut server = TestDataServer::from(data);
server.listen(s).await
});
let client = TestDataClient::new(c);
let clientref = &client;
let job1 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, clientref.fib(42).await.unwrap());
println!("fib1: {}", dur_to_str(now.elapsed()));
};
let job2 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
println!("calc_add: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, client.fib(42).await.unwrap());
println!("fib2: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
clientref.push((false, TestEnum::NoValue)).await.unwrap();
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
Ok(())
}

62
examples/tcp.rs Normal file
View file

@ -0,0 +1,62 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport::serde::TcpClient;
use rspc::transport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let t = TcpClient::connect("127.0.0.1:6543").await.unwrap();
let client = t.spawn().await;
// todo : server
let client = TestDataClient::new(client);
let clientref = &client;
let job1 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, clientref.fib(42).await.unwrap());
println!("fib1: {}", dur_to_str(now.elapsed()));
};
let job2 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
println!("calc_add: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, client.fib(42).await.unwrap());
println!("fib2: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
clientref.push((false, TestEnum::NoValue)).await.unwrap();
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
Ok(())
}

46
macros/Cargo.lock generated Normal file
View file

@ -0,0 +1,46 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "proc-macro2"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rspc_macros"
version = "0.1.0"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "syn"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"

View file

@ -5,6 +5,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
full = ["serde"]
serde = []
[dependencies]
#anyhow = "1.0"
#futures = "0.3"

View file

@ -1,4 +1,3 @@
extern crate proc_macro;
extern crate quote;
@ -9,8 +8,8 @@ use syn::{
parse::{Parse, ParseStream},
parse_macro_input,
spanned::Spanned,
FnArg, Ident,
Pat, PatType, ReturnType, Visibility, Receiver, ItemImpl, ImplItemFn, ImplItemType, Signature, Attribute,
Attribute, FnArg, Ident, ImplItemFn, ImplItemType, ItemImpl, Pat, PatType, Receiver,
ReturnType, Signature, Visibility,
};
use quote::{format_ident, quote};
@ -49,7 +48,7 @@ fn snake_to_upper_camel(ident_str: &str) -> String {
}
struct RpcMethod {
pub attrs: Vec<Attribute>,
pub _attrs: Vec<Attribute>,
pub sig: Signature,
pub reciever: Option<Receiver>,
pub args: Vec<PatType>,
@ -58,30 +57,34 @@ struct RpcMethod {
struct Service {
pub ident: Ident,
pub rpcs: Vec<RpcMethod>,
pub item: ItemImpl,
pub types: Vec<ImplItemType>,
pub _item: ItemImpl,
pub _types: Vec<ImplItemType>,
}
impl TryFrom<ImplItemFn> for RpcMethod {
type Error = syn::Error;
fn try_from(value: ImplItemFn) -> Result<Self, Self::Error> {
let mut reciever = None;
let mut args = vec!();
let mut args = vec![];
for arg in &value.sig.inputs {
match arg {
FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
args.push(captured.clone());
},
}
FnArg::Typed(captured) => {
return Err(syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args"));
},
return Err(syn::Error::new(
captured.pat.span(),
"patterns aren't allowed in RPC args",
));
}
FnArg::Receiver(v) => {
if matches!(v.reference, None) {
return Err(syn::Error::new(v.span(), "self cannot be consumed in RPC method"));
return Err(syn::Error::new(
v.span(),
"self cannot be consumed in RPC method",
));
} else {
reciever = Some(v.clone());
}
@ -90,7 +93,7 @@ impl TryFrom<ImplItemFn> for RpcMethod {
}
Ok(RpcMethod {
attrs: value.attrs,
_attrs: value.attrs,
sig: value.sig,
reciever,
args,
@ -105,37 +108,48 @@ impl Parse for Service {
let mut errors = Ok(());
let mut typeitems: Vec<ImplItemType> = vec!();
let fns: Vec<&ImplItemFn> = item.items.iter().filter(|x| {
match x {
let mut typeitems: Vec<ImplItemType> = vec![];
let fns: Vec<&ImplItemFn> = item
.items
.iter()
.filter(|x| match x {
syn::ImplItem::Fn(fnit) => {
matches!(fnit.vis, Visibility::Public(_))
},
}
syn::ImplItem::Type(x) => {
typeitems.push(x.clone());
false
}
_ => false,
}
}).map(|x|
})
.map(|x| {
if let syn::ImplItem::Fn(fnit) = x {
fnit
} else {
unreachable!()
}
).collect();
})
.collect();
let ident = match item.self_ty.as_ref() {
syn::Type::Path(x) => {
match x.path.get_ident() {
syn::Type::Path(x) => match x.path.get_ident() {
Some(i) => i.clone(),
None => return Err(syn::Error::new(x.span(), "generics and paths are not supported")),
None => {
return Err(syn::Error::new(
x.span(),
"generics and paths are not supported",
))
}
},
_ => return Err(syn::Error::new(item.self_ty.span(), "unsupported self type")),
_ => {
return Err(syn::Error::new(
item.self_ty.span(),
"unsupported self type",
))
}
};
let mut rpcs = vec!();
let mut rpcs = vec![];
for one_fn in fns {
match RpcMethod::try_from(one_fn.clone()) {
Ok(v) => rpcs.push(v),
@ -157,7 +171,6 @@ impl Parse for Service {
)
)
}
}
errors?;
@ -165,13 +178,12 @@ impl Parse for Service {
Ok(Self {
ident,
rpcs,
item,
types: typeitems,
_item: item,
_types: typeitems,
})
}
}
/// Macro used to generate the RSPC Server and Client objects.
#[proc_macro_attribute]
pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
@ -179,8 +191,8 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let Service {
ident,
rpcs,
item: _,
types: _,
_item: _,
_types: _,
} = parse_macro_input!(inputclone as Service);
let transport_request = &format_ident!("{}TransportRequest", ident);
@ -188,72 +200,90 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let server = &format_ident!("{}Server", ident);
let client = &format_ident!("{}Client", ident);
let fn_names: Vec<_> = rpcs.iter().map(|x| {
&x.sig.ident
}).collect();
let fn_names: Vec<_> = rpcs.iter().map(|x| &x.sig.ident).collect();
let fn_names_camel: Vec<_> = fn_names.iter().map(|x| {
Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span())
}).collect();
let fn_names_camel: Vec<_> = fn_names
.iter()
.map(|x| Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span()))
.collect();
let fn_locks: Vec<_> = rpcs.iter().map(|x| {
match x.reciever.as_ref() {
let fn_locks: Vec<_> = rpcs
.iter()
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability {
Some(_) => quote!(write()),
None => quote!(read()),
},
None => quote!(read()),
}
}).collect();
})
.collect();
let fn_mut: Vec<_> = rpcs.iter().map(|x| {
match x.reciever.as_ref() {
let fn_mut: Vec<_> = rpcs
.iter()
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability {
Some(_) => quote!(mut),
None => quote!(),
},
None => quote!(),
}
}).collect();
})
.collect();
let fn_await: Vec<_> = rpcs.iter().map(|x| {
match &x.sig.asyncness {
let fn_await: Vec<_> = rpcs
.iter()
.map(|x| match &x.sig.asyncness {
Some(_) => quote!(.await),
None => quote!(),
}
}).collect();
})
.collect();
let args: Vec<_> = rpcs.iter().map(|x| {
let args: Vec<_> = rpcs
.iter()
.map(|x| {
let args = &x.args;
quote! { #(#args),* }
}).collect();
})
.collect();
let arg_types: Vec<_> = rpcs.iter().map(|x| {
let arg_types: Vec<_> = rpcs
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.ty);
quote! { #(#args),* }
}).collect();
})
.collect();
let arg_idents: Vec<_> = rpcs.iter().map(|x| {
let arg_idents: Vec<_> = rpcs
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.pat);
quote! { #(#args),* }
}).collect();
})
.collect();
let outs: Vec<_> = rpcs.iter().map(|x| {
match &x.sig.output {
let outs: Vec<_> = rpcs
.iter()
.map(|x| match &x.sig.output {
ReturnType::Default => quote! {()},
ReturnType::Type(_, t) => quote! {#t},
}
}).collect();
})
.collect();
let serde_derives = if cfg!(feature = "serde") {
quote! { serde::Serialize, serde::Deserialize }
} else {
quote! {}
};
let t = quote! {
#[derive(PartialEq,Debug,Serialize,Deserialize)]
#[derive(PartialEq, Debug, #serde_derives)]
pub enum #transport_request {
#( #fn_names_camel(#arg_types) ),* ,
Stop,
}
#[derive(PartialEq,Debug,Serialize,Deserialize)]
#[derive(PartialEq, Debug, #serde_derives)]
pub enum #transport_response {
#( #fn_names_camel(#outs) ),* ,
Stop,
@ -272,13 +302,13 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
}
pub struct #server {
obj: std::sync::Arc<async_std::sync::RwLock<#ident>>,
obj: std::sync::Arc<tokio::sync::RwLock<#ident>>,
}
impl From<#ident> for #server {
fn from(obj: #ident) -> Self {
Self {
obj: std::sync::Arc::new(async_std::sync::RwLock::new(obj)),
obj: std::sync::Arc::new(tokio::sync::RwLock::new(obj)),
}
}
}

View file

@ -1,55 +1,2 @@
pub mod transport;
pub use rspc_macros::service;
#[cfg(test)]
mod tests {
use super::transport::{channel, ClientTransporter,ServerTransporter};
use super::service;
use serde::{Deserialize, Serialize};
// use rspc::transport::{ClientTransporter,ServerTransporter};
#[derive(Serialize,Deserialize)]
pub struct MyStruct {
my_vec: Vec<String>,
}
#[service]
impl MyStruct
{
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct {
my_vec: Vec::new(),
};
let (c,s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
} );
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(client.pop().await.unwrap(), Some("Hello world!".to_string()));
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -4,10 +4,8 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use async_trait::async_trait;
use super::{ClientTransporter, ServerTransporter};
@ -23,32 +21,30 @@ pub enum Error {
OneshotRecvError(#[from] oneshot::error::RecvError),
}
#[derive(Clone)]
/// Client endpoint of any channel.
pub struct ChannelClient<T, R> {
channel: UnboundedSender<(T, oneshot::Sender<R>)>,
}
/// Server endpoint of a synchronous channel
/// Server endpoint of a synchronous channel.
///
/// Synchronous channels can only process one job at a time. If you want job concurrency refer to [new_async](new_async) and [AsyncChannelServer](AsyncChannelServer)
///
/// Unpon recieving a stop signal, jobs currently pending recv() will not be processed.
/// Said jobs will continue waiting until either the server listens again, or the server is dropped.
pub struct SyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new synchronous channel client/server instance.
/// Create a new synchronous channel client/server instance. See [SyncChannelServer](SyncChannelServer)
///
/// Synchronous channels can only process one job at a time.
/// If you want job concurrency refer to new_async() and Asynchronous channels
///
/// Unpon recieving a stop signal, jobs currently pending recv() will not be processed.
/// Said jobs will continue waiting until either the server listens again, or the server is dropped.
/// Synchronous channels can only process one job at a time. If you want job concurrency refer to [new_async](new_async) and [AsyncChannelServer](AsyncChannelServer)
///
/// Example:
/// ```no_run
/// use serde::{Serialize,Deserialize};
/// use rspc::transport::{channel, ClientTransporter,ServerTransporter};
///
/// #[derive(Serialize,Deserialize)]
/// pub struct MyStruct;
///
/// #[rspc::service]
@ -73,44 +69,36 @@ pub fn new_sync<T,R>() -> (ChannelClient<T,R>,SyncChannelServer<T,R>) {
)
}
impl<T,R> ChannelClient<T,R>
where
T: Send + Sync,
R: Send + Sync,
{
pub async fn internal_request(&self, data: T) -> Result<R, Error> {
let (t,r) = oneshot::channel();
self.channel.send( (data, t) ).map_err(|_| Error::ChannelSendError)?;
let output = r.await?;
Ok(output)
}
}
#[async_trait]
impl<T, R> ClientTransporter<T, R> for ChannelClient<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn request(&self, data: T) -> Result<R, Self::Error> {
self.internal_request(data).await
let (t, r) = oneshot::channel();
self.channel
.send((data, t))
.map_err(|_| Error::ChannelSendError)?;
let output = r.await?;
Ok(output)
}
}
#[async_trait]
impl<T, R> ServerTransporter<T, R> for SyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send,
F: Fn(T, &D) -> FR + Send + Sync,
@ -124,30 +112,28 @@ where
msg.1.send(v).map_err(|_| Error::ChannelRespError)?;
}
break;
},
}
};
}
Ok(())
}
}
pub struct AsyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance.
///
/// Server endpoint of an asynchronous channel.
/// Can process any number of jobs in parallel.
///
/// Unpon recieving a stop signal, pending jobs are finished and reponded to, but new jobs are not processed.
/// Said new jobs will continue waiting until either the server listens again, or the server is dropped.
pub struct AsyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance. See AsyncChannelServer
///
/// Example:
/// ```no_run
/// use serde::{Serialize,Deserialize};
/// use rspc::transport::{channel, ClientTransporter,ServerTransporter};
///
/// #[derive(Serialize,Deserialize)]
/// pub struct MyStruct;
///
/// #[rspc::service]
@ -177,7 +163,12 @@ where
T: Send + Sync,
R: Send + Sync + 'static,
{
async fn internal_listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Error>
async fn internal_listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
@ -212,27 +203,33 @@ where
}
let results: Vec<_> = pending.collect().await;
results.into_iter().map(|r| -> Result<(), Error> {
results
.into_iter()
.map(|r| -> Result<(), Error> {
match r {
(Some(r), sender) => sender.send(r).map_err(|_| Error::ChannelRespError),
_ => Ok(()),
}
}).collect::<Result<Vec<_>, Error>>()?;
})
.collect::<Result<Vec<_>, Error>>()?;
Ok(())
}
}
#[async_trait]
impl<T, R> ServerTransporter<T, R> for AsyncChannelServer<T, R>
where
T: Send + Sync,
R: Send + Sync + 'static,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,
@ -247,10 +244,7 @@ where
/// This is intended to be used with self-mutable clients to provide immutable clients to it
pub fn new_multiplexer<T, R>() -> (ChannelClient<T, R>, Multiplexer<T, R>) {
let (c, s) = mpsc::unbounded_channel();
(
ChannelClient { channel: c },
Multiplexer { channel: s },
)
(ChannelClient { channel: c }, Multiplexer { channel: s })
}
pub struct Multiplexer<T, R> {
@ -270,7 +264,13 @@ where
/// - The 3rd party must be handling (usize,T) as input and (usize,R) as output
/// - sender_send(sender) is the function used to send data to the 3rd party
/// - listener_recv(listener) is the function used to recieve data from the 3rd party
pub async fn start<S, SF, L, LF>(mut self, mut sender: S, sender_send: SF, mut listener: L, listener_recv: LF) -> Result<(), Error>
pub async fn start<S, SF, L, LF>(
mut self,
mut sender: S,
sender_send: SF,
mut listener: L,
listener_recv: LF,
) -> Result<(), Error>
where
SF: Fn(&mut S, (usize, T)) -> BoxFuture<bool> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Option<(usize, R)>> + 'static,

View file

@ -1,7 +1,8 @@
use async_trait::async_trait;
use futures::{Future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt};
#[cfg(feature = "channel")]
pub mod channel;
#[cfg(feature = "serde")]
pub mod serde;
/// Definition of a client transporter for RSPC.
@ -10,10 +11,9 @@ pub mod serde;
/// - ClientTransporter is a producer object, can be single-producer or multi-producer.
/// - ClientTransporter must be immutable. If a client implementation requires self mutability,
/// use Mutex, RwLock, or similar tools to mutate values without requiring self mutability
#[async_trait]
pub trait ClientTransporter<T, R> {
type Error: std::fmt::Debug;
async fn request(&self, data: T) -> Result<R, Self::Error>;
fn request(&self, data: T) -> impl std::future::Future<Output = Result<R, Self::Error>> + Send;
}
/// Definition of a server transporter for RSPC.
@ -22,23 +22,29 @@ pub trait ClientTransporter<T,R> {
/// - ServerTransporter is a single-consumer object
/// - Upon recieving a stop request (handler function return None), server must respond with stop_response if specified to only this request and none other.
/// Finishing and responding to pending jobs is optional.
#[async_trait]
pub trait ServerTransporter<T,R>
{
pub trait ServerTransporter<T, R> {
type Error: std::fmt::Debug;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync + Copy + 'static,
D: Send + Sync + 'static,
;
D: Send + Sync + 'static;
}
pub
async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener: &mut L, listener_recv: LF,
sender: &mut S, sender_send: SF,
handler: F, stop_response: Option<R>, userdata: &D) -> Result<(), E>
pub async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener: &mut L,
listener_recv: LF,
sender: &mut S,
sender_send: SF,
handler: F,
stop_response: Option<R>,
userdata: &D,
) -> Result<(), E>
where
T: Send + Sync,
R: Send + Sync + 'static,
@ -89,7 +95,7 @@ where
match it {
(id, Some(r)) => {
sender_send(sender, (id, r)).await?;
},
}
_ => (),
}
}

View file

@ -1,15 +1,14 @@
use async_trait::async_trait;
use futures::future::BoxFuture;
use thiserror::Error;
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::net::Ipv4Addr;
use std::sync::atomic::AtomicUsize;
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
//use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::io::{ReadHalf, WriteHalf};
use tokio_serde::SymmetricallyFramed;
use tokio_serde::formats::*;
use tokio_serde::SymmetricallyFramed;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::channel::{self, ChannelClient, Multiplexer};
@ -26,18 +25,21 @@ pub enum Error {
type SymmetricalReader<T> = SymmetricallyFramed<
FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec>,
T,
SymmetricalBincode<T>>;
SymmetricalBincode<T>,
>;
type SymmetricalWriter<T> = SymmetricallyFramed<
FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec>,
T,
SymmetricalBincode<T>>;
SymmetricalBincode<T>,
>;
pub struct Receiver<T> {
pub reader: SymmetricalReader<T>,
}
impl<T> Receiver<T> where
impl<T> Receiver<T>
where
SymmetricalReader<T>: TryStream<Ok = T> + Unpin,
{
pub async fn recv(&mut self) -> Result<Option<T>, Error> {
@ -53,32 +55,28 @@ pub struct Sender<T> {
pub writer: SymmetricalWriter<T>,
}
impl<T> Sender<T> where
T: for<'a> Deserialize<'a> + Serialize + Unpin
impl<T> Sender<T>
where
T: for<'a> Deserialize<'a> + Serialize + Unpin,
{
pub async fn send(&mut self, item: T) -> Result<(), Error> {
self.writer.send(item).await.map_err(Error::IO)
}
}
async fn split<T, R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
let (reader, writer) = tokio::io::split(socket);
let reader: FramedRead<
ReadHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> = SymmetricallyFramed::new(
reader, SymmetricalBincode::default());
let reader: FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec> =
FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> =
SymmetricallyFramed::new(reader, SymmetricalBincode::default());
let writer: FramedWrite<
WriteHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec> =
FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: SymmetricalWriter<(usize, T)> = SymmetricallyFramed::new(
writer, SymmetricalBincode::default());
let writer: SymmetricalWriter<(usize, T)> =
SymmetricallyFramed::new(writer, SymmetricalBincode::default());
(Sender { writer }, Receiver { reader })
}
@ -95,14 +93,12 @@ pub struct TcpServer<T,R> {
ghost: std::marker::PhantomData<(T, R)>,
}
impl<T, R> TcpClient<T, R>
where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
{
pub async fn connect<A>(address: &A) ->
Result<TcpClient<T, R>, Error>
pub async fn connect<A>(address: &A) -> Result<TcpClient<T, R>, Error>
where
A: tokio::net::ToSocketAddrs + ?Sized,
{
@ -110,17 +106,28 @@ where
let (sender, receiver) = split(socket).await;
Ok(TcpClient{ sender, receiver, multiplexer: None, req_id: AtomicUsize::new(0), ghost: Default::default() })
Ok(TcpClient {
sender,
receiver,
multiplexer: None,
req_id: AtomicUsize::new(0),
ghost: Default::default(),
})
}
pub async fn multiplex(self) -> (ChannelClient<T,R>, BoxFuture<'static, Result<(), channel::Error>>) {
pub async fn multiplex(
self,
) -> (
ChannelClient<T, R>,
BoxFuture<'static, Result<(), channel::Error>>,
) {
let (client, multiplexer) = channel::new_multiplexer::<T, R>();
let fut = multiplexer.start(
self.sender,
|sender, data| { Box::pin(async {sender.send(data).await.is_ok()}) },
|sender, data| Box::pin(async { sender.send(data).await.is_ok() }),
self.receiver,
|receiver| { Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }) },
|receiver| Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }),
);
(client, Box::pin(fut))
@ -133,28 +140,27 @@ where
}
}
impl<T,R> TcpServer<T,R> where
impl<T, R> TcpServer<T, R>
where
T: for<'a> Deserialize<'a> + Serialize,
R: for<'a> Deserialize<'a> + Serialize,
{
pub async fn new(address: &Ipv4Addr, port: u16) ->
Result<TcpServer<T,R>, Error>
{
pub async fn new(address: &Ipv4Addr, port: u16) -> Result<TcpServer<T, R>, Error> {
let address = format!("{}:{}", address, port);
let listener = TcpListener::bind(&address).await.map_err(Error::IO)?;
Ok(TcpServer{ listener, ghost: Default::default() })
Ok(TcpServer {
listener,
ghost: Default::default(),
})
}
async fn accept(&mut self) -> Result<TcpStream, Error>
{
async fn accept(&mut self) -> Result<TcpStream, Error> {
let (socket, address) = self.listener.accept().await.map_err(Error::IO)?;
println!("connection accepted: {:?}", address);
Ok(socket)
}
}
#[async_trait]
impl<T, R> ServerTransporter<T, R> for TcpServer<T, R>
where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
@ -162,7 +168,12 @@ where
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
async fn listen<F, FR, D>(
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where
FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync,

46
tests/channel_async.rs Normal file
View file

@ -0,0 +1,46 @@
#[cfg(test)]
#[cfg(feature = "channel")]
mod tests {
use rspc::transport::{channel, ClientTransporter, ServerTransporter};
pub struct MyStruct {
my_vec: Vec<String>,
}
#[rspc::service]
impl MyStruct {
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct { my_vec: Vec::new() };
let (c, s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
});
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(
client.pop().await.unwrap(),
Some("Hello world!".to_string())
);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

46
tests/channel_sync.rs Normal file
View file

@ -0,0 +1,46 @@
#[cfg(test)]
#[cfg(feature = "channel")]
mod tests {
use rspc::transport::{channel, ClientTransporter, ServerTransporter};
pub struct MyStruct {
my_vec: Vec<String>,
}
#[rspc::service]
impl MyStruct {
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct { my_vec: Vec::new() };
let (c, s) = channel::new_sync();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
});
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(
client.pop().await.unwrap(),
Some("Hello world!".to_string())
);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -1,13 +1,12 @@
[package]
name = "rspc_example"
name = "rspc_dev_utilities"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.86"
async-std = "1.12.0"
rspc = { path = ".." }
serde = "1.0.203"
tokio = "1.38.0"
serde = { version = "1.0", features = ["derive"] }
async-std = "1.12.0"
tokio = { version = "1.0", features = ["full"] }

1
utilities/src/lib.rs Normal file
View file

@ -0,0 +1 @@
pub mod test_data;

View file

@ -1,8 +1,8 @@
use std::time::Duration;
use serde::{Serialize, Deserialize};
use rspc::transport::{ClientTransporter, ServerTransporter};
pub type RetData = (usize,Option<usize>);
use serde::{Deserialize, Serialize};
pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
if dur.as_nanos() < 10000 {
@ -21,22 +21,9 @@ pub fn dur_to_str(dur: Duration) -> String {
n.to_string() + " " + s
}
pub fn process_data(dat: TestData) -> RetData {
(dat.len(), dat.calc())
}
pub fn process_data_ref(dat: &TestData) -> RetData {
(dat.len(), dat.calc())
}
pub const DATASIZE: usize = 1000000;
const TEST_STRINGS: [&str; 4] = [
"toto",
"tata",
"titi",
"tutu"
];
const TEST_STRINGS: [&str; 4] = ["toto", "tata", "titi", "tutu"];
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum TestEnum {
@ -50,11 +37,8 @@ pub struct TestData {
vec: Vec<(bool, TestEnum)>,
}
use crate::transport::{ClientTransporter,ServerTransporter};
#[rspc::service]
impl TestData {
pub fn len(&self) -> usize {
self.vec.len()
}
@ -102,16 +86,16 @@ pub fn make_test_data(n: usize) -> TestData {
let mut v = Vec::with_capacity(n);
let mut b = true;
for i in 0..n {
v.push(( b ,
v.push((
b,
match i % 3 {
0 => TestEnum::NoValue,
1 => TestEnum::Num(i),
2 => TestEnum::Str(TEST_STRINGS[i % 4].to_string()),
_ => panic!("unexpected error"),
}));
},
));
b = !b;
}
TestData {
vec: v,
}
TestData { vec: v }
}