Coverage for src/mcp_internal/document_api.py: 92%
287 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-08 05:40 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-08 05:40 +0000
1"""
2Document API Module
4Handles all document structure, sections, metadata, and search operations.
5"""
7from typing import Dict, List, Any, Optional
8from datetime import datetime
10try:
11 from src.document_parser import Section
12 from src.content_editor import ContentEditor
13 from src.diff_engine import DiffEngine
14except ImportError:
15 # Fallback for when run as script without src module in path
16 from document_parser import Section
17 from content_editor import ContentEditor
18 from diff_engine import DiffEngine
19import os
20import re
23class DocumentAPI:
24 """Manages document structure, sections, and metadata operations"""
26 def __init__(self, server: 'MCPDocumentationServer'):
27 """
28 Initialize DocumentAPI with reference to server instance
30 Args:
31 server: MCPDocumentationServer instance for accessing shared state
32 """
33 self.server = server
35 def _paginate(self, items: list, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
36 """
37 Apply pagination to a list of items
39 Args:
40 items: List of items to paginate
41 limit: Maximum number of items to return (None or 0 = all)
42 offset: Number of items to skip
44 Returns:
45 Dict with 'results' and 'pagination' metadata
46 """
47 total = len(items)
49 # If limit is None or 0, return all items (backward compatible)
50 if limit is None or limit == 0:
51 return {
52 'results': items,
53 'pagination': {
54 'total': total,
55 'limit': 0,
56 'offset': 0,
57 'has_next': False,
58 'has_previous': False
59 }
60 }
62 # Apply pagination
63 start = offset
64 end = offset + limit
65 paginated_items = items[start:end]
67 return {
68 'results': paginated_items,
69 'pagination': {
70 'total': total,
71 'limit': limit,
72 'offset': offset,
73 'has_next': end < total,
74 'has_previous': offset > 0
75 }
76 }
78 def get_structure(self, start_level: int = 1, parent_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
79 """
80 Get sections at a specific level (always depth=1 to avoid token limits)
82 Args:
83 start_level: Which hierarchy level to return (default: 1)
84 parent_id: Optional filter - only return children of this section
85 limit: Maximum results to return (None/0 = all)
86 offset: Number of results to skip
88 Returns:
89 Dict with section data for the requested level, or paginated response if limit specified
90 """
92 # Collect all sections at the requested level
93 filtered_sections = [
94 (section_id, section)
95 for section_id, section in self.server.sections.items()
96 if section.level == start_level
97 ]
99 # If parent_id is specified, filter to only children of that parent
100 if parent_id:
101 filtered_sections = [
102 (section_id, section)
103 for section_id, section in filtered_sections
104 if section_id.startswith(parent_id + '.')
105 ]
107 # Sort by chapter number (if present), then document_position, then title
108 def get_sort_key(item):
109 section_id, section = item
110 chapter_match = re.match(r'^(\d+)\.', section.title)
111 chapter_num = int(chapter_match.group(1)) if chapter_match else 999
112 return (chapter_num, section.document_position, section.title)
114 sorted_sections = sorted(filtered_sections, key=get_sort_key)
116 # Build section data list
117 section_list = []
118 for section_id, section in sorted_sections:
119 children_count = len(section.children) if hasattr(section, 'children') and section.children else 0
121 section_list.append({
122 'title': section.title,
123 'level': section.level,
124 'id': section_id,
125 'children_count': children_count,
126 'line_start': section.line_start,
127 'line_end': section.line_end,
128 'source_file': section.source_file
129 })
131 # Return paginated if limit specified, else return dict for backward compatibility
132 if limit is not None:
133 return self._paginate(section_list, limit, offset)
134 else:
135 # Convert list back to dict for backward compatibility
136 return {item['id']: item for item in section_list}
138 def get_main_chapters(self) -> Dict[str, Any]:
139 """Get main chapters for web interface - handles arc42 structure correctly"""
141 # Build structure from all levels since we can't use max_depth anymore
142 # We'll get level 1 and level 2 separately
143 level_1_structure = self.get_structure(start_level=1)
144 level_2_structure = self.get_structure(start_level=2)
146 # Combine them for backward compatibility
147 full_structure = {**level_1_structure, **level_2_structure}
149 # Find all numbered chapters (level 2 in arc42 structure) AND other top-level documents
150 main_chapters = {}
152 for section_id, section in self.server.sections.items():
153 # Look for numbered chapters at level 2 (arc42 structure)
154 if section.level == 2:
155 # Check for numbered chapters like "1. Introduction", "2. Architecture", etc.
156 chapter_match = re.match(r'^(\d+)\.', section.title)
157 if chapter_match:
158 chapter_num = int(chapter_match.group(1))
160 # Get the full section data from the hierarchical structure
161 if section_id in full_structure:
162 chapter_data = full_structure[section_id].copy()
163 chapter_data['chapter_number'] = chapter_num
164 main_chapters[f"chapter_{chapter_num:02d}"] = chapter_data
166 # Also check for "Introduction and Goals" which is chapter 1
167 elif 'introduction' in section.title.lower() and 'goals' in section.title.lower():
168 if section_id in full_structure:
169 chapter_data = full_structure[section_id].copy()
170 chapter_data['chapter_number'] = 1
171 main_chapters["chapter_01"] = chapter_data
173 # Also include all level 1 sections (other documents)
174 elif section.level == 1:
175 if section_id in full_structure:
176 section_data = full_structure[section_id].copy()
177 section_data['chapter_number'] = 999 # Sort after numbered chapters
178 main_chapters[section_id] = section_data
180 # Sort chapters by chapter_number to ensure correct order
181 sorted_chapters = {}
183 # First add numbered chapters (1-12) in order
184 for i in range(1, 100): # Support up to 99 chapters
185 key = f"chapter_{i:02d}"
186 if key in main_chapters:
187 sorted_chapters[key] = main_chapters[key]
189 # Then add other documents sorted by their title
190 other_docs = [(k, v) for k, v in main_chapters.items() if not k.startswith('chapter_')]
191 other_docs.sort(key=lambda x: x[1].get('title', '').lower())
193 for key, value in other_docs:
194 sorted_chapters[key] = value
196 return sorted_chapters
198 def get_root_files_structure(self) -> Dict[str, Any]:
199 """Get structure grouped by root files - shows files as top level with their sections"""
200 from pathlib import Path
202 structure = {}
204 # Helper function to determine if a file is an aggregator file
205 def is_aggregator_file(file_path: Path) -> bool:
206 """Check if a file contains only includes and no content sections."""
207 try:
208 content = file_path.read_text(encoding='utf-8')
209 lines = [line.strip() for line in content.split('\n') if line.strip()]
211 # Filter out metadata lines (starting with :) and comments
212 content_lines = [line for line in lines
213 if not line.startswith(':') and not line.startswith('//')]
215 # Check if most lines are includes
216 include_lines = [line for line in content_lines if line.startswith('include::')]
218 # It's an aggregator if it has includes and very little other content
219 return len(include_lines) > 0 and len(content_lines) - len(include_lines) <= 3
220 except Exception:
221 return False
223 # Helper function to collect sections for a root file (including from includes)
224 def collect_sections_for_root_file(root_file: Path) -> list:
225 """Collect all sections belonging to a root file (including from includes)."""
226 file_sections = []
227 root_file_str = str(root_file)
228 root_file_rel = str(root_file.relative_to(self.server.project_root))
230 # Check if this is an aggregator file
231 if is_aggregator_file(root_file):
232 # For aggregator files, collect sections from all included files
233 # Get all sections that come from files included by this root file
234 for section_id, section in self.server.sections.items():
235 section_file_path = Path(section.source_file)
237 # Check if this section comes from a file that's included
238 if section_file_path in self.server.included_files:
239 file_sections.append((section_id, section))
240 elif section.source_file == root_file_str or section.source_file == root_file_rel:
241 # Also include direct sections from the root file itself
242 file_sections.append((section_id, section))
243 else:
244 # For non-aggregator files, only collect direct sections
245 for section_id, section in self.server.sections.items():
246 if section.source_file == root_file_str or section.source_file == root_file_rel:
247 file_sections.append((section_id, section))
249 return file_sections
251 # Iterate over each root file (skip included files)
252 # Sort root files alphabetically by filename for consistent navigation order
253 for root_file in sorted(self.server.root_files, key=lambda f: f.name):
254 # Skip files that are included by other files
255 if root_file in self.server.included_files:
256 continue
258 # Get relative path for display
259 try:
260 rel_path = str(root_file.relative_to(self.server.project_root))
261 except ValueError:
262 rel_path = str(root_file)
264 # Collect sections for this root file
265 file_sections = collect_sections_for_root_file(root_file)
267 if not file_sections:
268 continue
270 # Build hierarchical structure for sections within this file
271 section_map = {}
272 root_sections = []
274 for section_id, section in file_sections:
275 children_count = len(section.children) if hasattr(section, 'children') and section.children else 0
277 section_data = {
278 'title': section.title,
279 'level': section.level,
280 'id': section_id,
281 'children_count': children_count,
282 'line_start': section.line_start,
283 'line_end': section.line_end,
284 'source_file': section.source_file,
285 'children': []
286 }
287 section_map[section_id] = section_data
289 # Determine parent using dot notation hierarchy
290 if '.' in section_id:
291 parent_id = '.'.join(section_id.split('.')[:-1])
292 if parent_id in section_map:
293 section_map[parent_id]['children'].append(section_data)
294 else:
295 root_sections.append(section_data)
296 else:
297 root_sections.append(section_data)
299 # Sort root sections by line_start to maintain document order
300 root_sections.sort(key=lambda x: self.server.sections[x['id']].line_start)
302 # Recursively sort all children by document order
303 def sort_children_recursively(sections):
304 """Sort children sections by line_start recursively"""
305 for section in sections:
306 if section['children']:
307 section['children'].sort(key=lambda x: self.server.sections[x['id']].line_start)
308 sort_children_recursively(section['children'])
310 sort_children_recursively(root_sections)
312 # Create file entry in the expected format
313 structure[rel_path] = {
314 'filename': root_file.name,
315 'path': rel_path,
316 'section_count': len(file_sections),
317 'sections': root_sections
318 }
320 return structure
322 def _parse_section_path(self, path: str) -> str:
323 """Parse section path, converting hash syntax to dot notation
325 Args:
326 path: Section path in either format:
327 - Hash syntax: "file.adoc#section-id"
328 - Dot notation: "file.section-id"
330 Returns:
331 Normalized dot notation path
332 """
333 if '#' not in path:
334 return path # Return as-is for backward compatibility
336 # Split on first hash only
337 file_path, section_id = path.split('#', 1)
339 # Remove file extension to get base name
340 from pathlib import Path
341 file_base = Path(file_path).stem
343 # Convert to dot notation
344 return f'{file_base}.{section_id}'
346 def get_section(self, path: str) -> Optional[Dict[str, Any]]:
347 """Get specific section content
349 Supports both formats:
350 - Hash syntax: "file.adoc#section-id"
351 - Dot notation: "file.section-id" (backward compatible)
352 """
353 # Parse hash syntax to dot notation if needed
354 parsed_path = self._parse_section_path(path)
356 if parsed_path in self.server.sections:
357 section = self.server.sections[parsed_path]
358 return {
359 'id': section.id,
360 'title': section.title,
361 'level': section.level,
362 'content': section.content,
363 'children': [child.id if hasattr(child, 'id') else str(child) for child in section.children]
364 }
365 return None
367 def get_sections(self, level: int, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
368 """
369 Get all sections at specific level
371 Args:
372 level: Section level to filter by
373 limit: Maximum results to return (None/0 = all)
374 offset: Number of results to skip
376 Returns:
377 Dict with 'results' list and 'pagination' metadata, or just list if limit not specified
378 """
379 result = []
380 for section in self.server.sections.values():
381 if section.level == level:
382 result.append({
383 'id': section.id,
384 'title': section.title,
385 'content': section.content[:200] + '...' if len(section.content) > 200 else section.content
386 })
388 # Return paginated if limit specified, else return list for backward compatibility
389 if limit is not None:
390 return self._paginate(result, limit, offset)
391 else:
392 return result
394 def search_content(self, query: str, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
395 """
396 Search for content in sections
398 Args:
399 query: Search query string
400 limit: Maximum results to return (None/0 = all, for backward compatibility)
401 offset: Number of results to skip
403 Returns:
404 Dict with 'results' list and 'pagination' metadata, or just list if limit not specified
405 """
406 results = []
407 query_lower = query.lower()
409 for section in self.server.sections.values():
410 if query_lower in section.title.lower() or query_lower in section.content.lower():
411 results.append({
412 'id': section.id,
413 'title': section.title,
414 'relevance': self._calculate_relevance(section, query_lower),
415 'snippet': self._extract_snippet(section.content, query_lower)
416 })
418 sorted_results = sorted(results, key=lambda x: x['relevance'], reverse=True)
420 # Return paginated if limit specified, else return list for backward compatibility
421 if limit is not None:
422 return self._paginate(sorted_results, limit, offset)
423 else:
424 return sorted_results
426 def get_sections_by_level(self, level: int) -> List[Dict[str, Any]]:
427 """Get all sections at specific level"""
428 result = []
429 for section in self.server.sections.values():
430 if section.level == level:
431 result.append({
432 'id': section.id,
433 'title': section.title,
434 'content': section.content
435 })
436 return result
438 def get_metadata(self, path: Optional[str] = None) -> Dict[str, Any]:
439 """Get metadata for section or entire project"""
441 if path:
442 # Metadata for specific section
443 if path not in self.server.sections:
444 return {'error': f'Section not found: {path}'}
446 section = self.server.sections[path]
447 word_count = len(section.content.split()) if section.content else 0
449 return {
450 'path': path,
451 'title': section.title,
452 'level': section.level,
453 'word_count': word_count,
454 'children_count': len(section.children),
455 'has_content': bool(section.content)
456 }
457 else:
458 # Project metadata
459 total_sections = len(self.server.sections)
460 total_words = sum(len(s.content.split()) if s.content else 0 for s in self.server.sections.values())
462 file_info = []
463 for root_file in self.server.root_files:
464 stat = os.stat(root_file)
465 file_info.append({
466 'file': str(root_file.relative_to(self.server.project_root)),
467 'size': stat.st_size,
468 'last_modified': datetime.fromtimestamp(stat.st_mtime).isoformat()
469 })
471 return {
472 'project_root': str(self.server.project_root),
473 'total_sections': total_sections,
474 'total_words': total_words,
475 'root_files': file_info
476 }
478 def get_dependencies(self) -> Dict[str, Any]:
479 """Get include tree and cross-references"""
480 dependencies = {
481 'includes': {},
482 'cross_references': [],
483 'orphaned_sections': []
484 }
486 # Analyze includes
487 for root_file in self.server.root_files:
488 try:
489 content = root_file.read_text(encoding='utf-8')
490 includes = []
491 for line in content.split('\n'):
492 if 'include::' in line:
493 start = line.find('include::') + 9
494 end = line.find('[', start)
495 if end > start:
496 include_path = line[start:end]
497 includes.append(include_path)
499 dependencies['includes'][str(root_file.relative_to(self.server.project_root))] = includes
500 except Exception:
501 pass
503 # Track cross-references
504 cross_refs = []
506 # Regex patterns for different reference types
507 # Pattern 1: <<section-id>> or <<section-id,link text>>
508 simple_ref_pattern = r'<<([^>,\]]+)(?:,[^>]*)?>>'
509 # Pattern 2: xref:section-id[] or xref:section-id[link text]
510 xref_pattern = r'xref:([^\[\]]+)\['
512 for section_id, section in self.server.sections.items():
513 content = section.content
514 if not content:
515 continue
517 # Find all <<>> style references
518 for match in re.finditer(simple_ref_pattern, content):
519 target = match.group(1).strip()
521 # Normalize target to match section IDs (convert to lowercase, replace spaces with dashes)
522 normalized_target = re.sub(r'[^\w\s-]', '', target.lower()).replace(' ', '-')
524 # Check if target exists in our sections
525 target_exists = any(normalized_target in sid or sid.endswith(normalized_target) for sid in self.server.sections.keys())
527 cross_refs.append({
528 'from_section': section_id,
529 'to_section': normalized_target,
530 'reference_type': '<<>>',
531 'valid': target_exists
532 })
534 # Find all xref: style references
535 for match in re.finditer(xref_pattern, content):
536 target = match.group(1).strip()
537 # Remove file path if present (e.g., "file.adoc#section" -> "section")
538 if '#' in target:
539 target = target.split('#')[1]
541 normalized_target = re.sub(r'[^\w\s-]', '', target.lower()).replace(' ', '-')
542 target_exists = any(normalized_target in sid or sid.endswith(normalized_target) for sid in self.server.sections.keys())
544 cross_refs.append({
545 'from_section': section_id,
546 'to_section': normalized_target,
547 'reference_type': 'xref',
548 'valid': target_exists
549 })
551 dependencies['cross_references'] = cross_refs
553 # Find orphaned sections (sections without proper parent references)
554 # Note: Top-level sections (level 1) are NOT considered orphaned even if they have no parent
555 all_section_ids = set(self.server.sections.keys())
556 referenced_sections = set()
558 # Collect all sections that are referenced as children
559 for section in self.server.sections.values():
560 for child in section.children:
561 if isinstance(child, str):
562 referenced_sections.add(child)
563 else:
564 referenced_sections.add(child.id)
566 # A section is orphaned if:
567 # 1. It is NOT referenced as anyone's child, AND
568 # 2. It is NOT a top-level section (level 1)
569 orphaned = []
570 for section_id in (all_section_ids - referenced_sections):
571 section = self.server.sections[section_id]
572 if section.level > 1: # Only non-top-level sections can be orphaned
573 orphaned.append(section_id)
575 dependencies['orphaned_sections'] = orphaned
576 return dependencies
578 def validate_structure(self) -> Dict[str, Any]:
579 """Validate document structure consistency"""
581 issues = []
582 warnings = []
584 # Check for missing sections
585 for section in self.server.sections.values():
586 for child in section.children:
587 child_id = child.id if hasattr(child, 'id') else str(child)
588 if child_id not in self.server.sections:
589 issues.append(f"Missing child section: {child_id} (referenced by {section.id})")
591 # Check level consistency (tolerance mode - allow level skips)
592 # Only warn if child level is NOT greater than parent level (actual hierarchy violation)
593 for section in self.server.sections.values():
594 if '.' in section.id:
595 parent_id = '.'.join(section.id.split('.')[:-1])
596 if parent_id in self.server.sections:
597 parent_section = self.server.sections[parent_id]
598 # Child must be deeper than parent (allow skips like 1→3, but not 3→2)
599 if section.level <= parent_section.level:
600 warnings.append(f"Level hierarchy violation: {section.id} (level {section.level}) should be deeper than parent {parent_id} (level {parent_section.level})")
602 # Check for empty sections
603 empty_sections = [s.id for s in self.server.sections.values() if not s.content and not s.children]
604 if empty_sections:
605 warnings.extend([f"Empty section: {sid}" for sid in empty_sections])
607 return {
608 'valid': len(issues) == 0,
609 'issues': issues,
610 'warnings': warnings,
611 'total_sections': len(self.server.sections),
612 'validation_timestamp': datetime.now().isoformat()
613 }
615 def refresh_index(self) -> Dict[str, Any]:
616 """Manually refresh the document index"""
617 old_count = len(self.server.sections)
619 # Re-discover and re-parse
620 self.server._discover_root_files()
621 self.server._parse_project()
623 new_count = len(self.server.sections)
625 return {
626 'success': True,
627 'old_section_count': old_count,
628 'new_section_count': new_count,
629 'sections_added': new_count - old_count,
630 'timestamp': datetime.now().isoformat()
631 }
633 def _calculate_relevance(self, section: Section, query: str) -> float:
634 """Simple relevance scoring"""
635 title_matches = section.title.lower().count(query)
636 content_matches = section.content.lower().count(query)
637 return title_matches * 2 + content_matches
639 def update_section_content(self, path: str, content: str) -> bool:
640 """Update section content"""
641 if path not in self.server.sections:
642 return False
644 section = self.server.sections[path]
646 # Update in-memory section first
647 section.content = content
649 # Try to update source file
650 for root_file in self.server.root_files:
651 if self.server.editor.update_section(section, content, root_file):
652 return True
654 # Even if file update fails, in-memory update succeeded
655 return True
657 def insert_section(self, parent_path: str, title: str, content: str, position: str = "append") -> bool:
658 """Insert new section"""
659 if parent_path not in self.server.sections:
660 return False
662 parent_section = self.server.sections[parent_path]
663 # Find source file for parent section
664 for root_file in self.server.root_files:
665 if self.server.editor.insert_section(parent_section, title, content, position, root_file):
666 return True
667 return False
669 def _extract_snippet(self, content: str, query: str, context_chars: int = 100) -> str:
670 """Extract snippet around query match"""
671 content_lower = content.lower()
672 pos = content_lower.find(query)
673 if pos == -1:
674 return content[:context_chars] + '...'
676 start = max(0, pos - context_chars // 2)
677 end = min(len(content), pos + len(query) + context_chars // 2)
678 return content[start:end]