A production-ready, full-stack Retrieval-Augmented Generation (RAG) system that runs entirely on free-tier cloud services with zero operational costs

Document Management RAG is a modern, open-source RAG platform enabling users to upload documents (PDF, DOCX, TXT), perform semantic search, and interact with an LLM using context from their own files. The backend is built with async FastAPI, integrates Hugging Face Inference API for LLM and embeddings, uses Qdrant for vector search, and MongoDB (with Motor async driver) for chat session persistence. The frontend is a React + TypeScript SPA with a ChatGPT-like interface, real-time SSE streaming, document upload with progress tracking, and chat history management. The entire stack is designed for zero-cost deployment using free-tier services: Render.com, Qdrant Cloud, MongoDB Atlas, and Hugging Face Inference API.
Staying within free-tier API quotas for Hugging Face, Qdrant Cloud, and MongoDB Atlas
Implemented embedding caching with SHA-256 hash keys to reduce duplicate API calls by ~60%
SSE Streaming with Context Retrieval (FastAPI)
async def generate_chat_stream(
request: ChatRequest,
user_id: str,
embedding_service: EmbeddingService,
vectordb_service: VectorDBService,
llm_service: LLMService,
settings: Settings,
) -> AsyncGenerator[str, None]:
"""Generate chat response as SSE stream with RAG context."""
sources = []
if request.use_context:
# Embed query asynchronously
query_embedding = await embedding_service.embed_text(request.query)
# Build filters for document-scoped search
search_filters = {}
if request.document_filter:
search_filters["filename"] = request.document_filter
# Search for relevant documents
search_results = await run_in_threadpool(
vectordb_service.search,
query_embedding,
user_id,
settings.TOP_K_RESULTS,
filters=search_filters,
)
context_docs = [result.text for result in search_results]
sources = list(set(
result.metadata.get("filename", "unknown")
for result in search_results
))
# Stream LLM response with context
async for chunk in llm_service.generate_response(
query=request.query,
context=context_docs if context_docs else None,
stream=True,
):
yield chunk
await asyncio.sleep(0.01) # Prevent overwhelming client
# Final SSE event with sources
sources_json = json.dumps(sources)
yield f'data: {{"done": true, "sources": {sources_json}}}\n\n'Efficiently chunking documents while preserving semantic boundaries for accurate retrieval
Used overlapping chunking strategy with sentence-boundary detection for coherent context retrieval
Frontend SSE Streaming Hook (React)
export const useSSE = () => {
const [isStreaming, setIsStreaming] = useState(false);
const streamChat = useCallback(async (
request: ChatRequest,
onChunk: (chunk: string) => void,
onComplete: () => void,
onError: (error: string) => void
) => {
setIsStreaming(true);
const response = await fetch(`${config.apiUrl}/api/chat/`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${localStorage.getItem('auth_token')}`,
},
body: JSON.stringify(request),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6)) as SSEChunk;
if (data.chunk) onChunk(data.chunk);
if (data.done) onComplete();
}
}
}
setIsStreaming(false);
}, []);
return { streamChat, isStreaming };
};Implementing real-time SSE streaming from Python async generators to React frontend
Built custom SSE endpoint with FastAPI StreamingResponse and React fetch API with ReadableStream for real-time streaming
Qdrant Vector Search with Retry Pattern
class VectorDBService:
"""Service for Qdrant vector database operations."""
def _retry_operation(self, operation, *args, **kwargs):
"""Retry with exponential backoff."""
for attempt in range(self.max_retries):
try:
return operation(*args, **kwargs)
except UnexpectedResponse as e:
if attempt == self.max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff
time.sleep(wait_time)
def search(
self,
query_embedding: List[float],
user_id: str,
top_k: int = 5,
filters: Optional[Dict[str, Any]] = None,
) -> List[SearchResult]:
"""Search for similar documents with filtering."""
# Build filter conditions
must_conditions = [
models.FieldCondition(
key="user_id",
match=models.MatchValue(value=user_id),
)
]
if filters and "filename" in filters:
must_conditions.append(
models.FieldCondition(
key="filename",
match=models.MatchValue(value=filters["filename"]),
)
)
results = self._retry_operation(
self.client.search,
collection_name=self.collection_name,
query_vector=query_embedding,
query_filter=models.Filter(must=must_conditions),
limit=top_k,
)
return [SearchResult(
text=hit.payload.get("text", ""),
score=hit.score,
metadata=hit.payload,
) for hit in results]Handling Qdrant API failures gracefully with exponential backoff retry logic
Implemented retry mechanism with exponential backoff (2^attempt seconds) for all Qdrant operations
Embedding Service with Caching
class EmbeddingService:
"""Service for generating text embeddings with caching."""
def __init__(self, settings: Settings):
self.client = InferenceClient(
token=settings.HF_API_TOKEN,
timeout=settings.HF_API_TIMEOUT,
)
self._cache: dict[str, List[float]] = {}
self._cache_max_size = settings.EMBEDDING_CACHE_SIZE
def _get_cache_key(self, text: str) -> str:
"""Generate SHA-256 cache key for text."""
return hashlib.sha256(
f"{self.model_name}:{text}".encode()
).hexdigest()
async def embed_text(self, text: str) -> List[float]:
"""Embed text with caching to reduce API calls."""
cache_key = self._get_cache_key(text)
# Check cache first
if cache_key in self._cache:
return self._cache[cache_key]
# Generate embedding via HuggingFace API
embedding = await anyio.to_thread.run_sync(
self._blocking_embed, text
)
# Update cache with LRU eviction
if len(self._cache) >= self._cache_max_size:
oldest_key = next(iter(self._cache))
del self._cache[oldest_key]
self._cache[cache_key] = embedding
return embedding