Coverage for src/mcp_internal/document_api.py: 92%

287 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-08 05:40 +0000

1""" 

2Document API Module 

3 

4Handles all document structure, sections, metadata, and search operations. 

5""" 

6 

7from typing import Dict, List, Any, Optional 

8from datetime import datetime 

9 

10try: 

11 from src.document_parser import Section 

12 from src.content_editor import ContentEditor 

13 from src.diff_engine import DiffEngine 

14except ImportError: 

15 # Fallback for when run as script without src module in path 

16 from document_parser import Section 

17 from content_editor import ContentEditor 

18 from diff_engine import DiffEngine 

19import os 

20import re 

21 

22 

23class DocumentAPI: 

24 """Manages document structure, sections, and metadata operations""" 

25 

26 def __init__(self, server: 'MCPDocumentationServer'): 

27 """ 

28 Initialize DocumentAPI with reference to server instance 

29 

30 Args: 

31 server: MCPDocumentationServer instance for accessing shared state 

32 """ 

33 self.server = server 

34 

35 def _paginate(self, items: list, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: 

36 """ 

37 Apply pagination to a list of items 

38 

39 Args: 

40 items: List of items to paginate 

41 limit: Maximum number of items to return (None or 0 = all) 

42 offset: Number of items to skip 

43 

44 Returns: 

45 Dict with 'results' and 'pagination' metadata 

46 """ 

47 total = len(items) 

48 

49 # If limit is None or 0, return all items (backward compatible) 

50 if limit is None or limit == 0: 

51 return { 

52 'results': items, 

53 'pagination': { 

54 'total': total, 

55 'limit': 0, 

56 'offset': 0, 

57 'has_next': False, 

58 'has_previous': False 

59 } 

60 } 

61 

62 # Apply pagination 

63 start = offset 

64 end = offset + limit 

65 paginated_items = items[start:end] 

66 

67 return { 

68 'results': paginated_items, 

69 'pagination': { 

70 'total': total, 

71 'limit': limit, 

72 'offset': offset, 

73 'has_next': end < total, 

74 'has_previous': offset > 0 

75 } 

76 } 

77 

78 def get_structure(self, start_level: int = 1, parent_id: Optional[str] = None, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: 

79 """ 

80 Get sections at a specific level (always depth=1 to avoid token limits) 

81 

82 Args: 

83 start_level: Which hierarchy level to return (default: 1) 

84 parent_id: Optional filter - only return children of this section 

85 limit: Maximum results to return (None/0 = all) 

86 offset: Number of results to skip 

87 

88 Returns: 

89 Dict with section data for the requested level, or paginated response if limit specified 

90 """ 

91 

92 # Collect all sections at the requested level 

93 filtered_sections = [ 

94 (section_id, section) 

95 for section_id, section in self.server.sections.items() 

96 if section.level == start_level 

97 ] 

98 

99 # If parent_id is specified, filter to only children of that parent 

100 if parent_id: 

101 filtered_sections = [ 

102 (section_id, section) 

103 for section_id, section in filtered_sections 

104 if section_id.startswith(parent_id + '.') 

105 ] 

106 

107 # Sort by chapter number (if present), then document_position, then title 

108 def get_sort_key(item): 

109 section_id, section = item 

110 chapter_match = re.match(r'^(\d+)\.', section.title) 

111 chapter_num = int(chapter_match.group(1)) if chapter_match else 999 

112 return (chapter_num, section.document_position, section.title) 

113 

114 sorted_sections = sorted(filtered_sections, key=get_sort_key) 

115 

116 # Build section data list 

117 section_list = [] 

118 for section_id, section in sorted_sections: 

119 children_count = len(section.children) if hasattr(section, 'children') and section.children else 0 

120 

121 section_list.append({ 

122 'title': section.title, 

123 'level': section.level, 

124 'id': section_id, 

125 'children_count': children_count, 

126 'line_start': section.line_start, 

127 'line_end': section.line_end, 

128 'source_file': section.source_file 

129 }) 

130 

131 # Return paginated if limit specified, else return dict for backward compatibility 

132 if limit is not None: 

133 return self._paginate(section_list, limit, offset) 

134 else: 

135 # Convert list back to dict for backward compatibility 

136 return {item['id']: item for item in section_list} 

137 

138 def get_main_chapters(self) -> Dict[str, Any]: 

139 """Get main chapters for web interface - handles arc42 structure correctly""" 

140 

141 # Build structure from all levels since we can't use max_depth anymore 

142 # We'll get level 1 and level 2 separately 

143 level_1_structure = self.get_structure(start_level=1) 

144 level_2_structure = self.get_structure(start_level=2) 

145 

146 # Combine them for backward compatibility 

147 full_structure = {**level_1_structure, **level_2_structure} 

148 

149 # Find all numbered chapters (level 2 in arc42 structure) AND other top-level documents 

150 main_chapters = {} 

151 

152 for section_id, section in self.server.sections.items(): 

153 # Look for numbered chapters at level 2 (arc42 structure) 

154 if section.level == 2: 

155 # Check for numbered chapters like "1. Introduction", "2. Architecture", etc. 

156 chapter_match = re.match(r'^(\d+)\.', section.title) 

157 if chapter_match: 

158 chapter_num = int(chapter_match.group(1)) 

159 

160 # Get the full section data from the hierarchical structure 

161 if section_id in full_structure: 

162 chapter_data = full_structure[section_id].copy() 

163 chapter_data['chapter_number'] = chapter_num 

164 main_chapters[f"chapter_{chapter_num:02d}"] = chapter_data 

165 

166 # Also check for "Introduction and Goals" which is chapter 1 

167 elif 'introduction' in section.title.lower() and 'goals' in section.title.lower(): 

168 if section_id in full_structure: 

169 chapter_data = full_structure[section_id].copy() 

170 chapter_data['chapter_number'] = 1 

171 main_chapters["chapter_01"] = chapter_data 

172 

173 # Also include all level 1 sections (other documents) 

174 elif section.level == 1: 

175 if section_id in full_structure: 

176 section_data = full_structure[section_id].copy() 

177 section_data['chapter_number'] = 999 # Sort after numbered chapters 

178 main_chapters[section_id] = section_data 

179 

180 # Sort chapters by chapter_number to ensure correct order 

181 sorted_chapters = {} 

182 

183 # First add numbered chapters (1-12) in order 

184 for i in range(1, 100): # Support up to 99 chapters 

185 key = f"chapter_{i:02d}" 

186 if key in main_chapters: 

187 sorted_chapters[key] = main_chapters[key] 

188 

189 # Then add other documents sorted by their title 

190 other_docs = [(k, v) for k, v in main_chapters.items() if not k.startswith('chapter_')] 

191 other_docs.sort(key=lambda x: x[1].get('title', '').lower()) 

192 

193 for key, value in other_docs: 

194 sorted_chapters[key] = value 

195 

196 return sorted_chapters 

197 

198 def get_root_files_structure(self) -> Dict[str, Any]: 

199 """Get structure grouped by root files - shows files as top level with their sections""" 

200 from pathlib import Path 

201 

202 structure = {} 

203 

204 # Helper function to determine if a file is an aggregator file 

205 def is_aggregator_file(file_path: Path) -> bool: 

206 """Check if a file contains only includes and no content sections.""" 

207 try: 

208 content = file_path.read_text(encoding='utf-8') 

209 lines = [line.strip() for line in content.split('\n') if line.strip()] 

210 

211 # Filter out metadata lines (starting with :) and comments 

212 content_lines = [line for line in lines 

213 if not line.startswith(':') and not line.startswith('//')] 

214 

215 # Check if most lines are includes 

216 include_lines = [line for line in content_lines if line.startswith('include::')] 

217 

218 # It's an aggregator if it has includes and very little other content 

219 return len(include_lines) > 0 and len(content_lines) - len(include_lines) <= 3 

220 except Exception: 

221 return False 

222 

223 # Helper function to collect sections for a root file (including from includes) 

224 def collect_sections_for_root_file(root_file: Path) -> list: 

225 """Collect all sections belonging to a root file (including from includes).""" 

226 file_sections = [] 

227 root_file_str = str(root_file) 

228 root_file_rel = str(root_file.relative_to(self.server.project_root)) 

229 

230 # Check if this is an aggregator file 

231 if is_aggregator_file(root_file): 

232 # For aggregator files, collect sections from all included files 

233 # Get all sections that come from files included by this root file 

234 for section_id, section in self.server.sections.items(): 

235 section_file_path = Path(section.source_file) 

236 

237 # Check if this section comes from a file that's included 

238 if section_file_path in self.server.included_files: 

239 file_sections.append((section_id, section)) 

240 elif section.source_file == root_file_str or section.source_file == root_file_rel: 

241 # Also include direct sections from the root file itself 

242 file_sections.append((section_id, section)) 

243 else: 

244 # For non-aggregator files, only collect direct sections  

245 for section_id, section in self.server.sections.items(): 

246 if section.source_file == root_file_str or section.source_file == root_file_rel: 

247 file_sections.append((section_id, section)) 

248 

249 return file_sections 

250 

251 # Iterate over each root file (skip included files) 

252 # Sort root files alphabetically by filename for consistent navigation order 

253 for root_file in sorted(self.server.root_files, key=lambda f: f.name): 

254 # Skip files that are included by other files 

255 if root_file in self.server.included_files: 

256 continue 

257 

258 # Get relative path for display 

259 try: 

260 rel_path = str(root_file.relative_to(self.server.project_root)) 

261 except ValueError: 

262 rel_path = str(root_file) 

263 

264 # Collect sections for this root file 

265 file_sections = collect_sections_for_root_file(root_file) 

266 

267 if not file_sections: 

268 continue 

269 

270 # Build hierarchical structure for sections within this file 

271 section_map = {} 

272 root_sections = [] 

273 

274 for section_id, section in file_sections: 

275 children_count = len(section.children) if hasattr(section, 'children') and section.children else 0 

276 

277 section_data = { 

278 'title': section.title, 

279 'level': section.level, 

280 'id': section_id, 

281 'children_count': children_count, 

282 'line_start': section.line_start, 

283 'line_end': section.line_end, 

284 'source_file': section.source_file, 

285 'children': [] 

286 } 

287 section_map[section_id] = section_data 

288 

289 # Determine parent using dot notation hierarchy 

290 if '.' in section_id: 

291 parent_id = '.'.join(section_id.split('.')[:-1]) 

292 if parent_id in section_map: 

293 section_map[parent_id]['children'].append(section_data) 

294 else: 

295 root_sections.append(section_data) 

296 else: 

297 root_sections.append(section_data) 

298 

299 # Sort root sections by line_start to maintain document order 

300 root_sections.sort(key=lambda x: self.server.sections[x['id']].line_start) 

301 

302 # Recursively sort all children by document order 

303 def sort_children_recursively(sections): 

304 """Sort children sections by line_start recursively""" 

305 for section in sections: 

306 if section['children']: 

307 section['children'].sort(key=lambda x: self.server.sections[x['id']].line_start) 

308 sort_children_recursively(section['children']) 

309 

310 sort_children_recursively(root_sections) 

311 

312 # Create file entry in the expected format 

313 structure[rel_path] = { 

314 'filename': root_file.name, 

315 'path': rel_path, 

316 'section_count': len(file_sections), 

317 'sections': root_sections 

318 } 

319 

320 return structure 

321 

322 def _parse_section_path(self, path: str) -> str: 

323 """Parse section path, converting hash syntax to dot notation 

324  

325 Args: 

326 path: Section path in either format: 

327 - Hash syntax: "file.adoc#section-id" 

328 - Dot notation: "file.section-id"  

329  

330 Returns: 

331 Normalized dot notation path 

332 """ 

333 if '#' not in path: 

334 return path # Return as-is for backward compatibility 

335 

336 # Split on first hash only 

337 file_path, section_id = path.split('#', 1) 

338 

339 # Remove file extension to get base name 

340 from pathlib import Path 

341 file_base = Path(file_path).stem 

342 

343 # Convert to dot notation 

344 return f'{file_base}.{section_id}' 

345 

346 def get_section(self, path: str) -> Optional[Dict[str, Any]]: 

347 """Get specific section content 

348  

349 Supports both formats: 

350 - Hash syntax: "file.adoc#section-id"  

351 - Dot notation: "file.section-id" (backward compatible) 

352 """ 

353 # Parse hash syntax to dot notation if needed 

354 parsed_path = self._parse_section_path(path) 

355 

356 if parsed_path in self.server.sections: 

357 section = self.server.sections[parsed_path] 

358 return { 

359 'id': section.id, 

360 'title': section.title, 

361 'level': section.level, 

362 'content': section.content, 

363 'children': [child.id if hasattr(child, 'id') else str(child) for child in section.children] 

364 } 

365 return None 

366 

367 def get_sections(self, level: int, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: 

368 """ 

369 Get all sections at specific level 

370 

371 Args: 

372 level: Section level to filter by 

373 limit: Maximum results to return (None/0 = all) 

374 offset: Number of results to skip 

375 

376 Returns: 

377 Dict with 'results' list and 'pagination' metadata, or just list if limit not specified 

378 """ 

379 result = [] 

380 for section in self.server.sections.values(): 

381 if section.level == level: 

382 result.append({ 

383 'id': section.id, 

384 'title': section.title, 

385 'content': section.content[:200] + '...' if len(section.content) > 200 else section.content 

386 }) 

387 

388 # Return paginated if limit specified, else return list for backward compatibility 

389 if limit is not None: 

390 return self._paginate(result, limit, offset) 

391 else: 

392 return result 

393 

394 def search_content(self, query: str, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: 

395 """ 

396 Search for content in sections 

397 

398 Args: 

399 query: Search query string 

400 limit: Maximum results to return (None/0 = all, for backward compatibility) 

401 offset: Number of results to skip 

402 

403 Returns: 

404 Dict with 'results' list and 'pagination' metadata, or just list if limit not specified 

405 """ 

406 results = [] 

407 query_lower = query.lower() 

408 

409 for section in self.server.sections.values(): 

410 if query_lower in section.title.lower() or query_lower in section.content.lower(): 

411 results.append({ 

412 'id': section.id, 

413 'title': section.title, 

414 'relevance': self._calculate_relevance(section, query_lower), 

415 'snippet': self._extract_snippet(section.content, query_lower) 

416 }) 

417 

418 sorted_results = sorted(results, key=lambda x: x['relevance'], reverse=True) 

419 

420 # Return paginated if limit specified, else return list for backward compatibility 

421 if limit is not None: 

422 return self._paginate(sorted_results, limit, offset) 

423 else: 

424 return sorted_results 

425 

426 def get_sections_by_level(self, level: int) -> List[Dict[str, Any]]: 

427 """Get all sections at specific level""" 

428 result = [] 

429 for section in self.server.sections.values(): 

430 if section.level == level: 

431 result.append({ 

432 'id': section.id, 

433 'title': section.title, 

434 'content': section.content 

435 }) 

436 return result 

437 

438 def get_metadata(self, path: Optional[str] = None) -> Dict[str, Any]: 

439 """Get metadata for section or entire project""" 

440 

441 if path: 

442 # Metadata for specific section 

443 if path not in self.server.sections: 

444 return {'error': f'Section not found: {path}'} 

445 

446 section = self.server.sections[path] 

447 word_count = len(section.content.split()) if section.content else 0 

448 

449 return { 

450 'path': path, 

451 'title': section.title, 

452 'level': section.level, 

453 'word_count': word_count, 

454 'children_count': len(section.children), 

455 'has_content': bool(section.content) 

456 } 

457 else: 

458 # Project metadata 

459 total_sections = len(self.server.sections) 

460 total_words = sum(len(s.content.split()) if s.content else 0 for s in self.server.sections.values()) 

461 

462 file_info = [] 

463 for root_file in self.server.root_files: 

464 stat = os.stat(root_file) 

465 file_info.append({ 

466 'file': str(root_file.relative_to(self.server.project_root)), 

467 'size': stat.st_size, 

468 'last_modified': datetime.fromtimestamp(stat.st_mtime).isoformat() 

469 }) 

470 

471 return { 

472 'project_root': str(self.server.project_root), 

473 'total_sections': total_sections, 

474 'total_words': total_words, 

475 'root_files': file_info 

476 } 

477 

478 def get_dependencies(self) -> Dict[str, Any]: 

479 """Get include tree and cross-references""" 

480 dependencies = { 

481 'includes': {}, 

482 'cross_references': [], 

483 'orphaned_sections': [] 

484 } 

485 

486 # Analyze includes 

487 for root_file in self.server.root_files: 

488 try: 

489 content = root_file.read_text(encoding='utf-8') 

490 includes = [] 

491 for line in content.split('\n'): 

492 if 'include::' in line: 

493 start = line.find('include::') + 9 

494 end = line.find('[', start) 

495 if end > start: 

496 include_path = line[start:end] 

497 includes.append(include_path) 

498 

499 dependencies['includes'][str(root_file.relative_to(self.server.project_root))] = includes 

500 except Exception: 

501 pass 

502 

503 # Track cross-references 

504 cross_refs = [] 

505 

506 # Regex patterns for different reference types 

507 # Pattern 1: <<section-id>> or <<section-id,link text>> 

508 simple_ref_pattern = r'<<([^>,\]]+)(?:,[^>]*)?>>' 

509 # Pattern 2: xref:section-id[] or xref:section-id[link text] 

510 xref_pattern = r'xref:([^\[\]]+)\[' 

511 

512 for section_id, section in self.server.sections.items(): 

513 content = section.content 

514 if not content: 

515 continue 

516 

517 # Find all <<>> style references 

518 for match in re.finditer(simple_ref_pattern, content): 

519 target = match.group(1).strip() 

520 

521 # Normalize target to match section IDs (convert to lowercase, replace spaces with dashes) 

522 normalized_target = re.sub(r'[^\w\s-]', '', target.lower()).replace(' ', '-') 

523 

524 # Check if target exists in our sections 

525 target_exists = any(normalized_target in sid or sid.endswith(normalized_target) for sid in self.server.sections.keys()) 

526 

527 cross_refs.append({ 

528 'from_section': section_id, 

529 'to_section': normalized_target, 

530 'reference_type': '<<>>', 

531 'valid': target_exists 

532 }) 

533 

534 # Find all xref: style references 

535 for match in re.finditer(xref_pattern, content): 

536 target = match.group(1).strip() 

537 # Remove file path if present (e.g., "file.adoc#section" -> "section") 

538 if '#' in target: 

539 target = target.split('#')[1] 

540 

541 normalized_target = re.sub(r'[^\w\s-]', '', target.lower()).replace(' ', '-') 

542 target_exists = any(normalized_target in sid or sid.endswith(normalized_target) for sid in self.server.sections.keys()) 

543 

544 cross_refs.append({ 

545 'from_section': section_id, 

546 'to_section': normalized_target, 

547 'reference_type': 'xref', 

548 'valid': target_exists 

549 }) 

550 

551 dependencies['cross_references'] = cross_refs 

552 

553 # Find orphaned sections (sections without proper parent references) 

554 # Note: Top-level sections (level 1) are NOT considered orphaned even if they have no parent 

555 all_section_ids = set(self.server.sections.keys()) 

556 referenced_sections = set() 

557 

558 # Collect all sections that are referenced as children 

559 for section in self.server.sections.values(): 

560 for child in section.children: 

561 if isinstance(child, str): 

562 referenced_sections.add(child) 

563 else: 

564 referenced_sections.add(child.id) 

565 

566 # A section is orphaned if: 

567 # 1. It is NOT referenced as anyone's child, AND 

568 # 2. It is NOT a top-level section (level 1) 

569 orphaned = [] 

570 for section_id in (all_section_ids - referenced_sections): 

571 section = self.server.sections[section_id] 

572 if section.level > 1: # Only non-top-level sections can be orphaned 

573 orphaned.append(section_id) 

574 

575 dependencies['orphaned_sections'] = orphaned 

576 return dependencies 

577 

578 def validate_structure(self) -> Dict[str, Any]: 

579 """Validate document structure consistency""" 

580 

581 issues = [] 

582 warnings = [] 

583 

584 # Check for missing sections 

585 for section in self.server.sections.values(): 

586 for child in section.children: 

587 child_id = child.id if hasattr(child, 'id') else str(child) 

588 if child_id not in self.server.sections: 

589 issues.append(f"Missing child section: {child_id} (referenced by {section.id})") 

590 

591 # Check level consistency (tolerance mode - allow level skips) 

592 # Only warn if child level is NOT greater than parent level (actual hierarchy violation) 

593 for section in self.server.sections.values(): 

594 if '.' in section.id: 

595 parent_id = '.'.join(section.id.split('.')[:-1]) 

596 if parent_id in self.server.sections: 

597 parent_section = self.server.sections[parent_id] 

598 # Child must be deeper than parent (allow skips like 1→3, but not 3→2) 

599 if section.level <= parent_section.level: 

600 warnings.append(f"Level hierarchy violation: {section.id} (level {section.level}) should be deeper than parent {parent_id} (level {parent_section.level})") 

601 

602 # Check for empty sections 

603 empty_sections = [s.id for s in self.server.sections.values() if not s.content and not s.children] 

604 if empty_sections: 

605 warnings.extend([f"Empty section: {sid}" for sid in empty_sections]) 

606 

607 return { 

608 'valid': len(issues) == 0, 

609 'issues': issues, 

610 'warnings': warnings, 

611 'total_sections': len(self.server.sections), 

612 'validation_timestamp': datetime.now().isoformat() 

613 } 

614 

615 def refresh_index(self) -> Dict[str, Any]: 

616 """Manually refresh the document index""" 

617 old_count = len(self.server.sections) 

618 

619 # Re-discover and re-parse 

620 self.server._discover_root_files() 

621 self.server._parse_project() 

622 

623 new_count = len(self.server.sections) 

624 

625 return { 

626 'success': True, 

627 'old_section_count': old_count, 

628 'new_section_count': new_count, 

629 'sections_added': new_count - old_count, 

630 'timestamp': datetime.now().isoformat() 

631 } 

632 

633 def _calculate_relevance(self, section: Section, query: str) -> float: 

634 """Simple relevance scoring""" 

635 title_matches = section.title.lower().count(query) 

636 content_matches = section.content.lower().count(query) 

637 return title_matches * 2 + content_matches 

638 

639 def update_section_content(self, path: str, content: str) -> bool: 

640 """Update section content""" 

641 if path not in self.server.sections: 

642 return False 

643 

644 section = self.server.sections[path] 

645 

646 # Update in-memory section first 

647 section.content = content 

648 

649 # Try to update source file 

650 for root_file in self.server.root_files: 

651 if self.server.editor.update_section(section, content, root_file): 

652 return True 

653 

654 # Even if file update fails, in-memory update succeeded 

655 return True 

656 

657 def insert_section(self, parent_path: str, title: str, content: str, position: str = "append") -> bool: 

658 """Insert new section""" 

659 if parent_path not in self.server.sections: 

660 return False 

661 

662 parent_section = self.server.sections[parent_path] 

663 # Find source file for parent section 

664 for root_file in self.server.root_files: 

665 if self.server.editor.insert_section(parent_section, title, content, position, root_file): 

666 return True 

667 return False 

668 

669 def _extract_snippet(self, content: str, query: str, context_chars: int = 100) -> str: 

670 """Extract snippet around query match""" 

671 content_lower = content.lower() 

672 pos = content_lower.find(query) 

673 if pos == -1: 

674 return content[:context_chars] + '...' 

675 

676 start = max(0, pos - context_chars // 2) 

677 end = min(len(content), pos + len(query) + context_chars // 2) 

678 return content[start:end]