Skip to main content

Pure Python library for batch file scanning with configurable filtering, metadata extraction, caching, and extensible provider support.

Project description

cjm-file-discovery

Install

pip install cjm_file_discovery

Project Structure

nbs/
├── cache/ (1)
│   └── memory.ipynb  # In-memory caching for scan results.
├── core/ (3)
│   ├── config.ipynb     # Configuration dataclasses for file scanning including ScanConfig, FilterConfig, and ExtensionMapping.
│   ├── models.ipynb     # Core data models for file discovery including FileInfo, FileType, and DirectoryInfo.
│   └── protocols.ipynb  # Protocol definitions for extensible file discovery providers.
├── providers/ (1)
│   └── local.ipynb  # Local file system discovery provider implementation.
├── scanning/ (2)
│   ├── filters.ipynb  # Filter implementations for file discovery.
│   └── scanner.ipynb  # High-level FileScanner class with caching and provider support.
└── utils/ (1)
    └── formatting.ipynb  # Utility functions for formatting file sizes, timestamps, and other display values.

Total: 8 notebooks across 5 directories

Module Dependencies

graph LR
    cache_memory[cache.memory<br/>Memory Cache]
    core_config[core.config<br/>Configuration]
    core_models[core.models<br/>Models]
    core_protocols[core.protocols<br/>Protocols]
    providers_local[providers.local<br/>Local Provider]
    scanning_filters[scanning.filters<br/>Filters]
    scanning_scanner[scanning.scanner<br/>Scanner]
    utils_formatting[utils.formatting<br/>Formatting Utilities]

    cache_memory --> core_models
    core_config --> core_models
    core_models --> utils_formatting
    core_protocols --> core_config
    core_protocols --> core_models
    providers_local --> core_config
    providers_local --> core_protocols
    providers_local --> core_models
    providers_local --> utils_formatting
    scanning_filters --> core_config
    scanning_filters --> core_models
    scanning_scanner --> utils_formatting
    scanning_scanner --> cache_memory
    scanning_scanner --> core_models
    scanning_scanner --> scanning_filters
    scanning_scanner --> core_config
    scanning_scanner --> providers_local

17 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Configuration (config.ipynb)

Configuration dataclasses for file scanning including ScanConfig, FilterConfig, and ExtensionMapping.

Import

from cjm_file_discovery.core.config import (
    ExtensionMapping,
    FilterConfig,
    ScanConfig
)

Classes

@dataclass
class ExtensionMapping:
    "Maps file extensions to FileType categories."
    
    audio: List[str] = field(...)
    video: List[str] = field(...)
    image: List[str] = field(...)
    document: List[str] = field(...)
    code: List[str] = field(...)
    data: List[str] = field(...)
    archive: List[str] = field(...)
    
    def get_type(
            self,
            extension: str  # File extension (with or without dot)
        ) -> FileType:  # Corresponding FileType
        "Get FileType for an extension."
    
    def build_extension_map(self) -> Dict[str, FileType]:  # Mapping of extension to FileType
            """Build reverse mapping from extension to FileType."""
            ext_map = {}
            for ext in self.audio
        "Build reverse mapping from extension to FileType."
    
    def get_all_extensions(self) -> Set[str]:  # Set of all known extensions
        "Get all configured extensions."
@dataclass
class FilterConfig:
    "Configuration for filtering files during discovery."
    
    extensions: Optional[List[str]]  # Include only these extensions (None = all)
    exclude_extensions: Optional[List[str]]  # Exclude these extensions
    file_types: Optional[List[FileType]]  # Include only these types (None = all)
    min_size: Optional[int]  # Minimum file size (bytes)
    max_size: Optional[int]  # Maximum file size (bytes)
    exclude_patterns: List[str] = field(...)  # Glob patterns to exclude
    include_hidden: bool = False  # Include hidden files/directories
    custom_filter: Optional[Callable[[FileInfo], bool]]  # Custom filter function
    
    def matches(
            self,
            file_info: FileInfo,  # File to check
            extension_mapping: Optional[ExtensionMapping] = None  # For type checking
        ) -> bool:  # True if file passes all filters
        "Check if a file matches all filter criteria."
@dataclass
class ScanConfig:
    "Main configuration for file scanning operations."
    
    directories: List[str] = field(...)  # Directories to scan
    recursive: bool = True  # Scan subdirectories
    max_depth: Optional[int]  # Maximum recursion depth (None = unlimited)
    follow_symlinks: bool = False  # Follow symbolic links
    filter_config: FilterConfig = field(...)
    extension_mapping: ExtensionMapping = field(...)
    cache_results: bool = True
    cache_duration_seconds: int = 300  # 5 minutes default
    max_results: Optional[int]  # Maximum files to return (None = unlimited)
    sort_by: str = 'name'  # name, size, modified, type
    sort_descending: bool = False

Filters (filters.ipynb)

Filter implementations for file discovery.

Import

from cjm_file_discovery.scanning.filters import (
    filter_files,
    sort_files,
    limit_files
)

Functions

def filter_files(
    files: List[FileInfo],      # Files to filter
    config: FilterConfig        # Filter configuration
) -> List[FileInfo]:  # Filtered files
    "Filter a list of files using FilterConfig."
def sort_files(
    files: List[FileInfo],      # Files to sort
    sort_by: str = "name",      # Sort key: "name", "size", "modified", "type"
    descending: bool = False    # Sort in descending order
) -> List[FileInfo]:  # Sorted files
    "Sort a list of files by the specified key."
def limit_files(
    files: List[FileInfo],       # Files to limit
    max_results: Optional[int]   # Maximum number of results (None = no limit)
) -> List[FileInfo]:  # Limited file list
    "Limit the number of files returned."

Formatting Utilities (formatting.ipynb)

Utility functions for formatting file sizes, timestamps, and other display values.

Import

from cjm_file_discovery.utils.formatting import (
    format_file_size,
    format_timestamp,
    matches_glob_patterns,
    get_mime_type
)

Functions

def format_file_size(
    size_bytes: int  # Size in bytes
) -> str:  # Human-readable size string (e.g., "15.2 MB")
    "Format file size in human-readable format."
def format_timestamp(
    timestamp: float  # Unix timestamp
) -> str:  # Human-readable date string
    "Format timestamp to human-readable date with relative time for recent files."
def matches_glob_patterns(
    path: str,  # File path to check
    patterns: List[str]  # List of glob patterns to match against
) -> bool:  # True if path matches any pattern
    "Check if path matches any of the glob patterns."
def get_mime_type(
    path: str  # File path to check
) -> Optional[str]:  # MIME type string or None if unknown
    "Determine MIME type for a file based on extension."

Local Provider (local.ipynb)

Local file system discovery provider implementation.

Import

from cjm_file_discovery.providers.local import (
    LocalDiscoveryProvider
)

Classes

class LocalDiscoveryProvider:
    "Local file system discovery provider."
    
    def name(self) -> str:  # Provider identifier
            """Provider identifier."""
            return "local"
    
        def supports_path(
            self,
            path: str  # Path to check
        ) -> bool:  # True if this is a local path
        "Provider identifier."
    
    def supports_path(
            self,
            path: str  # Path to check
        ) -> bool:  # True if this is a local path
        "Check if this provider can handle the given path."
    
    def get_file_info(
            self,
            path: str,  # Path to file
            extension_mapping: Optional[ExtensionMapping] = None  # For type detection
        ) -> Optional[FileInfo]:  # FileInfo or None if not found
        "Get metadata for a single file."
    
    def scan(
            self,
            directories: List[str],  # Directories to scan
            config: ScanConfig       # Scan configuration
        ) -> List[FileInfo]:  # List of discovered files
        "Scan directories for files."
    
    async def scan_async(
            self,
            directories: List[str],  # Directories to scan
            config: ScanConfig       # Scan configuration
        ) -> List[FileInfo]:  # List of discovered files
        "Async scan for files (runs sync scan in executor)."

Memory Cache (memory.ipynb)

In-memory caching for scan results.

Import

from cjm_file_discovery.cache.memory import (
    ScanCache
)

Classes

@dataclass
class ScanCache:
    "In-memory cache for scan results with time-based expiration."
    
    duration_seconds: int = 300  # Cache duration (default 5 minutes)
    _files: List[FileInfo] = field(...)
    _timestamp: Optional[float] = field(...)
    
    def is_valid(self) -> bool:  # True if cache is valid and not expired
            """Check if cache is valid and not expired."""
            if self._timestamp is None
        "Check if cache is valid and not expired."
    
    def get(self) -> Optional[List[FileInfo]]:  # Cached files or None if invalid
            """Get cached files if valid."""
            if self.is_valid()
        "Get cached files if valid."
    
    def set(
            self,
            files: List[FileInfo]  # Files to cache
        ) -> None
        "Update cache with new files."
    
    def clear(self) -> None:
            """Clear the cache."""
            self._files = []
            self._timestamp = None
    
        @property
        def age_seconds(self) -> Optional[float]:  # Age in seconds or None if not set
        "Clear the cache."
    
    def age_seconds(self) -> Optional[float]:  # Age in seconds or None if not set
            """Get cache age in seconds."""
            if self._timestamp is None
        "Get cache age in seconds."
    
    def file_count(self) -> int:  # Number of cached files
        "Get number of cached files."

Models (models.ipynb)

Core data models for file discovery including FileInfo, FileType, and DirectoryInfo.

Import

from cjm_file_discovery.core.models import (
    FileType,
    FileInfo,
    DirectoryInfo
)

Classes

class FileType(str, Enum):
    "File type categories."
@dataclass
class FileInfo:
    "Metadata for a discovered file or directory."
    
    name: str  # File name with extension
    path: str  # Full path (provider-specific format)
    is_directory: bool  # True for directories
    size: Optional[int]  # Size in bytes
    modified: Optional[float]  # Last modified timestamp (Unix)
    created: Optional[float]  # Creation timestamp (if available)
    file_type: FileType = FileType.OTHER  # Categorized file type
    extension: Optional[str]  # File extension (without dot)
    mime_type: Optional[str]  # MIME type (if determinable)
    provider_name: str = 'local'  # Source provider identifier
    metadata: Dict[str, Any] = field(...)  # Provider-specific extras
    
    def size_str(self) -> str:  # Human-readable size string
            """Human-readable size string (e.g., '15.2 MB')."""
            if self.size is None
        "Human-readable size string (e.g., '15.2 MB')."
    
    def modified_str(self) -> str:  # Human-readable modified date
            """Human-readable modified date."""
            if self.modified is None
        "Human-readable modified date."
@dataclass
class DirectoryInfo:
    "Metadata for a directory with optional computed statistics."
    
    path: str  # Full directory path
    name: str  # Directory name
    item_count: Optional[int]  # Number of direct children
    total_size: Optional[int]  # Total size of contents (bytes)
    file_count: Optional[int]  # Number of files (recursive)
    directory_count: Optional[int]  # Number of subdirectories
    
    def total_size_str(self) -> str:  # Human-readable total size
            """Human-readable total size string."""
            if self.total_size is None
        "Human-readable total size string."

Protocols (protocols.ipynb)

Protocol definitions for extensible file discovery providers.

Import

from cjm_file_discovery.core.protocols import (
    DiscoveryProvider
)

Classes

@runtime_checkable
class DiscoveryProvider(Protocol):
    "Protocol for file discovery backends."
    
    def name(self) -> str:  # Unique identifier for this provider
            """Unique identifier for this provider (e.g., 'local', 's3')."""
            ...
    
        def scan(
            self,
            directories: List[str],  # Directories to scan
            config: ScanConfig        # Scan configuration
        ) -> List[FileInfo]:  # List of discovered files
        "Unique identifier for this provider (e.g., 'local', 's3')."
    
    def scan(
            self,
            directories: List[str],  # Directories to scan
            config: ScanConfig        # Scan configuration
        ) -> List[FileInfo]:  # List of discovered files
        "Scan directories for files matching config."
    
    async def scan_async(
            self,
            directories: List[str],  # Directories to scan
            config: ScanConfig        # Scan configuration
        ) -> List[FileInfo]:  # List of discovered files
        "Async scan for files."
    
    def get_file_info(
            self,
            path: str  # Path to file
        ) -> Optional[FileInfo]:  # FileInfo or None if not found
        "Get metadata for a single file."
    
    def supports_path(
            self,
            path: str  # Path to check
        ) -> bool:  # True if this provider can handle the path
        "Check if this provider can handle the given path."

Scanner (scanner.ipynb)

High-level FileScanner class with caching and provider support.

Import

from cjm_file_discovery.scanning.scanner import (
    FileScanner
)

Classes

class FileScanner:
    def __init__(
        self,
        config: ScanConfig,  # Scan configuration
        provider: Optional[Any] = None  # Discovery provider (defaults to LocalDiscoveryProvider)
    )
    "High-level file scanner with caching and provider support."
    
    def __init__(
            self,
            config: ScanConfig,  # Scan configuration
            provider: Optional[Any] = None  # Discovery provider (defaults to LocalDiscoveryProvider)
        )
        "Initialize the scanner."
    
    def scan(
            self,
            force_refresh: bool = False  # Force fresh scan, ignoring cache
        ) -> List[FileInfo]:  # List of discovered files
        "Scan for files, using cache if valid."
    
    async def scan_async(
            self,
            force_refresh: bool = False  # Force fresh scan, ignoring cache
        ) -> List[FileInfo]:  # List of discovered files
        "Async scan for files."
    
    def get_files_by_type(
            self,
            file_types: List[FileType]  # File types to filter by
        ) -> List[FileInfo]:  # Filtered files
        "Get files filtered by specific file types."
    
    def clear_cache(self) -> None:
            """Clear the scan cache."""
            self._cache.clear()
    
        @property
        def cache_valid(self) -> bool:  # True if cache is valid
        "Clear the scan cache."
    
    def cache_valid(self) -> bool:  # True if cache is valid
            """Check if cache is valid."""
            return self._cache.is_valid()
    
        @property
        def cached_file_count(self) -> int:  # Number of cached files
        "Check if cache is valid."
    
    def cached_file_count(self) -> int:  # Number of cached files
            """Get number of cached files."""
            return self._cache.file_count
    
        def get_summary(self) -> Dict[str, Any]:  # Summary statistics
        "Get number of cached files."
    
    def get_summary(self) -> Dict[str, Any]:  # Summary statistics
            """Get summary statistics for scanned files."""
            files = self.scan()
    
            # Total stats
            total_size = sum(f.size for f in files if f.size is not None)
    
            # By type breakdown
            by_type: Dict[str, int] = {}
        "Get summary statistics for scanned files."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_file_discovery-0.0.1.tar.gz (22.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_file_discovery-0.0.1-py3-none-any.whl (22.7 kB view details)

Uploaded Python 3

File details

Details for the file cjm_file_discovery-0.0.1.tar.gz.

File metadata

  • Download URL: cjm_file_discovery-0.0.1.tar.gz
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for cjm_file_discovery-0.0.1.tar.gz
Algorithm Hash digest
SHA256 a8ee62e31b887d48670b6e6353ce01806c759a913e40c4f3e7690bff17ada2ec
MD5 25fdc6abe6f81785a2aba660c12387ec
BLAKE2b-256 e370837aa2495dc072b69d506e2555a3a077e1451918c6ffbac04249db1c51d1

See more details on using hashes here.

File details

Details for the file cjm_file_discovery-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_file_discovery-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8ad71d2c07116ff806d9ec42338215967c24d16e378c3ceedd19412c4df6faa6
MD5 f48ababa88d43d6a5a819f6fd9991278
BLAKE2b-256 19d759a482197aebb1ed4ca45631ec7355221158c1c93b0a3183f39fa0494152

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page