add in-memory gif cache and rate limiter to optimize api requests
This commit is contained in:
parent
d4b558b35b
commit
5c8f57e07c
199
src/main.rs
199
src/main.rs
|
|
@ -25,8 +25,9 @@
|
|||
// t3.large (8GB): 20,000 max connections
|
||||
// c6i.xlarge: 50,000 max connections (with kernel tuning)
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::StreamExt;
|
||||
use libp2p::{
|
||||
|
|
@ -91,6 +92,116 @@ pub struct GifResult {
|
|||
pub dims: [u32; 2],
|
||||
}
|
||||
|
||||
// ---- gif cache ----
|
||||
// caches klipy responses in memory so repeated queries dont hit the api.
|
||||
// trending results refresh every 10 minutes, search results live for 30 minutes.
|
||||
|
||||
const TRENDING_CACHE_TTL: Duration = Duration::from_secs(600);
|
||||
const SEARCH_CACHE_TTL: Duration = Duration::from_secs(1800);
|
||||
|
||||
struct GifCache {
|
||||
// key: normalized cache key (kind:query:limit), value: (results, inserted_at)
|
||||
entries: HashMap<String, (Vec<GifResult>, Instant)>,
|
||||
}
|
||||
|
||||
impl GifCache {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
entries: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// build a normalized cache key from the request
|
||||
fn cache_key(request: &GifRequest) -> String {
|
||||
let query = request.query.trim().to_lowercase();
|
||||
format!("{}:{}:{}", request.kind, query, request.limit)
|
||||
}
|
||||
|
||||
fn get(&self, request: &GifRequest) -> Option<&Vec<GifResult>> {
|
||||
let key = Self::cache_key(request);
|
||||
let (results, inserted_at) = self.entries.get(&key)?;
|
||||
|
||||
let ttl = if request.kind == "trending" {
|
||||
TRENDING_CACHE_TTL
|
||||
} else {
|
||||
SEARCH_CACHE_TTL
|
||||
};
|
||||
|
||||
if inserted_at.elapsed() > ttl {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(results)
|
||||
}
|
||||
|
||||
fn insert(&mut self, request: &GifRequest, results: Vec<GifResult>) {
|
||||
let key = Self::cache_key(request);
|
||||
self.entries.insert(key, (results, Instant::now()));
|
||||
}
|
||||
|
||||
// drop expired entries to avoid unbounded memory growth.
|
||||
// called periodically, not on every access.
|
||||
fn evict_expired(&mut self) {
|
||||
// use the longer ttl as the eviction threshold so nothing gets
|
||||
// removed before its actual ttl expires
|
||||
let max_ttl = SEARCH_CACHE_TTL;
|
||||
self.entries
|
||||
.retain(|_, (_, inserted_at)| inserted_at.elapsed() <= max_ttl);
|
||||
}
|
||||
}
|
||||
// ---- end gif cache ----
|
||||
|
||||
// ---- gif rate limiter ----
|
||||
// sliding window rate limiter for test api keys. tracks request timestamps
|
||||
// and rejects requests once the window budget is exhausted.
|
||||
|
||||
const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(60);
|
||||
const TEST_KEY_MAX_REQUESTS: usize = 100;
|
||||
|
||||
struct GifRateLimiter {
|
||||
// timestamps of requests within the current window
|
||||
timestamps: VecDeque<Instant>,
|
||||
max_requests: usize,
|
||||
}
|
||||
|
||||
impl GifRateLimiter {
|
||||
fn new(max_requests: usize) -> Self {
|
||||
Self {
|
||||
timestamps: VecDeque::new(),
|
||||
max_requests,
|
||||
}
|
||||
}
|
||||
|
||||
// returns true if the request is allowed, false if rate limited
|
||||
fn allow(&mut self) -> bool {
|
||||
let now = Instant::now();
|
||||
// drop timestamps outside the sliding window
|
||||
while self
|
||||
.timestamps
|
||||
.front()
|
||||
.is_some_and(|t| now.duration_since(*t) > RATE_LIMIT_WINDOW)
|
||||
{
|
||||
self.timestamps.pop_front();
|
||||
}
|
||||
if self.timestamps.len() >= self.max_requests {
|
||||
return false;
|
||||
}
|
||||
self.timestamps.push_back(now);
|
||||
true
|
||||
}
|
||||
|
||||
fn remaining(&self) -> usize {
|
||||
let now = Instant::now();
|
||||
let active = self
|
||||
.timestamps
|
||||
.iter()
|
||||
.filter(|t| now.duration_since(**t) <= RATE_LIMIT_WINDOW)
|
||||
.count();
|
||||
self.max_requests.saturating_sub(active)
|
||||
}
|
||||
}
|
||||
// ---- end gif rate limiter ----
|
||||
|
||||
// fetch from klipy and normalize into our GifResult format
|
||||
async fn fetch_klipy(
|
||||
http: &reqwest::Client,
|
||||
|
|
@ -220,6 +331,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
log::warn!("KLIPY_API_KEY not set, gif service will return empty results");
|
||||
}
|
||||
|
||||
// when running with a test api key, enforce rate limiting to stay within
|
||||
// klipy's test tier. set KLIPY_TEST_KEY=true in .env to enable.
|
||||
let is_test_key = std::env::var("KLIPY_TEST_KEY")
|
||||
.ok()
|
||||
.map(|v| v == "true" || v == "1")
|
||||
.unwrap_or(false);
|
||||
|
||||
let mut gif_rate_limiter: Option<GifRateLimiter> = if is_test_key {
|
||||
log::info!(
|
||||
"klipy test key mode: rate limited to {} requests/min",
|
||||
TEST_KEY_MAX_REQUESTS
|
||||
);
|
||||
Some(GifRateLimiter::new(TEST_KEY_MAX_REQUESTS))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// http client for klipy api calls (shared across requests)
|
||||
let http_client = reqwest::Client::new();
|
||||
|
||||
|
|
@ -361,6 +489,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
log::warn!("this relay will operate in standalone mode");
|
||||
}
|
||||
|
||||
// in-memory gif response cache to avoid redundant klipy api calls
|
||||
let mut gif_cache = GifCache::new();
|
||||
// evict stale cache entries every 5 minutes
|
||||
let mut cache_eviction_interval = tokio::time::interval(Duration::from_secs(300));
|
||||
cache_eviction_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
// track active reservations for logging
|
||||
let mut reservation_count: usize = 0;
|
||||
let mut connection_count: usize = 0;
|
||||
|
|
@ -369,7 +503,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
let mut connected_peer_relays: Vec<PeerId> = Vec::new();
|
||||
|
||||
loop {
|
||||
match swarm.select_next_some().await {
|
||||
let event = tokio::select! {
|
||||
// periodic cache eviction
|
||||
_ = cache_eviction_interval.tick() => {
|
||||
let before = gif_cache.entries.len();
|
||||
gif_cache.evict_expired();
|
||||
let evicted = before - gif_cache.entries.len();
|
||||
if evicted > 0 {
|
||||
log::debug!("gif cache: evicted {} expired entries, {} remaining", evicted, gif_cache.entries.len());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
event = swarm.select_next_some() => event,
|
||||
};
|
||||
|
||||
#[allow(clippy::single_match)]
|
||||
match event {
|
||||
// relay events
|
||||
SwarmEvent::Behaviour(RelayBehaviourEvent::Relay(
|
||||
relay::Event::ReservationReqAccepted { src_peer_id, .. },
|
||||
|
|
@ -512,16 +661,44 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
..
|
||||
},
|
||||
)) => {
|
||||
log::info!("gif {} request from peer {}", request.kind, peer);
|
||||
|
||||
let api_key = klipy_api_key.clone();
|
||||
let http = http_client.clone();
|
||||
|
||||
// run the klipy fetch in a separate task so we dont block the event loop
|
||||
let results = if let Some(ref key) = api_key {
|
||||
fetch_klipy(&http, key, &request).await
|
||||
} else {
|
||||
// check cache first, then fall back to klipy api
|
||||
let cached = gif_cache.get(&request).cloned();
|
||||
let results = if let Some(hits) = cached {
|
||||
log::info!(
|
||||
"gif {} request from peer {} (cache hit)",
|
||||
request.kind,
|
||||
peer
|
||||
);
|
||||
hits
|
||||
} else if gif_rate_limiter.as_mut().is_some_and(|rl| !rl.allow()) {
|
||||
// rate limited - return empty results instead of hitting klipy
|
||||
let remaining = gif_rate_limiter
|
||||
.as_ref()
|
||||
.map(|rl| rl.remaining())
|
||||
.unwrap_or(0);
|
||||
log::warn!(
|
||||
"gif {} request from peer {} rate limited ({} remaining in window)",
|
||||
request.kind,
|
||||
peer,
|
||||
remaining
|
||||
);
|
||||
vec![]
|
||||
} else {
|
||||
log::info!(
|
||||
"gif {} request from peer {} (cache miss)",
|
||||
request.kind,
|
||||
peer
|
||||
);
|
||||
let fetched = if let Some(ref key) = klipy_api_key {
|
||||
fetch_klipy(&http_client, key, &request).await
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
// only cache non-empty responses
|
||||
if !fetched.is_empty() {
|
||||
gif_cache.insert(&request, fetched.clone());
|
||||
}
|
||||
fetched
|
||||
};
|
||||
|
||||
let response = GifResponse { results };
|
||||
|
|
|
|||
Loading…
Reference in New Issue