What it is
The query-time RAG path the project’s six retrieval articles converged on, lifted into one importable Pipeline class. Replaces ~250 lines of hand-rolled embed + pgvector + chat-completions glue per article.
The strict-context grounded prompt is reproduced verbatim from the article. Embed and rerank inherit NIMClient.chat’s retry policy, so co-resident memory pressure on the Spark’s unified pool doesn’t fail the pipeline.
Public API
from fieldkit.rag import (
Pipeline,
Document,
Chunk,
DEFAULT_EMBED_MODEL, # "nvidia/llama-nemotron-embed-1b-v2"
DEFAULT_EMBED_DIM, # 1024
DEFAULT_EMBED_BATCH, # 32
DEFAULT_CHUNK_TOKENS, # 900
CHUNKS_PER_DOC_MAX, # 10000
DEFAULT_RERANK_URL,
DEFAULT_RERANK_MODEL, # "nvidia/llama-3.2-nv-rerankqa-1b-v2"
DEFAULT_SYSTEM_PROMPT,
RAGError,
)
Tunable constants
These module-level defaults are exported so callers can read them, log them, or override them per-call without re-deriving the numbers from the article history.
| Constant | Default | What it controls |
|---|---|---|
DEFAULT_EMBED_MODEL | "nvidia/llama-nemotron-embed-1b-v2" | The 1024-d Matryoshka embedder used across the project’s RAG articles. |
DEFAULT_EMBED_DIM | 1024 | Output dimensionality of the embedder. 1024-d gives a ~50% storage cut vs native 2048-d at ~4 recall points (the sweet spot from nemo-retriever-embeddings-local). |
DEFAULT_EMBED_BATCH | 32 | Passages embedded per /v1/embeddings call during .ingest(). Matches the article-#4 sweet spot of ~28 docs/s on Spark; raise it on bigger boxes, lower it if the embedder NIM is OOM-pressured. |
DEFAULT_CHUNK_TOKENS | 900 | Per-chunk token budget for .ingest(). Sized so a top-5 retrieval at this chunk size stays under fieldkit.nim.NIM_CONTEXT_WINDOW = 8192 with room for system + query + answer (see project_spark_nim_context_window). |
CHUNKS_PER_DOC_MAX | 10000 | Hard ceiling on chunks per document. Encoded into the chunk id formula chunk.id = doc.id * CHUNKS_PER_DOC_MAX + chunk_idx, so raising this changes the id arithmetic — only bump it if you also re-ingest the corpus from scratch. .ingest() raises RAGError if a doc would exceed the cap. |
DEFAULT_RERANK_URL | NGC hosted reranker | Where rerank requests go when rerank_url is set. Switch to a local URL when GB10-native TRT plans for the reranker land. |
DEFAULT_RERANK_MODEL | "nvidia/llama-3.2-nv-rerankqa-1b-v2" | Model id sent in the rerank request body. Override on Pipeline(..., rerank_model=...) when pointing at a different reranker. |
Pipeline(embed_url, pgvector_dsn, generator: NIMClient, rerank_url=None, ...)
Composable, context-manager friendly. One persistent httpx.Client for embed and (optionally) one for rerank. pgvector connections are short-lived per call so callers don’t have to manage them.
from fieldkit.nim import NIMClient
from fieldkit.rag import Document, Pipeline
with NIMClient(base_url="http://localhost:8000/v1",
model="meta/llama-3.1-8b-instruct") as gen, \
Pipeline(
embed_url="http://localhost:8001/v1",
pgvector_dsn="postgresql://spark:spark@localhost:5432/vectors",
generator=gen,
) as pipe:
pipe.ensure_schema()
pipe.ingest([Document(id=1, text="...", label="spark")])
result = pipe.ask("How much memory does the Spark have?",
retrieve_k=5, rerank_k=3, max_tokens=120)
print(result["answer"])
Pipeline methods
| Method | Returns | Notes |
|---|---|---|
ensure_schema() | None | CREATE EXTENSION IF NOT EXISTS vector + CREATE TABLE IF NOT EXISTS at configured embed dim. Idempotent. |
ingest(docs, chunk_tokens=900) | int | Chunks via fieldkit.nim.chunk_text, embeds in batches of 32, upserts in one transaction. Returns chunk count. |
retrieve(query, top_k=5) | list[Chunk] | pgvector cosine <=>. Each chunk carries distance. |
rerank(query, chunks, top_k=3) | list[Chunk] | Pass-through when rerank_url=None so the simplest pipeline works without NGC creds. |
fuse(query, chunks, **gen_kwargs) | dict | Builds the strict-context prompt and calls the generator. |
ask(query, retrieve_k=5, rerank_k=3, ...) | dict | Full chain. Returns {"answer", "chunks", "raw"}. |
Chunk id encoding
Single-chunk docs keep their original id. Multi-chunk docs get id = doc.id * 10000 + idx so the doc → chunk relationship survives without an extra column.
Chunk.score
Single “higher is better” score: rerank logit if available, else 1 - distance. Fallback 0.0 if neither is set.
Sample
samples/naive-rag.py reproduces the naive-rag-on-spark flow end-to-end in ~30 lines: ensure schema → ingest 3 docs → ask one question.
CLI
fieldkit bench rag --table fieldkit_cli_bench_rag --out /tmp/bench.json
Drives Pipeline.ask through fieldkit.eval.Bench against a 3-doc in-memory corpus and prints a markdown latency report. Requires the chat NIM, embed NIM, and pgvector to be reachable.