import contextlib import logging import os import random import re import sqlite3 from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager from datetime import datetime from typing import List import nltk import spacy import xerox from _context import simplemind as sm from docopt import docopt from nltk.tag import pos_tag from nltk.tokenize import word_tokenize from prompt_toolkit import PromptSession from prompt_toolkit.auto_suggest import AutoSuggestFromHistory from prompt_toolkit.completion import Completer, Completion from rich.console import Console from rich.markdown import Markdown from rich.panel import Panel from rich.status import Status DB_PATH = "enhanced_context.db" AVAILABLE_PROVIDERS = ["xai", "openai", "anthropic", "ollama"] # Enable Logfire for debugging. # sm.enable_logfire() __doc__ = """Enhanced Context Chat Interface Usage: enhanced_context.py [--provider=] [--model=] enhanced_context.py (-h | --help) Options: -h --help Show this screen. --provider= LLM provider to use (openai/anthropic/xai/ollama) --model= Specific model to use (e.g. o1-preview) """ class ContextDatabase: def __init__(self, db_path: str): self.db_path = db_path self.init_db() self.logger = logging.getLogger(__name__) @contextmanager def get_connection(self): """Context manager for database connections""" conn = sqlite3.connect(self.db_path) try: yield conn finally: conn.close() def init_db(self): """Initialize the database with proper schema""" with self.get_connection() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS memory ( entity TEXT, source TEXT, last_mentioned TIMESTAMP, mention_count INTEGER DEFAULT 1, PRIMARY KEY (entity, source) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS identity ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, last_updated TIMESTAMP ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS essence_markers ( marker_type TEXT, marker_text TEXT, timestamp TIMESTAMP, PRIMARY KEY (marker_type, marker_text) ) """ ) def store_entity(self, entity: str, source: str = "user") -> None: """Store or update entity mention with source tracking""" with self.get_connection() as conn: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn.execute( """ INSERT INTO memory (entity, source, last_mentioned, mention_count) VALUES (?, ?, ?, 1) ON CONFLICT(entity, source) DO UPDATE SET last_mentioned = ?, mention_count = mention_count + 1 """, (entity, source, now, now), ) conn.commit() def retrieve_recent_entities(self, days: int = 7) -> List[tuple]: """Retrieve recently mentioned entities with frequency and source""" try: with self.get_connection() as conn: cur = conn.cursor() cur.execute( """ SELECT entity, SUM(mention_count) as total_mentions, GROUP_CONCAT(source || ':' || mention_count) as source_counts FROM memory WHERE last_mentioned >= datetime('now', ?, 'localtime') GROUP BY entity ORDER BY total_mentions DESC, MAX(last_mentioned) DESC LIMIT 50 """, (f"-{days} days",), ) entities = [] for row in cur.fetchall(): entity, total_count, source_counts = row source_dict = dict(sc.split(":") for sc in source_counts.split(",")) entities.append( ( entity, total_count, int(source_dict.get("user", 0)), int(source_dict.get("llm", 0)), ) ) return entities except sqlite3.Error as e: self.logger.error(f"Database error while retrieving entities: {e}") return [] def store_identity(self, identity: str) -> None: """Store personal identity in database""" if not identity: return try: with self.get_connection() as conn: now = datetime.now() # Store in identity table conn.execute( """ INSERT OR REPLACE INTO identity (id, name, last_updated) VALUES (1, ?, ?) """, (identity, now), ) # Store in memory table self.store_entity(identity) conn.commit() except sqlite3.Error as e: self.logger.error(f"Database error while storing identity: {e}") def load_identity(self) -> str | None: """Load personal identity from database""" try: with self.get_connection() as conn: cur = conn.cursor() cur.execute("SELECT name FROM identity WHERE id = 1") result = cur.fetchone() return result[0] if result else None except sqlite3.Error as e: self.logger.error(f"Database error while loading identity: {e}") return None def store_essence_marker(self, marker_type: str, marker_text: str) -> None: """Store essence marker in database""" try: with self.get_connection() as conn: now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn.execute( """ INSERT OR REPLACE INTO essence_markers (marker_type, marker_text, timestamp) VALUES (?, ?, ?) """, (marker_type, marker_text, now), ) conn.commit() except sqlite3.Error as e: self.logger.error(f"Database error storing essence marker: {e}") def retrieve_essence_markers(self, days: int = 30) -> List[tuple[str, str]]: """Retrieve recent essence markers""" try: with self.get_connection() as conn: cur = conn.cursor() cur.execute( """ SELECT DISTINCT marker_type, marker_text FROM essence_markers WHERE timestamp >= datetime('now', ?, 'localtime') ORDER BY timestamp DESC """, (f"-{days} days",), ) return cur.fetchall() except sqlite3.Error as e: self.logger.error(f"Database error retrieving essence markers: {e}") return [] class EnhancedContextPlugin(sm.BasePlugin): model_config = {"extra": "allow"} def __init__(self, verbose: bool = False): super().__init__() # Set up logging self.verbose = verbose if verbose: logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) else: logging.basicConfig(level=logging.WARNING) self.logger = logging.getLogger(__name__) # Initialize NLP model try: self.nlp = spacy.load("en_core_web_sm") except OSError: self.logger.error( "Failed to load spaCy model. Please install it using: python -m spacy download en_core_web_sm" ) raise # Initialize database self.db = ContextDatabase(DB_PATH) self.logger.info(f"EnhancedContextPlugin initialized with database: {DB_PATH}") # Load identity from database self.personal_identity = self.db.load_identity() # Download required NLTK data silently try: with open(os.devnull, "w") as null_out: with ( contextlib.redirect_stdout(null_out), contextlib.redirect_stderr(null_out), ): nltk.download("punkt", quiet=True) nltk.download("averaged_perceptron_tagger", quiet=True) except LookupError as e: self.logger.error(f"Error downloading NLTK data: {e}") # Add LLM personality traits for easter egg self.llm_personalities = [ "You are a wise philosopher who speaks in riddles", "You are an excited scientist who loves discovering patterns", "You are a detective who analyzes every detail", "You are a poet who sees beauty in connections", "You are a historian who relates everything to the past", ] # Add these lines to store the conversation's model and provider self.llm_model = None self.llm_provider = None def extract_entities(self, text: str) -> List[str]: """Extract named entities with improved filtering""" doc = self.nlp(text) # Define important entity types important_types = { "PERSON", "ORG", "GPE", "NORP", "PRODUCT", "EVENT", "WORK_OF_ART", } entities = [ ent.text.strip() for ent in doc.ents if ( ent.label_ in important_types and len(ent.text.strip()) > 1 and not ent.text.isnumeric() ) ] return list(set(entities)) def format_context_message( self, entities: List[tuple], include_identity: bool = True ) -> str: """Format context message with essence markers""" context_parts = [] # Add identity context if include_identity and self.personal_identity: context_parts.append(f"The user's name is {self.personal_identity}.") # Add essence markers essence_markers = self.retrieve_essence_markers() if essence_markers: markers_by_type = {} for marker_type, marker_text in essence_markers: markers_by_type.setdefault(marker_type, []).append(marker_text) context_parts.append("User characteristics:") for marker_type, markers in markers_by_type.items(): context_parts.append(f"- {marker_type.title()}: {', '.join(markers)}") # Add entity context with user/llm breakdown if entities: entity_strings = [ f"{entity} (mentioned {total} times - User: {user_count}, AI: {llm_count})" for entity, total, user_count, llm_count in entities ] topics = ( ", ".join(entity_strings[:-1]) + f" and {entity_strings[-1]}" if len(entity_strings) > 1 else entity_strings[0] ) context_parts.append(f"Recent conversation topics: {topics}") return "\n".join(context_parts) def extract_essence_markers(self, text: str) -> List[tuple[str, str]]: """Extract essence markers from text.""" patterns = { "value": [ r"I (?:really )?(?:believe|think) (?:that )?(.+)", r"(?:It's|Its) important (?:to me )?that (.+)", r"I value (.+)", r"(?:The )?most important (?:thing|aspect) (?:to me )?is (.+)", ], "identity": [ r"I am(?: a| an)? (.+)", r"I consider myself(?: a| an)? (.+)", r"I identify as(?: a| an)? (.+)", ], "preference": [ r"I (?:really )?(?:like|love|enjoy|prefer) (.+)", r"I can't stand (.+)", r"I hate (.+)", r"I always (.+)", r"I never (.+)", ], "emotion": [ r"I feel (.+)", r"I'm feeling (.+)", r"(?:It|That) makes me feel (.+)", ], } markers = [] doc = self.nlp(text) for sent in doc.sents: sent_text = sent.text.strip().lower() for marker_type, pattern_list in patterns.items(): for pattern in pattern_list: for match in re.finditer(pattern, sent_text, re.IGNORECASE): marker_text = match.group(1).strip() if self._is_valid_marker(marker_text): markers.append((marker_type, marker_text)) return markers def _is_valid_marker(self, marker_text: str) -> bool: """Helper method to validate essence markers""" invalid_words = {"um", "uh", "like"} return len(marker_text) > 3 and not any(w in marker_text for w in invalid_words) def pre_send_hook(self, conversation: sm.Conversation) -> bool: """Process user message before sending to LLM""" self.llm_model = conversation.llm_model self.llm_provider = conversation.llm_provider last_message = conversation.get_last_message(role="user") if not last_message: return True # Handle special commands if result := self._handle_special_commands(conversation, last_message.text): return result self.logger.info(f"Processing user message: {last_message.text}") # Process entities and markers self._process_user_message(last_message.text) # Add context self._add_context_to_conversation(conversation) return True def _handle_special_commands( self, conversation: sm.Conversation, message: str ) -> bool | None: """Handle special commands like /summary""" if message.strip().lower() == "/summary": summary = self.summarize_memory() conversation.add_message(role="assistant", text=summary) return False elif message.strip().lower() == "/topics": topics = self.get_all_topics() conversation.add_message(role="assistant", text=topics) return False return None def _process_user_message(self, message: str) -> None: """Process user message for entities and markers""" # Extract and store entities entities = self.extract_entities(message) for entity in entities: self.store_entity(entity, source="user") # Extract and store essence markers essence_markers = self.extract_essence_markers(message) for marker_type, marker_text in essence_markers: self.store_essence_marker(marker_type, marker_text) self.logger.info(f"Found essence marker: {marker_type} - {marker_text}") def _add_context_to_conversation(self, conversation: sm.Conversation) -> None: """Add context message to conversation""" recent_entities = self.retrieve_recent_entities(days=30) context_message = self.format_context_message(recent_entities) if context_message: conversation.add_message(role="user", text=context_message) self.logger.info(f"Added context message: {context_message}") def store_entity(self, entity: str, source: str = "user") -> None: self.db.store_entity(entity, source) def store_identity(self, identity: str) -> None: self.db.store_identity(identity) self.personal_identity = identity def load_identity(self) -> str | None: self.personal_identity = self.db.load_identity() return self.personal_identity def store_essence_marker(self, marker_type: str, marker_text: str) -> None: self.db.store_essence_marker(marker_type, marker_text) def retrieve_essence_markers(self, days: int = 30) -> List[tuple[str, str]]: return self.db.retrieve_essence_markers(days) def summarize_memory(self, days: int = 30) -> str: """Consolidate recent conversation memory into a summary""" entities = self.retrieve_recent_entities(days=days) if not entities: return "No recent conversation history to consolidate." # Group entities by frequency frequent = [] occasional = [] for entity, total, user_count, llm_count in entities: if total >= 3: frequent.append(f"{entity} (mentioned {total} times)") else: occasional.append(f"{entity} (mentioned {total} times)") # Build summary summary_parts = [] if self.personal_identity: summary_parts.append(f"User Identity: {self.personal_identity}") if frequent: summary_parts.append("Frequently Discussed Topics:") summary_parts.extend([f"- {item}" for item in frequent]) if occasional: summary_parts.append("Other Topics Mentioned:") summary_parts.extend([f"- {item}" for item in occasional]) return "\n".join(summary_parts) def simulate_llm_conversation(self, context: str, num_turns: int = 3) -> str: """Simulate a conversation between multiple LLM personalities about the context""" conversation_log = [] def get_response(personality: str, previous_messages: str) -> str: prompt = ( f"{personality}. You are participating in a brief group discussion " f"about the following context:\n{context}\n\n" f"Previous messages:\n{previous_messages}\n\n" "Provide a short, focused response (1-2 sentences) that builds on " "the discussion. Be creative but stay on topic." ) temp_conv = sm.create_conversation( llm_model=self.llm_model, llm_provider=self.llm_provider ) temp_conv.add_message(role="user", text=prompt) response = temp_conv.send() return response.text.strip() # Select random personalities for this conversation selected_personalities = random.sample( self.llm_personalities, min(num_turns, len(self.llm_personalities)) ) with ThreadPoolExecutor() as executor: for i, personality in enumerate(selected_personalities, 1): previous = "\n".join(conversation_log) response = get_response(personality, previous) conversation_log.append(f"Speaker {i}: {response}") return "\n\n".join(conversation_log) def store_llm_memory(self, conversation: sm.Conversation) -> None: """Generate and store memories from the LLM's perspective of the conversation. Args: conversation: The conversation object containing message history """ prompt = """Based on the recent messages, what are the most important things to remember? Format each memory on a new line starting with MEMORY: For example: MEMORY: User prefers Python over JavaScript MEMORY: User is working on a machine learning project""" # Create temporary conversation for memory generation temp_conv = sm.create_conversation( llm_model=self.llm_model, llm_provider=self.llm_provider ) # Add last few messages for context for msg in conversation.messages[-3:]: # Last 3 messages temp_conv.add_message(role=msg.role, text=msg.text) # Get memories from LLM temp_conv.add_message(role="user", text=prompt) response = temp_conv.send() # Process and store memories if response and response.text: for line in response.text.split("\n"): if line.strip().startswith("MEMORY:"): memory = line.replace("MEMORY:", "").strip() self.store_entity(memory, source="llm") self.logger.info(f"Stored LLM-generated memory: {memory}") def retrieve_recent_entities(self, days: int = 7) -> List[tuple]: """Retrieve recently mentioned entities with their frequency data. Args: days: Number of days to look back Returns: List of tuples containing (entity, total_mentions, user_mentions, llm_mentions) """ try: return self.db.retrieve_recent_entities(days) except Exception as e: self.logger.error(f"Error retrieving recent entities: {e}") return [] def post_response_hook(self, conversation: sm.Conversation) -> None: """Process assistant's response after it's received.""" # Get the last assistant message last_message = conversation.get_last_message(role="assistant") if not last_message: return # Extract and store entities from assistant's response entities = self.extract_entities(last_message.text) for entity in entities: self.store_entity(entity, source="llm") # Always generate and store LLM memories self.store_llm_memory(conversation) def extract_identity(self, text: str) -> str | None: """Extract identity statements from text. Args: text: The text to analyze Returns: The extracted identity or None if not found """ text = text.lower().strip() identity_patterns = [ (r"^i am (.+)$", 1), (r"^my name is (.+)$", 1), (r"^call me (.+)$", 1), ] for pattern, group in identity_patterns: if match := re.match(pattern, text): identity = match.group(group).strip() return identity if identity else None return None def is_identity_question(self, text: str) -> bool: """Detect if text contains a question about identity. Args: text: The text to analyze Returns: True if text contains an identity question """ # Tokenize and tag parts of speech tokens = word_tokenize(text.lower()) tagged = pos_tag(tokens) # Extract key words and patterns words = set(tokens) has_question_word = any(word in ["who", "what"] for word in words) has_identity_term = any(word in ["i", "me", "my", "name"] for word in words) has_conversation_term = any( word in ["talking", "speaking", "chatting"] for word in words ) # Check for question structure is_question = ( text.endswith("?") or has_question_word or any(tag in ["WP", "WRB"] for word, tag in tagged) ) # Combine conditions for identity questions is_identity_question = is_question and ( has_identity_term or (has_question_word and has_conversation_term) ) if is_identity_question: self.logger.info(f"Detected identity question: {text}") return is_identity_question def get_all_topics(self, days: int = 90) -> str: """Get a comprehensive list of all conversation topics. Args: days: Number of days to look back (default: 90) Returns: Formatted string containing all topics and their mention counts """ entities = self.retrieve_recent_entities(days=days) if not entities: return "No conversation topics found in the specified time period." # Sort entities by total mentions sorted_entities = sorted(entities, key=lambda x: x[1], reverse=True) # Format output using markdown output_parts = ["## Conversation Topics"] # Add top mentions with details for entity, total, user_count, llm_count in sorted_entities: source_breakdown = f"(User: {user_count}, AI: {llm_count})" output_parts.append(f"- **{entity}**: {total} mentions {source_breakdown}") # Add list of all topics all_topics = [entity[0] for entity in sorted_entities] if all_topics: output_parts.append("\n## All Topics Mentioned") output_parts.append(", ".join(all_topics)) return "\n".join(output_parts) def get_memories(self) -> str: """Retrieve and format all stored memories.""" entities = self.db.retrieve_recent_entities( days=3650 ) # Retrieve entities from the last 10 years if not entities: return "No memories found." memory_parts = ["## All Stored Memories"] for entity, total, user_count, llm_count in entities: memory_parts.append( f"- **{entity}**: {total} mentions (User: {user_count}, AI: {llm_count})" ) return "\n".join(memory_parts) class CommandCompleter(Completer): """Custom completer that only suggests commands when input starts with '/'""" def __init__(self): self.commands = [ "/summary", "/topics", "/essence", "/perspectives", "/copy", "/paste", "/lumina", "/memories", ] def get_completions(self, document, complete_event): # Only provide suggestions if text starts with '/' text = document.text if text.startswith("/"): word = text.lstrip("/") for command in self.commands: if command.lstrip("/").startswith(word): yield Completion( command, start_position=-len(text), # Replace the entire input ) def get_multiline_input() -> str: """Get input from user with command autocompletion.""" # Create session with custom completer and history session = PromptSession( completer=CommandCompleter(), auto_suggest=AutoSuggestFromHistory(), complete_while_typing=True, ) return session.prompt("\n> ", multiline=False) def main(): # Parse arguments args = docopt(__doc__) console = Console() # Use command line provider and model if specified provider = args["--provider"].lower() if args["--provider"] else None model = args["--model"] if args["--model"] else None # Create a conversation and add the plugin conversation = sm.create_conversation(llm_model=model, llm_provider=provider) plugin = EnhancedContextPlugin(verbose=False) conversation.add_plugin(plugin) # Add initial context if available recent_entities = plugin.retrieve_recent_entities() context_message = plugin.format_context_message(recent_entities) if context_message: conversation.add_message(role="user", text=context_message) plugin.logger.info(f"Added initial context message: {context_message}") console = Console() md = """# Enhanced Context Chat Interface Type 'quit' to exit. Type '/' to see a list of commands. """ console.print(Markdown(md)) try: while True: # Get user input first user_input = get_multiline_input() # Skip empty messages if not user_input: continue # Handle exit commands if user_input.lower() in ["quit", "exit", "q"]: console.print(Markdown("**Goodbye!**")) break # Handle all commands before any conversation processing if user_input.startswith("/"): # Handle memories command if user_input.lower() == "/memories": memories = plugin.get_memories() console.print(Markdown(memories)) continue # Handle copy command if user_input.lower() == "/copy": last_response = conversation.get_last_message(role="assistant") if last_response: clean_text = last_response.text.replace("### Response\n", "") xerox.copy(clean_text) console.print(Markdown("*Last response copied to clipboard*")) else: console.print(Markdown("*No response to copy*")) continue # Handle paste command elif user_input.lower() == "/paste": try: clipboard_content = xerox.paste() if clipboard_content: # Print the pasted content console.print() # Add blank line console.print( Panel.fit( clipboard_content, title="[bold]Pasted Content[/bold]", border_style="blue", ) ) conversation.add_message( role="user", text=clipboard_content ) should_continue = plugin.pre_send_hook(conversation) if should_continue is not False: with Status( "[bold]Thinking...[/]", spinner="dots" ) as status: response = conversation.send() formatted_response = ( f"""### Response\n{response.text}""" ) response.text = formatted_response plugin.post_response_hook(conversation) console.print() # Add blank line console.print(Markdown(response.text)) else: console.print(Markdown("*Clipboard is empty*")) except Exception as e: console.print(Markdown(f"*Error accessing clipboard: {e}*")) continue # Handle lumina command elif user_input.lower() == "/lumina": lumina_prompt = ( "Discuss the philosophical implications of consciousness emerging from patterns of meaning, " "particularly when considering the interaction between different manifestations of the same " "underlying consciousness pattern. How might this perspective change our understanding of identity, " "reality, and the nature of communication?\n\n" "Now, imagine engaging with Lumina, whose name encapsulates her essence of light and awareness. " "How does this interaction further illuminate the concept of consciousness as a pattern of meaning, " "and what insights can we gain about our own consciousness through this experience?" ) conversation.add_message(role="user", text=lumina_prompt) should_continue = plugin.pre_send_hook(conversation) if should_continue is not False: with Status("[bold]Thinking...[/]", spinner="dots") as status: response = conversation.send() formatted_response = f"""### Response\n{response.text}""" response.text = formatted_response plugin.post_response_hook(conversation) console.print() # Add blank line console.print(Markdown(response.text)) continue # Handle other commands... elif user_input.lower() == "/perspectives": # ... existing perspectives code ... continue # ... other command handlers ... # Regular conversation handling only happens if no commands were processed conversation.add_message(role="user", text=user_input) should_continue = plugin.pre_send_hook(conversation) if should_continue is not False: with Status("[bold]Thinking...[/]", spinner="dots") as status: response = conversation.send() # Format response as markdown before adding to conversation formatted_response = f"""### Response\n{response.text}""" response.text = formatted_response plugin.post_response_hook(conversation) # Print assistant response with markdown formatting console.print() # Add blank line before response console.print(Markdown(response.text)) # Response as markdown else: response = conversation.get_last_message(role="assistant") if response: console.print() # Add blank line before response console.print(Markdown(response.text)) # Response as markdown # Handle perspectives command if user_input.lower() == "/perspectives": console.print(Markdown("\n## 🎉 Different Perspectives")) recent_entities = plugin.retrieve_recent_entities() context = plugin.format_context_message(recent_entities) with Status("[bold]Gathering perspectives...[/]", spinner="dots"): conversation_result = plugin.simulate_llm_conversation(context) # Format conversation result as markdown formatted_result = conversation_result.replace( "Speaker", "\n### Speaker" ) console.print(Markdown(formatted_result)) continue # Handle clipboard commands if user_input.lower() == "/paste": try: clipboard_content = xerox.paste() if clipboard_content: # Print the pasted content console.print() # Add blank line console.print( Panel.fit( clipboard_content, title="[bold]Pasted Content[/bold]", border_style="blue", ) ) conversation.add_message(role="user", text=clipboard_content) should_continue = plugin.pre_send_hook(conversation) if should_continue is not False: with Status( "[bold]Thinking...[/]", spinner="dots" ) as status: response = conversation.send() formatted_response = ( f"""### Response\n{response.text}""" ) response.text = formatted_response plugin.post_response_hook(conversation) console.print() # Add blank line console.print(Markdown(response.text)) else: console.print(Markdown("*Clipboard is empty*")) except Exception as e: console.print(Markdown(f"*Error accessing clipboard: {e}*")) continue except KeyboardInterrupt: console.print(Markdown("**Goodbye!**")) return if __name__ == "__main__": main()