add gif search service using request-response protocol and update dependencies

This commit is contained in:
cloudwithax 2026-02-15 16:45:52 -05:00
parent 47af6658a2
commit 5073351fb9
3 changed files with 469 additions and 9 deletions

288
Cargo.lock generated
View File

@ -195,13 +195,19 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "atomic-waker"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]] [[package]]
name = "attohttpc" name = "attohttpc"
version = "0.24.1" version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2"
dependencies = [ dependencies = [
"http", "http 0.2.12",
"log", "log",
"url", "url",
] ]
@ -303,6 +309,15 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "cbor4ii"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.2.55" version = "1.2.55"
@ -576,6 +591,12 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dotenvy"
version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]] [[package]]
name = "dtoa" name = "dtoa"
version = "1.0.11" version = "1.0.11"
@ -587,13 +608,16 @@ name = "dusk-relay"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"directories", "directories",
"dotenvy",
"env_logger", "env_logger",
"futures", "futures",
"libp2p", "libp2p",
"log", "log",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"urlencoding",
] ]
[[package]] [[package]]
@ -906,7 +930,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"http", "http 0.2.12",
"indexmap", "indexmap",
"slab", "slab",
"tokio", "tokio",
@ -1024,6 +1048,16 @@ dependencies = [
"itoa", "itoa",
] ]
[[package]]
name = "http"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a"
dependencies = [
"bytes",
"itoa",
]
[[package]] [[package]]
name = "http-body" name = "http-body"
version = "0.4.6" version = "0.4.6"
@ -1031,7 +1065,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [ dependencies = [
"bytes", "bytes",
"http", "http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http 1.4.0",
]
[[package]]
name = "http-body-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http 1.4.0",
"http-body 1.0.1",
"pin-project-lite", "pin-project-lite",
] ]
@ -1058,8 +1115,8 @@ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2", "h2",
"http", "http 0.2.12",
"http-body", "http-body 0.4.6",
"httparse", "httparse",
"httpdate", "httpdate",
"itoa", "itoa",
@ -1071,6 +1128,67 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
dependencies = [
"atomic-waker",
"bytes",
"futures-channel",
"futures-core",
"http 1.4.0",
"http-body 1.0.1",
"httparse",
"itoa",
"pin-project-lite",
"pin-utils",
"smallvec",
"tokio",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.27.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http 1.4.0",
"hyper 1.8.1",
"hyper-util",
"rustls",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots",
]
[[package]]
name = "hyper-util"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
dependencies = [
"base64",
"bytes",
"futures-channel",
"futures-util",
"http 1.4.0",
"http-body 1.0.1",
"hyper 1.8.1",
"ipnet",
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.2",
"tokio",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "icu_collections" name = "icu_collections"
version = "2.1.1" version = "2.1.1"
@ -1216,8 +1334,8 @@ dependencies = [
"attohttpc", "attohttpc",
"bytes", "bytes",
"futures", "futures",
"http", "http 0.2.12",
"hyper", "hyper 0.14.32",
"log", "log",
"rand 0.8.5", "rand 0.8.5",
"tokio", "tokio",
@ -1271,6 +1389,16 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "iri-string"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a"
dependencies = [
"memchr",
"serde",
]
[[package]] [[package]]
name = "is_terminal_polyfill" name = "is_terminal_polyfill"
version = "1.70.2" version = "1.70.2"
@ -1354,6 +1482,7 @@ dependencies = [
"libp2p-quic", "libp2p-quic",
"libp2p-relay", "libp2p-relay",
"libp2p-rendezvous", "libp2p-rendezvous",
"libp2p-request-response",
"libp2p-swarm", "libp2p-swarm",
"libp2p-tcp", "libp2p-tcp",
"libp2p-upnp", "libp2p-upnp",
@ -1668,6 +1797,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6" checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"cbor4ii",
"futures", "futures",
"futures-bounded", "futures-bounded",
"futures-timer", "futures-timer",
@ -1675,6 +1805,7 @@ dependencies = [
"libp2p-identity", "libp2p-identity",
"libp2p-swarm", "libp2p-swarm",
"rand 0.8.5", "rand 0.8.5",
"serde",
"smallvec", "smallvec",
"tracing", "tracing",
"void", "void",
@ -2522,6 +2653,44 @@ version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c"
[[package]]
name = "reqwest"
version = "0.12.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147"
dependencies = [
"base64",
"bytes",
"futures-core",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.8.1",
"hyper-rustls",
"hyper-util",
"js-sys",
"log",
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls-pki-types",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tokio-rustls",
"tower",
"tower-http",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots",
]
[[package]] [[package]]
name = "resolv-conf" name = "resolv-conf"
version = "0.7.6" version = "0.7.6"
@ -2674,6 +2843,12 @@ dependencies = [
"static_assertions", "static_assertions",
] ]
[[package]]
name = "ryu"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.2.0" version = "1.2.0"
@ -2729,6 +2904,18 @@ dependencies = [
"zmij", "zmij",
] ]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.9" version = "0.10.9"
@ -2859,6 +3046,15 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sync_wrapper"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
dependencies = [
"futures-core",
]
[[package]] [[package]]
name = "synstructure" name = "synstructure"
version = "0.13.2" version = "0.13.2"
@ -3015,6 +3211,16 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-rustls"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61"
dependencies = [
"rustls",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.18" version = "0.7.18"
@ -3028,6 +3234,45 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tower"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-http"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8"
dependencies = [
"bitflags 2.10.0",
"bytes",
"futures-util",
"http 1.4.0",
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]] [[package]]
name = "tower-service" name = "tower-service"
version = "0.3.3" version = "0.3.3"
@ -3129,6 +3374,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "urlencoding"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
[[package]] [[package]]
name = "utf8_iter" name = "utf8_iter"
version = "1.0.4" version = "1.0.4"
@ -3190,6 +3441,20 @@ dependencies = [
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f"
dependencies = [
"cfg-if",
"futures-util",
"js-sys",
"once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.108" version = "0.2.108"
@ -3242,6 +3507,15 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "webpki-roots"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed"
dependencies = [
"rustls-pki-types",
]
[[package]] [[package]]
name = "widestring" name = "widestring"
version = "1.2.1" version = "1.2.1"

View File

@ -16,6 +16,8 @@ libp2p = { version = "0.54", features = [
"tcp", "tcp",
"tokio", "tokio",
"macros", "macros",
"request-response",
"cbor",
] } ] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
futures = "0.3" futures = "0.3"
@ -24,3 +26,6 @@ env_logger = "0.11"
directories = "5" directories = "5"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] }
dotenvy = "0.15"
urlencoding = "2"

View File

@ -7,6 +7,7 @@
// - circuit relay v2: peers connect through this node, never seeing each other's IPs // - circuit relay v2: peers connect through this node, never seeing each other's IPs
// - rendezvous: peers register under community namespaces, discover each other by peer ID // - rendezvous: peers register under community namespaces, discover each other by peer ID
// - relay federation: gossips peer registrations to other relays for global discovery // - relay federation: gossips peer registrations to other relays for global discovery
// - gif service: responds to gif search requests from clients via request-response protocol
// - no data storage, no message routing, just connection brokering // - no data storage, no message routing, just connection brokering
// //
// usage: // usage:
@ -29,8 +30,9 @@ use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
use libp2p::{ use libp2p::{
connection_limits, gossipsub, identify, noise, ping, relay, rendezvous, swarm::SwarmEvent, connection_limits, gossipsub, identify, noise, ping, relay, rendezvous,
tcp, yamux, Multiaddr, PeerId, request_response::{self, cbor, ProtocolSupport},
swarm::SwarmEvent, tcp, yamux, Multiaddr, PeerId, StreamProtocol,
}; };
// gossip message for relay-to-relay federation // gossip message for relay-to-relay federation
@ -53,8 +55,121 @@ struct RelayBehaviour {
identify: identify::Behaviour, identify: identify::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
limits: connection_limits::Behaviour, limits: connection_limits::Behaviour,
// gif search service - clients send GifRequest, relay responds with GifResponse
gif_service: cbor::Behaviour<GifRequest, GifResponse>,
} }
// ---- gif protocol ----
// clients send a GifRequest over request-response and the relay responds
// with a GifResponse after fetching from klipy. the api key stays on the
// relay so clients never need credentials.
const GIF_PROTOCOL: StreamProtocol = StreamProtocol::new("/dusk/gif/1.0.0");
const KLIPY_API_BASE: &str = "https://api.klipy.com/v2";
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GifRequest {
// "search" or "trending"
pub kind: String,
// search query (only used when kind == "search")
pub query: String,
pub limit: u32,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GifResponse {
pub results: Vec<GifResult>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct GifResult {
pub id: String,
pub title: String,
pub url: String,
pub preview: String,
pub dims: [u32; 2],
}
// fetch from klipy and normalize into our GifResult format
async fn fetch_klipy(
http: &reqwest::Client,
api_key: &str,
request: &GifRequest,
) -> Vec<GifResult> {
let limit = request.limit.min(50);
let url = if request.kind == "search" && !request.query.trim().is_empty() {
format!(
"{}/search?q={}&key={}&limit={}&media_filter=tinygif,gif",
KLIPY_API_BASE,
urlencoding::encode(&request.query),
api_key,
limit,
)
} else {
format!(
"{}/featured?key={}&limit={}&media_filter=tinygif,gif",
KLIPY_API_BASE,
api_key,
limit,
)
};
let resp = match http.get(&url).send().await {
Ok(r) => r,
Err(e) => {
log::warn!("klipy request failed: {}", e);
return vec![];
}
};
if !resp.status().is_success() {
log::warn!("klipy returned status {}", resp.status());
return vec![];
}
let body: serde_json::Value = match resp.json().await {
Ok(v) => v,
Err(e) => {
log::warn!("klipy json parse error: {}", e);
return vec![];
}
};
body["results"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|r| {
let gif_url = r["media_formats"]["gif"]["url"].as_str()?;
let preview_url = r["media_formats"]["tinygif"]["url"].as_str()?;
let dims = r["media_formats"]["tinygif"]["dims"]
.as_array()
.and_then(|d| {
Some([
d.first()?.as_u64()? as u32,
d.get(1)?.as_u64()? as u32,
])
})
.unwrap_or([220, 165]);
Some(GifResult {
id: r["id"].as_str().unwrap_or_default().to_string(),
title: r["content_description"]
.as_str()
.or_else(|| r["title"].as_str())
.unwrap_or_default()
.to_string(),
url: gif_url.to_string(),
preview: preview_url.to_string(),
dims,
})
})
.collect()
})
.unwrap_or_default()
}
// ---- end gif protocol ----
// resolve the path where we persist the relay's keypair so the peer id is stable // resolve the path where we persist the relay's keypair so the peer id is stable
fn keypair_path() -> PathBuf { fn keypair_path() -> PathBuf {
if let Some(proj_dirs) = directories::ProjectDirs::from("", "", "dusk-relay") { if let Some(proj_dirs) = directories::ProjectDirs::from("", "", "dusk-relay") {
@ -95,6 +210,23 @@ fn load_or_generate_keypair() -> libp2p::identity::Keypair {
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// load .env file if present (for KLIPY_API_KEY etc)
dotenvy::dotenv().ok();
// klipy api key for gif service (stays on the relay, never sent to clients)
let klipy_api_key = std::env::var("KLIPY_API_KEY")
.ok()
.filter(|k| !k.is_empty());
if klipy_api_key.is_some() {
log::info!("klipy api key found, gif service enabled");
} else {
log::warn!("KLIPY_API_KEY not set, gif service will return empty results");
}
// http client for klipy api calls (shared across requests)
let http_client = reqwest::Client::new();
let keypair = load_or_generate_keypair(); let keypair = load_or_generate_keypair();
let local_peer_id = keypair.public().to_peer_id(); let local_peer_id = keypair.public().to_peer_id();
@ -164,6 +296,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
connection_limits::ConnectionLimits::default() connection_limits::ConnectionLimits::default()
.with_max_established(Some(max_connections)) .with_max_established(Some(max_connections))
), ),
// gif search service over request-response protocol
gif_service: cbor::Behaviour::new(
[(GIF_PROTOCOL, ProtocolSupport::Inbound)],
request_response::Config::default()
.with_request_timeout(Duration::from_secs(15)),
),
} }
})? })?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300))) .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300)))
@ -345,6 +483,49 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
} }
// gif service - incoming search/trending requests from clients
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::Message {
peer,
message: request_response::Message::Request { request, channel, .. },
..
}
)) => {
log::info!("gif {} request from peer {}", request.kind, peer);
let api_key = klipy_api_key.clone();
let http = http_client.clone();
// run the klipy fetch in a separate task so we dont block the event loop
let results = if let Some(ref key) = api_key {
fetch_klipy(&http, key, &request).await
} else {
vec![]
};
let response = GifResponse { results };
if swarm.behaviour_mut().gif_service.send_response(channel, response).is_err() {
log::warn!("failed to send gif response to {}", peer);
}
}
// ignore outbound response sent confirmation
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::Message {
message: request_response::Message::Response { .. }, ..
}
)) => {}
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::OutboundFailure { peer, error, .. }
)) => {
log::warn!("gif outbound failure to {}: {:?}", peer, error);
}
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::InboundFailure { peer, error, .. }
)) => {
log::debug!("gif inbound failure from {}: {:?}", peer, error);
}
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(_)) => {}
// connection tracking // connection tracking
SwarmEvent::ConnectionEstablished { peer_id, .. } => { SwarmEvent::ConnectionEstablished { peer_id, .. } => {
connection_count += 1; connection_count += 1;