Compare commits

..

No commits in common. "b619f79095773d53d23c996b5652ae8da22e5376" and "25bb724961de3993c74f827f25d454b4cb3bb247" have entirely different histories.

19 changed files with 1861 additions and 744 deletions

204
Cargo.lock generated
View file

@ -1,6 +1,21 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
@ -112,6 +127,17 @@ version = "4.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1" checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1"
[[package]]
name = "async-trait"
version = "0.1.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
]
[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@ -124,6 +150,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.3" version = "1.3.3"
@ -222,7 +263,7 @@ dependencies = [
"num-traits", "num-traits",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -232,7 +273,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -327,7 +368,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -373,6 +414,12 @@ dependencies = [
"windows", "windows",
] ]
[[package]]
name = "gimli"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]] [[package]]
name = "gloo-timers" name = "gloo-timers"
version = "0.2.6" version = "0.2.6"
@ -408,7 +455,7 @@ checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi",
"libc", "libc",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -443,9 +490,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.186" version = "0.2.148"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
@ -502,14 +549,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]] [[package]]
name = "mio" name = "miniz_oxide"
version = "1.2.0" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [ dependencies = [
"libc", "libc",
"wasi", "wasi",
"windows-sys 0.61.2", "windows-sys",
] ]
[[package]] [[package]]
@ -552,6 +608,25 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "object"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.18.0" version = "1.18.0"
@ -619,7 +694,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -658,23 +733,23 @@ dependencies = [
"libc", "libc",
"log", "log",
"pin-project-lite", "pin-project-lite",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.106" version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.45" version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -736,35 +811,35 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
name = "rspc" name = "rspc"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-std",
"async-trait",
"futures", "futures",
"oneshot", "oneshot",
"rspc_dev_utilities", "pin-project",
"rspc_macros", "rspc_macros",
"serde", "serde",
"serde_json",
"syn 2.0.38",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-serde", "tokio-serde",
"tokio-util", "tokio-util",
] ]
[[package]]
name = "rspc_dev_utilities"
version = "0.1.0"
dependencies = [
"async-std",
"rspc",
"serde",
"tokio",
]
[[package]] [[package]]
name = "rspc_macros" name = "rspc_macros"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.25" version = "0.37.25"
@ -776,7 +851,7 @@ dependencies = [
"io-lifetimes", "io-lifetimes",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -805,32 +880,22 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.228" version = "1.0.189"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.228" version = "1.0.189"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -889,12 +954,12 @@ dependencies = [
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.6.3" version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.61.2", "windows-sys",
] ]
[[package]] [[package]]
@ -910,9 +975,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.117" version = "2.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -936,7 +1001,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -951,30 +1016,32 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.52.1" version = "1.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
dependencies = [ dependencies = [
"backtrace",
"bytes", "bytes",
"libc", "libc",
"mio", "mio",
"num_cpus",
"parking_lot", "parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2 0.6.3", "socket2 0.5.4",
"tokio-macros", "tokio-macros",
"windows-sys 0.61.2", "windows-sys",
] ]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.7.0" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -1026,7 +1093,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
] ]
[[package]] [[package]]
@ -1119,7 +1186,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1153,7 +1220,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.117", "syn 2.0.38",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1205,12 +1272,6 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"
@ -1220,15 +1281,6 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "windows-sys"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link",
]
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.48.5" version = "0.48.5"

View file

@ -1,44 +1,22 @@
[workspace]
resolver = "3"
members = ["utilities"]
[dev-dependencies]
rspc_dev_utilities = { path = "utilities" }
[features]
default = ["full"]
full = ["serde","channel"]
channel = ["dep:oneshot"]
serde = ["dep:serde", "rspc_macros/serde"]
[package] [package]
name = "rspc" name = "rspc"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
include = ["/src"]
[dependencies] [dependencies]
rspc_macros = { path = "macros", version = "0.1" } serde = { version = "1.0", features = ["derive"] }
# necessary dependencies
futures = "0.3" futures = "0.3"
thiserror = "1.0.49"
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
syn = { version = "2.0", features = ["full"] }
rspc_macros = { path = "macros", version = "0.1" }
thiserror = "1.0.49"
async-trait = "0.1.73"
async-std = "1.12.0"
tokio-serde = { version = "0.8.0", features=["json","bincode"] } tokio-serde = { version = "0.8.0", features=["json","bincode"] }
pin-project = "1.1.3"
tokio-util = { version = "0.7.10", features=["codec"] } tokio-util = { version = "0.7.10", features=["codec"] }
oneshot = { version = "0.1.6", features=["std"], optional = true } serde_json = "1.0.108"
serde = { version = "1.0", features = ["derive"], optional = true } oneshot = { version = "0.1.6", features=["std"] }
[lib] [lib]
[[example]]
name = "channel_sync"
required-features = ["channel"]
[[example]]
name = "channel_async"
required-features = ["channel"]
[[example]]
name = "tcp"
required-features = ["serde"]

View file

@ -3,14 +3,12 @@
Proof of concept RPC framework focused on ease of use. Proof of concept RPC framework focused on ease of use.
It works by calling the macro `#[rspc::service]` on an impl block, It works by calling the macro `#[rspc::service]` on an impl block,
and code is generated for the resulting Server and Client objects. and code is generated for the resulting Server and Client objects.
You can then instantiate a Server/Client on the desired transporter.
The Server objects own the original struct data and implements listen functions.<br> The Server objects own the original struct data and implements listen functions.<br>
The Client objects must be provided a connection, and replicates all the functions of the impl block where the macro was called. The Client objects must be provided a connection, and replicates all the functions of the impl block where the macro was called.
The Server implements read-write locking, there can be many reads at once, but only one write. Requests are not errored upon write-lock, but instead waited for. The Server implements read-write locking, there can be many reads at once, but only one write.
Requests are not errored upon write-lock, but instead waited for.
> If you wish to implement parallel writes, you must implement it with internal parallelism as you would in normal rust, for instance with immutable functions and internal read-write locks
The Client object cannot be cloned. Instead all function calls are immutable, so a reference can be shared to all. The Client object cannot be cloned. Instead all function calls are immutable, so a reference can be shared to all.
@ -19,6 +17,7 @@ Currently only implements local thread messaging. Serialized TCP transport is un
Example: Example:
```rs ```rs
use rspc::transport::{channel, ClientTransporter,ServerTransporter}; use rspc::transport::{channel, ClientTransporter,ServerTransporter};
use rspc::service;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -27,8 +26,7 @@ pub struct MyStruct {
my_vec: Vec<String>, my_vec: Vec<String>,
} }
// Functions to instanciate as RPC #[service]
#[rspc::service]
impl MyStruct impl MyStruct
{ {
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
@ -46,11 +44,9 @@ impl MyStruct
#[tokio::test] #[tokio::test]
async fn test() { async fn test() {
// Create the server data structure
let my_data = MyStruct { let my_data = MyStruct {
my_vec: Vec::new(), my_vec: Vec::new(),
}; };
// Instanciate a client and server
let (c,s) = channel::new_async(); let (c,s) = channel::new_async();
let srv_thread = tokio::spawn(async move { let srv_thread = tokio::spawn(async move {
@ -69,10 +65,3 @@ async fn test() {
``` ```
See [example](example) for an more detailed example usage See [example](example) for an more detailed example usage
### Internal logic and determinism
RSPC is built on rust logic and works with type determinism rather than serializing.
Serializing is used for transports that require it (such as TCP with serde), but is otherwise unused in code logic.
Determinism is achieved through enum autogeneration by the macro `rscp::service`, which is used for internal transport logic

1333
example/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,12 +1,13 @@
[package] [package]
name = "rspc_dev_utilities" name = "rspc_example"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
rspc = { path = ".." } anyhow = "1.0.86"
serde = { version = "1.0", features = ["derive"] }
async-std = "1.12.0" async-std = "1.12.0"
tokio = { version = "1.0", features = ["full"] } rspc = { path = ".." }
serde = "1.0.203"
tokio = "1.38.0"

View file

@ -1,8 +1,8 @@
use std::time::Duration; use std::time::Duration;
use rspc::transport::{ClientTransporter, ServerTransporter}; use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize}; pub type RetData = (usize,Option<usize>);
pub fn dur_to_num(dur: Duration) -> (u128, &'static str) { pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
if dur.as_nanos() < 10000 { if dur.as_nanos() < 10000 {
@ -17,13 +17,26 @@ pub fn dur_to_num(dur: Duration) -> (u128, &'static str) {
} }
pub fn dur_to_str(dur: Duration) -> String { pub fn dur_to_str(dur: Duration) -> String {
let (n, s) = dur_to_num(dur); let (n,s) = dur_to_num(dur);
n.to_string() + " " + s n.to_string() +" "+ s
}
pub fn process_data(dat: TestData) -> RetData {
(dat.len(), dat.calc())
}
pub fn process_data_ref(dat: &TestData) -> RetData {
(dat.len(), dat.calc())
} }
pub const DATASIZE: usize = 1000000; pub const DATASIZE: usize = 1000000;
const TEST_STRINGS: [&str; 4] = ["toto", "tata", "titi", "tutu"]; const TEST_STRINGS: [&str; 4] = [
"toto",
"tata",
"titi",
"tutu"
];
#[derive(Debug, PartialEq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum TestEnum { pub enum TestEnum {
@ -37,8 +50,11 @@ pub struct TestData {
vec: Vec<(bool, TestEnum)>, vec: Vec<(bool, TestEnum)>,
} }
use crate::transport::{ClientTransporter,ServerTransporter};
#[rspc::service] #[rspc::service]
impl TestData { impl TestData {
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.vec.len() self.vec.len()
} }
@ -61,7 +77,7 @@ impl TestData {
if n <= 1 { if n <= 1 {
n n
} else { } else {
Self::internal_fib(n - 1) + Self::internal_fib(n - 2) Self::internal_fib(n-1) + Self::internal_fib(n-2)
} }
} }
@ -70,11 +86,11 @@ impl TestData {
} }
pub fn add(&self, a: usize, b: usize) -> usize { pub fn add(&self, a: usize, b: usize) -> usize {
return a + b; return a+b;
} }
pub fn calc_add(&self, data: TestData) -> Option<usize> { pub fn calc_add(&self, data: TestData) -> Option<usize> {
return self.calc().and_then(|a| data.calc().map(|b| a + b)); return self.calc().and_then(|a| data.calc().map(|b| a+b));
} }
pub fn push(&mut self, u: (bool, TestEnum)) { pub fn push(&mut self, u: (bool, TestEnum)) {
@ -86,16 +102,16 @@ pub fn make_test_data(n: usize) -> TestData {
let mut v = Vec::with_capacity(n); let mut v = Vec::with_capacity(n);
let mut b = true; let mut b = true;
for i in 0..n { for i in 0..n {
v.push(( v.push(( b ,
b, match i%3 {
match i % 3 {
0 => TestEnum::NoValue, 0 => TestEnum::NoValue,
1 => TestEnum::Num(i), 1 => TestEnum::Num(i),
2 => TestEnum::Str(TEST_STRINGS[i % 4].to_string()), 2 => TestEnum::Str(TEST_STRINGS[i%4].to_string()),
_ => panic!("unexpected error"), _ => panic!("unexpected error"),
}, }));
));
b = !b; b = !b;
} }
TestData { vec: v } TestData {
vec: v,
}
} }

View file

@ -1,19 +1,28 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE, pub mod data;
};
use data::{make_test_data,dur_to_str,TestData,TestEnum,TestDataClient,TestDataServer,DATASIZE};
use tokio::join; use tokio::join;
use rspc::transport::serde::TcpClient;
use rspc::transport; use rspc::transport;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> anyhow::Result<()> {
let (c, s) = transport::channel::new_async();
// let t = TcpClient::connect("127.0.0.1:3306").await.unwrap();
// let client = t.spawn().await;
// let client = TestDataClient::new(client);
let (c,s) = transport::channel::new_async();
let data: TestData = make_test_data(DATASIZE); let data: TestData = make_test_data(DATASIZE);
let srv_thread = tokio::spawn(async move { let srv_thread = tokio::spawn(async move {
let mut server = TestDataServer::from(data); let mut server = TestDataServer::from(data);
server.listen(s).await server.listen(s).await
}); } );
let client = TestDataClient::new(c); let client = TestDataClient::new(c);
let clientref = &client; let clientref = &client;
@ -34,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("len: {}", dur_to_str(now.elapsed())); println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now(); let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0)); assert_eq!(4, client.calc().await.unwrap().unwrap_or(0) );
println!("calc: {}", dur_to_str(now.elapsed())); println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE); let cdat = make_test_data(DATASIZE);
@ -51,8 +60,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("push: {}", dur_to_str(now.elapsed())); println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now(); let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap()); assert_eq!(DATASIZE+1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed())); println!("len: {}", dur_to_str(now.elapsed()));
}; };
join!(job1, job2); join!(job1, job2);

View file

@ -1,63 +0,0 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (c, s) = transport::channel::new_sync();
let data: TestData = make_test_data(DATASIZE);
let srv_thread = tokio::spawn(async move {
let mut server = TestDataServer::from(data);
server.listen(s).await
});
let client = TestDataClient::new(c);
let clientref = &client;
let job1 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, clientref.fib(42).await.unwrap());
println!("fib1: {}", dur_to_str(now.elapsed()));
};
let job2 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
println!("calc_add: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, client.fib(42).await.unwrap());
println!("fib2: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
clientref.push((false, TestEnum::NoValue)).await.unwrap();
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
Ok(())
}

View file

@ -1,62 +0,0 @@
use rspc_dev_utilities::test_data::{
dur_to_str, make_test_data, TestData, TestDataClient, TestDataServer, TestEnum, DATASIZE,
};
use tokio::join;
use rspc::transport::serde::TcpClient;
use rspc::transport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let t = TcpClient::connect("127.0.0.1:6543").await.unwrap();
let client = t.spawn().await;
// todo : server
let client = TestDataClient::new(client);
let clientref = &client;
let job1 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, clientref.fib(42).await.unwrap());
println!("fib1: {}", dur_to_str(now.elapsed()));
};
let job2 = async {
let now = std::time::Instant::now();
assert_eq!(DATASIZE, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(4, client.calc().await.unwrap().unwrap_or(0));
println!("calc: {}", dur_to_str(now.elapsed()));
let cdat = make_test_data(DATASIZE);
let now = std::time::Instant::now();
assert_eq!(8, client.calc_add(cdat).await.unwrap().unwrap_or(0));
println!("calc_add: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(267914296, client.fib(42).await.unwrap());
println!("fib2: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
clientref.push((false, TestEnum::NoValue)).await.unwrap();
println!("push: {}", dur_to_str(now.elapsed()));
let now = std::time::Instant::now();
assert_eq!(DATASIZE + 1, client.len().await.unwrap());
println!("len: {}", dur_to_str(now.elapsed()));
};
join!(job1, job2);
client.stop().await.unwrap();
Ok(())
}

46
macros/Cargo.lock generated
View file

@ -1,46 +0,0 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "proc-macro2"
version = "1.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rspc_macros"
version = "0.1.0"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "syn"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "unicode-ident"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"

View file

@ -5,10 +5,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
full = ["serde"]
serde = []
[dependencies] [dependencies]
#anyhow = "1.0" #anyhow = "1.0"
#futures = "0.3" #futures = "0.3"

View file

@ -1,3 +1,4 @@
extern crate proc_macro; extern crate proc_macro;
extern crate quote; extern crate quote;
@ -8,8 +9,8 @@ use syn::{
parse::{Parse, ParseStream}, parse::{Parse, ParseStream},
parse_macro_input, parse_macro_input,
spanned::Spanned, spanned::Spanned,
Attribute, FnArg, Ident, ImplItemFn, ImplItemType, ItemImpl, Pat, PatType, Receiver, FnArg, Ident,
ReturnType, Signature, Visibility, Pat, PatType, ReturnType, Visibility, Receiver, ItemImpl, ImplItemFn, ImplItemType, Signature, Attribute,
}; };
use quote::{format_ident, quote}; use quote::{format_ident, quote};
@ -48,7 +49,7 @@ fn snake_to_upper_camel(ident_str: &str) -> String {
} }
struct RpcMethod { struct RpcMethod {
pub _attrs: Vec<Attribute>, pub attrs: Vec<Attribute>,
pub sig: Signature, pub sig: Signature,
pub reciever: Option<Receiver>, pub reciever: Option<Receiver>,
pub args: Vec<PatType>, pub args: Vec<PatType>,
@ -57,34 +58,30 @@ struct RpcMethod {
struct Service { struct Service {
pub ident: Ident, pub ident: Ident,
pub rpcs: Vec<RpcMethod>, pub rpcs: Vec<RpcMethod>,
pub _item: ItemImpl, pub item: ItemImpl,
pub _types: Vec<ImplItemType>, pub types: Vec<ImplItemType>,
} }
impl TryFrom<ImplItemFn> for RpcMethod { impl TryFrom<ImplItemFn> for RpcMethod {
type Error = syn::Error; type Error = syn::Error;
fn try_from(value: ImplItemFn) -> Result<Self, Self::Error> { fn try_from(value: ImplItemFn) -> Result<Self, Self::Error> {
let mut reciever = None; let mut reciever = None;
let mut args = vec![]; let mut args = vec!();
for arg in &value.sig.inputs { for arg in &value.sig.inputs {
match arg { match arg {
FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => { FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
args.push(captured.clone()); args.push(captured.clone());
} },
FnArg::Typed(captured) => { FnArg::Typed(captured) => {
return Err(syn::Error::new( return Err(syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args"));
captured.pat.span(), },
"patterns aren't allowed in RPC args",
));
}
FnArg::Receiver(v) => { FnArg::Receiver(v) => {
if matches!(v.reference, None) { if matches!(v.reference, None) {
return Err(syn::Error::new( return Err(syn::Error::new(v.span(), "self cannot be consumed in RPC method"));
v.span(),
"self cannot be consumed in RPC method",
));
} else { } else {
reciever = Some(v.clone()); reciever = Some(v.clone());
} }
@ -93,7 +90,7 @@ impl TryFrom<ImplItemFn> for RpcMethod {
} }
Ok(RpcMethod { Ok(RpcMethod {
_attrs: value.attrs, attrs: value.attrs,
sig: value.sig, sig: value.sig,
reciever, reciever,
args, args,
@ -108,48 +105,37 @@ impl Parse for Service {
let mut errors = Ok(()); let mut errors = Ok(());
let mut typeitems: Vec<ImplItemType> = vec![]; let mut typeitems: Vec<ImplItemType> = vec!();
let fns: Vec<&ImplItemFn> = item let fns: Vec<&ImplItemFn> = item.items.iter().filter(|x| {
.items match x {
.iter()
.filter(|x| match x {
syn::ImplItem::Fn(fnit) => { syn::ImplItem::Fn(fnit) => {
matches!(fnit.vis, Visibility::Public(_)) matches!(fnit.vis, Visibility::Public(_))
} },
syn::ImplItem::Type(x) => { syn::ImplItem::Type(x) => {
typeitems.push(x.clone()); typeitems.push(x.clone());
false false
} }
_ => false, _ => false,
}) }
.map(|x| { }).map(|x|
if let syn::ImplItem::Fn(fnit) = x { if let syn::ImplItem::Fn(fnit) = x {
fnit fnit
} else { } else {
unreachable!() unreachable!()
} }
}) ).collect();
.collect();
let ident = match item.self_ty.as_ref() { let ident = match item.self_ty.as_ref() {
syn::Type::Path(x) => match x.path.get_ident() { syn::Type::Path(x) => {
match x.path.get_ident() {
Some(i) => i.clone(), Some(i) => i.clone(),
None => { None => return Err(syn::Error::new(x.span(), "generics and paths are not supported")),
return Err(syn::Error::new(
x.span(),
"generics and paths are not supported",
))
} }
}, },
_ => { _ => return Err(syn::Error::new(item.self_ty.span(), "unsupported self type")),
return Err(syn::Error::new(
item.self_ty.span(),
"unsupported self type",
))
}
}; };
let mut rpcs = vec![]; let mut rpcs = vec!();
for one_fn in fns { for one_fn in fns {
match RpcMethod::try_from(one_fn.clone()) { match RpcMethod::try_from(one_fn.clone()) {
Ok(v) => rpcs.push(v), Ok(v) => rpcs.push(v),
@ -171,6 +157,7 @@ impl Parse for Service {
) )
) )
} }
} }
errors?; errors?;
@ -178,12 +165,13 @@ impl Parse for Service {
Ok(Self { Ok(Self {
ident, ident,
rpcs, rpcs,
_item: item, item,
_types: typeitems, types: typeitems,
}) })
} }
} }
/// Macro used to generate the RSPC Server and Client objects. /// Macro used to generate the RSPC Server and Client objects.
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream { pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
@ -191,8 +179,8 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let Service { let Service {
ident, ident,
rpcs, rpcs,
_item: _, item: _,
_types: _, types: _,
} = parse_macro_input!(inputclone as Service); } = parse_macro_input!(inputclone as Service);
let transport_request = &format_ident!("{}TransportRequest", ident); let transport_request = &format_ident!("{}TransportRequest", ident);
@ -200,90 +188,72 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
let server = &format_ident!("{}Server", ident); let server = &format_ident!("{}Server", ident);
let client = &format_ident!("{}Client", ident); let client = &format_ident!("{}Client", ident);
let fn_names: Vec<_> = rpcs.iter().map(|x| &x.sig.ident).collect(); let fn_names: Vec<_> = rpcs.iter().map(|x| {
&x.sig.ident
}).collect();
let fn_names_camel: Vec<_> = fn_names let fn_names_camel: Vec<_> = fn_names.iter().map(|x| {
.iter() Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span())
.map(|x| Ident::new(&snake_to_upper_camel(&x.unraw().to_string()), x.span())) }).collect();
.collect();
let fn_locks: Vec<_> = rpcs let fn_locks: Vec<_> = rpcs.iter().map(|x| {
.iter() match x.reciever.as_ref() {
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability { Some(rcv) => match &rcv.mutability {
Some(_) => quote!(write()), Some(_) => quote!(write()),
None => quote!(read()), None => quote!(read()),
}, },
None => quote!(read()), None => quote!(read()),
}) }
.collect(); }).collect();
let fn_mut: Vec<_> = rpcs let fn_mut: Vec<_> = rpcs.iter().map(|x| {
.iter() match x.reciever.as_ref() {
.map(|x| match x.reciever.as_ref() {
Some(rcv) => match &rcv.mutability { Some(rcv) => match &rcv.mutability {
Some(_) => quote!(mut), Some(_) => quote!(mut),
None => quote!(), None => quote!(),
}, },
None => quote!(), None => quote!(),
}) }
.collect(); }).collect();
let fn_await: Vec<_> = rpcs let fn_await: Vec<_> = rpcs.iter().map(|x| {
.iter() match &x.sig.asyncness {
.map(|x| match &x.sig.asyncness {
Some(_) => quote!(.await), Some(_) => quote!(.await),
None => quote!(), None => quote!(),
}) }
.collect(); }).collect();
let args: Vec<_> = rpcs let args: Vec<_> = rpcs.iter().map(|x| {
.iter()
.map(|x| {
let args = &x.args; let args = &x.args;
quote! { #(#args),* } quote!{ #(#args),* }
}) }).collect();
.collect();
let arg_types: Vec<_> = rpcs let arg_types: Vec<_> = rpcs.iter().map(|x| {
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.ty); let args = x.args.iter().map(|a| &a.ty);
quote! { #(#args),* } quote!{ #(#args),* }
}) }).collect();
.collect();
let arg_idents: Vec<_> = rpcs let arg_idents: Vec<_> = rpcs.iter().map(|x| {
.iter()
.map(|x| {
let args = x.args.iter().map(|a| &a.pat); let args = x.args.iter().map(|a| &a.pat);
quote! { #(#args),* } quote!{ #(#args),* }
}) }).collect();
.collect();
let outs: Vec<_> = rpcs let outs: Vec<_> = rpcs.iter().map(|x| {
.iter() match &x.sig.output {
.map(|x| match &x.sig.output { ReturnType::Default => quote!{()},
ReturnType::Default => quote! {()}, ReturnType::Type(_, t) => quote!{#t},
ReturnType::Type(_, t) => quote! {#t}, }
}) }).collect();
.collect();
let serde_derives = if cfg!(feature = "serde") {
quote! { serde::Serialize, serde::Deserialize }
} else {
quote! {}
};
let t = quote! { let t = quote! {
#[derive(PartialEq, Debug, #serde_derives)] #[derive(PartialEq,Debug,Serialize,Deserialize)]
pub enum #transport_request { pub enum #transport_request {
#( #fn_names_camel(#arg_types) ),* , #( #fn_names_camel(#arg_types) ),* ,
Stop, Stop,
} }
#[derive(PartialEq, Debug, #serde_derives)] #[derive(PartialEq,Debug,Serialize,Deserialize)]
pub enum #transport_response { pub enum #transport_response {
#( #fn_names_camel(#outs) ),* , #( #fn_names_camel(#outs) ),* ,
Stop, Stop,
@ -302,13 +272,13 @@ pub fn service(_attr: TokenStream, mut input: TokenStream) -> TokenStream {
} }
pub struct #server { pub struct #server {
obj: std::sync::Arc<tokio::sync::RwLock<#ident>>, obj: std::sync::Arc<async_std::sync::RwLock<#ident>>,
} }
impl From<#ident> for #server { impl From<#ident> for #server {
fn from(obj: #ident) -> Self { fn from(obj: #ident) -> Self {
Self { Self {
obj: std::sync::Arc::new(tokio::sync::RwLock::new(obj)), obj: std::sync::Arc::new(async_std::sync::RwLock::new(obj)),
} }
} }
} }

View file

@ -1,2 +1,55 @@
pub mod transport; pub mod transport;
pub use rspc_macros::service; pub use rspc_macros::service;
#[cfg(test)]
mod tests {
use super::transport::{channel, ClientTransporter,ServerTransporter};
use super::service;
use serde::{Deserialize, Serialize};
// use rspc::transport::{ClientTransporter,ServerTransporter};
#[derive(Serialize,Deserialize)]
pub struct MyStruct {
my_vec: Vec<String>,
}
#[service]
impl MyStruct
{
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct {
my_vec: Vec::new(),
};
let (c,s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
} );
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(client.pop().await.unwrap(), Some("Hello world!".to_string()));
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -4,12 +4,14 @@ use futures::future::BoxFuture;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use super::{ClientTransporter, ServerTransporter}; use async_trait::async_trait;
#[derive(Error, Debug)] use super::{ClientTransporter,ServerTransporter};
#[derive(Error,Debug)]
pub enum Error { pub enum Error {
#[error("channel recv error")] #[error("channel recv error")]
ChannelRecvError, ChannelRecvError,
@ -21,30 +23,32 @@ pub enum Error {
OneshotRecvError(#[from] oneshot::error::RecvError), OneshotRecvError(#[from] oneshot::error::RecvError),
} }
#[derive(Clone)] #[derive(Clone)]
/// Client endpoint of any channel. /// Client endpoint of any channel.
pub struct ChannelClient<T, R> { pub struct ChannelClient<T,R> {
channel: UnboundedSender<(T, oneshot::Sender<R>)>, channel: UnboundedSender<(T,oneshot::Sender<R>)>,
} }
/// Server endpoint of a synchronous channel. /// Server endpoint of a synchronous channel
pub struct SyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
}
/// Create a new synchronous channel client/server instance.
/// ///
/// Synchronous channels can only process one job at a time. If you want job concurrency refer to [new_async](new_async) and [AsyncChannelServer](AsyncChannelServer) /// Synchronous channels can only process one job at a time.
/// If you want job concurrency refer to new_async() and Asynchronous channels
/// ///
/// Unpon recieving a stop signal, jobs currently pending recv() will not be processed. /// Unpon recieving a stop signal, jobs currently pending recv() will not be processed.
/// Said jobs will continue waiting until either the server listens again, or the server is dropped. /// Said jobs will continue waiting until either the server listens again, or the server is dropped.
pub struct SyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new synchronous channel client/server instance. See [SyncChannelServer](SyncChannelServer)
///
/// Synchronous channels can only process one job at a time. If you want job concurrency refer to [new_async](new_async) and [AsyncChannelServer](AsyncChannelServer)
/// ///
/// Example: /// Example:
/// ```no_run /// ```no_run
/// use serde::{Serialize,Deserialize};
/// use rspc::transport::{channel, ClientTransporter,ServerTransporter}; /// use rspc::transport::{channel, ClientTransporter,ServerTransporter};
/// ///
/// #[derive(Serialize,Deserialize)]
/// pub struct MyStruct; /// pub struct MyStruct;
/// ///
/// #[rspc::service] /// #[rspc::service]
@ -61,48 +65,56 @@ pub struct SyncChannelServer<T, R> {
/// ///
/// let client = MyStructClient::new(client_channel); /// let client = MyStructClient::new(client_channel);
/// ``` /// ```
pub fn new_sync<T, R>() -> (ChannelClient<T, R>, SyncChannelServer<T, R>) { pub fn new_sync<T,R>() -> (ChannelClient<T,R>,SyncChannelServer<T,R>) {
let (c, s) = mpsc::unbounded_channel(); let (c,s) = mpsc::unbounded_channel();
( (
ChannelClient { channel: c }, ChannelClient { channel: c },
SyncChannelServer { channel: s }, SyncChannelServer { channel: s },
) )
} }
impl<T, R> ClientTransporter<T, R> for ChannelClient<T, R> impl<T,R> ChannelClient<T,R>
where where
T: Send + Sync, T: Send + Sync,
R: Send + Sync, R: Send + Sync,
{ {
type Error = Error;
async fn request(&self, data: T) -> Result<R, Self::Error> { pub async fn internal_request(&self, data: T) -> Result<R, Error> {
let (t, r) = oneshot::channel(); let (t,r) = oneshot::channel();
self.channel self.channel.send( (data, t) ).map_err(|_| Error::ChannelSendError)?;
.send((data, t))
.map_err(|_| Error::ChannelSendError)?;
let output = r.await?; let output = r.await?;
Ok(output) Ok(output)
} }
} }
impl<T, R> ServerTransporter<T, R> for SyncChannelServer<T, R> #[async_trait]
impl<T,R> ClientTransporter<T,R> for ChannelClient<T,R>
where where
T: Send + Sync, T: Send + Sync,
R: Send + Sync, R: Send + Sync,
{ {
type Error = Error; type Error = Error;
async fn listen<F, FR, D>( async fn request(&self, data: T) -> Result<R, Self::Error> {
&mut self, self.internal_request(data).await
handler: F, }
stop_response: Option<R>, }
userdata: D,
) -> Result<(), Self::Error> #[async_trait]
impl<T,R> ServerTransporter<T,R> for SyncChannelServer<T,R>
where
T: Send + Sync,
R: Send + Sync,
{
type Error = Error;
async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
where where
FR: Future<Output = Option<R>> + Send, FR: Future<Output = Option<R>> + Send,
F: Fn(T, &D) -> FR + Send + Sync, F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync, D: Send+Sync,
{ {
while let Some(msg) = self.channel.recv().await { while let Some(msg) = self.channel.recv().await {
match handler(msg.0, &userdata).await { match handler(msg.0, &userdata).await {
@ -112,28 +124,30 @@ where
msg.1.send(v).map_err(|_| Error::ChannelRespError)?; msg.1.send(v).map_err(|_| Error::ChannelRespError)?;
} }
break; break;
} },
}; };
} }
Ok(()) Ok(())
} }
} }
/// Server endpoint of an asynchronous channel. pub struct AsyncChannelServer<T,R> {
channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance.
///
/// Can process any number of jobs in parallel. /// Can process any number of jobs in parallel.
/// ///
/// Unpon recieving a stop signal, pending jobs are finished and reponded to, but new jobs are not processed. /// Unpon recieving a stop signal, pending jobs are finished and reponded to, but new jobs are not processed.
/// Said new jobs will continue waiting until either the server listens again, or the server is dropped. /// Said new jobs will continue waiting until either the server listens again, or the server is dropped.
pub struct AsyncChannelServer<T, R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>,
}
/// Create a new asynchronous channel client/server instance. See AsyncChannelServer
/// ///
/// Example: /// Example:
/// ```no_run /// ```no_run
/// use serde::{Serialize,Deserialize};
/// use rspc::transport::{channel, ClientTransporter,ServerTransporter}; /// use rspc::transport::{channel, ClientTransporter,ServerTransporter};
/// ///
/// #[derive(Serialize,Deserialize)]
/// pub struct MyStruct; /// pub struct MyStruct;
/// ///
/// #[rspc::service] /// #[rspc::service]
@ -150,25 +164,20 @@ pub struct AsyncChannelServer<T, R> {
/// ///
/// let client = MyStructClient::new(client_channel); /// let client = MyStructClient::new(client_channel);
/// ``` /// ```
pub fn new_async<T, R>() -> (ChannelClient<T, R>, AsyncChannelServer<T, R>) { pub fn new_async<T,R>() -> (ChannelClient<T,R>,AsyncChannelServer<T,R>) {
let (c, s) = mpsc::unbounded_channel(); let (c,s) = mpsc::unbounded_channel();
( (
ChannelClient { channel: c }, ChannelClient { channel: c },
AsyncChannelServer { channel: s }, AsyncChannelServer { channel: s },
) )
} }
impl<T, R> AsyncChannelServer<T, R> impl<T,R> AsyncChannelServer<T,R>
where where
T: Send + Sync, T: Send + Sync,
R: Send + Sync + 'static, R: Send + Sync + 'static,
{ {
async fn internal_listen<F, FR, D>( async fn internal_listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Error>
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Error>
where where
FR: Future<Output = Option<R>> + Send + 'static, FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync, F: Fn(T, &D) -> FR + Send + Sync,
@ -176,7 +185,7 @@ where
{ {
let mut pending = FuturesUnordered::new(); let mut pending = FuturesUnordered::new();
loop { loop {
tokio::select! { tokio::select!{
Some(rcv) = self.channel.recv() => { Some(rcv) = self.channel.recv() => {
pending.push( pending.push(
async { async {
@ -203,33 +212,27 @@ where
} }
let results: Vec<_> = pending.collect().await; let results: Vec<_> = pending.collect().await;
results results.into_iter().map(|r| -> Result<(), Error> {
.into_iter()
.map(|r| -> Result<(), Error> {
match r { match r {
(Some(r), sender) => sender.send(r).map_err(|_| Error::ChannelRespError), (Some(r),sender) => sender.send(r).map_err(|_| Error::ChannelRespError),
_ => Ok(()), _ => Ok(()),
} }
}) }).collect::<Result<Vec<_>, Error>>()?;
.collect::<Result<Vec<_>, Error>>()?;
Ok(()) Ok(())
} }
} }
impl<T, R> ServerTransporter<T, R> for AsyncChannelServer<T, R> #[async_trait]
impl<T,R> ServerTransporter<T,R> for AsyncChannelServer<T,R>
where where
T: Send + Sync, T: Send + Sync,
R: Send + Sync + 'static, R: Send + Sync + 'static,
{ {
type Error = Error; type Error = Error;
async fn listen<F, FR, D>( async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where where
FR: Future<Output = Option<R>> + Send + 'static, FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync, F: Fn(T, &D) -> FR + Send + Sync,
@ -242,16 +245,19 @@ where
/// Create a channel multiplexer. /// Create a channel multiplexer.
/// ///
/// This is intended to be used with self-mutable clients to provide immutable clients to it /// This is intended to be used with self-mutable clients to provide immutable clients to it
pub fn new_multiplexer<T, R>() -> (ChannelClient<T, R>, Multiplexer<T, R>) { pub fn new_multiplexer<T,R>() -> (ChannelClient<T,R>,Multiplexer<T,R>) {
let (c, s) = mpsc::unbounded_channel(); let (c,s) = mpsc::unbounded_channel();
(ChannelClient { channel: c }, Multiplexer { channel: s }) (
ChannelClient { channel: c },
Multiplexer { channel: s },
)
} }
pub struct Multiplexer<T, R> { pub struct Multiplexer<T,R> {
channel: UnboundedReceiver<(T, oneshot::Sender<R>)>, channel: UnboundedReceiver<(T,oneshot::Sender<R>)>,
} }
impl<T, R> Multiplexer<T, R> impl<T,R> Multiplexer<T,R>
where where
T: Send + Sync, T: Send + Sync,
R: Send + Sync + 'static, R: Send + Sync + 'static,
@ -264,23 +270,17 @@ where
/// - The 3rd party must be handling (usize,T) as input and (usize,R) as output /// - The 3rd party must be handling (usize,T) as input and (usize,R) as output
/// - sender_send(sender) is the function used to send data to the 3rd party /// - sender_send(sender) is the function used to send data to the 3rd party
/// - listener_recv(listener) is the function used to recieve data from the 3rd party /// - listener_recv(listener) is the function used to recieve data from the 3rd party
pub async fn start<S, SF, L, LF>( pub async fn start<S, SF, L, LF>(mut self, mut sender: S, sender_send: SF, mut listener: L, listener_recv: LF) -> Result<(), Error>
mut self,
mut sender: S,
sender_send: SF,
mut listener: L,
listener_recv: LF,
) -> Result<(), Error>
where where
SF: Fn(&mut S, (usize, T)) -> BoxFuture<bool> + Send + Sync + 'static, SF: Fn(&mut S, (usize,T)) -> BoxFuture<bool> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Option<(usize, R)>> + 'static, LF: Fn(&mut L) -> BoxFuture<Option<(usize,R)>> + 'static,
{ {
let mut pending: BTreeMap<usize, oneshot::Sender<R>> = BTreeMap::new(); let mut pending: BTreeMap<usize, oneshot::Sender<R>> = BTreeMap::new();
let mut id_counter: usize = 0; let mut id_counter: usize = 0;
loop { loop {
tokio::select! { tokio::select!{
q = self.channel.recv() => { q = self.channel.recv() => {
match q { match q {
Some(rcv) => { Some(rcv) => {

View file

@ -1,8 +1,7 @@
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt}; use async_trait::async_trait;
use futures::{Future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
#[cfg(feature = "channel")]
pub mod channel; pub mod channel;
#[cfg(feature = "serde")]
pub mod serde; pub mod serde;
/// Definition of a client transporter for RSPC. /// Definition of a client transporter for RSPC.
@ -11,9 +10,10 @@ pub mod serde;
/// - ClientTransporter is a producer object, can be single-producer or multi-producer. /// - ClientTransporter is a producer object, can be single-producer or multi-producer.
/// - ClientTransporter must be immutable. If a client implementation requires self mutability, /// - ClientTransporter must be immutable. If a client implementation requires self mutability,
/// use Mutex, RwLock, or similar tools to mutate values without requiring self mutability /// use Mutex, RwLock, or similar tools to mutate values without requiring self mutability
pub trait ClientTransporter<T, R> { #[async_trait]
pub trait ClientTransporter<T,R> {
type Error: std::fmt::Debug; type Error: std::fmt::Debug;
fn request(&self, data: T) -> impl std::future::Future<Output = Result<R, Self::Error>> + Send; async fn request(&self, data: T) -> Result<R, Self::Error>;
} }
/// Definition of a server transporter for RSPC. /// Definition of a server transporter for RSPC.
@ -22,29 +22,23 @@ pub trait ClientTransporter<T, R> {
/// - ServerTransporter is a single-consumer object /// - ServerTransporter is a single-consumer object
/// - Upon recieving a stop request (handler function return None), server must respond with stop_response if specified to only this request and none other. /// - Upon recieving a stop request (handler function return None), server must respond with stop_response if specified to only this request and none other.
/// Finishing and responding to pending jobs is optional. /// Finishing and responding to pending jobs is optional.
pub trait ServerTransporter<T, R> { #[async_trait]
pub trait ServerTransporter<T,R>
{
type Error: std::fmt::Debug; type Error: std::fmt::Debug;
fn listen<F, FR, D>( async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send
where where
FR: Future<Output = Option<R>> + Send + 'static, FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync + Copy + 'static, F: Fn(T, &D) -> FR + Send + Sync + Copy + 'static,
D: Send + Sync + 'static; D: Send + Sync + 'static,
;
} }
pub async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>( pub
listener: &mut L, async fn async_listener<T, R, C, L, LF, S, SF, F, FR, D, E>(
listener_recv: LF, listener: &mut L, listener_recv: LF,
sender: &mut S, sender: &mut S, sender_send: SF,
sender_send: SF, handler: F, stop_response: Option<R>, userdata: &D) -> Result<(), E>
handler: F,
stop_response: Option<R>,
userdata: &D,
) -> Result<(), E>
where where
T: Send + Sync, T: Send + Sync,
R: Send + Sync + 'static, R: Send + Sync + 'static,
@ -52,12 +46,12 @@ where
F: Fn(T, &D) -> FR + Send + Sync, F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync + 'static, D: Send + Sync + 'static,
C: Send + Sync + 'static, C: Send + Sync + 'static,
SF: Fn(&mut S, (C, R)) -> BoxFuture<Result<(), E>> + Send + Sync + 'static, SF: Fn(&mut S, (C,R)) -> BoxFuture<Result<(), E>> + Send + Sync + 'static,
LF: Fn(&mut L) -> BoxFuture<Result<Option<(C, T)>, E>> + 'static, LF: Fn(&mut L) -> BoxFuture<Result<Option<(C,T)>, E>> + 'static,
{ {
let mut pending = FuturesUnordered::new(); let mut pending = FuturesUnordered::new();
loop { loop {
tokio::select! { tokio::select!{
rcv = listener_recv(listener) => { rcv = listener_recv(listener) => {
match rcv? { match rcv? {
Some((id, data)) => { Some((id, data)) => {
@ -93,9 +87,9 @@ where
let results: Vec<_> = pending.collect().await; let results: Vec<_> = pending.collect().await;
for it in results { for it in results {
match it { match it {
(id, Some(r)) => { (id,Some(r)) => {
sender_send(sender, (id, r)).await?; sender_send(sender, (id,r)).await?;
} },
_ => (), _ => (),
} }
} }

View file

@ -1,18 +1,19 @@
use async_trait::async_trait;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use thiserror::Error;
use futures::prelude::*; use futures::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
//use tokio::net::tcp::{ReadHalf, WriteHalf}; //use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::io::{ReadHalf, WriteHalf}; use tokio::io::{ReadHalf,WriteHalf};
use tokio_serde::formats::*;
use tokio_serde::SymmetricallyFramed; use tokio_serde::SymmetricallyFramed;
use tokio_serde::formats::*;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use super::channel::{self, ChannelClient, Multiplexer}; use super::channel::{self, ChannelClient, Multiplexer};
use super::{ClientTransporter, ServerTransporter}; use super::{ClientTransporter,ServerTransporter};
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
@ -25,22 +26,19 @@ pub enum Error {
type SymmetricalReader<T> = SymmetricallyFramed< type SymmetricalReader<T> = SymmetricallyFramed<
FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec>, FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec>,
T, T,
SymmetricalBincode<T>, SymmetricalBincode<T>>;
>;
type SymmetricalWriter<T> = SymmetricallyFramed< type SymmetricalWriter<T> = SymmetricallyFramed<
FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec>, FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec>,
T, T,
SymmetricalBincode<T>, SymmetricalBincode<T>>;
>;
pub struct Receiver<T> { pub struct Receiver<T> {
pub reader: SymmetricalReader<T>, pub reader: SymmetricalReader<T>,
} }
impl<T> Receiver<T> impl<T> Receiver<T> where
where SymmetricalReader<T> : TryStream<Ok=T> + Unpin,
SymmetricalReader<T>: TryStream<Ok = T> + Unpin,
{ {
pub async fn recv(&mut self) -> Result<Option<T>, Error> { pub async fn recv(&mut self) -> Result<Option<T>, Error> {
if let Ok(msg) = self.reader.try_next().await { if let Ok(msg) = self.reader.try_next().await {
@ -55,131 +53,122 @@ pub struct Sender<T> {
pub writer: SymmetricalWriter<T>, pub writer: SymmetricalWriter<T>,
} }
impl<T> Sender<T> impl<T> Sender<T> where
where T: for<'a> Deserialize<'a> + Serialize + Unpin
T: for<'a> Deserialize<'a> + Serialize + Unpin,
{ {
pub async fn send(&mut self, item: T) -> Result<(), Error> { pub async fn send(&mut self, item: T) -> Result<(), Error> {
self.writer.send(item).await.map_err(Error::IO) self.writer.send(item).await.map_err(Error::IO)
} }
} }
async fn split<T, R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
async fn split<T,R>(socket: TcpStream) -> (Sender<(usize, T)>, Receiver<(usize, R)>) {
let (reader, writer) = tokio::io::split(socket); let (reader, writer) = tokio::io::split(socket);
let reader: FramedRead<ReadHalf<TcpStream>, LengthDelimitedCodec> = let reader: FramedRead<
FramedRead::new(reader, LengthDelimitedCodec::new()); ReadHalf<TcpStream>,
let reader: SymmetricalReader<(usize, R)> = LengthDelimitedCodec,
SymmetricallyFramed::new(reader, SymmetricalBincode::default()); > = FramedRead::new(reader, LengthDelimitedCodec::new());
let reader: SymmetricalReader<(usize, R)> = SymmetricallyFramed::new(
reader, SymmetricalBincode::default());
let writer: FramedWrite<WriteHalf<TcpStream>, LengthDelimitedCodec> = let writer: FramedWrite<
FramedWrite::new(writer, LengthDelimitedCodec::new()); WriteHalf<TcpStream>,
LengthDelimitedCodec,
> = FramedWrite::new(writer, LengthDelimitedCodec::new());
let writer: SymmetricalWriter<(usize, T)> = let writer: SymmetricalWriter<(usize, T)> = SymmetricallyFramed::new(
SymmetricallyFramed::new(writer, SymmetricalBincode::default()); writer, SymmetricalBincode::default());
(Sender { writer }, Receiver { reader }) (Sender{ writer }, Receiver{ reader })
} }
pub struct TcpClient<T, R> { pub struct TcpClient<T,R> {
sender: Sender<(usize, T)>, sender: Sender<(usize, T)>,
receiver: Receiver<(usize, R)>, receiver: Receiver<(usize, R)>,
multiplexer: Option<Multiplexer<T, R>>, multiplexer: Option<Multiplexer<T,R>>,
req_id: AtomicUsize, req_id: AtomicUsize,
ghost: std::marker::PhantomData<(T, R)>, ghost: std::marker::PhantomData<(T, R)>,
} }
pub struct TcpServer<T, R> { pub struct TcpServer<T,R> {
listener: TcpListener, listener: TcpListener,
ghost: std::marker::PhantomData<(T, R)>, ghost: std::marker::PhantomData<(T, R)>,
} }
impl<T, R> TcpClient<T, R>
impl<T,R> TcpClient<T,R>
where where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static, T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static, R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin + 'static,
{ {
pub async fn connect<A>(address: &A) -> Result<TcpClient<T, R>, Error> pub async fn connect<A>(address: &A) ->
Result<TcpClient<T, R>, Error>
where where
A: tokio::net::ToSocketAddrs + ?Sized, A: tokio::net::ToSocketAddrs + ?Sized,
{ {
let socket = TcpStream::connect(&address).await.map_err(Error::IO)?; let socket = TcpStream::connect(&address).await.map_err(Error::IO)?;
let (sender, receiver) = split(socket).await; let (sender,receiver) = split(socket).await;
Ok(TcpClient { Ok(TcpClient{ sender, receiver, multiplexer: None, req_id: AtomicUsize::new(0), ghost: Default::default() })
sender,
receiver,
multiplexer: None,
req_id: AtomicUsize::new(0),
ghost: Default::default(),
})
} }
pub async fn multiplex( pub async fn multiplex(self) -> (ChannelClient<T,R>, BoxFuture<'static, Result<(), channel::Error>>) {
self, let (client,multiplexer) = channel::new_multiplexer::<T,R>();
) -> (
ChannelClient<T, R>,
BoxFuture<'static, Result<(), channel::Error>>,
) {
let (client, multiplexer) = channel::new_multiplexer::<T, R>();
let fut = multiplexer.start( let fut = multiplexer.start(
self.sender, self.sender,
|sender, data| Box::pin(async { sender.send(data).await.is_ok() }), |sender, data| { Box::pin(async {sender.send(data).await.is_ok()}) },
self.receiver, self.receiver,
|receiver| Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }), |receiver| { Box::pin(async { receiver.recv().await.map_or_else(|_| None, |x| x) }) },
); );
(client, Box::pin(fut)) (client, Box::pin(fut))
} }
pub async fn spawn(self) -> ChannelClient<T, R> { pub async fn spawn(self) -> ChannelClient<T,R> {
let (client, job) = self.multiplex().await; let (client, job) = self.multiplex().await;
tokio::spawn(job); tokio::spawn(job);
client client
} }
} }
impl<T, R> TcpServer<T, R> impl<T,R> TcpServer<T,R> where
where
T: for<'a> Deserialize<'a> + Serialize, T: for<'a> Deserialize<'a> + Serialize,
R: for<'a> Deserialize<'a> + Serialize, R: for<'a> Deserialize<'a> + Serialize,
{ {
pub async fn new(address: &Ipv4Addr, port: u16) -> Result<TcpServer<T, R>, Error> {
pub async fn new(address: &Ipv4Addr, port: u16) ->
Result<TcpServer<T,R>, Error>
{
let address = format!("{}:{}", address, port); let address = format!("{}:{}", address, port);
let listener = TcpListener::bind(&address).await.map_err(Error::IO)?; let listener = TcpListener::bind(&address).await.map_err(Error::IO)?;
Ok(TcpServer { Ok(TcpServer{ listener, ghost: Default::default() })
listener,
ghost: Default::default(),
})
} }
async fn accept(&mut self) -> Result<TcpStream, Error> { async fn accept(&mut self) -> Result<TcpStream, Error>
{
let (socket, address) = self.listener.accept().await.map_err(Error::IO)?; let (socket, address) = self.listener.accept().await.map_err(Error::IO)?;
println!("connection accepted: {:?}", address); println!("connection accepted: {:?}", address);
Ok(socket) Ok(socket)
} }
} }
impl<T, R> ServerTransporter<T, R> for TcpServer<T, R> #[async_trait]
impl<T,R> ServerTransporter<T,R> for TcpServer<T,R>
where where
T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin, T: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin, R: for<'a> Deserialize<'a> + Serialize + Send + Sync + Unpin,
{ {
type Error = Error; type Error = Error;
async fn listen<F, FR, D>( async fn listen<F, FR, D>(&mut self, handler: F, stop_response: Option<R>, userdata: D) -> Result<(), Self::Error>
&mut self,
handler: F,
stop_response: Option<R>,
userdata: D,
) -> Result<(), Self::Error>
where where
FR: Future<Output = Option<R>> + Send + 'static, FR: Future<Output = Option<R>> + Send + 'static,
F: Fn(T, &D) -> FR + Send + Sync, F: Fn(T, &D) -> FR + Send + Sync,
D: Send + Sync, D: Send + Sync,
{ {
let (client, fut) = channel::new_multiplexer::<R, T>(); let (client,fut) = channel::new_multiplexer::<R,T>();
// super::async_listener( // super::async_listener(
// &mut receiver, |_self| { Box::pin(async { // &mut receiver, |_self| { Box::pin(async {
@ -191,7 +180,7 @@ where
// handler, stop_response, &userdata); // handler, stop_response, &userdata);
while let Ok(mut stream) = self.accept().await { while let Ok(mut stream) = self.accept().await {
let (sender, receiver) = split::<R, T>(stream).await; let (sender,receiver) = split::<R,T>(stream).await;
// tokio::spawn(async move { // tokio::spawn(async move {
// super::async_listener( // super::async_listener(
// &mut receiver, |_self| { Box::pin(async { // &mut receiver, |_self| { Box::pin(async {

View file

@ -1,46 +0,0 @@
#[cfg(test)]
#[cfg(feature = "channel")]
mod tests {
use rspc::transport::{channel, ClientTransporter, ServerTransporter};
pub struct MyStruct {
my_vec: Vec<String>,
}
#[rspc::service]
impl MyStruct {
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct { my_vec: Vec::new() };
let (c, s) = channel::new_async();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
});
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(
client.pop().await.unwrap(),
Some("Hello world!".to_string())
);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -1,46 +0,0 @@
#[cfg(test)]
#[cfg(feature = "channel")]
mod tests {
use rspc::transport::{channel, ClientTransporter, ServerTransporter};
pub struct MyStruct {
my_vec: Vec<String>,
}
#[rspc::service]
impl MyStruct {
pub fn len(&self) -> usize {
self.my_vec.len()
}
pub fn push(&mut self, val: String) {
self.my_vec.push(val)
}
pub fn pop(&mut self) -> Option<String> {
self.my_vec.pop()
}
}
#[tokio::test]
async fn test() {
let my_data = MyStruct { my_vec: Vec::new() };
let (c, s) = channel::new_sync();
let srv_thread = tokio::spawn(async move {
let mut server = MyStructServer::from(my_data);
server.listen(s).await
});
let client = MyStructClient::new(c);
assert_eq!(client.len().await.unwrap(), 0);
client.push("Hello world!".to_string()).await.unwrap();
assert_eq!(client.len().await.unwrap(), 1);
assert_eq!(
client.pop().await.unwrap(),
Some("Hello world!".to_string())
);
client.stop().await.unwrap();
srv_thread.await.unwrap().unwrap();
}
}

View file

@ -1 +0,0 @@
pub mod test_data;