Back to Blog
Vector DatabasesPineconeWeaviatepgvectorRAGAIPythonProduction

Vector Databases in Production — Pinecone, Weaviate, and pgvector for RAG at Scale

A practical guide to choosing and operating vector databases in production RAG systems: pgvector with HNSW index design, Weaviate hybrid search with BM25 and vectorizer modules, Pinecone serverless vs pod-based architectures, embedding pipeline design with chunking strategies and batch upserts, ANN index tuning for recall/latency trade-offs, metadata filtering strategies, multi-tenancy patterns, monitoring embedding drift and recall@k, backup and disaster recovery for vector stores, and a decision framework for selecting the right vector database for your use case.

2026-05-22

Why Vector Search is Different from Traditional Search

Traditional search engines — Elasticsearch, PostgreSQL full-text search, Solr — operate on exact or fuzzy keyword matching. A query for “database performance tuning” returns documents containing those tokens, ranked by BM25 or TF-IDF. This works well for known-item retrieval, but it breaks down the moment users express intent in language that does not share lexical overlap with the documents you have indexed. A user asking “how do I make my queries faster” will miss every document that uses the word “optimization” but not “faster.”

Vector search solves this by moving from the token space to the semantic space. An embedding model — a neural network trained on large text corpora — transforms any piece of text into a dense floating-point vector, typically 768 to 3072 dimensions depending on the model. Semantically similar texts map to geometrically nearby points in that high- dimensional space. Retrieval becomes a nearest-neighbor search: given a query vector, find the N document vectors with the smallest angular distance (cosine similarity) or Euclidean distance.

Exact nearest-neighbor search over millions of vectors is prohibitively expensive — O(n) per query with full scans. Production systems use Approximate Nearest Neighbor (ANN) algorithms that trade a small amount of recall for orders-of-magnitude faster queries. The dominant algorithm family in production is HNSW (Hierarchical Navigable Small World), a graph-based index that achieves sub-millisecond queries at 99%+ recall on datasets with tens of millions of vectors.

Note

Recall@k is the core metric for vector search quality: the fraction of true nearest neighbors that appear in the top-k results returned by the ANN index. A recall@10 of 0.95 means 95% of the true top-10 results are in the returned set. In RAG systems, low recall means the correct context chunks are not retrieved, causing the LLM to hallucinate or give incomplete answers. Always measure recall@k against a ground-truth dataset before and after index configuration changes.

The Three Production Contenders

Three options dominate production RAG deployments today, each with a distinct operational model, cost structure, and capability set.

FeaturepgvectorWeaviatePinecone
DeploymentSelf-hosted (PostgreSQL extension)Self-hosted or Weaviate CloudFully managed SaaS
Index typeHNSW, IVFFlatHNSW (built-in)HNSW (managed)
Hybrid searchManual (BM25 extension needed)Native BM25 + vectorSparse + dense vectors
Multi-tenancyVia schema/row securityNative tenant isolationNamespaces
Vectorizer built-inNoYes (text2vec-openai, etc.)No (bring your own)
Best forExisting Postgres shopsHybrid search, multi-tenant RAGServerless scale, zero ops

pgvector in Depth — Schema Design, Index Types, and Python with psycopg3

pgvector is a PostgreSQL extension that adds a native vector column type and ANN index support. Its primary advantage is operational simplicity for teams already running PostgreSQL: no new infrastructure, no separate vector store to monitor, and full transactional consistency between your application data and embeddings. Joins between vectors and relational data are first-class SQL operations.

pgvector supports two index types. HNSW builds a navigable graph structure at index creation time. Queries are fast (sub-millisecond at scale) and recall is high, but the index requires more memory — roughly 8 bytes per dimension per vector for the graph links. IVFFlat partitions vectors into flat clusters (Voronoi cells); queries probe a configurable number of cells (probes) and search each. IVFFlat uses less memory than HNSW and handles bulk data loading better, but requires a training pass over your data and gives lower recall at the same query latency unless you increase the number of probes.

-- schema_pgvector.sql
-- Enable the extension (requires PostgreSQL 14+ and pgvector installed)
CREATE EXTENSION IF NOT EXISTS vector;

-- Documents table with embedding column
CREATE TABLE documents (
    id          BIGSERIAL PRIMARY KEY,
    tenant_id   UUID NOT NULL,
    source_uri  TEXT NOT NULL,
    chunk_index INT NOT NULL,
    content     TEXT NOT NULL,
    -- text-embedding-3-small produces 1536-dim vectors
    embedding   vector(1536),
    metadata    JSONB DEFAULT '{}',
    created_at  TIMESTAMPTZ DEFAULT now(),
    UNIQUE (tenant_id, source_uri, chunk_index)
);

-- HNSW index — best for low-latency recall in production
-- m: max connections per layer (higher = better recall, more memory)
-- ef_construction: search depth during build (higher = better recall, slower build)
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- For IVFFlat: create after populating at least 10k rows
-- lists ≈ sqrt(row_count) is a good starting point
-- CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
--     WITH (lists = 100);

-- Metadata index for filtered search
CREATE INDEX ON documents (tenant_id);
CREATE INDEX ON documents USING gin (metadata);

-- Row-level security for multi-tenancy
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON documents
    USING (tenant_id = current_setting('app.current_tenant')::UUID);
# pgvector_client.py
from __future__ import annotations

import os
from typing import Any
from pgvector.psycopg import register_vector
import psycopg
from psycopg.rows import dict_row

DB_DSN = os.environ["DATABASE_URL"]


def get_conn() -> psycopg.Connection:
    conn = psycopg.connect(DB_DSN, row_factory=dict_row)
    register_vector(conn)
    return conn


def upsert_chunk(
    conn: psycopg.Connection,
    tenant_id: str,
    source_uri: str,
    chunk_index: int,
    content: str,
    embedding: list[float],
    metadata: dict[str, Any] | None = None,
) -> None:
    conn.execute(
        """
        INSERT INTO documents (tenant_id, source_uri, chunk_index, content, embedding, metadata)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON CONFLICT (tenant_id, source_uri, chunk_index)
        DO UPDATE SET
            content   = EXCLUDED.content,
            embedding = EXCLUDED.embedding,
            metadata  = EXCLUDED.metadata
        """,
        (tenant_id, source_uri, chunk_index, content, embedding, metadata or {}),
    )


def similarity_search(
    conn: psycopg.Connection,
    tenant_id: str,
    query_embedding: list[float],
    top_k: int = 10,
    metadata_filter: dict[str, Any] | None = None,
) -> list[dict]:
    """
    ANN search with optional metadata pre-filter.
    Uses cosine distance (<=>); lower = more similar.
    Sets ef_search at session level for recall/latency tuning.
    """
    # ef_search: higher = better recall, higher latency (default 40)
    conn.execute("SET hnsw.ef_search = 80")

    filter_clause = "AND metadata @> %s" if metadata_filter else ""
    params: list[Any] = [tenant_id, query_embedding]
    if metadata_filter:
        import json
        params.append(json.dumps(metadata_filter))
    params.append(top_k)

    rows = conn.execute(
        f"""
        SELECT
            id,
            source_uri,
            chunk_index,
            content,
            metadata,
            1 - (embedding <=> %s::vector) AS score
        FROM documents
        WHERE tenant_id = %s
        {filter_clause}
        ORDER BY embedding <=> %s::vector
        LIMIT %s
        """,
        [query_embedding, tenant_id] + ([metadata_filter and __import__('json').dumps(metadata_filter)] if metadata_filter else []) + [query_embedding, top_k],
    ).fetchall()
    return rows

Note

Set hnsw.ef_search at the session level before your ANN queries. The default is 40. Increasing it improves recall at the cost of latency. For interactive RAG queries where p95 latency matters, start at 80 and measure recall@10 on your evaluation set. For batch re-ranking pipelines where latency is not critical, set it to 200 or higher. You can also set max_parallel_workers_per_gather to enable parallel index scans on large tables.

Weaviate in Depth — Hybrid Search, Vectorizer Modules, and Multi-Tenancy

Weaviate is a purpose-built vector database that combines an HNSW vector index with a BM25 inverted index in a single engine. Its defining feature for RAG is native hybrid search: a single query can blend dense vector similarity and sparse keyword relevance using a configurable alpha parameter (0 = pure BM25, 1 = pure vector, 0.75 = 75% vector weight). This eliminates the need to manage separate keyword and vector indexes and merge results in application code.

Weaviate's vectorizer module system lets you configure automatic embedding generation at the class level — objects are vectorized on insert without a separate embedding pipeline step. Supported modules include text2vec-openai, text2vec-cohere, and text2vec-transformers (self-hosted). For production, we recommend managing embeddings outside Weaviate and using the none vectorizer, which gives you full control over embedding model versioning and batching.

# weaviate_client.py — Python client v4
from __future__ import annotations

import os
import weaviate
import weaviate.classes as wvc
from weaviate.classes.config import Configure, VectorDistances, Property, DataType
from weaviate.classes.query import MetadataQuery, HybridFusion

client = weaviate.connect_to_weaviate_cloud(
    cluster_url=os.environ["WEAVIATE_URL"],
    auth_credentials=wvc.init.Auth.api_key(os.environ["WEAVIATE_API_KEY"]),
)


def create_collection(collection_name: str) -> None:
    """
    Create a Weaviate collection with:
    - none vectorizer (we provide embeddings ourselves)
    - HNSW index with cosine distance
    - multi-tenancy enabled
    """
    client.collections.create(
        name=collection_name,
        vectorizer_config=Configure.Vectorizer.none(),
        vector_index_config=Configure.VectorIndex.hnsw(
            distance_metric=VectorDistances.COSINE,
            ef_construction=128,
            max_connections=16,
        ),
        multi_tenancy_config=Configure.multi_tenancy(enabled=True),
        properties=[
            Property(name="source_uri", data_type=DataType.TEXT),
            Property(name="chunk_index", data_type=DataType.INT),
            Property(name="content", data_type=DataType.TEXT),
            Property(name="doc_type", data_type=DataType.TEXT),
        ],
    )


def add_tenant(collection_name: str, tenant_id: str) -> None:
    collection = client.collections.get(collection_name)
    collection.tenants.create(wvc.tenants.Tenant(name=tenant_id))


def batch_upsert(
    collection_name: str,
    tenant_id: str,
    objects: list[dict],
) -> None:
    """
    Batch upsert objects with pre-computed embeddings.
    objects: list of {"uuid": ..., "properties": {...}, "vector": [...]}
    """
    collection = client.collections.get(collection_name).with_tenant(tenant_id)
    with collection.batch.dynamic() as batch:
        for obj in objects:
            batch.add_object(
                uuid=obj["uuid"],
                properties=obj["properties"],
                vector=obj["vector"],
            )
    if collection.batch.failed_objects:
        failed = len(collection.batch.failed_objects)
        raise RuntimeError(f"Weaviate batch upsert: {failed} objects failed")


def hybrid_search(
    collection_name: str,
    tenant_id: str,
    query_text: str,
    query_vector: list[float],
    top_k: int = 10,
    alpha: float = 0.75,
    doc_type_filter: str | None = None,
) -> list[dict]:
    """
    Hybrid search blending BM25 (alpha=0) and vector (alpha=1).
    alpha=0.75 is a good starting point for most RAG use cases.
    """
    collection = client.collections.get(collection_name).with_tenant(tenant_id)

    filters = None
    if doc_type_filter:
        filters = wvc.query.Filter.by_property("doc_type").equal(doc_type_filter)

    response = collection.query.hybrid(
        query=query_text,
        vector=query_vector,
        alpha=alpha,
        limit=top_k,
        fusion_type=HybridFusion.RELATIVE_SCORE,
        filters=filters,
        return_metadata=MetadataQuery(score=True, explain_score=True),
    )

    return [
        {
            "uuid": str(obj.uuid),
            "properties": obj.properties,
            "score": obj.metadata.score,
        }
        for obj in response.objects
    ]

Note

Weaviate's multi-tenancy feature isolates each tenant's data into a separate shard with independent HNSW indexes. This is preferable to a single shared collection with a tenant filter for two reasons: query latency scales with the tenant's data volume rather than the total collection size, and tenant data can be cold-activated or deleted without affecting other tenants. Enable multi-tenancy at collection creation — it cannot be added retroactively.

Pinecone in Depth — Serverless vs Pod-Based, Namespaces, and Metadata Filtering

Pinecone is a fully managed vector database with no infrastructure to operate. It offers two deployment modes: Serverless indexes scale automatically from zero and charge per query and storage unit — ideal for variable workloads and products with unpredictable traffic. Pod-based indexes provision dedicated compute with predictable latency and throughput — appropriate for high-QPS production systems with SLA requirements.

Pinecone indexes are partitioned by namespaces, which provide logical data isolation within a single index. All upsert, query, and delete operations target a specific namespace. This is the primary multi-tenancy mechanism for Pinecone — one namespace per tenant for smaller workloads, or separate indexes per tenant for strict isolation and independent scaling.

# pinecone_client.py
from __future__ import annotations

import os
import time
from itertools import islice
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])


def create_index(
    index_name: str,
    dimension: int = 1536,
    metric: str = "cosine",
    cloud: str = "aws",
    region: str = "us-east-1",
) -> None:
    """Create a serverless Pinecone index."""
    if index_name not in [idx.name for idx in pc.list_indexes()]:
        pc.create_index(
            name=index_name,
            dimension=dimension,
            metric=metric,
            spec=ServerlessSpec(cloud=cloud, region=region),
        )
        # Wait for the index to be ready
        while not pc.describe_index(index_name).status["ready"]:
            time.sleep(1)


def _batched(iterable, n: int):
    """Yield successive n-sized chunks from iterable."""
    it = iter(iterable)
    while chunk := list(islice(it, n)):
        yield chunk


def upsert_vectors(
    index_name: str,
    namespace: str,
    vectors: list[dict],
    batch_size: int = 100,
) -> None:
    """
    Upsert vectors in batches.
    vectors: list of {"id": str, "values": list[float], "metadata": dict}
    Pinecone recommends batches of 100 vectors, max 2MB per batch.
    """
    index = pc.Index(index_name)
    for batch in _batched(vectors, batch_size):
        index.upsert(vectors=batch, namespace=namespace)


def query_vectors(
    index_name: str,
    namespace: str,
    query_vector: list[float],
    top_k: int = 10,
    metadata_filter: dict | None = None,
    include_metadata: bool = True,
) -> list[dict]:
    """
    Query with optional metadata pre-filter.
    Metadata filtering runs before ANN search — cardinality matters.
    High-cardinality filters (e.g. doc_id == X) can degrade ANN quality;
    prefer lower-cardinality filters (e.g. doc_type IN [...], tenant_id == X).
    """
    index = pc.Index(index_name)
    response = index.query(
        vector=query_vector,
        top_k=top_k,
        namespace=namespace,
        filter=metadata_filter,
        include_metadata=include_metadata,
        include_values=False,
    )
    return [
        {
            "id": match.id,
            "score": match.score,
            "metadata": match.metadata,
        }
        for match in response.matches
    ]


def delete_namespace(index_name: str, namespace: str) -> None:
    """Delete all vectors in a namespace (e.g. when removing a tenant)."""
    index = pc.Index(index_name)
    index.delete(delete_all=True, namespace=namespace)

Embedding Pipeline Design — Chunking, Model Selection, Batching, and Caching

The quality of your vector search is determined more by your embedding pipeline than by your choice of vector database. A well-tuned chunking strategy and the right embedding model will outperform any index configuration change.

Chunking Strategies

The goal of chunking is to produce text segments that are semantically self-contained at retrieval time. Chunks that are too short lose context; chunks that are too long dilute the embedding signal and exceed context windows.

# chunking.py
from __future__ import annotations

import re
from dataclasses import dataclass


@dataclass
class Chunk:
    content: str
    chunk_index: int
    start_char: int
    end_char: int
    metadata: dict


def chunk_by_tokens(
    text: str,
    chunk_size: int = 512,
    chunk_overlap: int = 64,
    tokenizer=None,
) -> list[Chunk]:
    """
    Token-aware sliding window chunking.
    chunk_size: target tokens per chunk (not characters)
    chunk_overlap: overlap in tokens between adjacent chunks
    tokenizer: tiktoken or HuggingFace tokenizer; falls back to word split
    """
    if tokenizer is None:
        # Approximate: 1 token ≈ 4 characters
        words = text.split()
        approx_tokens_per_word = 1.3
        words_per_chunk = int(chunk_size / approx_tokens_per_word)
        overlap_words = int(chunk_overlap / approx_tokens_per_word)
        chunks = []
        i = 0
        while i < len(words):
            chunk_words = words[i : i + words_per_chunk]
            content = " ".join(chunk_words)
            chunks.append(
                Chunk(
                    content=content,
                    chunk_index=len(chunks),
                    start_char=text.find(chunk_words[0]) if chunk_words else 0,
                    end_char=0,
                    metadata={},
                )
            )
            i += words_per_chunk - overlap_words
        return chunks

    # Token-precise chunking with tiktoken
    tokens = tokenizer.encode(text)
    chunks = []
    i = 0
    while i < len(tokens):
        chunk_tokens = tokens[i : i + chunk_size]
        content = tokenizer.decode(chunk_tokens)
        chunks.append(
            Chunk(content=content, chunk_index=len(chunks), start_char=i, end_char=i + len(chunk_tokens), metadata={})
        )
        i += chunk_size - chunk_overlap
    return chunks


def chunk_by_markdown_sections(text: str, max_tokens: int = 512) -> list[Chunk]:
    """
    Split on markdown headings first, then sub-chunk oversized sections.
    Preserves heading context in each chunk (prepended to the chunk content).
    """
    sections = re.split(r"(?m)^(#{1,3} .+)$", text)
    chunks: list[Chunk] = []
    current_heading = ""
    for part in sections:
        if re.match(r"^#{1,3} ", part):
            current_heading = part.strip()
        else:
            content = f"{current_heading}\n\n{part.strip()}" if current_heading else part.strip()
            if not content.strip():
                continue
            # Sub-chunk if oversized (approximate)
            if len(content) > max_tokens * 4:
                sub = chunk_by_tokens(content, chunk_size=max_tokens, chunk_overlap=64)
                for s in sub:
                    s.chunk_index = len(chunks)
                    chunks.append(s)
            else:
                chunks.append(
                    Chunk(content=content, chunk_index=len(chunks), start_char=0, end_char=len(content), metadata={})
                )
    return chunks

Batch Embedding with Caching

# embed_pipeline.py
from __future__ import annotations

import hashlib
import json
import os
from typing import Any
import openai
import redis

openai_client = openai.OpenAI()
cache = redis.Redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379"))
EMBED_MODEL = "text-embedding-3-small"
CACHE_TTL = 86400 * 7  # 7 days


def _cache_key(text: str, model: str) -> str:
    h = hashlib.sha256(f"{model}:{text}".encode()).hexdigest()
    return f"embed:{h}"


def embed_batch(texts: list[str], model: str = EMBED_MODEL) -> list[list[float]]:
    """
    Embed a batch of texts with Redis caching.
    - Cache hit: return cached vector, skip API call
    - Cache miss: call OpenAI, store result
    OpenAI text-embedding-3-small: up to 2048 inputs per request.
    """
    keys = [_cache_key(t, model) for t in texts]
    cached = cache.mget(keys)

    result: list[list[float] | None] = [None] * len(texts)
    miss_indices: list[int] = []

    for i, val in enumerate(cached):
        if val is not None:
            result[i] = json.loads(val)
        else:
            miss_indices.append(i)

    if miss_indices:
        miss_texts = [texts[i] for i in miss_indices]
        # Batch in groups of 2048 (OpenAI limit)
        embeddings: list[list[float]] = []
        for batch_start in range(0, len(miss_texts), 2048):
            batch = miss_texts[batch_start : batch_start + 2048]
            response = openai_client.embeddings.create(input=batch, model=model)
            embeddings.extend([item.embedding for item in response.data])

        pipe = cache.pipeline()
        for idx, embedding in zip(miss_indices, embeddings):
            result[idx] = embedding
            pipe.setex(_cache_key(texts[idx], model), CACHE_TTL, json.dumps(embedding))
        pipe.execute()

    return result  # type: ignore[return-value]

Production Patterns — Filtering, Recall Monitoring, Stale Embeddings, and Backups

Pre-filter Before Re-ranking

Retrieve top-k * 3 candidates from the vector index, apply metadata filters and business rules in application code, then re-rank the filtered set with a cross-encoder or LLM scoring call. This separates recall (vector index) from precision (re-ranker) and avoids the precision loss of applying filters inside the ANN index, which forces the index to skip graph nodes and degrades recall.

Monitor recall@k with Ground Truth Pairs

Maintain a golden evaluation set of (query, expected_document_ids) pairs. Run recall@k computation as a scheduled job after every embedding model upgrade, index reconfiguration, or schema change. Alert if recall@10 drops more than 2 percentage points from the baseline. Store results in a time-series table so you can correlate recall changes with deployment events.

Handle Stale Embeddings on Model Upgrade

Embedding models are not compatible across versions — a vector produced by text-embedding-3-small cannot be compared to one produced by text-embedding-3-large. When upgrading, write new embeddings to a shadow column or shadow namespace, verify recall on your evaluation set, then atomically cut over traffic. Never mix embeddings from different models in the same index; the nearest-neighbor distances will be meaningless.

Backup and Disaster Recovery

For pgvector, your embeddings are in PostgreSQL — use standard pg_dump or continuous WAL archiving. For Weaviate, use the built-in backup API to S3/GCS; schedule daily backups and test restores monthly. For Pinecone, export your source documents and re-embed on disaster recovery — Pinecone does not expose a full index export API. In all cases, treat the source text corpus as the source of truth and the index as a derived artifact that can be rebuilt.

# recall_monitor.py
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable


@dataclass
class RecallResult:
    timestamp: str
    model: str
    index_config: dict
    k: int
    recall_at_k: float
    num_queries: int

    def to_dict(self) -> dict:
        return {
            "timestamp": self.timestamp,
            "model": self.model,
            "index_config": self.index_config,
            "k": self.k,
            "recall_at_k": self.recall_at_k,
            "num_queries": self.num_queries,
        }


def compute_recall_at_k(
    eval_pairs: list[dict],   # [{"query_vector": [...], "relevant_ids": [...]}]
    search_fn: Callable,      # fn(query_vector, top_k) -> list of {"id": ...}
    k: int = 10,
) -> float:
    """
    Compute recall@k over an evaluation set.
    recall@k = fraction of relevant docs found in top-k results, averaged over queries.
    """
    total_recall = 0.0
    for pair in eval_pairs:
        results = search_fn(pair["query_vector"], top_k=k)
        retrieved_ids = {r["id"] for r in results}
        relevant_ids = set(pair["relevant_ids"])
        if relevant_ids:
            recall = len(retrieved_ids & relevant_ids) / len(relevant_ids)
            total_recall += recall
    return total_recall / len(eval_pairs) if eval_pairs else 0.0


def assert_recall_baseline(
    recall: float,
    baseline: float = 0.95,
    tolerance: float = 0.02,
) -> None:
    """Raise if recall drops below baseline minus tolerance."""
    threshold = baseline - tolerance
    if recall < threshold:
        raise AssertionError(
            f"recall@k dropped to {recall:.3f}, below threshold {threshold:.3f}. "
            "Check index config or embedding model changes."
        )

Decision Framework — When to Use Which

Choosing a vector database is an operational and architectural decision as much as a technical one. The wrong choice creates migration costs and performance problems that compound as your data volume grows. Use the following framework to narrow the decision.

Choose pgvector when

You already run PostgreSQL and want to avoid adding operational complexity. Your vector corpus is under 10 million rows and query latency is not sub-10ms. You need transactional consistency between application records and embeddings — for example, deleting a user account must atomically remove their documents. Your team is comfortable tuning PostgreSQL but has no experience operating dedicated vector infrastructure.

Choose Weaviate when

You need hybrid keyword + semantic search as a first-class feature — for example, a document search product where users expect both full-text matching and semantic relevance. You are building a multi-tenant SaaS RAG product and need strong tenant isolation with per-tenant index performance. You want built-in vectorizer module support to simplify your embedding pipeline at the cost of tight coupling to a specific embedding provider.

Choose Pinecone when

You want zero infrastructure to operate and are willing to pay the managed service premium. Your team cannot staff a dedicated infrastructure engineer for vector database operations. You need to scale from prototype to production rapidly without tuning index parameters. Your workload is bursty and serverless billing aligns with your cost model. You have a strict requirement that vector infrastructure lives inside a specific cloud provider and region.

Note

None of these options is a permanent commitment. Teams frequently start with pgvector for its simplicity, migrate to Weaviate or Pinecone when recall quality or throughput requirements exceed what PostgreSQL can deliver at acceptable cost, and then standardize on a dedicated vector store. Build your embedding pipeline and retrieval interface as an abstraction layer from day one — the database-specific code should be isolated behind a repository interface so that swapping the underlying store requires changing one module, not rewriting your entire RAG stack.

Further Reading

  • pgvector on GitHub — source code, HNSW and IVFFlat documentation, distance operator reference, and PostgreSQL version compatibility matrix
  • Weaviate Documentation — Python client v4 API, multi-tenancy configuration, hybrid search alpha tuning, and vectorizer module setup guides
  • Pinecone Documentation — serverless vs pod-based index selection, namespace design, metadata filtering best practices, and upsert batching guidelines
  • HNSW Paper — Efficient and Robust Approximate Nearest Neighbor Search — the original Malkov & Yashunin paper describing the HNSW algorithm underlying all three production vector stores covered in this article
  • OpenAI Embeddings Guide — text-embedding-3-small vs text-embedding-3-large benchmarks, dimension reduction, and batching recommendations

Work with us

Building a RAG system and not sure which vector database fits your production requirements?

We design and implement vector search infrastructure for production AI systems — from pgvector HNSW schema design and Weaviate hybrid search pipelines to Pinecone serverless deployments, embedding pipeline optimization, metadata filtering strategies, and recall monitoring. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.