diff --git a/Cargo.lock b/Cargo.lock index 0213396..3e3dbb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,13 +195,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "attohttpc" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ - "http", + "http 0.2.12", "log", "url", ] @@ -303,6 +309,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.55" @@ -576,6 +591,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dtoa" version = "1.0.11" @@ -587,13 +608,16 @@ name = "dusk-relay" version = "0.1.0" dependencies = [ "directories", + "dotenvy", "env_logger", "futures", "libp2p", "log", + "reqwest", "serde", "serde_json", "tokio", + "urlencoding", ] [[package]] @@ -906,7 +930,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap", "slab", "tokio", @@ -1024,6 +1048,16 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1031,7 +1065,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.4.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1058,8 +1115,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1071,6 +1128,67 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", + "httparse", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http 1.4.0", + "hyper 1.8.1", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", + "ipnet", + "libc", + "percent-encoding", + "pin-project-lite", + "socket2 0.6.2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "icu_collections" version = "2.1.1" @@ -1216,8 +1334,8 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http", - "hyper", + "http 0.2.12", + "hyper 0.14.32", "log", "rand 0.8.5", "tokio", @@ -1271,6 +1389,16 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "iri-string" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1354,6 +1482,7 @@ dependencies = [ "libp2p-quic", "libp2p-relay", "libp2p-rendezvous", + "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -1668,6 +1797,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6" dependencies = [ "async-trait", + "cbor4ii", "futures", "futures-bounded", "futures-timer", @@ -1675,6 +1805,7 @@ dependencies = [ "libp2p-identity", "libp2p-swarm", "rand 0.8.5", + "serde", "smallvec", "tracing", "void", @@ -2522,6 +2653,44 @@ version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", +] + [[package]] name = "resolv-conf" version = "0.7.6" @@ -2674,6 +2843,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2729,6 +2904,18 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2859,6 +3046,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -3015,6 +3211,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -3028,6 +3234,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "bitflags 2.10.0", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "iri-string", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3129,6 +3374,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -3190,6 +3441,20 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f" +dependencies = [ + "cfg-if", + "futures-util", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.108" @@ -3242,6 +3507,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "widestring" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 698e7c7..204a678 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ libp2p = { version = "0.54", features = [ "tcp", "tokio", "macros", + "request-response", + "cbor", ] } tokio = { version = "1", features = ["full"] } futures = "0.3" @@ -24,3 +26,6 @@ env_logger = "0.11" directories = "5" serde = { version = "1", features = ["derive"] } serde_json = "1" +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] } +dotenvy = "0.15" +urlencoding = "2" diff --git a/src/main.rs b/src/main.rs index 2341d5b..071b22f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ // - circuit relay v2: peers connect through this node, never seeing each other's IPs // - rendezvous: peers register under community namespaces, discover each other by peer ID // - relay federation: gossips peer registrations to other relays for global discovery +// - gif service: responds to gif search requests from clients via request-response protocol // - no data storage, no message routing, just connection brokering // // usage: @@ -29,8 +30,9 @@ use std::time::Duration; use futures::StreamExt; use libp2p::{ - connection_limits, gossipsub, identify, noise, ping, relay, rendezvous, swarm::SwarmEvent, - tcp, yamux, Multiaddr, PeerId, + connection_limits, gossipsub, identify, noise, ping, relay, rendezvous, + request_response::{self, cbor, ProtocolSupport}, + swarm::SwarmEvent, tcp, yamux, Multiaddr, PeerId, StreamProtocol, }; // gossip message for relay-to-relay federation @@ -53,8 +55,121 @@ struct RelayBehaviour { identify: identify::Behaviour, ping: ping::Behaviour, limits: connection_limits::Behaviour, + // gif search service - clients send GifRequest, relay responds with GifResponse + gif_service: cbor::Behaviour, } +// ---- gif protocol ---- +// clients send a GifRequest over request-response and the relay responds +// with a GifResponse after fetching from klipy. the api key stays on the +// relay so clients never need credentials. + +const GIF_PROTOCOL: StreamProtocol = StreamProtocol::new("/dusk/gif/1.0.0"); +const KLIPY_API_BASE: &str = "https://api.klipy.com/v2"; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GifRequest { + // "search" or "trending" + pub kind: String, + // search query (only used when kind == "search") + pub query: String, + pub limit: u32, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GifResponse { + pub results: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GifResult { + pub id: String, + pub title: String, + pub url: String, + pub preview: String, + pub dims: [u32; 2], +} + +// fetch from klipy and normalize into our GifResult format +async fn fetch_klipy( + http: &reqwest::Client, + api_key: &str, + request: &GifRequest, +) -> Vec { + let limit = request.limit.min(50); + let url = if request.kind == "search" && !request.query.trim().is_empty() { + format!( + "{}/search?q={}&key={}&limit={}&media_filter=tinygif,gif", + KLIPY_API_BASE, + urlencoding::encode(&request.query), + api_key, + limit, + ) + } else { + format!( + "{}/featured?key={}&limit={}&media_filter=tinygif,gif", + KLIPY_API_BASE, + api_key, + limit, + ) + }; + + let resp = match http.get(&url).send().await { + Ok(r) => r, + Err(e) => { + log::warn!("klipy request failed: {}", e); + return vec![]; + } + }; + + if !resp.status().is_success() { + log::warn!("klipy returned status {}", resp.status()); + return vec![]; + } + + let body: serde_json::Value = match resp.json().await { + Ok(v) => v, + Err(e) => { + log::warn!("klipy json parse error: {}", e); + return vec![]; + } + }; + + body["results"] + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|r| { + let gif_url = r["media_formats"]["gif"]["url"].as_str()?; + let preview_url = r["media_formats"]["tinygif"]["url"].as_str()?; + let dims = r["media_formats"]["tinygif"]["dims"] + .as_array() + .and_then(|d| { + Some([ + d.first()?.as_u64()? as u32, + d.get(1)?.as_u64()? as u32, + ]) + }) + .unwrap_or([220, 165]); + + Some(GifResult { + id: r["id"].as_str().unwrap_or_default().to_string(), + title: r["content_description"] + .as_str() + .or_else(|| r["title"].as_str()) + .unwrap_or_default() + .to_string(), + url: gif_url.to_string(), + preview: preview_url.to_string(), + dims, + }) + }) + .collect() + }) + .unwrap_or_default() +} +// ---- end gif protocol ---- + // resolve the path where we persist the relay's keypair so the peer id is stable fn keypair_path() -> PathBuf { if let Some(proj_dirs) = directories::ProjectDirs::from("", "", "dusk-relay") { @@ -95,6 +210,23 @@ fn load_or_generate_keypair() -> libp2p::identity::Keypair { async fn main() -> Result<(), Box> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + // load .env file if present (for KLIPY_API_KEY etc) + dotenvy::dotenv().ok(); + + // klipy api key for gif service (stays on the relay, never sent to clients) + let klipy_api_key = std::env::var("KLIPY_API_KEY") + .ok() + .filter(|k| !k.is_empty()); + + if klipy_api_key.is_some() { + log::info!("klipy api key found, gif service enabled"); + } else { + log::warn!("KLIPY_API_KEY not set, gif service will return empty results"); + } + + // http client for klipy api calls (shared across requests) + let http_client = reqwest::Client::new(); + let keypair = load_or_generate_keypair(); let local_peer_id = keypair.public().to_peer_id(); @@ -164,6 +296,12 @@ async fn main() -> Result<(), Box> { connection_limits::ConnectionLimits::default() .with_max_established(Some(max_connections)) ), + // gif search service over request-response protocol + gif_service: cbor::Behaviour::new( + [(GIF_PROTOCOL, ProtocolSupport::Inbound)], + request_response::Config::default() + .with_request_timeout(Duration::from_secs(15)), + ), } })? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300))) @@ -345,6 +483,49 @@ async fn main() -> Result<(), Box> { ); } + // gif service - incoming search/trending requests from clients + SwarmEvent::Behaviour(RelayBehaviourEvent::GifService( + request_response::Event::Message { + peer, + message: request_response::Message::Request { request, channel, .. }, + .. + } + )) => { + log::info!("gif {} request from peer {}", request.kind, peer); + + let api_key = klipy_api_key.clone(); + let http = http_client.clone(); + + // run the klipy fetch in a separate task so we dont block the event loop + let results = if let Some(ref key) = api_key { + fetch_klipy(&http, key, &request).await + } else { + vec![] + }; + + let response = GifResponse { results }; + if swarm.behaviour_mut().gif_service.send_response(channel, response).is_err() { + log::warn!("failed to send gif response to {}", peer); + } + } + // ignore outbound response sent confirmation + SwarmEvent::Behaviour(RelayBehaviourEvent::GifService( + request_response::Event::Message { + message: request_response::Message::Response { .. }, .. + } + )) => {} + SwarmEvent::Behaviour(RelayBehaviourEvent::GifService( + request_response::Event::OutboundFailure { peer, error, .. } + )) => { + log::warn!("gif outbound failure to {}: {:?}", peer, error); + } + SwarmEvent::Behaviour(RelayBehaviourEvent::GifService( + request_response::Event::InboundFailure { peer, error, .. } + )) => { + log::debug!("gif inbound failure from {}: {:?}", peer, error); + } + SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(_)) => {} + // connection tracking SwarmEvent::ConnectionEstablished { peer_id, .. } => { connection_count += 1;