fieldkit · module

fieldkit.rag

Composable ingest → retrieve → rerank → fuse RAG pipeline backed by pgvector + a NIM embedder + the strict-context grounded prompt from `naive-rag-on-spark`.

What it is

The query-time RAG path the project’s six retrieval articles converged on, lifted into one importable Pipeline class. Replaces ~250 lines of hand-rolled embed + pgvector + chat-completions glue per article.

The strict-context grounded prompt is reproduced verbatim from the article. Embed and rerank inherit NIMClient.chat’s retry policy, so co-resident memory pressure on the Spark’s unified pool doesn’t fail the pipeline.

Public API

from fieldkit.rag import (
    Pipeline,
    Document,
    Chunk,
    DEFAULT_EMBED_MODEL,           # "nvidia/llama-nemotron-embed-1b-v2"
    DEFAULT_EMBED_DIM,             # 1024
    DEFAULT_EMBED_BATCH,           # 32
    DEFAULT_CHUNK_TOKENS,          # 900
    CHUNKS_PER_DOC_MAX,            # 10000
    DEFAULT_RERANK_URL,
    DEFAULT_RERANK_MODEL,          # "nvidia/llama-3.2-nv-rerankqa-1b-v2"
    DEFAULT_SYSTEM_PROMPT,
    RAGError,
)

Tunable constants

These module-level defaults are exported so callers can read them, log them, or override them per-call without re-deriving the numbers from the article history.

ConstantDefaultWhat it controls
DEFAULT_EMBED_MODEL"nvidia/llama-nemotron-embed-1b-v2"The 1024-d Matryoshka embedder used across the project’s RAG articles.
DEFAULT_EMBED_DIM1024Output dimensionality of the embedder. 1024-d gives a ~50% storage cut vs native 2048-d at ~4 recall points (the sweet spot from nemo-retriever-embeddings-local).
DEFAULT_EMBED_BATCH32Passages embedded per /v1/embeddings call during .ingest(). Matches the article-#4 sweet spot of ~28 docs/s on Spark; raise it on bigger boxes, lower it if the embedder NIM is OOM-pressured.
DEFAULT_CHUNK_TOKENS900Per-chunk token budget for .ingest(). Sized so a top-5 retrieval at this chunk size stays under fieldkit.nim.NIM_CONTEXT_WINDOW = 8192 with room for system + query + answer (see project_spark_nim_context_window).
CHUNKS_PER_DOC_MAX10000Hard ceiling on chunks per document. Encoded into the chunk id formula chunk.id = doc.id * CHUNKS_PER_DOC_MAX + chunk_idx, so raising this changes the id arithmetic — only bump it if you also re-ingest the corpus from scratch. .ingest() raises RAGError if a doc would exceed the cap.
DEFAULT_RERANK_URLNGC hosted rerankerWhere rerank requests go when rerank_url is set. Switch to a local URL when GB10-native TRT plans for the reranker land.
DEFAULT_RERANK_MODEL"nvidia/llama-3.2-nv-rerankqa-1b-v2"Model id sent in the rerank request body. Override on Pipeline(..., rerank_model=...) when pointing at a different reranker.

Pipeline(embed_url, pgvector_dsn, generator: NIMClient, rerank_url=None, ...)

Composable, context-manager friendly. One persistent httpx.Client for embed and (optionally) one for rerank. pgvector connections are short-lived per call so callers don’t have to manage them.

from fieldkit.nim import NIMClient
from fieldkit.rag import Document, Pipeline

with NIMClient(base_url="http://localhost:8000/v1",
               model="meta/llama-3.1-8b-instruct") as gen, \
     Pipeline(
         embed_url="http://localhost:8001/v1",
         pgvector_dsn="postgresql://spark:spark@localhost:5432/vectors",
         generator=gen,
     ) as pipe:
    pipe.ensure_schema()
    pipe.ingest([Document(id=1, text="...", label="spark")])
    result = pipe.ask("How much memory does the Spark have?",
                      retrieve_k=5, rerank_k=3, max_tokens=120)
    print(result["answer"])

Pipeline methods

MethodReturnsNotes
ensure_schema()NoneCREATE EXTENSION IF NOT EXISTS vector + CREATE TABLE IF NOT EXISTS at configured embed dim. Idempotent.
ingest(docs, chunk_tokens=900)intChunks via fieldkit.nim.chunk_text, embeds in batches of 32, upserts in one transaction. Returns chunk count.
retrieve(query, top_k=5)list[Chunk]pgvector cosine <=>. Each chunk carries distance.
rerank(query, chunks, top_k=3)list[Chunk]Pass-through when rerank_url=None so the simplest pipeline works without NGC creds.
fuse(query, chunks, **gen_kwargs)dictBuilds the strict-context prompt and calls the generator.
ask(query, retrieve_k=5, rerank_k=3, ...)dictFull chain. Returns {"answer", "chunks", "raw"}.

Chunk id encoding

Single-chunk docs keep their original id. Multi-chunk docs get id = doc.id * 10000 + idx so the doc → chunk relationship survives without an extra column.

Chunk.score

Single “higher is better” score: rerank logit if available, else 1 - distance. Fallback 0.0 if neither is set.

Sample

samples/naive-rag.py reproduces the naive-rag-on-spark flow end-to-end in ~30 lines: ensure schema → ingest 3 docs → ask one question.

CLI

fieldkit bench rag --table fieldkit_cli_bench_rag --out /tmp/bench.json

Drives Pipeline.ask through fieldkit.eval.Bench against a 3-doc in-memory corpus and prints a markdown latency report. Requires the chat NIM, embed NIM, and pgvector to be reachable.

Articles that use fieldkit.rag