Coverage for src/file_watcher.py: 89%
64 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-08 05:40 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-08 05:40 +0000
1import os
2import time
3from pathlib import Path
4from typing import Dict, Set, Callable
5from threading import Thread, Lock
7class FileWatcher:
8 def __init__(self, project_root: Path, callback: Callable):
9 self.project_root = Path(project_root) if isinstance(project_root, str) else project_root
10 self.callback = callback
11 self.file_mtimes: Dict[str, float] = {}
12 self.watching = False
13 self.watch_thread = None
14 self.lock = Lock()
16 def start(self):
17 """Start watching for file changes"""
18 if self.watching:
19 return
21 self.watching = True
22 self._scan_files()
23 self.watch_thread = Thread(target=self._watch_loop, daemon=True)
24 self.watch_thread.start()
26 def stop(self):
27 """Stop watching for file changes"""
28 self.watching = False
29 if self.watch_thread:
30 self.watch_thread.join(timeout=1)
32 def _scan_files(self):
33 """Scan project for documentation files and record modification times"""
34 patterns = ['**/*.adoc', '**/*.md', '**/*.asciidoc']
36 with self.lock:
37 self.file_mtimes.clear()
38 for pattern in patterns:
39 for file_path in self.project_root.glob(pattern):
40 try:
41 mtime = file_path.stat().st_mtime
42 self.file_mtimes[str(file_path)] = mtime
43 except OSError:
44 pass
46 def _watch_loop(self):
47 """Main watching loop"""
48 while self.watching:
49 try:
50 changed_files = self._check_changes()
51 if changed_files:
52 self.callback(changed_files)
53 time.sleep(1) # Check every second
54 except Exception:
55 pass # Continue watching even on errors
57 def _check_changes(self) -> Set[str]:
58 """Check for file changes and return set of changed files"""
59 changed_files = set()
60 current_files = {}
62 # Scan current files
63 patterns = ['**/*.adoc', '**/*.md', '**/*.asciidoc']
64 for pattern in patterns:
65 for file_path in self.project_root.glob(pattern):
66 try:
67 mtime = file_path.stat().st_mtime
68 current_files[str(file_path)] = mtime
69 except OSError:
70 continue
72 with self.lock:
73 # Check for new or modified files
74 for file_path, mtime in current_files.items():
75 if file_path not in self.file_mtimes or self.file_mtimes[file_path] != mtime:
76 changed_files.add(file_path)
78 # Check for deleted files
79 for file_path in self.file_mtimes:
80 if file_path not in current_files:
81 changed_files.add(file_path)
83 # Update stored modification times
84 self.file_mtimes = current_files
86 return changed_files