This commit is contained in:
cloudwithax 2026-02-15 16:46:05 -05:00
parent 5073351fb9
commit d4b558b35b
2 changed files with 74 additions and 40 deletions

View File

@ -26,6 +26,9 @@ env_logger = "0.11"
directories = "5"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] }
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
"json",
] }
dotenvy = "0.15"
urlencoding = "2"

View File

@ -32,7 +32,8 @@ use futures::StreamExt;
use libp2p::{
connection_limits, gossipsub, identify, noise, ping, relay, rendezvous,
request_response::{self, cbor, ProtocolSupport},
swarm::SwarmEvent, tcp, yamux, Multiaddr, PeerId, StreamProtocol,
swarm::SwarmEvent,
tcp, yamux, Multiaddr, PeerId, StreamProtocol,
};
// gossip message for relay-to-relay federation
@ -108,9 +109,7 @@ async fn fetch_klipy(
} else {
format!(
"{}/featured?key={}&limit={}&media_filter=tinygif,gif",
KLIPY_API_BASE,
api_key,
limit,
KLIPY_API_BASE, api_key, limit,
)
};
@ -145,10 +144,7 @@ async fn fetch_klipy(
let dims = r["media_formats"]["tinygif"]["dims"]
.as_array()
.and_then(|d| {
Some([
d.first()?.as_u64()? as u32,
d.get(1)?.as_u64()? as u32,
])
Some([d.first()?.as_u64()? as u32, d.get(1)?.as_u64()? as u32])
})
.unwrap_or([220, 165]);
@ -290,11 +286,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
key.public(),
)),
// ping every 30s to keep peer connections alive
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(30))),
ping: ping::Behaviour::new(
ping::Config::new().with_interval(Duration::from_secs(30)),
),
// limit total concurrent connections (default 10k for ~t3.medium)
limits: connection_limits::Behaviour::new(
connection_limits::ConnectionLimits::default()
.with_max_established(Some(max_connections))
.with_max_established(Some(max_connections)),
),
// gif search service over request-response protocol
gif_service: cbor::Behaviour::new(
@ -319,7 +317,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// subscribe to the relay federation gossip topic
let federation_topic = gossipsub::IdentTopic::new("dusk/relay/federation");
swarm.behaviour_mut().gossipsub.subscribe(&federation_topic)?;
swarm
.behaviour_mut()
.gossipsub
.subscribe(&federation_topic)?;
log::info!("subscribed to relay federation topic");
// connect to peer relays for federation (from env var, comma-separated multiaddrs)
@ -345,7 +346,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.collect();
if !peer_relays.is_empty() {
log::info!("connecting to {} peer relays for federation", peer_relays.len());
log::info!(
"connecting to {} peer relays for federation",
peer_relays.len()
);
for addr in &peer_relays {
log::info!(" dialing peer relay: {}", addr);
if let Err(e) = swarm.dial(addr.clone()) {
@ -388,7 +392,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
}
SwarmEvent::Behaviour(RelayBehaviourEvent::Relay(
relay::Event::CircuitReqAccepted { src_peer_id, dst_peer_id, .. },
relay::Event::CircuitReqAccepted {
src_peer_id,
dst_peer_id,
..
},
)) => {
log::info!(
"circuit opened: {} -> {} (through relay)",
@ -396,14 +404,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
dst_peer_id
);
}
SwarmEvent::Behaviour(RelayBehaviourEvent::Relay(
relay::Event::CircuitClosed { src_peer_id, dst_peer_id, .. },
)) => {
log::debug!(
"circuit closed: {} -> {}",
SwarmEvent::Behaviour(RelayBehaviourEvent::Relay(relay::Event::CircuitClosed {
src_peer_id,
dst_peer_id
);
dst_peer_id,
..
})) => {
log::debug!("circuit closed: {} -> {}", src_peer_id, dst_peer_id);
}
// rendezvous events
@ -427,7 +433,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
if let Ok(data) = serde_json::to_vec(&gossip) {
if let Err(e) = swarm.behaviour_mut().gossipsub.publish(federation_topic.clone(), data) {
if let Err(e) = swarm
.behaviour_mut()
.gossipsub
.publish(federation_topic.clone(), data)
{
log::warn!("failed to gossip registration to peer relays: {}", e);
} else {
log::debug!(
@ -440,7 +450,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
SwarmEvent::Behaviour(RelayBehaviourEvent::Rendezvous(
rendezvous::server::Event::DiscoverServed { enquirer, registrations, .. },
rendezvous::server::Event::DiscoverServed {
enquirer,
registrations,
..
},
)) => {
log::info!(
"served {} registrations to peer {}",
@ -450,10 +464,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// gossipsub messages - receive peer registrations from other relays
SwarmEvent::Behaviour(RelayBehaviourEvent::Gossipsub(
gossipsub::Event::Message { message, .. }
)) => {
if let Ok(gossip) = serde_json::from_slice::<RelayRegistrationGossip>(&message.data) {
SwarmEvent::Behaviour(RelayBehaviourEvent::Gossipsub(gossipsub::Event::Message {
message,
..
})) => {
if let Ok(gossip) = serde_json::from_slice::<RelayRegistrationGossip>(&message.data)
{
// skip our own messages (loop prevention)
if gossip.source_relay != local_peer_id.to_string() {
log::info!(
@ -466,7 +482,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
SwarmEvent::Behaviour(RelayBehaviourEvent::Rendezvous(
rendezvous::server::Event::PeerNotRegistered { peer, namespace, .. },
rendezvous::server::Event::PeerNotRegistered {
peer, namespace, ..
},
)) => {
log::debug!(
"peer {} tried to register under '{}' but was rejected",
@ -487,9 +505,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::Message {
peer,
message: request_response::Message::Request { request, channel, .. },
message:
request_response::Message::Request {
request, channel, ..
},
..
}
},
)) => {
log::info!("gif {} request from peer {}", request.kind, peer);
@ -504,23 +525,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let response = GifResponse { results };
if swarm.behaviour_mut().gif_service.send_response(channel, response).is_err() {
if swarm
.behaviour_mut()
.gif_service
.send_response(channel, response)
.is_err()
{
log::warn!("failed to send gif response to {}", peer);
}
}
// ignore outbound response sent confirmation
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::Message {
message: request_response::Message::Response { .. }, ..
}
message: request_response::Message::Response { .. },
..
},
)) => {}
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::OutboundFailure { peer, error, .. }
request_response::Event::OutboundFailure { peer, error, .. },
)) => {
log::warn!("gif outbound failure to {}: {:?}", peer, error);
}
SwarmEvent::Behaviour(RelayBehaviourEvent::GifService(
request_response::Event::InboundFailure { peer, error, .. }
request_response::Event::InboundFailure { peer, error, .. },
)) => {
log::debug!("gif inbound failure from {}: {:?}", peer, error);
}
@ -531,7 +558,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
connection_count += 1;
// check if this is a peer relay connection
if expected_peer_relay_ids.contains(&peer_id) && !connected_peer_relays.contains(&peer_id) {
if expected_peer_relay_ids.contains(&peer_id)
&& !connected_peer_relays.contains(&peer_id)
{
connected_peer_relays.push(peer_id);
log::info!(
"peer relay connected: {} ({}/{} peer relays online)",
@ -581,9 +610,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// identify events - log protocol info from connecting peers
SwarmEvent::Behaviour(RelayBehaviourEvent::Identify(
identify::Event::Received { peer_id, info, .. },
)) => {
SwarmEvent::Behaviour(RelayBehaviourEvent::Identify(identify::Event::Received {
peer_id,
info,
..
})) => {
log::debug!(
"identified peer {}: protocol={}, agent={}",
peer_id,