S
SankalpRaiGambhir
Sankalp Rai Gambhir· Fullstack Software & AI Engineer
HomeProjectsInsightsSkillsBlogContactResume
Back to Projects

Document Management RAG

A production-ready, full-stack Retrieval-Augmented Generation (RAG) system that runs entirely on free-tier cloud services with zero operational costs

FastAPIPythonQdrantMongoDBHugging FaceReactTypeScriptViteTailwind CSSDocker
GitHubLive Demo
Document Management RAG
5
Services
2
Databases
15+
API Endpoints
80%+
Test Coverage
Overview

Document Management RAG is a modern, open-source RAG platform enabling users to upload documents (PDF, DOCX, TXT), perform semantic search, and interact with an LLM using context from their own files. The backend is built with async FastAPI, integrates Hugging Face Inference API for LLM and embeddings, uses Qdrant for vector search, and MongoDB (with Motor async driver) for chat session persistence. The frontend is a React + TypeScript SPA with a ChatGPT-like interface, real-time SSE streaming, document upload with progress tracking, and chat history management. The entire stack is designed for zero-cost deployment using free-tier services: Render.com, Qdrant Cloud, MongoDB Atlas, and Hugging Face Inference API.

Timeline: 1 month
Role: Fullstack AI Engineer
Implementation Overview
  • ✓Async FastAPI backend with SSE streaming for real-time LLM responses
  • ✓Hugging Face Inference API integration for LLM (Qwen2.5-72B-Instruct) and embeddings (all-MiniLM-L6-v2)
  • ✓Qdrant vector database with payload indexing for semantic search and document filtering
  • ✓MongoDB Atlas with async Motor driver for user authentication and chat session persistence
  • ✓JWT authentication with secure password hashing (bcrypt)
  • ✓Document ingestion pipeline with smart text chunking (PDF, DOCX, TXT support)
  • ✓Embedding caching with LRU strategy to minimize API calls
  • ✓Rate limiting with in-memory token bucket pattern
  • ✓Modern React + TypeScript frontend with ChatGPT-like UI
  • ✓Real-time streaming chat responses with source attribution
  • ✓Document-scoped chat sessions with context filtering
  • ✓Dockerized deployment with docker-compose orchestration

Technical Deep Dive

1
Problem

Staying within free-tier API quotas for Hugging Face, Qdrant Cloud, and MongoDB Atlas

✓

Solution

Implemented embedding caching with SHA-256 hash keys to reduce duplicate API calls by ~60%

</>

Implementation

SSE Streaming with Context Retrieval (FastAPI)

async def generate_chat_stream(
    request: ChatRequest,
    user_id: str,
    embedding_service: EmbeddingService,
    vectordb_service: VectorDBService,
    llm_service: LLMService,
    settings: Settings,
) -> AsyncGenerator[str, None]:
    """Generate chat response as SSE stream with RAG context."""
    sources = []
    
    if request.use_context:
        # Embed query asynchronously
        query_embedding = await embedding_service.embed_text(request.query)
        
        # Build filters for document-scoped search
        search_filters = {}
        if request.document_filter:
            search_filters["filename"] = request.document_filter
        
        # Search for relevant documents
        search_results = await run_in_threadpool(
            vectordb_service.search,
            query_embedding,
            user_id,
            settings.TOP_K_RESULTS,
            filters=search_filters,
        )
        
        context_docs = [result.text for result in search_results]
        sources = list(set(
            result.metadata.get("filename", "unknown")
            for result in search_results
        ))
    
    # Stream LLM response with context
    async for chunk in llm_service.generate_response(
        query=request.query,
        context=context_docs if context_docs else None,
        stream=True,
    ):
        yield chunk
        await asyncio.sleep(0.01)  # Prevent overwhelming client
    
    # Final SSE event with sources
    sources_json = json.dumps(sources)
    yield f'data: {{"done": true, "sources": {sources_json}}}\n\n'
Key Insight: This generator combines vector search, context retrieval, and LLM streaming into a single SSE endpoint, enabling real-time responses with source attribution.
2
Problem

Efficiently chunking documents while preserving semantic boundaries for accurate retrieval

✓

Solution

Used overlapping chunking strategy with sentence-boundary detection for coherent context retrieval

</>

Implementation

Frontend SSE Streaming Hook (React)

export const useSSE = () => {
  const [isStreaming, setIsStreaming] = useState(false);

  const streamChat = useCallback(async (
    request: ChatRequest,
    onChunk: (chunk: string) => void,
    onComplete: () => void,
    onError: (error: string) => void
  ) => {
    setIsStreaming(true);

    const response = await fetch(`${config.apiUrl}/api/chat/`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${localStorage.getItem('auth_token')}`,
      },
      body: JSON.stringify(request),
    });

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop() || '';

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = JSON.parse(line.slice(6)) as SSEChunk;
          if (data.chunk) onChunk(data.chunk);
          if (data.done) onComplete();
        }
      }
    }
    setIsStreaming(false);
  }, []);

  return { streamChat, isStreaming };
};
Key Insight: Custom React hook consuming SSE streams using the Fetch API with ReadableStream, handling chunked responses and maintaining streaming state.
3
Problem

Implementing real-time SSE streaming from Python async generators to React frontend

✓

Solution

Built custom SSE endpoint with FastAPI StreamingResponse and React fetch API with ReadableStream for real-time streaming

</>

Implementation

Qdrant Vector Search with Retry Pattern

class VectorDBService:
    """Service for Qdrant vector database operations."""
    
    def _retry_operation(self, operation, *args, **kwargs):
        """Retry with exponential backoff."""
        for attempt in range(self.max_retries):
            try:
                return operation(*args, **kwargs)
            except UnexpectedResponse as e:
                if attempt == self.max_retries - 1:
                    raise
                wait_time = 2 ** attempt  # Exponential backoff
                time.sleep(wait_time)
    
    def search(
        self,
        query_embedding: List[float],
        user_id: str,
        top_k: int = 5,
        filters: Optional[Dict[str, Any]] = None,
    ) -> List[SearchResult]:
        """Search for similar documents with filtering."""
        # Build filter conditions
        must_conditions = [
            models.FieldCondition(
                key="user_id",
                match=models.MatchValue(value=user_id),
            )
        ]
        
        if filters and "filename" in filters:
            must_conditions.append(
                models.FieldCondition(
                    key="filename",
                    match=models.MatchValue(value=filters["filename"]),
                )
            )
        
        results = self._retry_operation(
            self.client.search,
            collection_name=self.collection_name,
            query_vector=query_embedding,
            query_filter=models.Filter(must=must_conditions),
            limit=top_k,
        )
        
        return [SearchResult(
            text=hit.payload.get("text", ""),
            score=hit.score,
            metadata=hit.payload,
        ) for hit in results]
Key Insight: Qdrant search with user-scoped filtering, document-specific queries, and exponential backoff retry pattern for resilient cloud API calls.
4
Problem

Handling Qdrant API failures gracefully with exponential backoff retry logic

✓

Solution

Implemented retry mechanism with exponential backoff (2^attempt seconds) for all Qdrant operations

</>

Implementation

Embedding Service with Caching

class EmbeddingService:
    """Service for generating text embeddings with caching."""
    
    def __init__(self, settings: Settings):
        self.client = InferenceClient(
            token=settings.HF_API_TOKEN,
            timeout=settings.HF_API_TIMEOUT,
        )
        self._cache: dict[str, List[float]] = {}
        self._cache_max_size = settings.EMBEDDING_CACHE_SIZE
    
    def _get_cache_key(self, text: str) -> str:
        """Generate SHA-256 cache key for text."""
        return hashlib.sha256(
            f"{self.model_name}:{text}".encode()
        ).hexdigest()
    
    async def embed_text(self, text: str) -> List[float]:
        """Embed text with caching to reduce API calls."""
        cache_key = self._get_cache_key(text)
        
        # Check cache first
        if cache_key in self._cache:
            return self._cache[cache_key]
        
        # Generate embedding via HuggingFace API
        embedding = await anyio.to_thread.run_sync(
            self._blocking_embed, text
        )
        
        # Update cache with LRU eviction
        if len(self._cache) >= self._cache_max_size:
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]
        
        self._cache[cache_key] = embedding
        return embedding
Key Insight: Embedding service with SHA-256 hash-based caching reduces duplicate Hugging Face API calls, critical for staying within free-tier quotas.
View Full Repository