Pure Python library for batch file scanning with configurable filtering, metadata extraction, caching, and extensible provider support.
Project description
cjm-file-discovery
Install
pip install cjm_file_discovery
Project Structure
nbs/
├── cache/ (1)
│ └── memory.ipynb # In-memory caching for scan results.
├── core/ (3)
│ ├── config.ipynb # Configuration dataclasses for file scanning including ScanConfig, FilterConfig, and ExtensionMapping.
│ ├── models.ipynb # Core data models for file discovery including FileInfo, FileType, and DirectoryInfo.
│ └── protocols.ipynb # Protocol definitions for extensible file discovery providers.
├── providers/ (1)
│ └── local.ipynb # Local file system discovery provider implementation.
├── scanning/ (2)
│ ├── filters.ipynb # Filter implementations for file discovery.
│ └── scanner.ipynb # High-level FileScanner class with caching and provider support.
└── utils/ (1)
└── formatting.ipynb # Utility functions for formatting file sizes, timestamps, and other display values.
Total: 8 notebooks across 5 directories
Module Dependencies
graph LR
cache_memory[cache.memory<br/>Memory Cache]
core_config[core.config<br/>Configuration]
core_models[core.models<br/>Models]
core_protocols[core.protocols<br/>Protocols]
providers_local[providers.local<br/>Local Provider]
scanning_filters[scanning.filters<br/>Filters]
scanning_scanner[scanning.scanner<br/>Scanner]
utils_formatting[utils.formatting<br/>Formatting Utilities]
cache_memory --> core_models
core_config --> core_models
core_models --> utils_formatting
core_protocols --> core_config
core_protocols --> core_models
providers_local --> core_config
providers_local --> core_protocols
providers_local --> core_models
providers_local --> utils_formatting
scanning_filters --> core_config
scanning_filters --> core_models
scanning_scanner --> utils_formatting
scanning_scanner --> cache_memory
scanning_scanner --> core_models
scanning_scanner --> scanning_filters
scanning_scanner --> core_config
scanning_scanner --> providers_local
17 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Configuration (config.ipynb)
Configuration dataclasses for file scanning including ScanConfig, FilterConfig, and ExtensionMapping.
Import
from cjm_file_discovery.core.config import (
ExtensionMapping,
FilterConfig,
ScanConfig
)
Classes
@dataclass
class ExtensionMapping:
"Maps file extensions to FileType categories."
audio: List[str] = field(...)
video: List[str] = field(...)
image: List[str] = field(...)
document: List[str] = field(...)
code: List[str] = field(...)
data: List[str] = field(...)
archive: List[str] = field(...)
def get_type(
self,
extension: str # File extension (with or without dot)
) -> FileType: # Corresponding FileType
"Get FileType for an extension."
def build_extension_map(self) -> Dict[str, FileType]: # Mapping of extension to FileType
"""Build reverse mapping from extension to FileType."""
ext_map = {}
for ext in self.audio
"Build reverse mapping from extension to FileType."
def get_all_extensions(self) -> Set[str]: # Set of all known extensions
"Get all configured extensions."
@dataclass
class FilterConfig:
"Configuration for filtering files during discovery."
extensions: Optional[List[str]] # Include only these extensions (None = all)
exclude_extensions: Optional[List[str]] # Exclude these extensions
file_types: Optional[List[FileType]] # Include only these types (None = all)
min_size: Optional[int] # Minimum file size (bytes)
max_size: Optional[int] # Maximum file size (bytes)
exclude_patterns: List[str] = field(...) # Glob patterns to exclude
include_hidden: bool = False # Include hidden files/directories
custom_filter: Optional[Callable[[FileInfo], bool]] # Custom filter function
def matches(
self,
file_info: FileInfo, # File to check
extension_mapping: Optional[ExtensionMapping] = None # For type checking
) -> bool: # True if file passes all filters
"Check if a file matches all filter criteria."
@dataclass
class ScanConfig:
"Main configuration for file scanning operations."
directories: List[str] = field(...) # Directories to scan
recursive: bool = True # Scan subdirectories
max_depth: Optional[int] # Maximum recursion depth (None = unlimited)
follow_symlinks: bool = False # Follow symbolic links
filter_config: FilterConfig = field(...)
extension_mapping: ExtensionMapping = field(...)
cache_results: bool = True
cache_duration_seconds: int = 300 # 5 minutes default
max_results: Optional[int] # Maximum files to return (None = unlimited)
sort_by: str = 'name' # name, size, modified, type
sort_descending: bool = False
Filters (filters.ipynb)
Filter implementations for file discovery.
Import
from cjm_file_discovery.scanning.filters import (
filter_files,
sort_files,
limit_files
)
Functions
def filter_files(
files: List[FileInfo], # Files to filter
config: FilterConfig # Filter configuration
) -> List[FileInfo]: # Filtered files
"Filter a list of files using FilterConfig."
def sort_files(
files: List[FileInfo], # Files to sort
sort_by: str = "name", # Sort key: "name", "size", "modified", "type"
descending: bool = False # Sort in descending order
) -> List[FileInfo]: # Sorted files
"Sort a list of files by the specified key."
def limit_files(
files: List[FileInfo], # Files to limit
max_results: Optional[int] # Maximum number of results (None = no limit)
) -> List[FileInfo]: # Limited file list
"Limit the number of files returned."
Formatting Utilities (formatting.ipynb)
Utility functions for formatting file sizes, timestamps, and other display values.
Import
from cjm_file_discovery.utils.formatting import (
format_file_size,
format_timestamp,
matches_glob_patterns,
get_mime_type
)
Functions
def format_file_size(
size_bytes: int # Size in bytes
) -> str: # Human-readable size string (e.g., "15.2 MB")
"Format file size in human-readable format."
def format_timestamp(
timestamp: float # Unix timestamp
) -> str: # Human-readable date string
"Format timestamp to human-readable date with relative time for recent files."
def matches_glob_patterns(
path: str, # File path to check
patterns: List[str] # List of glob patterns to match against
) -> bool: # True if path matches any pattern
"Check if path matches any of the glob patterns."
def get_mime_type(
path: str # File path to check
) -> Optional[str]: # MIME type string or None if unknown
"Determine MIME type for a file based on extension."
Local Provider (local.ipynb)
Local file system discovery provider implementation.
Import
from cjm_file_discovery.providers.local import (
LocalDiscoveryProvider
)
Classes
class LocalDiscoveryProvider:
"Local file system discovery provider."
def name(self) -> str: # Provider identifier
"""Provider identifier."""
return "local"
def supports_path(
self,
path: str # Path to check
) -> bool: # True if this is a local path
"Provider identifier."
def supports_path(
self,
path: str # Path to check
) -> bool: # True if this is a local path
"Check if this provider can handle the given path."
def get_file_info(
self,
path: str, # Path to file
extension_mapping: Optional[ExtensionMapping] = None # For type detection
) -> Optional[FileInfo]: # FileInfo or None if not found
"Get metadata for a single file."
def scan(
self,
directories: List[str], # Directories to scan
config: ScanConfig # Scan configuration
) -> List[FileInfo]: # List of discovered files
"Scan directories for files."
async def scan_async(
self,
directories: List[str], # Directories to scan
config: ScanConfig # Scan configuration
) -> List[FileInfo]: # List of discovered files
"Async scan for files (runs sync scan in executor)."
Memory Cache (memory.ipynb)
In-memory caching for scan results.
Import
from cjm_file_discovery.cache.memory import (
ScanCache
)
Classes
@dataclass
class ScanCache:
"In-memory cache for scan results with time-based expiration."
duration_seconds: int = 300 # Cache duration (default 5 minutes)
_files: List[FileInfo] = field(...)
_timestamp: Optional[float] = field(...)
def is_valid(self) -> bool: # True if cache is valid and not expired
"""Check if cache is valid and not expired."""
if self._timestamp is None
"Check if cache is valid and not expired."
def get(self) -> Optional[List[FileInfo]]: # Cached files or None if invalid
"""Get cached files if valid."""
if self.is_valid()
"Get cached files if valid."
def set(
self,
files: List[FileInfo] # Files to cache
) -> None
"Update cache with new files."
def clear(self) -> None:
"""Clear the cache."""
self._files = []
self._timestamp = None
@property
def age_seconds(self) -> Optional[float]: # Age in seconds or None if not set
"Clear the cache."
def age_seconds(self) -> Optional[float]: # Age in seconds or None if not set
"""Get cache age in seconds."""
if self._timestamp is None
"Get cache age in seconds."
def file_count(self) -> int: # Number of cached files
"Get number of cached files."
Models (models.ipynb)
Core data models for file discovery including FileInfo, FileType, and DirectoryInfo.
Import
from cjm_file_discovery.core.models import (
FileType,
FileInfo,
DirectoryInfo
)
Classes
class FileType(str, Enum):
"File type categories."
@dataclass
class FileInfo:
"Metadata for a discovered file or directory."
name: str # File name with extension
path: str # Full path (provider-specific format)
is_directory: bool # True for directories
size: Optional[int] # Size in bytes
modified: Optional[float] # Last modified timestamp (Unix)
created: Optional[float] # Creation timestamp (if available)
file_type: FileType = FileType.OTHER # Categorized file type
extension: Optional[str] # File extension (without dot)
mime_type: Optional[str] # MIME type (if determinable)
provider_name: str = 'local' # Source provider identifier
metadata: Dict[str, Any] = field(...) # Provider-specific extras
def size_str(self) -> str: # Human-readable size string
"""Human-readable size string (e.g., '15.2 MB')."""
if self.size is None
"Human-readable size string (e.g., '15.2 MB')."
def modified_str(self) -> str: # Human-readable modified date
"""Human-readable modified date."""
if self.modified is None
"Human-readable modified date."
@dataclass
class DirectoryInfo:
"Metadata for a directory with optional computed statistics."
path: str # Full directory path
name: str # Directory name
item_count: Optional[int] # Number of direct children
total_size: Optional[int] # Total size of contents (bytes)
file_count: Optional[int] # Number of files (recursive)
directory_count: Optional[int] # Number of subdirectories
def total_size_str(self) -> str: # Human-readable total size
"""Human-readable total size string."""
if self.total_size is None
"Human-readable total size string."
Protocols (protocols.ipynb)
Protocol definitions for extensible file discovery providers.
Import
from cjm_file_discovery.core.protocols import (
DiscoveryProvider
)
Classes
@runtime_checkable
class DiscoveryProvider(Protocol):
"Protocol for file discovery backends."
def name(self) -> str: # Unique identifier for this provider
"""Unique identifier for this provider (e.g., 'local', 's3')."""
...
def scan(
self,
directories: List[str], # Directories to scan
config: ScanConfig # Scan configuration
) -> List[FileInfo]: # List of discovered files
"Unique identifier for this provider (e.g., 'local', 's3')."
def scan(
self,
directories: List[str], # Directories to scan
config: ScanConfig # Scan configuration
) -> List[FileInfo]: # List of discovered files
"Scan directories for files matching config."
async def scan_async(
self,
directories: List[str], # Directories to scan
config: ScanConfig # Scan configuration
) -> List[FileInfo]: # List of discovered files
"Async scan for files."
def get_file_info(
self,
path: str # Path to file
) -> Optional[FileInfo]: # FileInfo or None if not found
"Get metadata for a single file."
def supports_path(
self,
path: str # Path to check
) -> bool: # True if this provider can handle the path
"Check if this provider can handle the given path."
Scanner (scanner.ipynb)
High-level FileScanner class with caching and provider support.
Import
from cjm_file_discovery.scanning.scanner import (
FileScanner
)
Classes
class FileScanner:
def __init__(
self,
config: ScanConfig, # Scan configuration
provider: Optional[Any] = None # Discovery provider (defaults to LocalDiscoveryProvider)
)
"High-level file scanner with caching and provider support."
def __init__(
self,
config: ScanConfig, # Scan configuration
provider: Optional[Any] = None # Discovery provider (defaults to LocalDiscoveryProvider)
)
"Initialize the scanner."
def scan(
self,
force_refresh: bool = False # Force fresh scan, ignoring cache
) -> List[FileInfo]: # List of discovered files
"Scan for files, using cache if valid."
async def scan_async(
self,
force_refresh: bool = False # Force fresh scan, ignoring cache
) -> List[FileInfo]: # List of discovered files
"Async scan for files."
def get_files_by_type(
self,
file_types: List[FileType] # File types to filter by
) -> List[FileInfo]: # Filtered files
"Get files filtered by specific file types."
def clear_cache(self) -> None:
"""Clear the scan cache."""
self._cache.clear()
@property
def cache_valid(self) -> bool: # True if cache is valid
"Clear the scan cache."
def cache_valid(self) -> bool: # True if cache is valid
"""Check if cache is valid."""
return self._cache.is_valid()
@property
def cached_file_count(self) -> int: # Number of cached files
"Check if cache is valid."
def cached_file_count(self) -> int: # Number of cached files
"""Get number of cached files."""
return self._cache.file_count
def get_summary(self) -> Dict[str, Any]: # Summary statistics
"Get number of cached files."
def get_summary(self) -> Dict[str, Any]: # Summary statistics
"""Get summary statistics for scanned files."""
files = self.scan()
# Total stats
total_size = sum(f.size for f in files if f.size is not None)
# By type breakdown
by_type: Dict[str, int] = {}
"Get summary statistics for scanned files."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_file_discovery-0.0.1.tar.gz.
File metadata
- Download URL: cjm_file_discovery-0.0.1.tar.gz
- Upload date:
- Size: 22.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a8ee62e31b887d48670b6e6353ce01806c759a913e40c4f3e7690bff17ada2ec
|
|
| MD5 |
25fdc6abe6f81785a2aba660c12387ec
|
|
| BLAKE2b-256 |
e370837aa2495dc072b69d506e2555a3a077e1451918c6ffbac04249db1c51d1
|
File details
Details for the file cjm_file_discovery-0.0.1-py3-none-any.whl.
File metadata
- Download URL: cjm_file_discovery-0.0.1-py3-none-any.whl
- Upload date:
- Size: 22.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ad71d2c07116ff806d9ec42338215967c24d16e378c3ceedd19412c4df6faa6
|
|
| MD5 |
f48ababa88d43d6a5a819f6fd9991278
|
|
| BLAKE2b-256 |
19d759a482197aebb1ed4ca45631ec7355221158c1c93b0a3183f39fa0494152
|