Vector Search Implementation β
Purpose: Provide high-performance semantic search capabilities within the Noosphere layer using state-of-the-art embedding models and vector indexing techniques.
Architecture Overview β
Core Pipeline β
typescript
interface VectorSearchPipeline {
// Ingestion Pipeline
document_processor: DocumentProcessor;
chunking_engine: ChunkingEngine;
embedding_generator: EmbeddingGenerator;
vector_indexer: VectorIndexer;
// Query Pipeline
query_processor: QueryProcessor;
similarity_engine: SimilarityEngine;
result_ranker: ResultRanker;
metadata_enricher: MetadataEnricher;
}
Data Flow β
Ingestion: Document β Chunks β Embeddings β Vector Index β Metadata Storage
Query: Query β Embedding β Similarity Search β Reranking β Results + Context
Embedding Strategy β
1. Multi-Model Approach β
typescript
interface EmbeddingStrategy {
primary_model: "text-embedding-3-large"; // OpenAI latest (3072 dims)
fallback_model: "all-MiniLM-L6-v2"; // Sentence-BERT (384 dims)
specialized_models: {
code: "microsoft/codebert-base"; // Code-specific embeddings
documents: "sentence-transformers/all-mpnet-base-v2";
domain_specific: CustomEmbeddingModel; // Fine-tuned on domain data
};
}
class EmbeddingGenerator {
async generateEmbeddings(
text: string,
content_type: ContentType,
context?: EmbeddingContext
): Promise<EmbeddingResult> {
const model = this.selectModel(content_type, text.length);
const preprocessed_text = await this.preprocessText(text, content_type);
// Generate base embedding
const base_embedding = await model.embed(preprocessed_text);
// Apply contextual enhancements
const enhanced_embedding = await this.applyContextualEnhancements(
base_embedding,
context,
content_type
);
return {
embedding: enhanced_embedding,
model_used: model.name,
dimensions: enhanced_embedding.length,
preprocessing_applied: this.getPreprocessingSteps(content_type),
context_signals: context ? this.extractContextSignals(context) : null
};
}
private selectModel(content_type: ContentType, text_length: number): EmbeddingModel {
// Performance-optimized model selection
if (content_type === "code" && text_length < 2000) {
return this.models.code;
}
if (text_length > 8000) {
return this.models.primary_model; // Best quality for long content
}
if (this.isHighTrafficPeriod()) {
return this.models.fallback_model; // Faster, local model
}
return this.models.primary_model;
}
}
2. Contextual Enhancement β
typescript
class ContextualEmbeddingEnhancer {
async enhanceEmbedding(
base_embedding: number[],
context: EmbeddingContext
): Promise<number[]> {
const enhancements = [];
// User context signals
if (context.user_profile) {
const user_bias = await this.generateUserBias(context.user_profile);
enhancements.push(user_bias);
}
// Domain context
if (context.domain_signals) {
const domain_bias = await this.generateDomainBias(context.domain_signals);
enhancements.push(domain_bias);
}
// Temporal context
if (context.temporal_signals) {
const temporal_bias = await this.generateTemporalBias(context.temporal_signals);
enhancements.push(temporal_bias);
}
// Combine base embedding with context signals
return this.combineEmbeddings(base_embedding, enhancements);
}
private combineEmbeddings(base: number[], enhancements: number[][]): number[] {
const weights = [0.8, ...enhancements.map(() => 0.2 / enhancements.length)];
const all_embeddings = [base, ...enhancements];
return this.weightedAverage(all_embeddings, weights);
}
}
Vector Indexing β
1. Multi-Index Architecture β
typescript
interface VectorIndexingStrategy {
primary_index: HNSWIndex; // Hierarchical NSW for general search
fast_index: FlatIndex; // Exact search for small datasets
specialized_indexes: {
code: CodeOptimizedIndex; // Optimized for code similarity
temporal: TemporalAwareIndex; // Time-weighted similarities
hybrid: HybridIndex; // Combined vector + keyword
};
}
class VectorIndexManager {
private indexes: Map<string, VectorIndex> = new Map();
async indexDocument(
document: ProcessedDocument,
embedding: EmbeddingResult
): Promise<IndexingResult> {
const index_strategy = this.selectIndexStrategy(document);
const target_indexes = this.getTargetIndexes(index_strategy);
const indexing_results = await Promise.all(
target_indexes.map(async index => {
return await index.addVector({
id: document.id,
vector: embedding.embedding,
metadata: {
content_type: document.content_type,
source: document.source,
timestamp: document.timestamp,
entities: document.entities,
embedding_model: embedding.model_used
}
});
})
);
return {
document_id: document.id,
indexed_in: target_indexes.map(idx => idx.name),
indexing_latency_ms: performance.now() - start_time,
storage_size_bytes: this.calculateStorageSize(embedding)
};
}
}
2. HNSW Optimization β
typescript
class HNSWOptimizedIndex implements VectorIndex {
private config: HNSWConfig = {
M: 16, // Max connections per layer
efConstruction: 200, // Search width during construction
mL: 1 / Math.log(2), // Level generation factor
maxM: 16, // Max connections for layer 0
maxM0: 32, // Max connections for higher layers
};
async search(
query_embedding: number[],
options: SearchOptions
): Promise<SearchResult[]> {
const search_params = this.optimizeSearchParams(options);
// Dynamic ef parameter based on desired recall
const ef = this.calculateOptimalEf(
options.target_recall || 0.95,
options.max_results || 10
);
const candidates = await this.hnswIndex.search(
query_embedding,
options.max_results,
{ ef, ...search_params }
);
return candidates.map(candidate => ({
id: candidate.id,
score: candidate.distance,
metadata: candidate.metadata,
similarity: 1 - candidate.distance, // Convert distance to similarity
}));
}
private calculateOptimalEf(target_recall: number, k: number): number {
// Empirically derived formula for ef parameter
const base_ef = Math.max(k, 100);
const recall_multiplier = Math.pow(target_recall, -2);
return Math.min(Math.ceil(base_ef * recall_multiplier), 1000);
}
}
Query Processing β
1. Query Enhancement β
typescript
class QueryProcessor {
async processQuery(
query: string,
context: QueryContext
): Promise<ProcessedQuery> {
// Extract and enhance query components
const entities = await this.extractEntities(query);
const intent = await this.classifyIntent(query);
const expanded_terms = await this.expandQuery(query, context);
// Generate multiple query variations
const query_variations = [
query, // Original
...expanded_terms.synonyms, // Synonym expansion
...expanded_terms.domain_terms, // Domain-specific expansion
await this.paraphraseQuery(query) // Paraphrase variation
];
return {
original_query: query,
processed_queries: query_variations,
extracted_entities: entities,
intent_classification: intent,
expansion_metadata: {
synonym_count: expanded_terms.synonyms.length,
domain_terms_added: expanded_terms.domain_terms.length,
confidence_score: expanded_terms.confidence
}
};
}
private async expandQuery(query: string, context: QueryContext): Promise<QueryExpansion> {
const expansions = await Promise.all([
this.synonymExpansion(query),
this.domainExpansion(query, context.domain),
this.contextualExpansion(query, context.user_history),
this.entityExpansion(query, context.knowledge_graph)
]);
return this.mergeExpansions(expansions);
}
}
2. Multi-Vector Search β
typescript
class MultiVectorSearchEngine {
async search(
processed_query: ProcessedQuery,
options: SearchOptions
): Promise<SearchResult[]> {
// Generate embeddings for all query variations
const query_embeddings = await Promise.all(
processed_query.processed_queries.map(q =>
this.embedding_generator.generateEmbeddings(q, "query", options.context)
)
);
// Search with multiple embeddings
const search_results = await Promise.all(
query_embeddings.map(async (embedding, index) => {
const results = await this.vector_index.search(embedding.embedding, {
...options,
max_results: options.max_results * 2 // Get more candidates
});
return results.map(result => ({
...result,
query_variation_index: index,
embedding_model: embedding.model_used
}));
})
);
// Merge and deduplicate results
const merged_results = this.mergeSearchResults(search_results);
// Apply reranking
const reranked_results = await this.rerank(
merged_results,
processed_query,
options
);
return reranked_results.slice(0, options.max_results);
}
}
Advanced Features β
1. Hybrid Search (Vector + BM25) β
typescript
class HybridSearchEngine {
async hybridSearch(
query: string,
options: HybridSearchOptions
): Promise<SearchResult[]> {
// Parallel vector and keyword search
const [vector_results, keyword_results] = await Promise.all([
this.vector_search.search(query, { ...options, max_results: 100 }),
this.keyword_search.search(query, { ...options, max_results: 100 })
]);
// Reciprocal Rank Fusion (RRF)
const fused_results = this.reciprocalRankFusion(
vector_results,
keyword_results,
options.fusion_params || { k: 60 }
);
return fused_results.slice(0, options.max_results);
}
private reciprocalRankFusion(
vector_results: SearchResult[],
keyword_results: SearchResult[],
params: { k: number }
): SearchResult[] {
const result_scores: Map<string, number> = new Map();
// Score vector results
vector_results.forEach((result, rank) => {
const score = 1 / (params.k + rank + 1);
result_scores.set(result.id, (result_scores.get(result.id) || 0) + score);
});
// Score keyword results
keyword_results.forEach((result, rank) => {
const score = 1 / (params.k + rank + 1);
result_scores.set(result.id, (result_scores.get(result.id) || 0) + score);
});
// Combine and sort by fused score
const all_results = [...vector_results, ...keyword_results];
const unique_results = this.deduplicateById(all_results);
return unique_results
.map(result => ({
...result,
fused_score: result_scores.get(result.id) || 0
}))
.sort((a, b) => b.fused_score - a.fused_score);
}
}
2. Adaptive Reranking β
typescript
class AdaptiveReranker {
async rerank(
initial_results: SearchResult[],
query_context: QueryContext,
user_feedback: UserFeedbackHistory
): Promise<SearchResult[]> {
// Feature extraction for each result
const features = await Promise.all(
initial_results.map(async result => ({
result,
features: await this.extractRerankingFeatures(result, query_context)
}))
);
// Apply learned reranking model
const reranking_scores = await this.reranking_model.predict(
features.map(f => f.features),
{ user_context: user_feedback }
);
// Combine original scores with reranking scores
const reranked = features.map((item, index) => ({
...item.result,
original_score: item.result.similarity,
reranking_score: reranking_scores[index],
final_score: this.combineScores(
item.result.similarity,
reranking_scores[index],
query_context.reranking_weight || 0.3
)
}));
return reranked
.sort((a, b) => b.final_score - a.final_score)
.map(({ reranking_score, final_score, ...result }) => result);
}
private async extractRerankingFeatures(
result: SearchResult,
context: QueryContext
): Promise<RerankingFeatures> {
return {
// Content features
content_length: result.content?.length || 0,
entity_overlap: this.calculateEntityOverlap(result.entities, context.query_entities),
content_type_match: result.content_type === context.preferred_content_type ? 1 : 0,
// Temporal features
recency_score: this.calculateRecencyScore(result.timestamp),
update_frequency: await this.getUpdateFrequency(result.source),
// User features
personal_relevance: await this.calculatePersonalRelevance(result, context.user_profile),
historical_interaction: await this.getHistoricalInteraction(result.id, context.user_id),
// Quality signals
source_authority: await this.getSourceAuthority(result.source),
content_quality: await this.assessContentQuality(result.content),
citation_count: await this.getCitationCount(result.id)
};
}
}
Performance Optimization β
1. Caching Strategy β
typescript
class VectorSearchCache {
private embedding_cache: LRUCache<string, EmbeddingResult>;
private result_cache: LRUCache<string, SearchResult[]>;
private similarity_cache: LRUCache<string, number>;
constructor() {
this.embedding_cache = new LRUCache({ max: 10000, ttl: 1000 * 60 * 60 }); // 1 hour
this.result_cache = new LRUCache({ max: 5000, ttl: 1000 * 60 * 15 }); // 15 minutes
this.similarity_cache = new LRUCache({ max: 50000, ttl: 1000 * 60 * 60 }); // 1 hour
}
async getCachedEmbedding(text: string, model: string): Promise<EmbeddingResult | null> {
const cache_key = this.createEmbeddingCacheKey(text, model);
return this.embedding_cache.get(cache_key) || null;
}
async cacheEmbedding(text: string, model: string, embedding: EmbeddingResult): Promise<void> {
const cache_key = this.createEmbeddingCacheKey(text, model);
this.embedding_cache.set(cache_key, embedding);
}
async getCachedSearchResults(query_hash: string): Promise<SearchResult[] | null> {
return this.result_cache.get(query_hash) || null;
}
private createEmbeddingCacheKey(text: string, model: string): string {
return `${model}:${this.hashString(text)}`;
}
}
2. Performance Monitoring β
typescript
class VectorSearchMetrics {
private metrics: PerformanceMetrics = {
embedding_latency: new HistogramMetric("embedding_generation_ms"),
search_latency: new HistogramMetric("vector_search_ms"),
index_size: new GaugeMetric("vector_index_size_mb"),
cache_hit_rate: new GaugeMetric("cache_hit_rate_percent"),
throughput: new CounterMetric("searches_per_second")
};
recordSearchOperation(operation: SearchOperation): void {
this.metrics.search_latency.observe(operation.latency_ms);
this.metrics.throughput.increment();
if (operation.cache_hit) {
this.updateCacheHitRate(true);
}
if (operation.embedding_generated) {
this.metrics.embedding_latency.observe(operation.embedding_latency_ms);
}
}
async getPerformanceReport(): Promise<PerformanceReport> {
return {
avg_search_latency_ms: this.metrics.search_latency.average(),
p95_search_latency_ms: this.metrics.search_latency.percentile(95),
avg_embedding_latency_ms: this.metrics.embedding_latency.average(),
current_throughput: this.metrics.throughput.rate(),
cache_hit_rate: this.metrics.cache_hit_rate.value(),
index_memory_usage_mb: this.metrics.index_size.value()
};
}
}
Configuration & Deployment β
1. Environment Configuration β
typescript
interface VectorSearchConfig {
embedding: {
primary_model: string;
fallback_model: string;
api_key?: string;
batch_size: number;
max_tokens_per_request: number;
};
indexing: {
index_type: "hnsw" | "flat" | "ivf";
hnsw_params: HNSWParams;
update_strategy: "realtime" | "batch" | "scheduled";
reindex_threshold: number;
};
search: {
default_max_results: number;
similarity_threshold: number;
enable_reranking: boolean;
cache_ttl_seconds: number;
};
performance: {
max_concurrent_searches: number;
embedding_timeout_ms: number;
search_timeout_ms: number;
memory_limit_gb: number;
};
}
// Example production configuration
const PRODUCTION_CONFIG: VectorSearchConfig = {
embedding: {
primary_model: "text-embedding-3-large",
fallback_model: "all-MiniLM-L6-v2",
batch_size: 100,
max_tokens_per_request: 8192
},
indexing: {
index_type: "hnsw",
hnsw_params: { M: 16, efConstruction: 200 },
update_strategy: "realtime",
reindex_threshold: 0.1
},
search: {
default_max_results: 10,
similarity_threshold: 0.7,
enable_reranking: true,
cache_ttl_seconds: 900
},
performance: {
max_concurrent_searches: 100,
embedding_timeout_ms: 5000,
search_timeout_ms: 2000,
memory_limit_gb: 32
}
};
Integration Points β
1. Unified Search Interface β
typescript
// Integration with search abstraction layer
class VectorSearchProvider implements SearchProvider {
async search(request: UnifiedSearchRequest): Promise<UnifiedSearchResponse> {
const processed_query = await this.query_processor.processQuery(
request.query,
request.context
);
const search_results = await this.multi_vector_engine.search(
processed_query,
{
max_results: request.context?.max_results || 10,
similarity_threshold: 0.7,
enable_reranking: request.context?.budget?.max_latency_ms > 1000,
context: request.context
}
);
return {
results: search_results.map(this.formatUnifiedResult),
metadata: {
method_used: "vector",
confidence: this.calculateOverallConfidence(search_results),
latency_ms: performance.now() - start_time,
routing_reason: "Semantic similarity search requested"
}
};
}
}
2. MCP Integration β
typescript
// MCP tool for vector search
const VECTOR_SEARCH_TOOL = {
name: "vector_search",
description: "Semantic search using vector embeddings",
inputSchema: {
type: "object",
properties: {
query: { type: "string", description: "Search query" },
options: {
type: "object",
properties: {
max_results: { type: "number", default: 10 },
similarity_threshold: { type: "number", default: 0.7 },
content_types: {
type: "array",
items: { type: "string" },
description: "Filter by content types"
},
rerank: { type: "boolean", default: true }
}
}
},
required: ["query"]
}
};
Performance Benchmarks β
Target Performance (Production) β
- Embedding Generation: < 50ms (p95) for queries under 2000 tokens
- Vector Search: < 100ms (p95) for datasets up to 1M vectors
- End-to-End Search: < 200ms (p95) including reranking
- Throughput: > 1000 searches/second (with caching)
- Memory Efficiency: < 2GB RAM per 100k vectors (HNSW)
Scaling Characteristics β
- Index Size: Linear growth ~2-4MB per 1000 documents
- Search Latency: Logarithmic growth with dataset size
- Memory Usage: ~1.5x index size during search operations
- CPU Utilization: < 50% at target throughput
Vector Store Provider Abstraction β
Vector Database Interface β
python
from abc import ABC, abstractmethod
from typing import Protocol, List, Dict, Any, Optional, Union, Tuple
from dataclasses import dataclass
import numpy as np
@dataclass
class VectorDocument:
"""Universal vector document representation"""
id: str
vector: np.ndarray
metadata: Dict[str, Any]
content: Optional[str] = None
@dataclass
class VectorQuery:
"""Universal vector query representation"""
vector: np.ndarray
top_k: int = 10
filters: Optional[Dict[str, Any]] = None
similarity_threshold: Optional[float] = None
@dataclass
class VectorSearchResult:
"""Universal search result representation"""
id: str
score: float
metadata: Dict[str, Any]
content: Optional[str] = None
class VectorStoreProvider(Protocol):
"""Abstract interface for vector database providers"""
@abstractmethod
def connect(self, connection_config: Dict[str, Any]) -> bool:
"""Establish connection to vector database"""
pass
@abstractmethod
def create_index(self,
index_name: str,
dimension: int,
metric: str = "cosine",
index_config: Optional[Dict[str, Any]] = None) -> bool:
"""Create vector index"""
pass
@abstractmethod
def insert_vectors(self,
index_name: str,
documents: List[VectorDocument]) -> bool:
"""Insert vectors into index"""
pass
@abstractmethod
def search_vectors(self,
index_name: str,
query: VectorQuery) -> List[VectorSearchResult]:
"""Search for similar vectors"""
pass
@abstractmethod
def update_vector(self,
index_name: str,
document: VectorDocument) -> bool:
"""Update existing vector"""
pass
@abstractmethod
def delete_vectors(self,
index_name: str,
vector_ids: List[str]) -> bool:
"""Delete vectors by IDs"""
pass
@abstractmethod
def get_index_stats(self, index_name: str) -> Dict[str, Any]:
"""Get index statistics"""
pass
@abstractmethod
def list_indexes(self) -> List[str]:
"""List all available indexes"""
pass
class PineconeProvider(VectorStoreProvider):
"""Pinecone implementation of vector store provider"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.pinecone_client = None
def connect(self, connection_config: Dict[str, Any]) -> bool:
"""Connect to Pinecone"""
try:
import pinecone
pinecone.init(
api_key=connection_config['api_key'],
environment=connection_config['environment']
)
self.pinecone_client = pinecone
return True
except Exception as e:
print(f"Pinecone connection failed: {e}")
return False
def create_index(self,
index_name: str,
dimension: int,
metric: str = "cosine",
index_config: Optional[Dict[str, Any]] = None) -> bool:
"""Create Pinecone index"""
try:
config = index_config or {}
self.pinecone_client.create_index(
name=index_name,
dimension=dimension,
metric=metric,
pods=config.get('pods', 1),
replicas=config.get('replicas', 1),
pod_type=config.get('pod_type', 'p1.x1')
)
return True
except Exception as e:
print(f"Index creation failed: {e}")
return False
def insert_vectors(self,
index_name: str,
documents: List[VectorDocument]) -> bool:
"""Insert vectors into Pinecone"""
try:
index = self.pinecone_client.Index(index_name)
vectors = [
(doc.id, doc.vector.tolist(), doc.metadata)
for doc in documents
]
index.upsert(vectors=vectors)
return True
except Exception as e:
print(f"Vector insertion failed: {e}")
return False
def search_vectors(self,
index_name: str,
query: VectorQuery) -> List[VectorSearchResult]:
"""Search Pinecone index"""
try:
index = self.pinecone_client.Index(index_name)
results = index.query(
vector=query.vector.tolist(),
top_k=query.top_k,
filter=query.filters,
include_metadata=True
)
search_results = []
for match in results.matches:
search_results.append(VectorSearchResult(
id=match.id,
score=float(match.score),
metadata=match.metadata or {},
content=match.metadata.get('content') if match.metadata else None
))
return search_results
except Exception as e:
print(f"Vector search failed: {e}")
return []
class WeaviateProvider(VectorStoreProvider):
"""Weaviate implementation of vector store provider"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.client = None
def connect(self, connection_config: Dict[str, Any]) -> bool:
"""Connect to Weaviate"""
try:
import weaviate
auth_config = None
if 'api_key' in connection_config:
auth_config = weaviate.AuthApiKey(api_key=connection_config['api_key'])
self.client = weaviate.Client(
url=connection_config['url'],
auth_client_secret=auth_config,
additional_headers=connection_config.get('headers', {})
)
# Test connection
return self.client.is_ready()
except Exception as e:
print(f"Weaviate connection failed: {e}")
return False
def create_index(self,
index_name: str,
dimension: int,
metric: str = "cosine",
index_config: Optional[Dict[str, Any]] = None) -> bool:
"""Create Weaviate class (index)"""
try:
class_schema = {
"class": index_name,
"vectorizer": "none", # We provide our own vectors
"properties": [
{
"name": "content",
"dataType": ["text"]
},
{
"name": "metadata",
"dataType": ["object"]
}
],
"vectorIndexConfig": {
"distance": metric,
"ef": index_config.get('ef', 64) if index_config else 64,
"efConstruction": index_config.get('efConstruction', 128) if index_config else 128,
"maxConnections": index_config.get('maxConnections', 64) if index_config else 64
}
}
self.client.schema.create_class(class_schema)
return True
except Exception as e:
print(f"Class creation failed: {e}")
return False
def insert_vectors(self,
index_name: str,
documents: List[VectorDocument]) -> bool:
"""Insert vectors into Weaviate"""
try:
with self.client.batch as batch:
for doc in documents:
data_object = {
"content": doc.content,
"metadata": doc.metadata
}
batch.add_data_object(
data_object=data_object,
class_name=index_name,
uuid=doc.id,
vector=doc.vector.tolist()
)
return True
except Exception as e:
print(f"Vector insertion failed: {e}")
return False
class ChromaProvider(VectorStoreProvider):
"""ChromaDB implementation of vector store provider"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.client = None
def connect(self, connection_config: Dict[str, Any]) -> bool:
"""Connect to ChromaDB"""
try:
import chromadb
if 'path' in connection_config:
# Persistent client
self.client = chromadb.PersistentClient(
path=connection_config['path']
)
elif 'host' in connection_config:
# HTTP client
self.client = chromadb.HttpClient(
host=connection_config['host'],
port=connection_config.get('port', 8000)
)
else:
# In-memory client
self.client = chromadb.Client()
return True
except Exception as e:
print(f"ChromaDB connection failed: {e}")
return False
def create_index(self,
index_name: str,
dimension: int,
metric: str = "cosine",
index_config: Optional[Dict[str, Any]] = None) -> bool:
"""Create ChromaDB collection"""
try:
metadata = {"hnsw:space": metric}
if index_config:
metadata.update(index_config)
self.client.create_collection(
name=index_name,
metadata=metadata
)
return True
except Exception as e:
print(f"Collection creation failed: {e}")
return False
class FAISSProvider(VectorStoreProvider):
"""FAISS implementation of vector store provider"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.indexes = {}
self.metadata_store = {}
def connect(self, connection_config: Dict[str, Any]) -> bool:
"""Connect to FAISS (local setup)"""
try:
import faiss
self.faiss = faiss
return True
except ImportError:
print("FAISS not installed")
return False
def create_index(self,
index_name: str,
dimension: int,
metric: str = "cosine",
index_config: Optional[Dict[str, Any]] = None) -> bool:
"""Create FAISS index"""
try:
config = index_config or {}
index_type = config.get('index_type', 'flat')
if index_type == 'flat':
if metric == "cosine":
index = self.faiss.IndexFlatIP(dimension)
else:
index = self.faiss.IndexFlatL2(dimension)
elif index_type == 'hnsw':
M = config.get('M', 16)
if metric == "cosine":
index = self.faiss.IndexHNSWFlat(dimension, M, self.faiss.METRIC_INNER_PRODUCT)
else:
index = self.faiss.IndexHNSWFlat(dimension, M, self.faiss.METRIC_L2)
else:
raise ValueError(f"Unsupported index type: {index_type}")
self.indexes[index_name] = index
self.metadata_store[index_name] = {}
return True
except Exception as e:
print(f"FAISS index creation failed: {e}")
return False
def insert_vectors(self,
index_name: str,
documents: List[VectorDocument]) -> bool:
"""Insert vectors into FAISS index"""
try:
if index_name not in self.indexes:
return False
index = self.indexes[index_name]
vectors = np.array([doc.vector for doc in documents]).astype('float32')
# Normalize for cosine similarity if needed
if hasattr(index, 'metric_type') and index.metric_type == self.faiss.METRIC_INNER_PRODUCT:
self.faiss.normalize_L2(vectors)
# Store vectors in index
start_id = index.ntotal
index.add(vectors)
# Store metadata separately
for i, doc in enumerate(documents):
self.metadata_store[index_name][start_id + i] = {
'id': doc.id,
'metadata': doc.metadata,
'content': doc.content
}
return True
except Exception as e:
print(f"FAISS vector insertion failed: {e}")
return False
class VectorStoreFactory:
"""Factory for creating vector store providers"""
_providers = {
'pinecone': PineconeProvider,
'weaviate': WeaviateProvider,
'chroma': ChromaProvider,
'faiss': FAISSProvider,
'milvus': None, # Could add Milvus support
'qdrant': None, # Could add Qdrant support
'elasticsearch': None, # Could add Elasticsearch support
}
@classmethod
def create_provider(cls, provider_type: str, config: Dict[str, Any]) -> VectorStoreProvider:
"""Create a vector store provider"""
if provider_type not in cls._providers:
raise ValueError(f"Unknown vector store provider: {provider_type}")
provider_class = cls._providers[provider_type]
if provider_class is None:
raise NotImplementedError(f"Provider {provider_type} not yet implemented")
return provider_class(config)
@classmethod
def get_available_providers(cls) -> List[str]:
"""Get list of available providers"""
return [k for k, v in cls._providers.items() if v is not None]
### Vector Store Configuration
```python
@dataclass
class VectorStoreConfig:
"""Configuration for vector store providers"""
provider_type: str
connection_config: Dict[str, Any]
index_config: Dict[str, Any]
# Performance settings
batch_size: int = 100
max_retries: int = 3
timeout_seconds: int = 30
# Index settings
dimension: int = 1536
metric: str = "cosine"
# Provider-specific settings
provider_settings: Dict[str, Any] = None
class VectorStoreManager:
"""High-level manager for vector store operations"""
def __init__(self, config: VectorStoreConfig):
self.config = config
self.provider = VectorStoreFactory.create_provider(
config.provider_type,
config.provider_settings or {}
)
self.connected = False
def initialize(self) -> bool:
"""Initialize vector store connection"""
try:
self.connected = self.provider.connect(self.config.connection_config)
return self.connected
except Exception as e:
print(f"Vector store initialization failed: `{e}`")
return False
def create_search_index(self, index_name: str) -> bool:
"""Create optimized search index"""
return self.provider.create_index(
index_name=index_name,
dimension=self.config.dimension,
metric=self.config.metric,
index_config=self.config.index_config
)
def batch_insert(self,
index_name: str,
documents: List[VectorDocument]) -> bool:
"""Insert documents in batches"""
try:
batch_size = self.config.batch_size
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
success = self.provider.insert_vectors(index_name, batch)
if not success:
print(f"Failed to insert batch starting at index `{i}`")
return False
return True
except Exception as e:
print(f"Batch insertion failed: `{e}`")
return False
# Usage example
vector_configs = {
'production': VectorStoreConfig(
provider_type='pinecone',
connection_config={
'api_key': 'pinecone-api-key',
'environment': 'us-west1-gcp'
},
index_config={
'pods': 2,
'replicas': 1,
'pod_type': 'p1.x2'
},
dimension=1536,
metric='cosine'
),
'development': VectorStoreConfig(
provider_type='chroma',
connection_config={
'path': './dev_vector_db'
},
index_config={},
dimension=1536,
metric='cosine'
),
'local': VectorStoreConfig(
provider_type='faiss',
connection_config={},
index_config={
'index_type': 'hnsw',
'M': 16
},
dimension=1536,
metric='cosine'
)
}
Research: benchmarking embedding quality for scientific texts
See also: Search Abstraction, Hyperbolic Space
Related Links β
Explore related documentation:
- Noosphere Layer - README - π Noosphere Layer - README | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- AI Staff - π AI Staff | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- Architecture - π Architecture | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- Hyperbolic Space - π Hyperbolic Space | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- Knowledge Graph - π Knowledge Graph | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.