diff --git a/src/main.rs b/src/main.rs index 0c87429..7ade09a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,8 +25,9 @@ // t3.large (8GB): 20,000 max connections // c6i.xlarge: 50,000 max connections (with kernel tuning) +use std::collections::{HashMap, VecDeque}; use std::path::PathBuf; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures::StreamExt; use libp2p::{ @@ -91,6 +92,116 @@ pub struct GifResult { pub dims: [u32; 2], } +// ---- gif cache ---- +// caches klipy responses in memory so repeated queries dont hit the api. +// trending results refresh every 10 minutes, search results live for 30 minutes. + +const TRENDING_CACHE_TTL: Duration = Duration::from_secs(600); +const SEARCH_CACHE_TTL: Duration = Duration::from_secs(1800); + +struct GifCache { + // key: normalized cache key (kind:query:limit), value: (results, inserted_at) + entries: HashMap, Instant)>, +} + +impl GifCache { + fn new() -> Self { + Self { + entries: HashMap::new(), + } + } + + // build a normalized cache key from the request + fn cache_key(request: &GifRequest) -> String { + let query = request.query.trim().to_lowercase(); + format!("{}:{}:{}", request.kind, query, request.limit) + } + + fn get(&self, request: &GifRequest) -> Option<&Vec> { + let key = Self::cache_key(request); + let (results, inserted_at) = self.entries.get(&key)?; + + let ttl = if request.kind == "trending" { + TRENDING_CACHE_TTL + } else { + SEARCH_CACHE_TTL + }; + + if inserted_at.elapsed() > ttl { + return None; + } + + Some(results) + } + + fn insert(&mut self, request: &GifRequest, results: Vec) { + let key = Self::cache_key(request); + self.entries.insert(key, (results, Instant::now())); + } + + // drop expired entries to avoid unbounded memory growth. + // called periodically, not on every access. + fn evict_expired(&mut self) { + // use the longer ttl as the eviction threshold so nothing gets + // removed before its actual ttl expires + let max_ttl = SEARCH_CACHE_TTL; + self.entries + .retain(|_, (_, inserted_at)| inserted_at.elapsed() <= max_ttl); + } +} +// ---- end gif cache ---- + +// ---- gif rate limiter ---- +// sliding window rate limiter for test api keys. tracks request timestamps +// and rejects requests once the window budget is exhausted. + +const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(60); +const TEST_KEY_MAX_REQUESTS: usize = 100; + +struct GifRateLimiter { + // timestamps of requests within the current window + timestamps: VecDeque, + max_requests: usize, +} + +impl GifRateLimiter { + fn new(max_requests: usize) -> Self { + Self { + timestamps: VecDeque::new(), + max_requests, + } + } + + // returns true if the request is allowed, false if rate limited + fn allow(&mut self) -> bool { + let now = Instant::now(); + // drop timestamps outside the sliding window + while self + .timestamps + .front() + .is_some_and(|t| now.duration_since(*t) > RATE_LIMIT_WINDOW) + { + self.timestamps.pop_front(); + } + if self.timestamps.len() >= self.max_requests { + return false; + } + self.timestamps.push_back(now); + true + } + + fn remaining(&self) -> usize { + let now = Instant::now(); + let active = self + .timestamps + .iter() + .filter(|t| now.duration_since(**t) <= RATE_LIMIT_WINDOW) + .count(); + self.max_requests.saturating_sub(active) + } +} +// ---- end gif rate limiter ---- + // fetch from klipy and normalize into our GifResult format async fn fetch_klipy( http: &reqwest::Client, @@ -220,6 +331,23 @@ async fn main() -> Result<(), Box> { log::warn!("KLIPY_API_KEY not set, gif service will return empty results"); } + // when running with a test api key, enforce rate limiting to stay within + // klipy's test tier. set KLIPY_TEST_KEY=true in .env to enable. + let is_test_key = std::env::var("KLIPY_TEST_KEY") + .ok() + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + + let mut gif_rate_limiter: Option = if is_test_key { + log::info!( + "klipy test key mode: rate limited to {} requests/min", + TEST_KEY_MAX_REQUESTS + ); + Some(GifRateLimiter::new(TEST_KEY_MAX_REQUESTS)) + } else { + None + }; + // http client for klipy api calls (shared across requests) let http_client = reqwest::Client::new(); @@ -361,6 +489,12 @@ async fn main() -> Result<(), Box> { log::warn!("this relay will operate in standalone mode"); } + // in-memory gif response cache to avoid redundant klipy api calls + let mut gif_cache = GifCache::new(); + // evict stale cache entries every 5 minutes + let mut cache_eviction_interval = tokio::time::interval(Duration::from_secs(300)); + cache_eviction_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // track active reservations for logging let mut reservation_count: usize = 0; let mut connection_count: usize = 0; @@ -369,7 +503,22 @@ async fn main() -> Result<(), Box> { let mut connected_peer_relays: Vec = Vec::new(); loop { - match swarm.select_next_some().await { + let event = tokio::select! { + // periodic cache eviction + _ = cache_eviction_interval.tick() => { + let before = gif_cache.entries.len(); + gif_cache.evict_expired(); + let evicted = before - gif_cache.entries.len(); + if evicted > 0 { + log::debug!("gif cache: evicted {} expired entries, {} remaining", evicted, gif_cache.entries.len()); + } + continue; + } + event = swarm.select_next_some() => event, + }; + + #[allow(clippy::single_match)] + match event { // relay events SwarmEvent::Behaviour(RelayBehaviourEvent::Relay( relay::Event::ReservationReqAccepted { src_peer_id, .. }, @@ -512,16 +661,44 @@ async fn main() -> Result<(), Box> { .. }, )) => { - log::info!("gif {} request from peer {}", request.kind, peer); - - let api_key = klipy_api_key.clone(); - let http = http_client.clone(); - - // run the klipy fetch in a separate task so we dont block the event loop - let results = if let Some(ref key) = api_key { - fetch_klipy(&http, key, &request).await - } else { + // check cache first, then fall back to klipy api + let cached = gif_cache.get(&request).cloned(); + let results = if let Some(hits) = cached { + log::info!( + "gif {} request from peer {} (cache hit)", + request.kind, + peer + ); + hits + } else if gif_rate_limiter.as_mut().is_some_and(|rl| !rl.allow()) { + // rate limited - return empty results instead of hitting klipy + let remaining = gif_rate_limiter + .as_ref() + .map(|rl| rl.remaining()) + .unwrap_or(0); + log::warn!( + "gif {} request from peer {} rate limited ({} remaining in window)", + request.kind, + peer, + remaining + ); vec![] + } else { + log::info!( + "gif {} request from peer {} (cache miss)", + request.kind, + peer + ); + let fetched = if let Some(ref key) = klipy_api_key { + fetch_klipy(&http_client, key, &request).await + } else { + vec![] + }; + // only cache non-empty responses + if !fetched.is_empty() { + gif_cache.insert(&request, fetched.clone()); + } + fetched }; let response = GifResponse { results };