Knowledge Graph Engine (L1 Component) β
Structural relationship management system within the Noosphere architecture. Provides graph-based knowledge representation, complex traversal capabilities, and automated relationship discovery.
Architecture Overview β
text
ββββββββββββββββββββββββββββββββββββββββββββββββ
β Graph Agent β
β ββββββββββββββββββββββββββββββββββββββββββ β
β β Neo4j Core β β
β β β’ Cypher Query Engine β β
β β β’ Relationship Indexing β β
β β β Graph Analytics (APOC) β β
β β β’ Schema Management β β
β ββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββ¬βββββββββββββββββββββββββββββ
β
βββββββββββββββΌββββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββββββββββ
βEntity β βRelation β β Quality β
βExtract β βDetect β β Assessment β
β β β β β β
ββ’ NER β ββ’ NLP β ββ’ Edge Score β
ββ’ Linkingβ ββ’ Rules β ββ’ Validation β
ββ’ Merge β ββ’ ML β ββ’ Feedback Loop β
βββββββββββ βββββββββββ βββββββββββββββββββ
Graph Schema Design β
Core Node Types β
cypher
// Document nodes
(:Document {
id: "uuid",
title: "string",
type: "paper|code|documentation|article",
url: "string",
content_hash: "string",
created_at: datetime,
indexed_at: datetime,
metadata: {
authors: ["author_ids"],
publication_date: date,
venue: "string",
doi: "string",
language: "string"
}
})
// Concept nodes
(:Concept {
id: "uuid",
name: "string",
type: "technology|methodology|theory|domain",
description: "string",
canonical_form: "string",
aliases: ["string"],
confidence_score: float,
usage_frequency: int,
last_updated: datetime
})
// Author nodes
(:Author {
id: "uuid",
name: "string",
canonical_name: "string",
aliases: ["string"],
orcid: "string",
affiliations: ["institution_ids"],
expertise_areas: ["concept_ids"],
h_index: int,
publication_count: int
})
// Institution nodes
(:Institution {
id: "uuid",
name: "string",
type: "university|company|research_lab",
country: "string",
domain: "string",
founded: date,
research_areas: ["concept_ids"]
})
// Code Repository nodes
(:Repository {
id: "uuid",
name: "string",
url: "string",
language: "string",
stars: int,
forks: int,
topics: ["string"],
concepts: ["concept_ids"],
last_commit: datetime
})
Relationship Types and Properties β
cypher
// Document relationships
(:Document)-[:AUTHORED_BY {role: "primary|secondary", order: int}]->(:Author)
(:Document)-[:PUBLISHED_IN {venue: "string", year: int}]->(:Venue)
(:Document)-[:CITES {context: "string", frequency: int, sentiment: "positive|neutral|negative"}]->(:Document)
(:Document)-[:CONTAINS {tf_idf: float, prominence: float, context: "string"}]->(:Concept)
(:Document)-[:IMPLEMENTS {description: "string", completeness: float}]->(:Repository)
// Concept relationships
(:Concept)-[:RELATED_TO {
strength: float, // 0.0-1.0 relationship strength
type: "semantic|hierarchical|causal|temporal",
evidence_count: int, // supporting documents
confidence: float, // ML confidence score
created_by: "auto|manual|hybrid",
last_validated: datetime
}]->(:Concept)
(:Concept)-[:IS_SUBCONCEPT_OF {specificity: float}]->(:Concept)
(:Concept)-[:EVOLVES_INTO {timespan: "string", evidence: ["doc_ids"]}]->(:Concept)
(:Concept)-[:REQUIRES {dependency_type: "prerequisite|component|tool"}]->(:Concept)
// Author relationships
(:Author)-[:COLLABORATES_WITH {
frequency: int,
papers_count: int,
relationship_strength: float,
domains: ["concept_ids"]
}]->(:Author)
(:Author)-[:AFFILIATED_WITH {
start_date: date,
end_date: date,
role: "string",
current: boolean
}]->(:Institution)
// Dynamic relationships (time-aware)
(:Concept)-[:POPULARITY_OVER_TIME {
year: int,
paper_count: int,
citation_count: int,
trend: "rising|stable|declining"
}]->(:TimeSlice)
Query Patterns and Optimization β
Core Query Types β
1. Concept Exploration
cypher
// Find related concepts with evidence
MATCH (c1:Concept {name: "transformer architecture"})
-[r:RELATED_TO]-(c2:Concept)
MATCH (doc:Document)-[:CONTAINS]->(c1)
MATCH (doc)-[:CONTAINS]->(c2)
RETURN c2.name,
r.strength,
r.confidence,
count(doc) as supporting_docs
ORDER BY r.strength DESC, supporting_docs DESC
LIMIT 20;
// Multi-hop concept traversal
MATCH path = (start:Concept {name: "neural networks"})
-[:RELATED_TO*2..4]-(end:Concept)
WHERE end.name CONTAINS "memory"
RETURN path,
reduce(score = 1.0, rel in relationships(path) | score * rel.strength) as path_score
ORDER BY path_score DESC
LIMIT 10;
2. Research Trend Analysis
cypher
// Concept evolution over time
MATCH (c:Concept {name: "attention mechanism"})
-[:POPULARITY_OVER_TIME]->(ts:TimeSlice)
MATCH (docs:Document)-[:CONTAINS]->(c)
WHERE docs.metadata.publication_date.year = ts.year
RETURN ts.year,
ts.paper_count,
ts.citation_count,
ts.trend,
avg(docs.citation_count) as avg_impact
ORDER BY ts.year;
// Emerging research areas
MATCH (c:Concept)-[pt:POPULARITY_OVER_TIME]->(ts:TimeSlice)
WHERE ts.year >= 2022 AND pt.trend = "rising"
WITH c, sum(pt.paper_count) as total_papers,
sum(pt.citation_count) as total_citations
WHERE total_papers >= 10
RETURN c.name,
c.description,
total_papers,
total_citations,
(total_citations * 1.0 / total_papers) as avg_impact
ORDER BY avg_impact DESC;
3. Author and Collaboration Networks
cypher
// Find expert authors for a concept
MATCH (c:Concept {name: "graph neural networks"})
<-[:CONTAINS]-(doc:Document)
<-[:AUTHORED_BY]-(a:Author)
WITH a, count(doc) as paper_count,
sum(doc.citation_count) as total_citations
WHERE paper_count >= 3
RETURN a.name,
a.canonical_name,
paper_count,
total_citations,
(total_citations * 1.0 / paper_count) as avg_impact,
a.h_index
ORDER BY avg_impact DESC, paper_count DESC;
// Collaboration network analysis
MATCH (a1:Author)-[collab:COLLABORATES_WITH]-(a2:Author)
WHERE collab.frequency >= 3
MATCH (a1)-[:AUTHORED_BY]->(doc:Document)<-[:AUTHORED_BY]-(a2)
MATCH (doc)-[:CONTAINS]->(c:Concept {name: "machine learning"})
RETURN a1.name, a2.name,
collab.frequency,
collab.relationship_strength,
collect(DISTINCT c.name) as shared_concepts
ORDER BY collab.relationship_strength DESC;
4. Research Gap Identification
cypher
// Find under-connected concepts (potential research gaps)
MATCH (c:Concept)
WHERE c.usage_frequency >= 10
WITH c, size((c)-[:RELATED_TO]-()) as connection_count
WHERE connection_count < 5
MATCH (c)<-[:CONTAINS]-(doc:Document)
RETURN c.name,
c.description,
connection_count,
count(doc) as document_count,
avg(doc.citation_count) as avg_citations
ORDER BY document_count DESC, avg_citations DESC;
// Cross-domain connection opportunities
MATCH (c1:Concept)-[:RELATED_TO {type: "semantic"}]-(bridge:Concept)-[:RELATED_TO {type: "semantic"}]-(c2:Concept)
WHERE c1.type <> c2.type
AND NOT (c1)-[:RELATED_TO]-(c2)
WITH c1, c2, bridge,
size((c1)<-[:CONTAINS]-()) as c1_usage,
size((c2)<-[:CONTAINS]-()) as c2_usage
WHERE c1_usage >= 5 AND c2_usage >= 5
RETURN c1.name, c1.type,
c2.name, c2.type,
bridge.name as connecting_concept,
c1_usage, c2_usage
ORDER BY (c1_usage + c2_usage) DESC;
Automated Relationship Discovery β
Entity Extraction Pipeline β
python
class GraphEntityExtractor:
def extract_entities(self, document: Document) -> List[Entity]:
"""Multi-stage entity extraction and linking"""
# Stage 1: Named Entity Recognition
raw_entities = self.ner_pipeline(document.content)
# Stage 2: Concept extraction using domain models
concepts = self.concept_extractor(document.content, domain=document.domain)
# Stage 3: Entity linking and canonicalization
linked_entities = self.entity_linker(raw_entities + concepts)
# Stage 4: Disambiguation and merging
canonical_entities = self.disambiguate(linked_entities)
return canonical_entities
def extract_relationships(self,
document: Document,
entities: List[Entity]) -> List[Relationship]:
"""Automated relationship extraction"""
relationships = []
# Syntactic relationships (dependency parsing)
syntactic_rels = self.dependency_parser(document.content, entities)
relationships.extend(syntactic_rels)
# Semantic relationships (ML model)
semantic_rels = self.semantic_relation_model(document.content, entities)
relationships.extend(semantic_rels)
# Pattern-based relationships (rule engine)
pattern_rels = self.pattern_engine(document.content, entities)
relationships.extend(pattern_rels)
# Cross-document relationships (citation analysis)
if document.citations:
citation_rels = self.citation_analyzer(document, entities)
relationships.extend(citation_rels)
return self.deduplicate_relationships(relationships)
Relationship Quality Assessment β
python
class RelationshipQualityAssessor:
def assess_edge_quality(self,
relationship: Relationship,
evidence: Evidence) -> QualityScore:
"""Multi-dimensional relationship quality assessment"""
scores = {}
# Evidence strength (number of supporting documents)
scores['evidence_strength'] = self.calculate_evidence_strength(evidence)
# Source credibility (quality of supporting documents)
scores['source_credibility'] = self.assess_source_quality(evidence.sources)
# Consistency (agreement across sources)
scores['consistency'] = self.measure_consistency(evidence)
# Semantic coherence (NLP-based validation)
scores['semantic_coherence'] = self.validate_semantic_coherence(relationship)
# Temporal stability (relationship persistence over time)
scores['temporal_stability'] = self.assess_temporal_stability(relationship)
# Expert validation (human feedback when available)
if evidence.expert_feedback:
scores['expert_validation'] = evidence.expert_feedback.score
# Composite score calculation
weights = {
'evidence_strength': 0.25,
'source_credibility': 0.20,
'consistency': 0.20,
'semantic_coherence': 0.15,
'temporal_stability': 0.15,
'expert_validation': 0.05
}
composite_score = sum(
scores.get(dimension, 0.0) * weight
for dimension, weight in weights.items()
)
return QualityScore(
composite=composite_score,
dimensions=scores,
confidence=self.calculate_confidence(scores),
explanation=self.generate_explanation(scores)
)
def quality_feedback_loop(self, user_feedback: UserFeedback):
"""Incorporate user feedback to improve quality assessment"""
# Update quality models based on user corrections
if user_feedback.type == "relationship_correction":
self.update_relationship_model(user_feedback)
# Retrain quality assessment weights
if user_feedback.type == "quality_rating":
self.retrain_quality_weights(user_feedback)
# Update entity linking confidence
if user_feedback.type == "entity_correction":
self.update_entity_linker(user_feedback)
Graph Analytics and Insights β
Community Detection β
cypher
// Research community identification using Louvain algorithm
CALL gds.louvain.stream('research_graph', {
relationshipTypes: ['COLLABORATES_WITH', 'CITES', 'RELATED_TO'],
relationshipWeightProperty: 'strength'
})
YIELD nodeId, communityId
MATCH (n) WHERE id(n) = nodeId
WITH communityId, collect(n) as community_members
WHERE size(community_members) >= 5
RETURN communityId,
[member IN community_members WHERE 'Author' IN labels(member) | member.name] as authors,
[member IN community_members WHERE 'Concept' IN labels(member) | member.name] as concepts,
size(community_members) as community_size
ORDER BY community_size DESC;
Centrality Analysis β
cypher
// Identify key concepts using PageRank
CALL gds.pageRank.stream('concept_graph', {
relationshipTypes: ['RELATED_TO'],
relationshipWeightProperty: 'strength',
dampingFactor: 0.85
})
YIELD nodeId, score
MATCH (c:Concept) WHERE id(c) = nodeId
RETURN c.name,
c.description,
score as importance_score,
c.usage_frequency,
size((c)-[:RELATED_TO]-()) as connection_count
ORDER BY score DESC
LIMIT 20;
// Identify bridge concepts (high betweenness centrality)
CALL gds.betweenness.stream('concept_graph', {
relationshipTypes: ['RELATED_TO']
})
YIELD nodeId, score
MATCH (c:Concept) WHERE id(c) = nodeId
WHERE score > 0
RETURN c.name,
c.type,
score as bridge_importance,
c.usage_frequency
ORDER BY score DESC
LIMIT 15;
Temporal Evolution Tracking β
cypher
// Track concept evolution patterns
MATCH (old:Concept)-[evolves:EVOLVES_INTO]->(new:Concept)
MATCH (old)<-[:CONTAINS]-(old_docs:Document),
(new)<-[:CONTAINS]-(new_docs:Document)
WITH old, new, evolves,
avg(old_docs.metadata.publication_date.year) as old_period,
avg(new_docs.metadata.publication_date.year) as new_period,
count(old_docs) as old_usage,
count(new_docs) as new_usage
RETURN old.name as original_concept,
new.name as evolved_concept,
(new_period - old_period) as evolution_years,
old_usage, new_usage,
(new_usage * 1.0 / old_usage) as adoption_ratio,
evolves.evidence as supporting_evidence
ORDER BY adoption_ratio DESC;
Integration with Vector Search β
Hybrid Query Processing β
python
class HybridGraphVectorSearch:
def search(self, query: str, context: SearchContext) -> SearchResults:
"""Combine graph traversal with vector similarity"""
# Stage 1: Vector similarity for initial candidates
vector_candidates = self.vector_search.search(query, top_k=100)
# Stage 2: Graph expansion for related concepts
graph_expansion = []
for candidate in vector_candidates[:20]:
related = self.graph_agent.find_related(
candidate.entity_id,
max_hops=2,
min_relationship_strength=0.3
)
graph_expansion.extend(related)
# Stage 3: Re-ranking with graph signals
combined_candidates = vector_candidates + graph_expansion
reranked = self.rerank_with_graph_features(
combined_candidates,
query,
context
)
return SearchResults(
results=reranked,
explanation=self.generate_explanation(query, reranked)
)
def rerank_with_graph_features(self,
candidates: List[Candidate],
query: str,
context: SearchContext) -> List[Candidate]:
"""Enhance ranking with graph-based features"""
for candidate in candidates:
# Graph centrality features
candidate.features['pagerank'] = self.get_pagerank_score(candidate.entity_id)
candidate.features['betweenness'] = self.get_betweenness_score(candidate.entity_id)
# Relationship density features
candidate.features['connection_count'] = self.get_connection_count(candidate.entity_id)
candidate.features['relationship_strength'] = self.get_avg_relationship_strength(candidate.entity_id)
# Context relevance features
if context.user_history:
candidate.features['user_context_similarity'] = self.calculate_user_context_similarity(
candidate.entity_id, context.user_history
)
# Re-ranking with combined vector + graph features
return self.ml_reranker.rerank(candidates, query, context)
Performance and Scalability β
Indexing Strategy β
cypher
// Core indexes for performance
CREATE INDEX concept_name FOR (c:Concept) ON (c.name);
CREATE INDEX concept_canonical FOR (c:Concept) ON (c.canonical_form);
CREATE INDEX document_hash FOR (d:Document) ON (d.content_hash);
CREATE INDEX author_canonical FOR (a:Author) ON (a.canonical_name);
CREATE FULLTEXT INDEX document_content FOR (d:Document) ON EACH [d.title, d.abstract];
// Relationship indexes
CREATE INDEX relationship_strength FOR ()-[r:RELATED_TO]-() ON (r.strength);
CREATE INDEX relationship_type FOR ()-[r:RELATED_TO]-() ON (r.type);
CREATE INDEX collaboration_frequency FOR ()-[r:COLLABORATES_WITH]-() ON (r.frequency);
Partitioning and Sharding β
python
class GraphPartitionStrategy:
def partition_by_domain(self, graph: Neo4jGraph) -> List[Partition]:
"""Partition graph by research domains for scalability"""
domains = self.identify_domains(graph)
partitions = []
for domain in domains:
partition = Partition(
domain=domain,
nodes=self.extract_domain_nodes(graph, domain),
relationships=self.extract_domain_relationships(graph, domain),
cross_partition_edges=self.identify_cross_domain_edges(graph, domain)
)
partitions.append(partition)
return partitions
def optimize_query_routing(self, query: CypherQuery, partitions: List[Partition]) -> QueryPlan:
"""Route queries to minimize cross-partition operations"""
relevant_partitions = self.identify_relevant_partitions(query, partitions)
if len(relevant_partitions) == 1:
return QueryPlan(type="single_partition", target=relevant_partitions[0])
else:
return QueryPlan(
type="multi_partition",
targets=relevant_partitions,
coordination_strategy="scatter_gather"
)
## Provider Abstraction Layer
### Graph Database Interface
```python
from abc import ABC, abstractmethod
from typing import Protocol, List, Dict, Any, Optional, Union
from dataclasses import dataclass
@dataclass
class GraphNode:
"""Universal graph node representation"""
id: str
labels: List[str]
properties: Dict[str, Any]
@dataclass
class GraphRelationship:
"""Universal graph relationship representation"""
id: str
type: str
start_node_id: str
end_node_id: str
properties: Dict[str, Any]
@dataclass
class GraphQuery:
"""Universal graph query representation"""
query_type: str # "cypher", "gremlin", "sql", "graphql"
query_text: str
parameters: Dict[str, Any]
timeout_ms: Optional[int] = None
class GraphDatabaseProvider(Protocol):
"""Abstract interface for graph database providers"""
@abstractmethod
def connect(self, connection_string: str, credentials: Dict[str, str]) -> bool:
"""Establish connection to graph database"""
pass
@abstractmethod
def execute_query(self, query: GraphQuery) -> List[Dict[str, Any]]:
"""Execute a graph query and return results"""
pass
@abstractmethod
def create_node(self, node: GraphNode) -> str:
"""Create a node and return its ID"""
pass
@abstractmethod
def create_relationship(self, relationship: GraphRelationship) -> str:
"""Create a relationship and return its ID"""
pass
@abstractmethod
def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
"""Update node properties"""
pass
@abstractmethod
def delete_node(self, node_id: str) -> bool:
"""Delete a node and its relationships"""
pass
@abstractmethod
def find_neighbors(self,
node_id: str,
relationship_types: List[str] = None,
max_depth: int = 1) -> List[GraphNode]:
"""Find neighboring nodes"""
pass
@abstractmethod
def get_schema(self) -> Dict[str, Any]:
"""Get database schema information"""
pass
@abstractmethod
def create_index(self, label: str, property_name: str) -> bool:
"""Create an index for performance"""
pass
@abstractmethod
def get_statistics(self) -> Dict[str, Any]:
"""Get database statistics"""
pass
class Neo4jProvider(GraphDatabaseProvider):
"""Neo4j implementation of graph database provider"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.driver = None
def connect(self, connection_string: str, credentials: Dict[str, str]) -> bool:
"""Connect to Neo4j database"""
try:
from neo4j import GraphDatabase
self.driver = GraphDatabase.driver(
connection_string,
auth=(credentials['username'], credentials['password']),
**self.config.get('driver_config', {})
)
# Test connection
with self.driver.session() as session:
result = session.run("RETURN 1 as test")
return result.single()['test'] == 1
except Exception as e:
print(f"Neo4j connection failed: `{e}`")
return False
def execute_query(self, query: GraphQuery) -> List[Dict[str, Any]]:
"""Execute Cypher query"""
if query.query_type != "cypher":
raise ValueError("Neo4j only supports Cypher queries")
with self.driver.session() as session:
result = session.run(
query.query_text,
query.parameters,
timeout=query.timeout_ms / 1000 if query.timeout_ms else None
)
return [record.data() for record in result]
def create_node(self, node: GraphNode) -> str:
"""Create node in Neo4j"""
labels_str = ":".join(node.labels)
props_str = ", ".join([f"{k}: ${k}" for k in node.properties.keys()])
cypher = f"""
CREATE (n:`{labels_str}` {{{props_str}}})
RETURN id(n) as node_id
"""
result = self.execute_query(GraphQuery(
query_type="cypher",
query_text=cypher,
parameters=node.properties
))
return str(result[0]['node_id'])
def create_relationship(self, relationship: GraphRelationship) -> str:
"""Create relationship in Neo4j"""
props_str = ", ".join([f"{k}: ${k}" for k in relationship.properties.keys()])
cypher = f"""
MATCH (a) WHERE id(a) = $start_id
MATCH (b) WHERE id(b) = $end_id
CREATE (a)-[r:`{relationship.type}` {{{props_str}}}]->(b)
RETURN id(r) as rel_id
"""
params = relationship.properties.copy()
params['start_id'] = int(relationship.start_node_id)
params['end_id'] = int(relationship.end_node_id)
result = self.execute_query(GraphQuery(
query_type="cypher",
query_text=cypher,
parameters=params
))
return str(result[0]['rel_id'])
def find_neighbors(self,
node_id: str,
relationship_types: List[str] = None,
max_depth: int = 1) -> List[GraphNode]:
"""Find neighbors in Neo4j"""
rel_filter = ""
if relationship_types:
rel_types = "|".join(relationship_types)
rel_filter = f":`{rel_types}`"
cypher = f"""
MATCH (start)-[r{rel_filter}*1..{max_depth}]-(neighbor)
WHERE id(start) = $node_id
RETURN DISTINCT id(neighbor) as id, labels(neighbor) as labels, properties(neighbor) as props
"""
result = self.execute_query(GraphQuery(
query_type="cypher",
query_text=cypher,
parameters={"node_id": int(node_id)}
))
neighbors = []
for record in result:
neighbors.append(GraphNode(
id=str(record['id']),
labels=record['labels'],
properties=record['props']
))
return neighbors
def get_statistics(self) -> Dict[str, Any]:
"""Get Neo4j database statistics"""
cypher = """
MATCH (n)
OPTIONAL MATCH ()-[r]->()
RETURN count(DISTINCT n) as node_count, count(r) as relationship_count
"""
result = self.execute_query(GraphQuery(
query_type="cypher",
query_text=cypher,
parameters={}
))
return {
'nodes': result[0]['node_count'],
'relationships': result[0]['relationship_count'],
'provider': 'neo4j'
}
class ArangoDBProvider(GraphDatabaseProvider):
"""ArangoDB implementation of graph database provider"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.client = None
self.database = None
def connect(self, connection_string: str, credentials: Dict[str, str]) -> bool:
"""Connect to ArangoDB"""
try:
from arango import ArangoClient
self.client = ArangoClient(hosts=connection_string)
self.database = self.client.db(
credentials.get('database', '_system'),
username=credentials['username'],
password=credentials['password']
)
# Test connection
return self.database.version() is not None
except Exception as e:
print(f"ArangoDB connection failed: `{e}`")
return False
def execute_query(self, query: GraphQuery) -> List[Dict[str, Any]]:
"""Execute AQL query"""
if query.query_type not in ["aql", "graphql"]:
raise ValueError("ArangoDB supports AQL and GraphQL queries")
cursor = self.database.aql.execute(
query.query_text,
bind_vars=query.parameters,
ttl=query.timeout_ms / 1000 if query.timeout_ms else 60
)
return list(cursor)
# Implement other abstract methods for ArangoDB...
class PostgreSQLGraphProvider(GraphDatabaseProvider):
"""PostgreSQL with graph extensions (AGE) implementation"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.connection = None
def connect(self, connection_string: str, credentials: Dict[str, str]) -> bool:
"""Connect to PostgreSQL with graph extensions"""
try:
import psycopg2
from psycopg2.extras import RealDictCursor
self.connection = psycopg2.connect(
connection_string,
cursor_factory=RealDictCursor
)
# Load AGE extension if available
with self.connection.cursor() as cursor:
cursor.execute("CREATE EXTENSION IF NOT EXISTS age;")
cursor.execute("LOAD 'age';")
self.connection.commit()
return True
except Exception as e:
print(f"PostgreSQL graph connection failed: `{e}`")
return False
def execute_query(self, query: GraphQuery) -> List[Dict[str, Any]]:
"""Execute SQL/Cypher hybrid query for PostgreSQL AGE"""
if query.query_type not in ["sql", "cypher"]:
raise ValueError("PostgreSQL graph supports SQL and Cypher-like queries")
with self.connection.cursor() as cursor:
cursor.execute(query.query_text, query.parameters)
return cursor.fetchall()
# Implement other abstract methods for PostgreSQL...
class GraphProviderFactory:
"""Factory for creating graph database providers"""
_providers = {
'neo4j': Neo4jProvider,
'arangodb': ArangoDBProvider,
'postgresql': PostgreSQLGraphProvider,
'memgraph': None, # Could add MemGraph support
'janusgraph': None, # Could add JanusGraph support
'dgraph': None, # Could add DGraph support
}
@classmethod
def create_provider(cls, provider_type: str, config: Dict[str, Any]) -> GraphDatabaseProvider:
"""Create a graph database provider"""
if provider_type not in cls._providers:
raise ValueError(f"Unknown graph provider: `{provider_type}`")
provider_class = cls._providers[provider_type]
if provider_class is None:
raise NotImplementedError(f"Provider `{provider_type}` not yet implemented")
return provider_class(config)
@classmethod
def get_available_providers(cls) -> List[str]:
"""Get list of available providers"""
return [k for k, v in cls._providers.items() if v is not None]
### Graph Provider Configuration
```python
@dataclass
class GraphDatabaseConfig:
"""Configuration for graph database providers"""
provider_type: str
connection_string: str
credentials: Dict[str, str]
provider_config: Dict[str, Any]
# Performance settings
connection_pool_size: int = 10
query_timeout_ms: int = 30000
retry_attempts: int = 3
# Feature flags
enable_transactions: bool = True
enable_schema_validation: bool = True
enable_query_caching: bool = True
class GraphDatabaseManager:
"""High-level manager for graph database operations"""
def __init__(self, config: GraphDatabaseConfig):
self.config = config
self.provider = GraphProviderFactory.create_provider(
config.provider_type,
config.provider_config
)
self.connected = False
def initialize(self) -> bool:
"""Initialize database connection and setup"""
try:
# Connect to database
self.connected = self.provider.connect(
self.config.connection_string,
self.config.credentials
)
if not self.connected:
return False
# Create indexes for performance
self._create_performance_indexes()
# Set up schema if validation is enabled
if self.config.enable_schema_validation:
self._setup_schema_validation()
return True
except Exception as e:
print(f"Database initialization failed: {e}")
return False
def _create_performance_indexes(self):
"""Create standard indexes for performance"""
try:
# Core concept indexes
self.provider.create_index("Concept", "name")
self.provider.create_index("Concept", "canonical_form")
# Document indexes
self.provider.create_index("Document", "content_hash")
self.provider.create_index("Document", "title")
# Author indexes
self.provider.create_index("Author", "canonical_name")
self.provider.create_index("Author", "orcid")
except Exception as e:
print(f"Index creation failed: {e}")
def execute_with_retry(self, query: GraphQuery) -> List[Dict[str, Any]]:
"""Execute query with retry logic"""
last_error = None
for attempt in range(self.config.retry_attempts):
try:
return self.provider.execute_query(query)
except Exception as e:
last_error = e
if attempt < self.config.retry_attempts - 1:
time.sleep(2 ** attempt) # Exponential backoff
raise last_error
Usage Examples β
Multi-Provider Setup β
python
# Configure different providers for different use cases
configs = {
'production': GraphDatabaseConfig(
provider_type='neo4j',
connection_string='neo4j://production-cluster:7687',
credentials={'username': 'neo4j', 'password': 'secure-password'},
provider_config={
'driver_config': {
'max_connection_pool_size': 50,
'connection_acquisition_timeout': 60
}
}
),
'development': GraphDatabaseConfig(
provider_type='postgresql',
connection_string='postgresql://localhost:5432/dev_graph',
credentials={'username': 'dev_user', 'password': 'dev_pass'},
provider_config={}
),
'research': GraphDatabaseConfig(
provider_type='arangodb',
connection_string='http://research-cluster:8529',
credentials={'username': 'researcher', 'password': 'research-key'},
provider_config={}
)
}
# Initialize managers
managers = {}
for env, config in configs.items():
manager = GraphDatabaseManager(config)
if manager.initialize():
managers[env] = manager
print(f"Initialized {env} graph database with {config.provider_type}")
# Use appropriate manager based on context
production_manager = managers['production']
research_manager = managers['research']
# Execute queries against different backends transparently
concept_query = GraphQuery(
query_type="cypher",
query_text="""
MATCH (c:Concept)-[:RELATED_TO]-(related:Concept)
WHERE c.name = $concept_name
RETURN related.name, related.description
LIMIT 10
""",
parameters={"concept_name": "machine learning"}
)
# Same query works across all providers
prod_results = production_manager.execute_with_retry(concept_query)
research_results = research_manager.execute_with_retry(concept_query)
Related Links β
Explore related documentation:
- Noosphere Layer - README - π Noosphere Layer - README | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- AI Staff - π AI Staff | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- Architecture - π Architecture | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- Hyperbolic Space - π Hyperbolic Space | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.
- MCP Integration - π MCP Integration | Step-by-step tutorial for Mnemoverse AI memory engine. Learn spatial memory concepts with practical examples.