Skip to content

SSF Tools - Analyze Module Architecture & Design Patterns

Overview

The analyze module provides security analysis services for entropy calculation and credential detection in files. This module implements PCI SSF 2.3 compliance requirements through protocol-based services that use dependency injection for testability and maintainability.

Architectural Principles

Design Goals

  • Protocol-Based Design: Define clear contracts through protocols for all services
  • External Tool Integration: Use proven tools like detect-secrets for credential detection
  • Streaming Architecture: Process large files with minimal memory usage
  • Type Safety: Full type annotation coverage with MyPy compliance
  • Dependency Injection: Services with clear separation of concerns

Key Benefits

  • Security Compliance: Meet PCI SSF 2.3 requirements for credential detection
  • Performance: Stream processing for large files with Excel export capabilities
  • Maintainability: Protocol-based design enables easy testing and extension
  • Integration: Seamless integration with external security tools
  • User Experience: CLI with progress feedback and detailed reporting

Architecture Overview

graph TD subgraph "CLI Layer" CLI[analyze.py Commands] CLI --> ENT[entropy command] CLI --> CRED[credentials command] end subgraph "Service Layer" ENT --> EA[EntropyAnalyzer] CRED --> DS[DetectSecretsCredentialService] DS --> |subprocess.run| EXT[detect-secrets tool] end subgraph "Protocol Layer" EA -.implements.-> EAP[EntropyAnalyzerProtocol] DS -.implements.-> CDP[CredentialDetectionProtocol] end subgraph "Shared Services" FS[FileDiscoveryService] RO[RichOutputService] TS[TimestampService] EXP[ExcelExportService] end EA --> FS EA --> RO DS --> FS DS --> RO DS --> TS CLI --> EXP subgraph "Container System" AC[ApplicationContainer] AC --> EA AC --> DS AC --> FS AC --> RO AC --> TS AC --> EXP end

Protocol Definitions

Core Analysis Protocols

The analyze module uses protocol-based design to define clear contracts:

kp_ssf_tools.analyze.services.interfaces.EntropyAnalyzerProtocol

Bases: Protocol

Protocol for Shannon entropy calculation and analysis.

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
@runtime_checkable
class EntropyAnalyzerProtocol(Protocol):
    """Protocol for Shannon entropy calculation and analysis."""

    def calculate_entropy(self, data: bytes) -> float:
        """
        Calculate Shannon entropy for data.

        Args:
            data: Data to analyze

        Returns:
            Shannon entropy in bits per byte (0.0-8.0)

        """
        ...

    def analyze_sliding_window(
        self,
        data: bytes,
        window_size: int,
        step_size: int,
    ) -> list[EntropyRegion]:
        """
        Perform sliding window entropy analysis.

        Args:
            data: Data to analyze
            window_size: Size of analysis window in bytes
            step_size: Step size for sliding window

        Returns:
            List of entropy regions with analysis results

        """
        ...

    def analyze_file_entropy(
        self,
        file_path: Path,
        *,
        analysis_block_size: int,
        step_size: int,
        file_chunk_size: int,
        force_file_type: FileType | None = None,
    ) -> FileAnalysisResult:
        """
        Analyze entropy of a complete file.

        Args:
            file_path: Path to file to analyze
            analysis_block_size: Size of analysis blocks in bytes (from config)
            step_size: Step size for sliding window (from config)
            file_chunk_size: Size of file I/O chunks in bytes (from config)
            force_file_type: Override automatic file type detection

        Returns:
            Complete file analysis result

        """
        ...

Functions

analyze_file_entropy(file_path, *, analysis_block_size, step_size, file_chunk_size, force_file_type=None)

Analyze entropy of a complete file.

Parameters:

Name Type Description Default
file_path Path

Path to file to analyze

required
analysis_block_size int

Size of analysis blocks in bytes (from config)

required
step_size int

Step size for sliding window (from config)

required
file_chunk_size int

Size of file I/O chunks in bytes (from config)

required
force_file_type FileType | None

Override automatic file type detection

None

Returns:

Type Description
FileAnalysisResult

Complete file analysis result

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def analyze_file_entropy(
    self,
    file_path: Path,
    *,
    analysis_block_size: int,
    step_size: int,
    file_chunk_size: int,
    force_file_type: FileType | None = None,
) -> FileAnalysisResult:
    """
    Analyze entropy of a complete file.

    Args:
        file_path: Path to file to analyze
        analysis_block_size: Size of analysis blocks in bytes (from config)
        step_size: Step size for sliding window (from config)
        file_chunk_size: Size of file I/O chunks in bytes (from config)
        force_file_type: Override automatic file type detection

    Returns:
        Complete file analysis result

    """
    ...

analyze_sliding_window(data, window_size, step_size)

Perform sliding window entropy analysis.

Parameters:

Name Type Description Default
data bytes

Data to analyze

required
window_size int

Size of analysis window in bytes

required
step_size int

Step size for sliding window

required

Returns:

Type Description
list[EntropyRegion]

List of entropy regions with analysis results

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def analyze_sliding_window(
    self,
    data: bytes,
    window_size: int,
    step_size: int,
) -> list[EntropyRegion]:
    """
    Perform sliding window entropy analysis.

    Args:
        data: Data to analyze
        window_size: Size of analysis window in bytes
        step_size: Step size for sliding window

    Returns:
        List of entropy regions with analysis results

    """
    ...

calculate_entropy(data)

Calculate Shannon entropy for data.

Parameters:

Name Type Description Default
data bytes

Data to analyze

required

Returns:

Type Description
float

Shannon entropy in bits per byte (0.0-8.0)

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def calculate_entropy(self, data: bytes) -> float:
    """
    Calculate Shannon entropy for data.

    Args:
        data: Data to analyze

    Returns:
        Shannon entropy in bits per byte (0.0-8.0)

    """
    ...

kp_ssf_tools.analyze.services.interfaces.CredentialDetectionProtocol

Bases: Protocol

Protocol for credential detection services that scan for sensitive information.

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
@runtime_checkable
class CredentialDetectionProtocol(Protocol):
    """Protocol for credential detection services that scan for sensitive information."""

    def scan_file(
        self,
        file_path: Path,
        options: CredentialScanOptions | None = None,
    ) -> list[CryptoStructure]:
        """
        Scan a single file for credential patterns.

        Args:
            file_path: Path to file to scan
            options: Optional scanning configuration

        Returns:
            List of detected credential structures

        """
        ...

    def scan_directory(
        self,
        directory_path: Path,
        options: CredentialScanOptions | None = None,
    ) -> dict[Path, list[CryptoStructure]]:
        """
        Scan a directory recursively for credential patterns.

        Args:
            directory_path: Path to directory to scan
            options: Optional scanning configuration

        Returns:
            Dictionary mapping file paths to detected credentials

        """
        ...

    def analyze_files(
        self,
        target_paths: list[Path],
        config: dict[str, dict[str, object]],
        options: CredentialScanOptions | None = None,
    ) -> CredentialAnalysisResult:
        """
        Analyze files for credential patterns.

        Args:
            target_paths: List of paths to analyze
            config: Analysis configuration
            options: Optional scanning configuration

        Returns:
            Analysis result with detected credentials

        """
        ...

    def get_supported_patterns(self) -> list[str]:
        """
        Get list of supported credential patterns.

        Returns:
            List of pattern names/types this detector supports

        """
        ...

Functions

analyze_files(target_paths, config, options=None)

Analyze files for credential patterns.

Parameters:

Name Type Description Default
target_paths list[Path]

List of paths to analyze

required
config dict[str, dict[str, object]]

Analysis configuration

required
options CredentialScanOptions | None

Optional scanning configuration

None

Returns:

Type Description
CredentialAnalysisResult

Analysis result with detected credentials

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def analyze_files(
    self,
    target_paths: list[Path],
    config: dict[str, dict[str, object]],
    options: CredentialScanOptions | None = None,
) -> CredentialAnalysisResult:
    """
    Analyze files for credential patterns.

    Args:
        target_paths: List of paths to analyze
        config: Analysis configuration
        options: Optional scanning configuration

    Returns:
        Analysis result with detected credentials

    """
    ...

get_supported_patterns()

Get list of supported credential patterns.

Returns:

Type Description
list[str]

List of pattern names/types this detector supports

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def get_supported_patterns(self) -> list[str]:
    """
    Get list of supported credential patterns.

    Returns:
        List of pattern names/types this detector supports

    """
    ...

scan_directory(directory_path, options=None)

Scan a directory recursively for credential patterns.

Parameters:

Name Type Description Default
directory_path Path

Path to directory to scan

required
options CredentialScanOptions | None

Optional scanning configuration

None

Returns:

Type Description
dict[Path, list[CryptoStructure]]

Dictionary mapping file paths to detected credentials

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def scan_directory(
    self,
    directory_path: Path,
    options: CredentialScanOptions | None = None,
) -> dict[Path, list[CryptoStructure]]:
    """
    Scan a directory recursively for credential patterns.

    Args:
        directory_path: Path to directory to scan
        options: Optional scanning configuration

    Returns:
        Dictionary mapping file paths to detected credentials

    """
    ...

scan_file(file_path, options=None)

Scan a single file for credential patterns.

Parameters:

Name Type Description Default
file_path Path

Path to file to scan

required
options CredentialScanOptions | None

Optional scanning configuration

None

Returns:

Type Description
list[CryptoStructure]

List of detected credential structures

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def scan_file(
    self,
    file_path: Path,
    options: CredentialScanOptions | None = None,
) -> list[CryptoStructure]:
    """
    Scan a single file for credential patterns.

    Args:
        file_path: Path to file to scan
        options: Optional scanning configuration

    Returns:
        List of detected credential structures

    """
    ...

Supporting Protocols

kp_ssf_tools.analyze.services.interfaces.FileTypeClassifierProtocol

Bases: Protocol

Protocol for file type detection and classification.

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
@runtime_checkable
class FileTypeClassifierProtocol(Protocol):
    """Protocol for file type detection and classification."""

    def classify_file(self, file_path: Path) -> tuple[FileType, str | None]:
        """
        Classify file type and detect programming language.

        Args:
            file_path: Path to the file to classify

        Returns:
            Tuple of (FileType, programming_language_or_None)

        """
        ...

    def load_file_content(self, file_path: Path) -> bytes:
        """
        Load file content for analysis.

        Args:
            file_path: Path to the file to load

        Returns:
            File content as bytes

        """
        ...

Functions

classify_file(file_path)

Classify file type and detect programming language.

Parameters:

Name Type Description Default
file_path Path

Path to the file to classify

required

Returns:

Type Description
tuple[FileType, str | None]

Tuple of (FileType, programming_language_or_None)

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def classify_file(self, file_path: Path) -> tuple[FileType, str | None]:
    """
    Classify file type and detect programming language.

    Args:
        file_path: Path to the file to classify

    Returns:
        Tuple of (FileType, programming_language_or_None)

    """
    ...

load_file_content(file_path)

Load file content for analysis.

Parameters:

Name Type Description Default
file_path Path

Path to the file to load

required

Returns:

Type Description
bytes

File content as bytes

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def load_file_content(self, file_path: Path) -> bytes:
    """
    Load file content for analysis.

    Args:
        file_path: Path to the file to load

    Returns:
        File content as bytes

    """
    ...

kp_ssf_tools.analyze.services.interfaces.ThresholdProviderProtocol

Bases: Protocol

Protocol for content-aware entropy threshold management.

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
@runtime_checkable
class ThresholdProviderProtocol(Protocol):
    """Protocol for content-aware entropy threshold management."""

    def get_thresholds(self, file_type: FileType) -> ContentAwareThresholds:
        """
        Get entropy thresholds for specific file type.

        Args:
            file_type: The detected file type

        Returns:
            ContentAwareThresholds model with all threshold values

        """
        ...

    def classify_entropy_level(
        self,
        entropy: float,
        file_type: FileType,
    ) -> EntropyLevel:
        """
        Classify entropy level based on content-aware thresholds.

        Args:
            entropy: Shannon entropy value
            file_type: The detected file type

        Returns:
            Entropy level classification enum

        """
        ...

Functions

classify_entropy_level(entropy, file_type)

Classify entropy level based on content-aware thresholds.

Parameters:

Name Type Description Default
entropy float

Shannon entropy value

required
file_type FileType

The detected file type

required

Returns:

Type Description
EntropyLevel

Entropy level classification enum

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def classify_entropy_level(
    self,
    entropy: float,
    file_type: FileType,
) -> EntropyLevel:
    """
    Classify entropy level based on content-aware thresholds.

    Args:
        entropy: Shannon entropy value
        file_type: The detected file type

    Returns:
        Entropy level classification enum

    """
    ...

get_thresholds(file_type)

Get entropy thresholds for specific file type.

Parameters:

Name Type Description Default
file_type FileType

The detected file type

required

Returns:

Type Description
ContentAwareThresholds

ContentAwareThresholds model with all threshold values

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
def get_thresholds(self, file_type: FileType) -> ContentAwareThresholds:
    """
    Get entropy thresholds for specific file type.

    Args:
        file_type: The detected file type

    Returns:
        ContentAwareThresholds model with all threshold values

    """
    ...

Credential Detection Implementation

DetectSecretsCredentialService

The credential detection service integrates with the industry-standard detect-secrets tool through subprocess execution:

kp_ssf_tools.analyze.services.detect_secrets_service.DetectSecretsCredentialService

Credential detection service using detect-secrets as backend.

Source code in src\kp_ssf_tools\analyze\services\detect_secrets_service.py
class DetectSecretsCredentialService:
    """Credential detection service using detect-secrets as backend."""

    def __init__(
        self,
        rich_output: RichOutputService,
        timestamp_service: TimestampProtocol,
        file_discovery: FileDiscoveryService,
        file_processing: FileProcessingService,
    ) -> None:
        """
        Initialize the detect-secrets credential detection service.

        Args:
            rich_output: Service for displaying progress and results
            timestamp_service: Service for timestamp operations
            file_discovery: Service for discovering files to analyze
            file_processing: Service for file type detection and processing

        """
        self.rich_output: RichOutputService = rich_output
        self.timestamp: TimestampProtocol = timestamp_service
        self.file_discovery: FileDiscoveryService = file_discovery
        self.file_processing: FileProcessingService = file_processing

    def analyze_files(
        self,
        target_paths: list[Path],
        config: dict[str, Any],
        options: CredentialScanOptions,
    ) -> CredentialAnalysisResult:
        """
        Analyze files using detect-secrets and return results in existing format.

        Args:
            target_paths: List of file or directory paths to analyze
            config: Analysis configuration
            options: Scanning options and parameters

        Returns:
            Analysis result containing detected patterns

        """
        self.rich_output.info("Starting detect-secrets credential analysis...")

        # Run detect-secrets scan and get JSON output directly
        secrets_data = self._run_scan(target_paths, config, options)

        # Convert detect-secrets results to our format
        patterns = self._convert_to_patterns(secrets_data, options.context_lines)

        # Extract all processed files from detect-secrets results
        processed_files = [
            Path(file_path) for file_path in secrets_data.get("results", {})
        ]

        # Return single file result or first target if multiple
        primary_target = target_paths[0] if target_paths else Path()

        return CredentialAnalysisResult(
            file_path=primary_target,
            patterns=patterns,
            total_patterns=len(patterns),
            processed_files=processed_files,
        )

    def _run_scan(
        self,
        target_paths: list[Path],
        config: dict[str, Any],
        options: CredentialScanOptions,
    ) -> dict[str, Any]:
        """Run detect-secrets scan and return JSON results."""
        # Build base command for direct JSON output
        cmd = ["detect-secrets", "scan"]

        # Add configuration-based options
        cmd.extend(self._build_config_options(config))

        # Add target paths
        cmd.extend(self._build_target_options(target_paths, options))

        # Execute detect-secrets scan and capture JSON output
        return self._execute_scan_command(cmd)

    def _build_config_options(self, config: dict[str, Any]) -> list[str]:
        """Build configuration options for detect-secrets command."""
        options = []
        credential_config = config.get("credentials", {})

        # Configure entropy limits if specified
        if "entropy_limits" in credential_config:
            limits = credential_config["entropy_limits"]
            if "base64" in limits:
                options.extend(["--base64-limit", str(limits["base64"])])
            if "hex" in limits:
                options.extend(["--hex-limit", str(limits["hex"])])

        # Add exclude patterns if configured
        if "exclude_patterns" in credential_config:
            patterns = credential_config["exclude_patterns"]
            if "files" in patterns:
                options.extend(["--exclude-files", patterns["files"]])
            if "lines" in patterns:
                options.extend(["--exclude-lines", patterns["lines"]])
            if "secrets" in patterns:
                options.extend(["--exclude-secrets", patterns["secrets"]])

        # Add word list if configured
        if "word_list_path" in credential_config:
            word_list_path = Path(credential_config["word_list_path"])
            if word_list_path.exists():
                options.extend(["--word-list", str(word_list_path)])

        return options

    def _build_target_options(
        self,
        target_paths: list[Path],
        options: CredentialScanOptions,
    ) -> list[str]:
        """Build target path options for detect-secrets command."""
        cmd_options = []

        for target_path in target_paths:
            if target_path.is_dir() and not options.recursive:
                # For non-recursive directory scanning
                cmd_options.append("--all-files")
            cmd_options.append(str(target_path))

        return cmd_options

    def _execute_scan_command(self, cmd: list[str]) -> dict[str, Any]:
        """Execute the detect-secrets scan command safely and return JSON results."""
        # Validate command for security - ensure it starts with detect-secrets
        if not cmd or cmd[0] != "detect-secrets":
            error_msg = "Invalid command: must start with 'detect-secrets'"
            self.rich_output.error(error_msg)
            raise ValueError(error_msg)

        try:
            self.rich_output.debug(f"Running: {' '.join(cmd)}")
            # Security: Command is constructed internally with validated components
            result = subprocess.run(  # noqa: S603
                cmd,
                capture_output=True,
                text=True,
                check=False,  # Don't raise on non-zero exit (normal for secrets found)
                cwd=Path.cwd(),
                timeout=300,  # 5 minute timeout for safety
            )

            if result.returncode not in (0, 1):  # 0=no secrets, 1=secrets found
                error_msg = f"detect-secrets failed: {result.stderr}"
                self.rich_output.error(error_msg)
                raise RuntimeError(error_msg)

            self.rich_output.debug(
                f"detect-secrets scan completed with exit code {result.returncode}",
            )

            # Parse JSON output from stdout
            try:
                return json.loads(result.stdout)
            except json.JSONDecodeError as e:
                self.rich_output.error(
                    f"Failed to parse detect-secrets JSON output: {e}",
                )
                return {"results": {}}

        except FileNotFoundError as e:
            error_msg = (
                "detect-secrets not found. Please install: pip install detect-secrets"
            )
            self.rich_output.error(error_msg)
            raise RuntimeError(error_msg) from e
        except subprocess.TimeoutExpired as e:
            error_msg = "detect-secrets scan timed out after 5 minutes"
            self.rich_output.error(error_msg)
            raise RuntimeError(error_msg) from e

    def _parse_baseline(self, baseline_file: Path) -> dict[str, Any]:
        """Parse the detect-secrets baseline JSON file."""
        try:
            with baseline_file.open(encoding="utf-8") as f:
                return json.load(f)
        except (json.JSONDecodeError, FileNotFoundError) as e:
            self.rich_output.error(f"Failed to parse baseline file: {e}")
            return {"results": {}}

    def _convert_to_patterns(
        self,
        secrets_data: dict[str, Any],
        context_lines: int,
    ) -> list[CredentialPattern]:
        """Convert detect-secrets results to CredentialPattern objects."""
        patterns: list[CredentialPattern] = []

        results = secrets_data.get("results", {})

        for file_path_str, file_secrets in results.items():
            file_path = Path(file_path_str)

            for secret in file_secrets:
                detector_type = secret.get("type", "unknown")
                risk_level = self._determine_risk_level_from_detector(
                    detector_type,
                    secret,
                )

                # Extract context lines around the secret
                context_lines_data = self._extract_context(
                    file_path,
                    secret,
                    context_lines,
                )
                context_before = (
                    "\n".join(context_lines_data[:context_lines])
                    if context_lines_data
                    else ""
                )
                context_after = (
                    "\n".join(context_lines_data[context_lines + 1 :])
                    if context_lines_data
                    else ""
                )

                pattern = CredentialPattern(
                    # BinaryLocationMixin fields
                    offset=0,  # detect-secrets doesn't provide byte offset
                    size=len(
                        secret.get("hashed_secret", ""),
                    ),  # Use hash length as approximation
                    confidence=1.0,  # detect-secrets results are high confidence
                    # TextLocationMixin fields
                    line_start=secret.get("line_number", 0),
                    line_end=secret.get("line_number", 0),
                    column_start=None,  # detect-secrets doesn't provide column info
                    column_end=None,
                    # CredentialLocationMixin fields
                    context_before=context_before,
                    context_after=context_after,
                    # DetectedCredential fields
                    pattern_type=detector_type,  # Use detector type directly as pattern type
                    risk_level=risk_level,
                    value=f"[DETECTED:{secret.get('type', 'SECRET')}]",  # detect-secrets only provides hashed values
                    detection_method="detect-secrets",
                    # CredentialPattern specific fields
                    file_path=file_path,  # Include file path in each pattern
                    regex_pattern=None,
                    wordlist_source=None,
                )
                patterns.append(pattern)

        return patterns

    def _determine_risk_level_from_detector(
        self,
        detector_type: str,
        secret: dict[str, Any],
    ) -> CredentialRiskLevel:
        """Determine risk level based on detect-secrets detector type and secret properties."""
        # Check if secret is verified (if available)
        is_verified = secret.get("is_verified", False)

        if is_verified:
            return CredentialRiskLevel.CRITICAL

        # High risk for API keys and private keys
        high_risk_detectors = {
            "AWS Access Key",
            "Azure Storage Account access key",
            "GitHub Token",
            "GitLab Token",
            "OpenAI API Key",
            "Stripe Access Key",
            "Private Key",
            "Discord Bot Token",
            "Mailchimp Access Key",
            "NPM tokens",
            "PyPI upload token",
            "SendGrid API Key",
            "Slack Token",
            "JWT Token",
            "IBM Cloud IAM Key",
            "Telegram Bot Token",
            "Twilio API Key",
        }

        if detector_type in high_risk_detectors:
            return CredentialRiskLevel.HIGH

        # Medium risk for authentication patterns and high entropy strings
        medium_risk_detectors = {
            "Basic Auth Credentials",
            "Keyword",  # Keywords often indicate passwords
        }

        if detector_type in medium_risk_detectors:
            return CredentialRiskLevel.MEDIUM

        # Low risk for entropy-based detectors (less specific)
        low_risk_detectors = {
            "Base64 High Entropy String",
            "Hex High Entropy String",
        }

        if detector_type in low_risk_detectors:
            return CredentialRiskLevel.LOW

        # Default to LOW for unknown detectors
        return CredentialRiskLevel.LOW

    def _extract_context(
        self,
        file_path: Path,
        secret: dict[str, Any],
        context_lines: int,
    ) -> list[str]:
        """Extract context lines around the detected secret."""
        try:
            line_number = secret.get("line_number", 1)
            with file_path.open(encoding="utf-8", errors="ignore") as f:
                lines = f.readlines()

            start_line = max(0, line_number - context_lines - 1)
            end_line = min(len(lines), line_number + context_lines)

            return [line.rstrip() for line in lines[start_line:end_line]]

        except (OSError, UnicodeDecodeError):
            # If we can't read the file, return empty context
            return []

Functions

__init__(rich_output, timestamp_service, file_discovery, file_processing)

Initialize the detect-secrets credential detection service.

Parameters:

Name Type Description Default
rich_output RichOutputService

Service for displaying progress and results

required
timestamp_service TimestampProtocol

Service for timestamp operations

required
file_discovery FileDiscoveryService

Service for discovering files to analyze

required
file_processing FileProcessingService

Service for file type detection and processing

required
Source code in src\kp_ssf_tools\analyze\services\detect_secrets_service.py
def __init__(
    self,
    rich_output: RichOutputService,
    timestamp_service: TimestampProtocol,
    file_discovery: FileDiscoveryService,
    file_processing: FileProcessingService,
) -> None:
    """
    Initialize the detect-secrets credential detection service.

    Args:
        rich_output: Service for displaying progress and results
        timestamp_service: Service for timestamp operations
        file_discovery: Service for discovering files to analyze
        file_processing: Service for file type detection and processing

    """
    self.rich_output: RichOutputService = rich_output
    self.timestamp: TimestampProtocol = timestamp_service
    self.file_discovery: FileDiscoveryService = file_discovery
    self.file_processing: FileProcessingService = file_processing

analyze_files(target_paths, config, options)

Analyze files using detect-secrets and return results in existing format.

Parameters:

Name Type Description Default
target_paths list[Path]

List of file or directory paths to analyze

required
config dict[str, Any]

Analysis configuration

required
options CredentialScanOptions

Scanning options and parameters

required

Returns:

Type Description
CredentialAnalysisResult

Analysis result containing detected patterns

Source code in src\kp_ssf_tools\analyze\services\detect_secrets_service.py
def analyze_files(
    self,
    target_paths: list[Path],
    config: dict[str, Any],
    options: CredentialScanOptions,
) -> CredentialAnalysisResult:
    """
    Analyze files using detect-secrets and return results in existing format.

    Args:
        target_paths: List of file or directory paths to analyze
        config: Analysis configuration
        options: Scanning options and parameters

    Returns:
        Analysis result containing detected patterns

    """
    self.rich_output.info("Starting detect-secrets credential analysis...")

    # Run detect-secrets scan and get JSON output directly
    secrets_data = self._run_scan(target_paths, config, options)

    # Convert detect-secrets results to our format
    patterns = self._convert_to_patterns(secrets_data, options.context_lines)

    # Extract all processed files from detect-secrets results
    processed_files = [
        Path(file_path) for file_path in secrets_data.get("results", {})
    ]

    # Return single file result or first target if multiple
    primary_target = target_paths[0] if target_paths else Path()

    return CredentialAnalysisResult(
        file_path=primary_target,
        patterns=patterns,
        total_patterns=len(patterns),
        processed_files=processed_files,
    )

Integration Architecture

The credential detection follows this execution flow:

  1. Command Construction: Build detect-secrets scan command with configuration options
  2. Subprocess Execution: Execute detect-secrets with security controls and timeout
  3. JSON Processing: Parse JSON output from detect-secrets
  4. Result Conversion: Transform detect-secrets results to CredentialPattern objects
  5. Excel Export: Stream results to Excel with per-file worksheets

Security Considerations

The subprocess integration implements security measures:

  • Command Validation: Commands must start with detect-secrets
  • Timeout Control: 5-minute timeout prevents hanging processes
  • Error Handling: Error management for missing tools and failures
  • Input Sanitization: Validated command construction with internal components

Configuration Models

Analysis Configuration

The module uses structured configuration models for type safety:

kp_ssf_tools.analyze.models.configuration.AnalysisConfiguration

Bases: BaseConfiguration

Complete security analysis configuration.

Inherits common output and network settings from BaseConfiguration. Contains analysis-specific configuration options for entropy analysis, wordlist detection, and cryptographic structure detection.

Source code in src\kp_ssf_tools\analyze\models\configuration.py
class AnalysisConfiguration(BaseConfiguration):
    """
    Complete security analysis configuration.

    Inherits common output and network settings from BaseConfiguration.
    Contains analysis-specific configuration options for entropy analysis,
    wordlist detection, and cryptographic structure detection.
    """

    # Entropy-specific settings
    analysis: AnalysisConfig = Field(
        default_factory=AnalysisConfig,
        description="Analysis-specific settings",
    )

    # Content-aware thresholds
    content_aware: ContentAwareConfig = Field(
        default_factory=ContentAwareConfig,
        description="Content-aware analysis settings",
    )

    # Detection settings
    detection: DetectionConfig = Field(
        default_factory=DetectionConfig,
        description="Detection feature toggles",
    )

    # Credential detection
    credentials: CredentialConfig = Field(
        default_factory=CredentialConfig,
        description="Credential detection settings",
    )

    # Statistical analysis
    statistical: StatisticalConfig = Field(
        default_factory=StatisticalConfig,
        description="Statistical analysis settings",
    )

    # Compliance settings
    compliance: ComplianceConfig = Field(
        default_factory=ComplianceConfig,
        description="PCI SSF compliance settings",
    )

    # Reporting settings
    reporting: ReportingConfig = Field(
        default_factory=ReportingConfig,
        description="Report generation settings",
    )

Credential Scan Options

kp_ssf_tools.analyze.services.interfaces.CredentialScanOptions

Bases: NamedTuple

Options for credential scanning operations.

Source code in src\kp_ssf_tools\analyze\services\interfaces.py
class CredentialScanOptions(NamedTuple):
    """Options for credential scanning operations."""

    scan_type: str = "comprehensive"  # comprehensive, quick, targeted
    severity_threshold: str = "medium"  # low, medium, high
    include_files: tuple[str, ...] = ()  # Glob patterns for inclusion
    exclude_files: tuple[str, ...] = ()  # Glob patterns for exclusion
    max_file_size: int = 100 * 1024 * 1024  # 100MB default
    confidence_threshold: float = 0.7  # Minimum confidence for reporting
    recursive: bool = True  # Whether to scan recursively
    file_extensions: tuple[str, ...] = ()  # File extensions to scan
    context_lines: int = 3  # Number of context lines around matches
    scan_binary_files: bool = False  # Whether to scan binary files
    max_binary_size_mb: int = 10  # Maximum binary file size in MB

Service Implementations

Entropy Analysis Service

The entropy analyzer provides Shannon entropy calculation with content-aware thresholds:

kp_ssf_tools.analyze.services.entropy.analyzer.EntropyAnalyzer

Shannon entropy analyzer with content-aware thresholds and chunk processing.

Implements normalized Shannon entropy calculation with file-type-specific thresholds for PCI SSF 2.3 compliance detection. Uses dependency injection for core services and file processing capabilities.

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
class EntropyAnalyzer:
    """
    Shannon entropy analyzer with content-aware thresholds and chunk processing.

    Implements normalized Shannon entropy calculation with file-type-specific
    thresholds for PCI SSF 2.3 compliance detection. Uses dependency injection
    for core services and file processing capabilities.
    """

    def __init__(  # noqa: PLR0913
        self,
        rich_output: RichOutputProtocol,
        timestamp_service: TimestampProtocol,
        file_validator: FileValidator,
        mime_detector: MimeTypeDetector,
        file_processing: FileProcessingService,
        threshold_manager: ThresholdProviderProtocol,
    ) -> None:
        """
        Initialize entropy analyzer with injected core services.

        Args:
            rich_output: Rich output service for progress reporting and results display
            timestamp_service: Timestamp service for analysis timing
            file_validator: File validation service
            mime_detector: MIME type detection service for file classification
            file_processing: Service for file processing operations
            threshold_manager: Content-aware threshold management service

        """
        self.rich_output: RichOutputProtocol = rich_output
        self.timestamp: TimestampProtocol = timestamp_service
        self.file_validator: FileValidator = file_validator
        self.mime_detector: MimeTypeDetector = mime_detector
        self.file_processing: FileProcessingService = file_processing
        self.threshold_manager: ThresholdProviderProtocol = threshold_manager

    def calculate_shannon_entropy(self, data: bytes) -> float:
        """
        Calculate Shannon entropy for binary data in bits per byte.

        Uses the standard Shannon entropy formula: H(X) = -sum(p(x) * log2(p(x)))
        where p(x) is the probability of byte value x.

        Args:
            data: Binary data to analyze

        Returns:
            Shannon entropy in bits per byte (0.0 to 8.0, where 8.0 is maximum entropy)

        Raises:
            ValueError: If data is empty

        Note:
            - Maximum entropy (8.0): All 256 byte values occur with equal probability
            - Minimum entropy (0.0): Only one byte value occurs
            - Result range [0, 8] matches research-based thresholds in configuration

        """
        if not data:
            msg = "Cannot calculate entropy for empty data"
            raise ValueError(msg)

        # Calculate byte frequency distribution
        byte_counts = [0] * 256
        for byte in data:
            byte_counts[byte] += 1

        # Calculate probabilities and entropy
        data_length = len(data)
        entropy = 0.0

        for count in byte_counts:
            if count > 0:
                probability = count / data_length
                entropy -= probability * math.log2(probability)

        # Return raw entropy in bits per byte (0.0 to 8.0 range)
        return entropy

    def analyze_file_generator(  # noqa: PLR0913
        self,
        file_path: Path,
        *,
        min_risk_level: EntropyLevel = EntropyLevel.MEDIUM_HIGH,
        file_chunk_size: int = 65536,
        analysis_block_size: int = 64,
        step_size: int = 16,
        force_file_type: FileType | None = None,
        include_samples: bool = False,
    ) -> Generator[AnalysisYield]:
        """
        Generate analysis results as they're computed.

        Yields high-risk regions immediately, summary at end.

        Memory efficient streaming analysis - only creates objects for regions
        that meet the risk threshold criteria.

        Args:
            file_path: Path to file to analyze
            min_risk_level: Minimum risk level to yield regions
            file_chunk_size: Size of file I/O chunks in bytes
            analysis_block_size: Size of analysis blocks in bytes
            step_size: Step size for sliding window
            force_file_type: Override automatic file type detection
            include_samples: Whether to include data samples in regions

        Yields:
            AnalysisYield objects containing either:
            - High-risk region data (type='region')
            - Final summary statistics (type='summary')

        Raises:
            FileNotFoundError: If file doesn't exist
            ValueError: If file is empty or unreadable

        """
        # Defensive conversion: ensure min_risk_level is always EntropyLevel
        if isinstance(min_risk_level, str):
            min_risk_level = EntropyLevel(min_risk_level)

        # Record start time for per-file processing duration
        start_time = self.timestamp.now()

        # Validate file exists and is accessible
        if not self.file_validator.validate_file_exists(file_path):
            msg = f"File not found or not accessible: {file_path}"
            raise FileNotFoundError(msg)

        # Detect file type for content-aware analysis
        file_type = force_file_type or self._detect_file_type(file_path)
        self.rich_output.debug(f"Detected file type: {file_type.value}")

        # Create binary streamer for chunk-based processing
        binary_streamer: BinaryStreamerProtocol = (
            self.file_processing.create_binary_streamer(
                file_path,
                chunk_size=file_chunk_size,
            )
        )

        # Check if file is empty
        file_size = binary_streamer.get_file_size()
        if file_size == 0:
            msg = f"Cannot analyze empty file: {file_path}"
            raise ValueError(msg)

        # Initialize counters and state
        global_byte_counts = [0] * 256
        total_bytes = 0
        total_regions = 0
        high_risk_regions = 0
        overlap_buffer = b""
        current_offset = 0

        self.rich_output.debug(
            f"Streaming analysis with threshold {min_risk_level.value}: "
            f"{file_chunk_size}-byte chunks, {analysis_block_size}-byte blocks",
        )

        try:
            # Single-pass file processing with streaming output
            for chunk in binary_streamer.stream_chunks():
                # Update global byte frequency distribution
                for byte in chunk:
                    global_byte_counts[byte] += 1
                    total_bytes += 1

                # Process sliding windows within this chunk
                processing_data = overlap_buffer + chunk
                processing_offset = current_offset - len(overlap_buffer)

                pos = 0
                while pos + analysis_block_size <= len(processing_data):
                    # Extract analysis block
                    block_data = processing_data[pos : pos + analysis_block_size]
                    block_offset = processing_offset + pos

                    # Calculate entropy
                    block_entropy = self.calculate_shannon_entropy(block_data)
                    entropy_level = self._classify_entropy_level(
                        block_entropy,
                        file_type,
                    )
                    total_regions += 1

                    # Only create and yield region if it meets risk threshold
                    if entropy_level.order >= min_risk_level.order:
                        high_risk_regions += 1

                        # Prepare region data for streaming
                        region_data = {
                            "offset": block_offset,
                            "size": len(block_data),
                            "entropy": block_entropy,
                            "level": entropy_level.value,
                            "confidence": self._calculate_confidence(
                                block_entropy,
                                file_type,
                            ),
                        }

                        # Optionally include data sample
                        # Include "step_size + (analysis_block_size // 2)" bytes
                        if include_samples:
                            sample_size = step_size + (analysis_block_size // 2)
                            region_data["data_sample"] = block_data[:sample_size]

                        # Yield immediately - no accumulation
                        yield AnalysisYield(type="region", data=region_data)

                    # Move to next sliding window position
                    pos += step_size

                # Prepare overlap buffer for next chunk
                overlap_buffer = (
                    processing_data[-analysis_block_size:]
                    if len(processing_data) >= analysis_block_size
                    else processing_data
                )
                current_offset += len(chunk)

        except Exception as e:
            self.rich_output.error(f"Error during streaming entropy analysis: {e}")
            raise

        # Calculate overall file entropy from global distribution
        overall_entropy = self._calculate_file_entropy_from_distribution(
            global_byte_counts,
            total_bytes,
        )

        # Yield final summary
        # Detect MIME type and language for summary output
        mime_type = self.get_file_mime_type(file_path) or ""
        language = self.get_file_language(file_path) or ""
        # Calculate per-file processing time
        processing_time = (self.timestamp.now() - start_time).total_seconds()
        yield AnalysisYield(
            type="summary",
            data={
                "overall_entropy": overall_entropy,
                "total_regions": total_regions,
                "high_risk_regions": high_risk_regions,
                "file_size": file_size,
                "min_risk_level": min_risk_level.value,
                "mime_type": mime_type,
                "language": language,
                "processing_time": processing_time,
            },
        )

    def analyze_file_entropy(  # noqa: PLR0913
        self,
        file_path: Path,
        *,
        analysis_block_size: int,
        step_size: int,
        file_chunk_size: int,
        force_file_type: FileType | None = None,
        progress_callback: object | None = None,
    ) -> tuple[float, list[EntropyRegion]]:
        """
        Analyze entropy of a complete file using sliding window approach.

        Args:
            file_path: Path to file to analyze
            analysis_block_size: Size of analysis blocks in bytes (from config)
            step_size: Step size for sliding window (from config)
            file_chunk_size: Size of file I/O chunks in bytes (from config)
            force_file_type: Override automatic file type detection
            progress_callback: Optional callback for progress updates (progress, task_id)

        Returns:
            Tuple of (overall_entropy, entropy_regions)

        Raises:
            FileNotFoundError: If file doesn't exist
            ValueError: If file is empty or unreadable

        """
        # Validate file exists and is accessible
        if not self.file_validator.validate_file_exists(file_path):
            msg = f"File not found or not accessible: {file_path}"
            raise FileNotFoundError(msg)

        start_time = self.timestamp.now()

        # Detect file type for content-aware analysis
        file_type = force_file_type or self._detect_file_type(file_path)
        self.rich_output.debug(f"Detected file type: {file_type.value}")

        # Create binary streamer for chunk-based processing
        binary_streamer: BinaryStreamerProtocol = (
            self.file_processing.create_binary_streamer(
                file_path,
                chunk_size=file_chunk_size,
            )
        )

        # Check if file is empty
        file_size = binary_streamer.get_file_size()
        if file_size == 0:
            msg = f"Cannot analyze empty file: {file_path}"
            raise ValueError(msg)

        # Single-pass analysis: build global byte distribution and process sliding windows
        entropy_regions: list[EntropyRegion] = []
        # Global frequency distribution for true file entropy
        global_byte_counts = [0] * 256
        total_bytes = 0
        region_count = 0
        overlap_buffer = b""
        bytes_processed = 0

        self.rich_output.debug(
            f"Processing file in {file_chunk_size}-byte chunks with {analysis_block_size}-byte analysis blocks",
        )

        try:
            # Single-pass file processing
            for chunk in binary_streamer.stream_chunks():
                # Update global byte frequency distribution
                for byte in chunk:
                    global_byte_counts[byte] += 1
                    total_bytes += 1

                # Update progress
                bytes_processed += len(chunk)
                if progress_callback and callable(progress_callback):
                    progress_callback(bytes_processed, file_size)

                # Process sliding windows within this chunk
                params = SlidingWindowParams(
                    analysis_block_size=analysis_block_size,
                    step_size=step_size,
                    file_type=file_type,
                    total_bytes=total_bytes,
                    current_region_count=region_count,
                )
                regions, region_count, overlap_buffer = (
                    self._process_chunk_sliding_windows(
                        chunk=chunk,
                        overlap_buffer=overlap_buffer,
                        params=params,
                    )
                )
                entropy_regions.extend(regions)

        except Exception as e:
            self.rich_output.error(f"Error during entropy analysis: {e}")
            raise

        # Calculate true file entropy from global byte distribution
        overall_entropy = self._calculate_file_entropy_from_distribution(
            global_byte_counts,
            total_bytes,
        )

        end_time = self.timestamp.now()
        analysis_duration = (end_time - start_time).total_seconds()

        self.rich_output.debug(
            f"Entropy analysis complete in {analysis_duration:.2f}s",
        )

        return overall_entropy, entropy_regions

    def analyze_data_chunk(self, data: bytes, file_type: FileType) -> EntropyRegion:
        """
        Analyze entropy of a single data chunk.

        Args:
            data: Binary data chunk to analyze
            file_type: File type for content-aware classification

        Returns:
            EntropyRegion with analysis results

        """
        entropy = self.calculate_shannon_entropy(data)
        level = self._classify_entropy_level(entropy, file_type)
        confidence = self._calculate_confidence(entropy, file_type)

        return EntropyRegion(
            offset=0,  # Offset would be set by caller
            size=len(data),
            confidence=confidence,
            entropy=entropy,
            level=level,
            data_sample=data[:32],  # First 32 bytes for output
        )

    def get_entropy_threshold(self, file_type: FileType, level: EntropyLevel) -> float:
        """
        Get entropy threshold for a specific file type and level.

        Args:
            file_type: Type of file being analyzed
            level: Entropy level to get threshold for

        Returns:
            Entropy threshold value (0.0 to 1.0)

        """
        thresholds = self.threshold_manager.get_thresholds(file_type)

        # Map EntropyLevel to specific threshold attributes
        level_mapping = {
            EntropyLevel.VERY_LOW: thresholds.very_low_threshold,
            EntropyLevel.LOW: thresholds.low_threshold,
            EntropyLevel.MEDIUM: thresholds.medium_threshold,
            EntropyLevel.MEDIUM_HIGH: thresholds.medium_high_threshold,
            EntropyLevel.HIGH: thresholds.high_threshold,
            EntropyLevel.CRITICAL: 8.0,  # Above max entropy
        }

        return level_mapping.get(level, thresholds.medium_threshold)

    def _detect_file_type(self, file_path: Path) -> FileType:
        """
        Detect file type using MIME detection service.

        Args:
            file_path: Path to file to classify

        Returns:
            Detected FileType

        """
        try:
            mime_type = self.mime_detector.detect_mime_type(file_path)

            if mime_type is None:
                return FileType.UNKNOWN

            # Create mapping for better maintainability
            if mime_type.startswith("text/"):
                return self._classify_text_type(mime_type)
            if mime_type.startswith("application/"):
                return self._classify_application_type(mime_type)
            if mime_type.startswith(("image/", "video/")):
                return FileType.UNKNOWN  # Binary files mapped to UNKNOWN

        except OSError:
            self.rich_output.warning(
                f"Failed to detect file type for {file_path}, using UNKNOWN",
            )

        return FileType.UNKNOWN

    def _classify_text_type(self, mime_type: str) -> FileType:
        """Classify text MIME types."""
        # Map MIME types to specific programming languages
        language_mapping = {
            "python": FileType.PYTHON,
            "javascript": FileType.JAVASCRIPT,
            "java": FileType.JAVA,
            "c++": FileType.CPP,
        }

        for lang, file_type in language_mapping.items():
            if lang in mime_type:
                return file_type

        # Map text MIME types that are definitely documentation
        doc_mime_patterns = {
            "markdown",
            "plain",
            "csv",
            "html",
            "xml",
            "yaml",
            "json",
            "toml",
        }

        mime_lower = mime_type.lower()
        for pattern in doc_mime_patterns:
            if pattern in mime_lower:
                return FileType.DOCUMENTATION

        # For truly unknown text MIME types, return UNKNOWN instead of defaulting to DOCUMENTATION
        return FileType.UNKNOWN

    def _classify_application_type(self, mime_type: str) -> FileType:
        """Classify application MIME types."""
        # Map specific MIME type patterns to file types
        type_mapping = {
            "x-msdos": FileType.WINDOWS_PE,
            "x-msdownload": FileType.WINDOWS_PE,
            "x-sharedlib": FileType.LINUX_ELF,
            "x-object": FileType.LINUX_ELF,
            "x-mach-binary": FileType.MACOS_MACHO,
            "encrypted": FileType.ENCRYPTED,
            "pgp": FileType.ENCRYPTED,
        }

        for pattern, file_type in type_mapping.items():
            if pattern in mime_type:
                return file_type

        return FileType.UNKNOWN

    def _classify_entropy_level(
        self,
        entropy: float,
        file_type: FileType,
    ) -> EntropyLevel:
        """
        Classify entropy level using content-aware thresholds.

        Args:
            entropy: Calculated entropy value (0.0 to 8.0 bits per byte)
            file_type: Type of file for threshold selection

        Returns:
            EntropyLevel classification

        """
        return self.threshold_manager.classify_entropy_level(entropy, file_type)

    def _calculate_confidence(self, entropy: float, file_type: FileType) -> float:
        """
        Calculate confidence score for entropy classification.

        Args:
            entropy: Calculated entropy value
            file_type: File type for context

        Returns:
            Confidence score (0.0 to 1.0)

        """
        thresholds = self.threshold_manager.get_thresholds(file_type)

        # Calculate distance from expected range for this file type
        expected_range = (
            thresholds.low_threshold,
            thresholds.medium_high_threshold,
        )
        expected_center = (expected_range[0] + expected_range[1]) / 2

        # Distance from expected center, normalized
        distance = abs(entropy - expected_center)
        max_distance = max(
            abs(expected_range[0] - expected_center),
            abs(expected_range[1] - expected_center),
        )

        # Higher confidence for values further from expected range
        confidence = (
            min(1.0, distance / max_distance) if max_distance > 0 else 0.5
        )  # Moderate confidence for edge case

        return confidence

    def _process_chunk_sliding_windows(
        self,
        chunk: bytes,
        overlap_buffer: bytes,
        params: SlidingWindowParams,
    ) -> tuple[list[EntropyRegion], int, bytes]:
        """
        Process sliding windows within a chunk for entropy region detection.

        Args:
            chunk: Current data chunk from file
            overlap_buffer: Buffer from previous chunk to handle boundary windows
            params: Sliding window processing parameters

        Returns:
            Tuple of (entropy_regions, updated_region_count, new_overlap_buffer)

        """
        regions: list[EntropyRegion] = []
        region_count = params.current_region_count

        # Combine with overlap buffer from previous chunk
        processing_data = overlap_buffer + chunk
        processing_offset = params.total_bytes - len(processing_data)

        # Process sliding windows within this chunk
        current_pos = 0
        while current_pos + params.analysis_block_size <= len(processing_data):
            # Extract analysis block
            block_data = processing_data[
                current_pos : current_pos + params.analysis_block_size
            ]
            block_offset = processing_offset + current_pos

            # Calculate entropy for this block
            block_entropy = self.calculate_shannon_entropy(block_data)
            region_count += 1

            # Classify entropy level using content-aware thresholds
            entropy_level = self._classify_entropy_level(
                block_entropy,
                params.file_type,
            )

            # Create entropy region (limit data sample to first 32 bytes for output)
            region = EntropyRegion(
                offset=block_offset,
                size=len(block_data),
                confidence=self._calculate_confidence(block_entropy, params.file_type),
                entropy=block_entropy,
                level=entropy_level,
                data_sample=block_data[:32],  # Limit sample size for output
            )

            regions.append(region)

            # Move to next sliding window position
            current_pos += params.step_size

            # Progress reporting for large files
            if region_count % 1000 == 0:
                self.rich_output.debug(
                    f"Processed {region_count} regions (offset: {block_offset})",
                )

        # Prepare overlap buffer for next chunk (last analysis_block_size bytes)
        new_overlap_buffer = (
            processing_data[-params.analysis_block_size :]
            if len(processing_data) >= params.analysis_block_size
            else processing_data
        )

        return regions, region_count, new_overlap_buffer

    def _calculate_file_entropy_from_distribution(
        self,
        byte_counts: list[int],
        total_bytes: int,
    ) -> float:
        """
        Calculate Shannon entropy from global byte frequency distribution.

        Args:
            byte_counts: Array of byte frequency counts (length 256)
            total_bytes: Total number of bytes processed

        Returns:
            Shannon entropy in bits per byte (0.0 to 8.0)

        """
        if total_bytes == 0:
            return 0.0

        entropy = 0.0
        for count in byte_counts:
            if count > 0:
                probability = count / total_bytes
                entropy -= probability * math.log2(probability)

        return entropy

    def get_file_mime_type(self, file_path: Path) -> str | None:
        """
        Get MIME type for a file.

        Args:
            file_path: Path to file

        Returns:
            MIME type string or None if detection fails

        """
        return self.file_processing.detect_mime_type(file_path)

    def get_file_language(self, file_path: Path) -> str | None:
        """
        Get detected programming language for a file.

        Args:
            file_path: Path to file

        Returns:
            Language name string or None if detection fails

        """
        return self.file_processing.detect_language(file_path)

Functions

__init__(rich_output, timestamp_service, file_validator, mime_detector, file_processing, threshold_manager)

Initialize entropy analyzer with injected core services.

Parameters:

Name Type Description Default
rich_output RichOutputProtocol

Rich output service for progress reporting and results display

required
timestamp_service TimestampProtocol

Timestamp service for analysis timing

required
file_validator FileValidator

File validation service

required
mime_detector MimeTypeDetector

MIME type detection service for file classification

required
file_processing FileProcessingService

Service for file processing operations

required
threshold_manager ThresholdProviderProtocol

Content-aware threshold management service

required
Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def __init__(  # noqa: PLR0913
    self,
    rich_output: RichOutputProtocol,
    timestamp_service: TimestampProtocol,
    file_validator: FileValidator,
    mime_detector: MimeTypeDetector,
    file_processing: FileProcessingService,
    threshold_manager: ThresholdProviderProtocol,
) -> None:
    """
    Initialize entropy analyzer with injected core services.

    Args:
        rich_output: Rich output service for progress reporting and results display
        timestamp_service: Timestamp service for analysis timing
        file_validator: File validation service
        mime_detector: MIME type detection service for file classification
        file_processing: Service for file processing operations
        threshold_manager: Content-aware threshold management service

    """
    self.rich_output: RichOutputProtocol = rich_output
    self.timestamp: TimestampProtocol = timestamp_service
    self.file_validator: FileValidator = file_validator
    self.mime_detector: MimeTypeDetector = mime_detector
    self.file_processing: FileProcessingService = file_processing
    self.threshold_manager: ThresholdProviderProtocol = threshold_manager

analyze_data_chunk(data, file_type)

Analyze entropy of a single data chunk.

Parameters:

Name Type Description Default
data bytes

Binary data chunk to analyze

required
file_type FileType

File type for content-aware classification

required

Returns:

Type Description
EntropyRegion

EntropyRegion with analysis results

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def analyze_data_chunk(self, data: bytes, file_type: FileType) -> EntropyRegion:
    """
    Analyze entropy of a single data chunk.

    Args:
        data: Binary data chunk to analyze
        file_type: File type for content-aware classification

    Returns:
        EntropyRegion with analysis results

    """
    entropy = self.calculate_shannon_entropy(data)
    level = self._classify_entropy_level(entropy, file_type)
    confidence = self._calculate_confidence(entropy, file_type)

    return EntropyRegion(
        offset=0,  # Offset would be set by caller
        size=len(data),
        confidence=confidence,
        entropy=entropy,
        level=level,
        data_sample=data[:32],  # First 32 bytes for output
    )

analyze_file_entropy(file_path, *, analysis_block_size, step_size, file_chunk_size, force_file_type=None, progress_callback=None)

Analyze entropy of a complete file using sliding window approach.

Parameters:

Name Type Description Default
file_path Path

Path to file to analyze

required
analysis_block_size int

Size of analysis blocks in bytes (from config)

required
step_size int

Step size for sliding window (from config)

required
file_chunk_size int

Size of file I/O chunks in bytes (from config)

required
force_file_type FileType | None

Override automatic file type detection

None
progress_callback object | None

Optional callback for progress updates (progress, task_id)

None

Returns:

Type Description
tuple[float, list[EntropyRegion]]

Tuple of (overall_entropy, entropy_regions)

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ValueError

If file is empty or unreadable

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def analyze_file_entropy(  # noqa: PLR0913
    self,
    file_path: Path,
    *,
    analysis_block_size: int,
    step_size: int,
    file_chunk_size: int,
    force_file_type: FileType | None = None,
    progress_callback: object | None = None,
) -> tuple[float, list[EntropyRegion]]:
    """
    Analyze entropy of a complete file using sliding window approach.

    Args:
        file_path: Path to file to analyze
        analysis_block_size: Size of analysis blocks in bytes (from config)
        step_size: Step size for sliding window (from config)
        file_chunk_size: Size of file I/O chunks in bytes (from config)
        force_file_type: Override automatic file type detection
        progress_callback: Optional callback for progress updates (progress, task_id)

    Returns:
        Tuple of (overall_entropy, entropy_regions)

    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If file is empty or unreadable

    """
    # Validate file exists and is accessible
    if not self.file_validator.validate_file_exists(file_path):
        msg = f"File not found or not accessible: {file_path}"
        raise FileNotFoundError(msg)

    start_time = self.timestamp.now()

    # Detect file type for content-aware analysis
    file_type = force_file_type or self._detect_file_type(file_path)
    self.rich_output.debug(f"Detected file type: {file_type.value}")

    # Create binary streamer for chunk-based processing
    binary_streamer: BinaryStreamerProtocol = (
        self.file_processing.create_binary_streamer(
            file_path,
            chunk_size=file_chunk_size,
        )
    )

    # Check if file is empty
    file_size = binary_streamer.get_file_size()
    if file_size == 0:
        msg = f"Cannot analyze empty file: {file_path}"
        raise ValueError(msg)

    # Single-pass analysis: build global byte distribution and process sliding windows
    entropy_regions: list[EntropyRegion] = []
    # Global frequency distribution for true file entropy
    global_byte_counts = [0] * 256
    total_bytes = 0
    region_count = 0
    overlap_buffer = b""
    bytes_processed = 0

    self.rich_output.debug(
        f"Processing file in {file_chunk_size}-byte chunks with {analysis_block_size}-byte analysis blocks",
    )

    try:
        # Single-pass file processing
        for chunk in binary_streamer.stream_chunks():
            # Update global byte frequency distribution
            for byte in chunk:
                global_byte_counts[byte] += 1
                total_bytes += 1

            # Update progress
            bytes_processed += len(chunk)
            if progress_callback and callable(progress_callback):
                progress_callback(bytes_processed, file_size)

            # Process sliding windows within this chunk
            params = SlidingWindowParams(
                analysis_block_size=analysis_block_size,
                step_size=step_size,
                file_type=file_type,
                total_bytes=total_bytes,
                current_region_count=region_count,
            )
            regions, region_count, overlap_buffer = (
                self._process_chunk_sliding_windows(
                    chunk=chunk,
                    overlap_buffer=overlap_buffer,
                    params=params,
                )
            )
            entropy_regions.extend(regions)

    except Exception as e:
        self.rich_output.error(f"Error during entropy analysis: {e}")
        raise

    # Calculate true file entropy from global byte distribution
    overall_entropy = self._calculate_file_entropy_from_distribution(
        global_byte_counts,
        total_bytes,
    )

    end_time = self.timestamp.now()
    analysis_duration = (end_time - start_time).total_seconds()

    self.rich_output.debug(
        f"Entropy analysis complete in {analysis_duration:.2f}s",
    )

    return overall_entropy, entropy_regions

analyze_file_generator(file_path, *, min_risk_level=EntropyLevel.MEDIUM_HIGH, file_chunk_size=65536, analysis_block_size=64, step_size=16, force_file_type=None, include_samples=False)

Generate analysis results as they're computed.

Yields high-risk regions immediately, summary at end.

Memory efficient streaming analysis - only creates objects for regions that meet the risk threshold criteria.

Parameters:

Name Type Description Default
file_path Path

Path to file to analyze

required
min_risk_level EntropyLevel

Minimum risk level to yield regions

MEDIUM_HIGH
file_chunk_size int

Size of file I/O chunks in bytes

65536
analysis_block_size int

Size of analysis blocks in bytes

64
step_size int

Step size for sliding window

16
force_file_type FileType | None

Override automatic file type detection

None
include_samples bool

Whether to include data samples in regions

False

Yields:

Type Description
Generator[AnalysisYield]

AnalysisYield objects containing either:

Generator[AnalysisYield]
  • High-risk region data (type='region')
Generator[AnalysisYield]
  • Final summary statistics (type='summary')

Raises:

Type Description
FileNotFoundError

If file doesn't exist

ValueError

If file is empty or unreadable

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def analyze_file_generator(  # noqa: PLR0913
    self,
    file_path: Path,
    *,
    min_risk_level: EntropyLevel = EntropyLevel.MEDIUM_HIGH,
    file_chunk_size: int = 65536,
    analysis_block_size: int = 64,
    step_size: int = 16,
    force_file_type: FileType | None = None,
    include_samples: bool = False,
) -> Generator[AnalysisYield]:
    """
    Generate analysis results as they're computed.

    Yields high-risk regions immediately, summary at end.

    Memory efficient streaming analysis - only creates objects for regions
    that meet the risk threshold criteria.

    Args:
        file_path: Path to file to analyze
        min_risk_level: Minimum risk level to yield regions
        file_chunk_size: Size of file I/O chunks in bytes
        analysis_block_size: Size of analysis blocks in bytes
        step_size: Step size for sliding window
        force_file_type: Override automatic file type detection
        include_samples: Whether to include data samples in regions

    Yields:
        AnalysisYield objects containing either:
        - High-risk region data (type='region')
        - Final summary statistics (type='summary')

    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If file is empty or unreadable

    """
    # Defensive conversion: ensure min_risk_level is always EntropyLevel
    if isinstance(min_risk_level, str):
        min_risk_level = EntropyLevel(min_risk_level)

    # Record start time for per-file processing duration
    start_time = self.timestamp.now()

    # Validate file exists and is accessible
    if not self.file_validator.validate_file_exists(file_path):
        msg = f"File not found or not accessible: {file_path}"
        raise FileNotFoundError(msg)

    # Detect file type for content-aware analysis
    file_type = force_file_type or self._detect_file_type(file_path)
    self.rich_output.debug(f"Detected file type: {file_type.value}")

    # Create binary streamer for chunk-based processing
    binary_streamer: BinaryStreamerProtocol = (
        self.file_processing.create_binary_streamer(
            file_path,
            chunk_size=file_chunk_size,
        )
    )

    # Check if file is empty
    file_size = binary_streamer.get_file_size()
    if file_size == 0:
        msg = f"Cannot analyze empty file: {file_path}"
        raise ValueError(msg)

    # Initialize counters and state
    global_byte_counts = [0] * 256
    total_bytes = 0
    total_regions = 0
    high_risk_regions = 0
    overlap_buffer = b""
    current_offset = 0

    self.rich_output.debug(
        f"Streaming analysis with threshold {min_risk_level.value}: "
        f"{file_chunk_size}-byte chunks, {analysis_block_size}-byte blocks",
    )

    try:
        # Single-pass file processing with streaming output
        for chunk in binary_streamer.stream_chunks():
            # Update global byte frequency distribution
            for byte in chunk:
                global_byte_counts[byte] += 1
                total_bytes += 1

            # Process sliding windows within this chunk
            processing_data = overlap_buffer + chunk
            processing_offset = current_offset - len(overlap_buffer)

            pos = 0
            while pos + analysis_block_size <= len(processing_data):
                # Extract analysis block
                block_data = processing_data[pos : pos + analysis_block_size]
                block_offset = processing_offset + pos

                # Calculate entropy
                block_entropy = self.calculate_shannon_entropy(block_data)
                entropy_level = self._classify_entropy_level(
                    block_entropy,
                    file_type,
                )
                total_regions += 1

                # Only create and yield region if it meets risk threshold
                if entropy_level.order >= min_risk_level.order:
                    high_risk_regions += 1

                    # Prepare region data for streaming
                    region_data = {
                        "offset": block_offset,
                        "size": len(block_data),
                        "entropy": block_entropy,
                        "level": entropy_level.value,
                        "confidence": self._calculate_confidence(
                            block_entropy,
                            file_type,
                        ),
                    }

                    # Optionally include data sample
                    # Include "step_size + (analysis_block_size // 2)" bytes
                    if include_samples:
                        sample_size = step_size + (analysis_block_size // 2)
                        region_data["data_sample"] = block_data[:sample_size]

                    # Yield immediately - no accumulation
                    yield AnalysisYield(type="region", data=region_data)

                # Move to next sliding window position
                pos += step_size

            # Prepare overlap buffer for next chunk
            overlap_buffer = (
                processing_data[-analysis_block_size:]
                if len(processing_data) >= analysis_block_size
                else processing_data
            )
            current_offset += len(chunk)

    except Exception as e:
        self.rich_output.error(f"Error during streaming entropy analysis: {e}")
        raise

    # Calculate overall file entropy from global distribution
    overall_entropy = self._calculate_file_entropy_from_distribution(
        global_byte_counts,
        total_bytes,
    )

    # Yield final summary
    # Detect MIME type and language for summary output
    mime_type = self.get_file_mime_type(file_path) or ""
    language = self.get_file_language(file_path) or ""
    # Calculate per-file processing time
    processing_time = (self.timestamp.now() - start_time).total_seconds()
    yield AnalysisYield(
        type="summary",
        data={
            "overall_entropy": overall_entropy,
            "total_regions": total_regions,
            "high_risk_regions": high_risk_regions,
            "file_size": file_size,
            "min_risk_level": min_risk_level.value,
            "mime_type": mime_type,
            "language": language,
            "processing_time": processing_time,
        },
    )

calculate_shannon_entropy(data)

Calculate Shannon entropy for binary data in bits per byte.

Uses the standard Shannon entropy formula: H(X) = -sum(p(x) * log2(p(x))) where p(x) is the probability of byte value x.

Parameters:

Name Type Description Default
data bytes

Binary data to analyze

required

Returns:

Type Description
float

Shannon entropy in bits per byte (0.0 to 8.0, where 8.0 is maximum entropy)

Raises:

Type Description
ValueError

If data is empty

Note
  • Maximum entropy (8.0): All 256 byte values occur with equal probability
  • Minimum entropy (0.0): Only one byte value occurs
  • Result range [0, 8] matches research-based thresholds in configuration
Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def calculate_shannon_entropy(self, data: bytes) -> float:
    """
    Calculate Shannon entropy for binary data in bits per byte.

    Uses the standard Shannon entropy formula: H(X) = -sum(p(x) * log2(p(x)))
    where p(x) is the probability of byte value x.

    Args:
        data: Binary data to analyze

    Returns:
        Shannon entropy in bits per byte (0.0 to 8.0, where 8.0 is maximum entropy)

    Raises:
        ValueError: If data is empty

    Note:
        - Maximum entropy (8.0): All 256 byte values occur with equal probability
        - Minimum entropy (0.0): Only one byte value occurs
        - Result range [0, 8] matches research-based thresholds in configuration

    """
    if not data:
        msg = "Cannot calculate entropy for empty data"
        raise ValueError(msg)

    # Calculate byte frequency distribution
    byte_counts = [0] * 256
    for byte in data:
        byte_counts[byte] += 1

    # Calculate probabilities and entropy
    data_length = len(data)
    entropy = 0.0

    for count in byte_counts:
        if count > 0:
            probability = count / data_length
            entropy -= probability * math.log2(probability)

    # Return raw entropy in bits per byte (0.0 to 8.0 range)
    return entropy

get_entropy_threshold(file_type, level)

Get entropy threshold for a specific file type and level.

Parameters:

Name Type Description Default
file_type FileType

Type of file being analyzed

required
level EntropyLevel

Entropy level to get threshold for

required

Returns:

Type Description
float

Entropy threshold value (0.0 to 1.0)

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def get_entropy_threshold(self, file_type: FileType, level: EntropyLevel) -> float:
    """
    Get entropy threshold for a specific file type and level.

    Args:
        file_type: Type of file being analyzed
        level: Entropy level to get threshold for

    Returns:
        Entropy threshold value (0.0 to 1.0)

    """
    thresholds = self.threshold_manager.get_thresholds(file_type)

    # Map EntropyLevel to specific threshold attributes
    level_mapping = {
        EntropyLevel.VERY_LOW: thresholds.very_low_threshold,
        EntropyLevel.LOW: thresholds.low_threshold,
        EntropyLevel.MEDIUM: thresholds.medium_threshold,
        EntropyLevel.MEDIUM_HIGH: thresholds.medium_high_threshold,
        EntropyLevel.HIGH: thresholds.high_threshold,
        EntropyLevel.CRITICAL: 8.0,  # Above max entropy
    }

    return level_mapping.get(level, thresholds.medium_threshold)

get_file_language(file_path)

Get detected programming language for a file.

Parameters:

Name Type Description Default
file_path Path

Path to file

required

Returns:

Type Description
str | None

Language name string or None if detection fails

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def get_file_language(self, file_path: Path) -> str | None:
    """
    Get detected programming language for a file.

    Args:
        file_path: Path to file

    Returns:
        Language name string or None if detection fails

    """
    return self.file_processing.detect_language(file_path)

get_file_mime_type(file_path)

Get MIME type for a file.

Parameters:

Name Type Description Default
file_path Path

Path to file

required

Returns:

Type Description
str | None

MIME type string or None if detection fails

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def get_file_mime_type(self, file_path: Path) -> str | None:
    """
    Get MIME type for a file.

    Args:
        file_path: Path to file

    Returns:
        MIME type string or None if detection fails

    """
    return self.file_processing.detect_mime_type(file_path)

File Type Classification

File type detection is integrated into the entropy analyzer service:

kp_ssf_tools.analyze.services.entropy.analyzer.EntropyAnalyzer._detect_file_type(file_path)

Detect file type using MIME detection service.

Parameters:

Name Type Description Default
file_path Path

Path to file to classify

required

Returns:

Type Description
FileType

Detected FileType

Source code in src\kp_ssf_tools\analyze\services\entropy\analyzer.py
def _detect_file_type(self, file_path: Path) -> FileType:
    """
    Detect file type using MIME detection service.

    Args:
        file_path: Path to file to classify

    Returns:
        Detected FileType

    """
    try:
        mime_type = self.mime_detector.detect_mime_type(file_path)

        if mime_type is None:
            return FileType.UNKNOWN

        # Create mapping for better maintainability
        if mime_type.startswith("text/"):
            return self._classify_text_type(mime_type)
        if mime_type.startswith("application/"):
            return self._classify_application_type(mime_type)
        if mime_type.startswith(("image/", "video/")):
            return FileType.UNKNOWN  # Binary files mapped to UNKNOWN

    except OSError:
        self.rich_output.warning(
            f"Failed to detect file type for {file_path}, using UNKNOWN",
        )

    return FileType.UNKNOWN

Threshold Management

The threshold service provides content-aware entropy thresholds based on file types:

kp_ssf_tools.analyze.services.threshold_service.ContentAwareThresholdManager

Manages content-aware thresholds for different file types.

Concrete implementation of the ThresholdProviderProtocol.

Source code in src\kp_ssf_tools\analyze\services\threshold_service.py
class ContentAwareThresholdManager:
    """
    Manages content-aware thresholds for different file types.

    Concrete implementation of the ThresholdProviderProtocol.
    """

    def __init__(self) -> None:
        # Cache pre-built models to avoid repeated conversions
        self.threshold_cache: dict[FileType, ContentAwareThresholds] = (
            ContentAwareThresholds.get_default_models()
        )

    def get_thresholds(self, file_type: FileType) -> ContentAwareThresholds:
        if file_type in self.threshold_cache:
            return self.threshold_cache[file_type]

        # Use factory method for unknown types
        return ContentAwareThresholds.for_file_type(file_type)

    def classify_entropy_level(
        self,
        entropy: float,
        file_type: FileType,
    ) -> EntropyLevel:
        """
        Classify entropy level based on content-aware thresholds.

        Args:
            entropy: Shannon entropy value
            file_type: The detected file type

        Returns:
            Entropy level classification enum

        """
        thresholds: ContentAwareThresholds = self.get_thresholds(file_type)

        if entropy <= thresholds.very_low_threshold:
            return EntropyLevel.VERY_LOW
        if entropy <= thresholds.low_threshold:
            return EntropyLevel.LOW
        if entropy <= thresholds.medium_threshold:
            return EntropyLevel.MEDIUM
        if entropy <= thresholds.medium_high_threshold:
            return EntropyLevel.MEDIUM_HIGH
        if entropy <= thresholds.high_threshold:
            return EntropyLevel.HIGH

        # If not any of the others, then it has to be CRITICAL
        return EntropyLevel.CRITICAL

Functions

classify_entropy_level(entropy, file_type)

Classify entropy level based on content-aware thresholds.

Parameters:

Name Type Description Default
entropy float

Shannon entropy value

required
file_type FileType

The detected file type

required

Returns:

Type Description
EntropyLevel

Entropy level classification enum

Source code in src\kp_ssf_tools\analyze\services\threshold_service.py
def classify_entropy_level(
    self,
    entropy: float,
    file_type: FileType,
) -> EntropyLevel:
    """
    Classify entropy level based on content-aware thresholds.

    Args:
        entropy: Shannon entropy value
        file_type: The detected file type

    Returns:
        Entropy level classification enum

    """
    thresholds: ContentAwareThresholds = self.get_thresholds(file_type)

    if entropy <= thresholds.very_low_threshold:
        return EntropyLevel.VERY_LOW
    if entropy <= thresholds.low_threshold:
        return EntropyLevel.LOW
    if entropy <= thresholds.medium_threshold:
        return EntropyLevel.MEDIUM
    if entropy <= thresholds.medium_high_threshold:
        return EntropyLevel.MEDIUM_HIGH
    if entropy <= thresholds.high_threshold:
        return EntropyLevel.HIGH

    # If not any of the others, then it has to be CRITICAL
    return EntropyLevel.CRITICAL

Subprocess Integration Pattern

The credential detection service demonstrates secure subprocess integration with external tools. This pattern provides several benefits:

  • Tool Reuse: Leverage proven security tools without reimplementation
  • Security Controls: Implement timeout and validation safeguards
  • Error Handling: Error management for external dependencies
  • Result Processing: Transform external tool output to internal models

Command Construction

The service builds validated commands with configuration options:

kp_ssf_tools.analyze.services.detect_secrets_service.DetectSecretsCredentialService._build_config_options(config)

Build configuration options for detect-secrets command.

Source code in src\kp_ssf_tools\analyze\services\detect_secrets_service.py
def _build_config_options(self, config: dict[str, Any]) -> list[str]:
    """Build configuration options for detect-secrets command."""
    options = []
    credential_config = config.get("credentials", {})

    # Configure entropy limits if specified
    if "entropy_limits" in credential_config:
        limits = credential_config["entropy_limits"]
        if "base64" in limits:
            options.extend(["--base64-limit", str(limits["base64"])])
        if "hex" in limits:
            options.extend(["--hex-limit", str(limits["hex"])])

    # Add exclude patterns if configured
    if "exclude_patterns" in credential_config:
        patterns = credential_config["exclude_patterns"]
        if "files" in patterns:
            options.extend(["--exclude-files", patterns["files"]])
        if "lines" in patterns:
            options.extend(["--exclude-lines", patterns["lines"]])
        if "secrets" in patterns:
            options.extend(["--exclude-secrets", patterns["secrets"]])

    # Add word list if configured
    if "word_list_path" in credential_config:
        word_list_path = Path(credential_config["word_list_path"])
        if word_list_path.exists():
            options.extend(["--word-list", str(word_list_path)])

    return options

Secure Execution

Command execution includes security controls and error handling:

kp_ssf_tools.analyze.services.detect_secrets_service.DetectSecretsCredentialService._execute_scan_command(cmd)

Execute the detect-secrets scan command safely and return JSON results.

Source code in src\kp_ssf_tools\analyze\services\detect_secrets_service.py
def _execute_scan_command(self, cmd: list[str]) -> dict[str, Any]:
    """Execute the detect-secrets scan command safely and return JSON results."""
    # Validate command for security - ensure it starts with detect-secrets
    if not cmd or cmd[0] != "detect-secrets":
        error_msg = "Invalid command: must start with 'detect-secrets'"
        self.rich_output.error(error_msg)
        raise ValueError(error_msg)

    try:
        self.rich_output.debug(f"Running: {' '.join(cmd)}")
        # Security: Command is constructed internally with validated components
        result = subprocess.run(  # noqa: S603
            cmd,
            capture_output=True,
            text=True,
            check=False,  # Don't raise on non-zero exit (normal for secrets found)
            cwd=Path.cwd(),
            timeout=300,  # 5 minute timeout for safety
        )

        if result.returncode not in (0, 1):  # 0=no secrets, 1=secrets found
            error_msg = f"detect-secrets failed: {result.stderr}"
            self.rich_output.error(error_msg)
            raise RuntimeError(error_msg)

        self.rich_output.debug(
            f"detect-secrets scan completed with exit code {result.returncode}",
        )

        # Parse JSON output from stdout
        try:
            return json.loads(result.stdout)
        except json.JSONDecodeError as e:
            self.rich_output.error(
                f"Failed to parse detect-secrets JSON output: {e}",
            )
            return {"results": {}}

    except FileNotFoundError as e:
        error_msg = (
            "detect-secrets not found. Please install: pip install detect-secrets"
        )
        self.rich_output.error(error_msg)
        raise RuntimeError(error_msg) from e
    except subprocess.TimeoutExpired as e:
        error_msg = "detect-secrets scan timed out after 5 minutes"
        self.rich_output.error(error_msg)
        raise RuntimeError(error_msg) from e

Container Integration

The analyze module integrates with the dependency injection container system for service management and configuration.

Application Container

Services are registered in the application container with proper dependency resolution:

kp_ssf_tools.containers.application.ApplicationContainer.analysis = providers.Container(AnalysisContainer, core=core) class-attribute instance-attribute

Configuration Services

Configuration management uses the core configuration service pattern:

kp_ssf_tools.core.services.config.service.ConfigurationService

Bases: Generic[ConfigT]

Configuration service implementation with dependency injection.

Source code in src\kp_ssf_tools\core\services\config\service.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
class ConfigurationService(Generic[ConfigT]):
    """Configuration service implementation with dependency injection."""

    def __init__(
        self,
        config_model: type[ConfigT],
        rich_output: RichOutputProtocol,
        timestamp_service: TimestampProtocol,
        config_section: str,
    ) -> None:
        """
        Initialize configuration service.

        Args:
            config_model: Pydantic model class for this configuration type
            rich_output: Rich output service for user feedback
            timestamp_service: Timestamp service for configuration metadata
            config_section: Section name in unified config file (e.g., "entropy", "volatility")

        """
        self._config_model: type[ConfigT] = config_model
        self._rich_output: RichOutputProtocol = rich_output
        self._timestamp_service: TimestampProtocol = timestamp_service
        self._config_section: str = config_section

    def load_config(
        self,
        config_path: Path | None = None,
        command_overrides: ConfigOverrides = None,
    ) -> ConfigT:
        """
        Load configuration from unified config file(s) with CLI overrides.

        Args:
            config_path: Path to unified configuration file (None for default search)
            command_overrides: CLI overrides to apply

        Returns:
            Loaded and merged configuration for this service's section

        """
        # Load and merge from multiple config files if no specific path provided
        if config_path is None:
            unified_config_data = self._load_and_merge_multiple_configs()
        elif config_path.exists():
            # Load from single specified file
            unified_config_data = self._load_file(config_path)
        else:
            unified_config_data = {}

        # Merge global and section-specific settings
        merged_config_data = self._merge_global_and_section_config(unified_config_data)

        # Create configuration instance
        try:
            config = self._config_model(**merged_config_data)
            if config_path is None:
                self._rich_output.debug(
                    f"Loaded {self._config_section} configuration from multiple sources with global settings merged",
                )
            else:
                self._rich_output.debug(
                    f"Loaded {self._config_section} configuration with global settings merged from {config_path}",
                )
        except ValidationError as e:
            self._rich_output.error(
                f"Invalid {self._config_section} configuration: {e}",
            )
            config = self.create_default_config(self._config_section)
        except (TypeError, ValueError) as e:
            self._rich_output.error(
                f"Failed to load {self._config_section} configuration: {e}",
            )
            config = self.create_default_config(self._config_section)

        # Apply command-line overrides
        if command_overrides:
            config = self.merge_configurations(config, command_overrides)
            self._rich_output.debug("Applied command-line overrides")

        return config

    def save_config(self, config: ConfigT, config_path: Path) -> None:
        """
        Save configuration to unified config file.

        Args:
            config: Configuration to save
            config_path: Target file path

        """
        # Ensure directory exists
        config_path.parent.mkdir(parents=True, exist_ok=True)

        # Load existing unified config or create new structure
        if config_path.exists():
            try:
                unified_config = self._load_file(config_path)
            except RuntimeError:
                # If file is corrupt, start fresh
                unified_config = {}
        else:
            unified_config = {}

        # Prepare section config
        section_config_dict = config.model_dump()

        # Add timestamp if config has created_at field
        if (
            hasattr(config, "created_at")
            and section_config_dict.get("created_at") is None
        ):
            section_config_dict["created_at"] = self._timestamp_service.format_iso(
                self._timestamp_service.utc_now(),
            )

        # Update the specific section in unified config
        unified_config[self._config_section] = section_config_dict

        # Save unified config as YAML
        try:
            with config_path.open("w", encoding="utf-8") as f:
                yaml.safe_dump(
                    unified_config,
                    f,
                    default_flow_style=False,
                    allow_unicode=True,
                    sort_keys=False,
                )
            self._rich_output.success(
                f"Configuration saved to {config_path} (section: {self._config_section})",
            )
        except Exception as e:
            msg = f"Failed to save configuration to {config_path}: {e}"
            self._rich_output.error(msg)
            raise RuntimeError(msg) from e

    def validate_config(self, config: ConfigT | ConfigDict) -> ValidationResult:
        """
        Validate configuration and return detailed results.

        Args:
            config: Configuration to validate (model instance or dict)

        Returns:
            Validation result with errors, warnings, and deprecated fields

        """
        errors: list[str] = []
        warnings: list[str] = []
        deprecated_fields: list[str] = []

        try:
            # If it's a dict, try to create model instance
            if isinstance(config, dict):
                self._config_model(**config)
                self._rich_output.debug("Configuration validation passed")
            else:
                # Already a model instance, validate by re-creating
                self._config_model(**config.model_dump())
                self._rich_output.debug("Configuration model validation passed")

        except ValidationError as e:
            for error in e.errors():
                field_path = " -> ".join(str(loc) for loc in error["loc"])
                error_msg = f"{field_path}: {error['msg']}"
                errors.append(error_msg)
                self._rich_output.debug(f"Validation error: {error_msg}")

        except TypeError as e:
            errors.append(f"Type error during validation: {e}")
            self._rich_output.error(f"Type error during validation: {e}")

        # Check for deprecated fields (this would be extended based on actual deprecations)
        config_data = config if isinstance(config, dict) else config.model_dump()

        # Add logic here to check for deprecated field names
        # This is a placeholder for future deprecation handling
        deprecated_candidates: list[str] = []  # Add known deprecated fields as needed
        for field in deprecated_candidates:
            if field in config_data:
                deprecated_fields.append(field)
                warnings.append(f"Field '{field}' is deprecated")

        is_valid = len(errors) == 0

        return ValidationResult(
            is_valid=is_valid,
            errors=errors,
            warnings=warnings,
            deprecated_fields=deprecated_fields,
        )

    def create_default_config(self, section: str) -> ConfigT:
        """
        Create default configuration for specific section.

        Args:
            section: Configuration section name (used for metadata)

        Returns:
            Default configuration instance

        """
        try:
            config = self._config_model()
        except Exception as e:
            msg = f"Failed to create default configuration for {section}: {e}"
            self._rich_output.error(msg)
            raise RuntimeError(msg) from e
        else:
            self._rich_output.debug(f"Created default configuration for {section}")
            return config

    def merge_configurations(self, base: ConfigT, overrides: ConfigDict) -> ConfigT:
        """
        Merge configuration with runtime overrides.

        Args:
            base: Base configuration
            overrides: Override values to apply

        Returns:
            Merged configuration

        """
        try:
            # Convert base to dict for merging
            base_dict = base.model_dump()

            # Deep merge the overrides
            merged_dict = self._deep_merge(base_dict, overrides)

            # Create new instance with merged data
            merged_config = self._config_model(**merged_dict)
        except ValidationError as e:
            msg = f"Merged configuration is invalid: {e}"
            self._rich_output.error(msg)
            raise ValueError(msg) from e
        except Exception as e:
            msg = f"Failed to merge configurations: {e}"
            self._rich_output.error(msg)
            raise RuntimeError(msg) from e
        else:
            self._rich_output.debug("Successfully merged configurations")
            return merged_config

    def get_config_paths(self) -> list[Path]:
        """
        Get standard configuration file paths for unified ssf-tools config (platform-independent).

        Returns:
            List of paths in priority order (highest to lowest)

        """
        from platformdirs import user_config_dir

        config_filename = "ssf-tools-config.yaml"

        paths = [
            # 1. Current directory (project-specific) - highest priority
            Path.cwd() / config_filename,
            # 2. User config directory (platform-independent) - lower priority
            Path(user_config_dir("ssf_tools", "kirkpatrickprice")) / config_filename,
        ]

        self._rich_output.debug(
            f"Configuration search paths: {[str(p) for p in paths]}",
        )
        return paths

    def discover_config_files(
        self,
        search_paths: list[Path],
    ) -> list[ConfigurationSource]:
        """
        Discover configuration files in search paths.

        Args:
            search_paths: Paths to search for configuration files

        Returns:
            List of discovered configuration sources

        """
        sources: list[ConfigurationSource] = []

        for i, path in enumerate(search_paths):
            if path.exists() and path.is_file():
                try:
                    # Determine scope based on path location
                    if path.parent == Path.cwd():
                        scope = ConfigurationScope.PROJECT
                    else:
                        scope = ConfigurationScope.USER

                    source = ConfigurationSource(
                        path=path,
                        scope=scope,
                        format=ConfigurationFormat.YAML,
                        priority=len(search_paths) - i,  # Higher index = lower priority
                        last_modified=path.stat().st_mtime,
                        is_default=False,
                    )
                    sources.append(source)
                    self._rich_output.debug(
                        f"Discovered config: {path} (scope: {scope})",
                    )

                except (OSError, PermissionError) as e:
                    self._rich_output.warning(
                        f"Could not process config file {path}: {e}",
                    )

        return sources

    def _load_file(self, config_path: Path) -> ConfigDict:
        """
        Load configuration data from file (YAML only).

        Args:
            config_path: Path to configuration file

        Returns:
            Configuration data as dictionary

        Raises:
            RuntimeError: If file cannot be loaded or parsed

        """
        try:
            with config_path.open("r", encoding="utf-8") as f:
                data = yaml.safe_load(f)

            if data is None:
                data = {}

            if not isinstance(data, dict):
                msg = f"Configuration file {config_path} must contain a YAML object, not {type(data).__name__}"
                raise TypeError(msg)

        except yaml.YAMLError as e:
            msg = f"Invalid YAML in {config_path}: {e}"
            self._rich_output.error(msg)
            raise RuntimeError(msg) from e
        except OSError as e:
            msg = f"Failed to load configuration from {config_path}: {e}"
            self._rich_output.error(msg)
            raise RuntimeError(msg) from e
        else:
            self._rich_output.debug(
                f"Loaded {len(data)} configuration items from {config_path}",
            )
            return data

    def _load_and_merge_multiple_configs(self) -> ConfigDict:
        """
        Load and merge configuration data from multiple config files in priority order.

        Returns:
            Merged configuration data from all available config files.
            Higher priority files override lower priority ones.

        """
        config_paths = self.get_config_paths()
        merged_data: ConfigDict = {}

        # Load files in reverse priority order (lowest to highest)
        # so higher priority files override lower priority ones
        for config_path in reversed(config_paths):
            if config_path.exists():
                try:
                    file_data = self._load_file(config_path)
                    # Deep merge this file's data into the accumulated data
                    merged_data = self._deep_merge(merged_data, file_data)
                    self._rich_output.debug(
                        f"Merged configuration from {config_path}",
                    )
                except RuntimeError:
                    # Skip files that can't be loaded (already logged in _load_file)
                    self._rich_output.warning(
                        f"Skipping corrupted config file: {config_path}",
                    )
                    continue

        return merged_data

    def _merge_global_and_section_config(
        self,
        unified_config_data: ConfigDict,
    ) -> ConfigDict:
        """
        Merge global and section-specific configuration data.

        Only merges fields that are compatible with the target model.

        Args:
            unified_config_data: The complete unified config file data

        Returns:
            Merged configuration dictionary with only valid fields
            for this section's configuration model

        """
        from typing import cast

        # Get section-specific settings (if they exist and are a dict)
        section_data_raw = unified_config_data.get(self._config_section, {})
        section_data: ConfigDict = cast(
            "ConfigDict",
            section_data_raw if isinstance(section_data_raw, dict) else {},
        )

        # For global section, return its own data directly
        if self._config_section == "global":
            return section_data

        # For other sections, start with section data
        result = copy.deepcopy(section_data)

        # Get global settings that are compatible with this model
        global_data_raw = unified_config_data.get("global", {})
        global_data: ConfigDict = cast(
            "ConfigDict",
            global_data_raw if isinstance(global_data_raw, dict) else {},
        )

        # Only merge global fields that are actually valid for this model
        # We do this by getting the model's field names and only merging those
        model_fields = set(self._config_model.model_fields.keys())

        for key, value in global_data.items():
            # Only merge if:
            # 1. The target model has this field
            # 2. The section-specific config doesn't already override it
            if key in model_fields and key not in result:
                result[key] = copy.deepcopy(value)

        return result

    def _deep_merge(self, base: ConfigDict, overrides: ConfigDict) -> ConfigDict:
        """
        Deep merge dictionaries.

        Args:
            base: Base dictionary
            overrides: Override dictionary

        Returns:
            Merged dictionary

        """
        result = copy.deepcopy(base)

        for key, value in overrides.items():
            if (
                key in result
                and isinstance(result[key], dict)
                and isinstance(value, dict)
            ):
                # Recursively merge nested dictionaries
                result[key] = self._deep_merge(  # type: ignore[assignment]
                    result[key],  # type: ignore[arg-type]
                    value,  # type: ignore[arg-type]
                )
            else:
                # Override or add new value
                result[key] = copy.deepcopy(value)

        return result

Functions

__init__(config_model, rich_output, timestamp_service, config_section)

Initialize configuration service.

Parameters:

Name Type Description Default
config_model type[ConfigT]

Pydantic model class for this configuration type

required
rich_output RichOutputProtocol

Rich output service for user feedback

required
timestamp_service TimestampProtocol

Timestamp service for configuration metadata

required
config_section str

Section name in unified config file (e.g., "entropy", "volatility")

required
Source code in src\kp_ssf_tools\core\services\config\service.py
def __init__(
    self,
    config_model: type[ConfigT],
    rich_output: RichOutputProtocol,
    timestamp_service: TimestampProtocol,
    config_section: str,
) -> None:
    """
    Initialize configuration service.

    Args:
        config_model: Pydantic model class for this configuration type
        rich_output: Rich output service for user feedback
        timestamp_service: Timestamp service for configuration metadata
        config_section: Section name in unified config file (e.g., "entropy", "volatility")

    """
    self._config_model: type[ConfigT] = config_model
    self._rich_output: RichOutputProtocol = rich_output
    self._timestamp_service: TimestampProtocol = timestamp_service
    self._config_section: str = config_section

create_default_config(section)

Create default configuration for specific section.

Parameters:

Name Type Description Default
section str

Configuration section name (used for metadata)

required

Returns:

Type Description
ConfigT

Default configuration instance

Source code in src\kp_ssf_tools\core\services\config\service.py
def create_default_config(self, section: str) -> ConfigT:
    """
    Create default configuration for specific section.

    Args:
        section: Configuration section name (used for metadata)

    Returns:
        Default configuration instance

    """
    try:
        config = self._config_model()
    except Exception as e:
        msg = f"Failed to create default configuration for {section}: {e}"
        self._rich_output.error(msg)
        raise RuntimeError(msg) from e
    else:
        self._rich_output.debug(f"Created default configuration for {section}")
        return config

discover_config_files(search_paths)

Discover configuration files in search paths.

Parameters:

Name Type Description Default
search_paths list[Path]

Paths to search for configuration files

required

Returns:

Type Description
list[ConfigurationSource]

List of discovered configuration sources

Source code in src\kp_ssf_tools\core\services\config\service.py
def discover_config_files(
    self,
    search_paths: list[Path],
) -> list[ConfigurationSource]:
    """
    Discover configuration files in search paths.

    Args:
        search_paths: Paths to search for configuration files

    Returns:
        List of discovered configuration sources

    """
    sources: list[ConfigurationSource] = []

    for i, path in enumerate(search_paths):
        if path.exists() and path.is_file():
            try:
                # Determine scope based on path location
                if path.parent == Path.cwd():
                    scope = ConfigurationScope.PROJECT
                else:
                    scope = ConfigurationScope.USER

                source = ConfigurationSource(
                    path=path,
                    scope=scope,
                    format=ConfigurationFormat.YAML,
                    priority=len(search_paths) - i,  # Higher index = lower priority
                    last_modified=path.stat().st_mtime,
                    is_default=False,
                )
                sources.append(source)
                self._rich_output.debug(
                    f"Discovered config: {path} (scope: {scope})",
                )

            except (OSError, PermissionError) as e:
                self._rich_output.warning(
                    f"Could not process config file {path}: {e}",
                )

    return sources

get_config_paths()

Get standard configuration file paths for unified ssf-tools config (platform-independent).

Returns:

Type Description
list[Path]

List of paths in priority order (highest to lowest)

Source code in src\kp_ssf_tools\core\services\config\service.py
def get_config_paths(self) -> list[Path]:
    """
    Get standard configuration file paths for unified ssf-tools config (platform-independent).

    Returns:
        List of paths in priority order (highest to lowest)

    """
    from platformdirs import user_config_dir

    config_filename = "ssf-tools-config.yaml"

    paths = [
        # 1. Current directory (project-specific) - highest priority
        Path.cwd() / config_filename,
        # 2. User config directory (platform-independent) - lower priority
        Path(user_config_dir("ssf_tools", "kirkpatrickprice")) / config_filename,
    ]

    self._rich_output.debug(
        f"Configuration search paths: {[str(p) for p in paths]}",
    )
    return paths

load_config(config_path=None, command_overrides=None)

Load configuration from unified config file(s) with CLI overrides.

Parameters:

Name Type Description Default
config_path Path | None

Path to unified configuration file (None for default search)

None
command_overrides ConfigOverrides

CLI overrides to apply

None

Returns:

Type Description
ConfigT

Loaded and merged configuration for this service's section

Source code in src\kp_ssf_tools\core\services\config\service.py
def load_config(
    self,
    config_path: Path | None = None,
    command_overrides: ConfigOverrides = None,
) -> ConfigT:
    """
    Load configuration from unified config file(s) with CLI overrides.

    Args:
        config_path: Path to unified configuration file (None for default search)
        command_overrides: CLI overrides to apply

    Returns:
        Loaded and merged configuration for this service's section

    """
    # Load and merge from multiple config files if no specific path provided
    if config_path is None:
        unified_config_data = self._load_and_merge_multiple_configs()
    elif config_path.exists():
        # Load from single specified file
        unified_config_data = self._load_file(config_path)
    else:
        unified_config_data = {}

    # Merge global and section-specific settings
    merged_config_data = self._merge_global_and_section_config(unified_config_data)

    # Create configuration instance
    try:
        config = self._config_model(**merged_config_data)
        if config_path is None:
            self._rich_output.debug(
                f"Loaded {self._config_section} configuration from multiple sources with global settings merged",
            )
        else:
            self._rich_output.debug(
                f"Loaded {self._config_section} configuration with global settings merged from {config_path}",
            )
    except ValidationError as e:
        self._rich_output.error(
            f"Invalid {self._config_section} configuration: {e}",
        )
        config = self.create_default_config(self._config_section)
    except (TypeError, ValueError) as e:
        self._rich_output.error(
            f"Failed to load {self._config_section} configuration: {e}",
        )
        config = self.create_default_config(self._config_section)

    # Apply command-line overrides
    if command_overrides:
        config = self.merge_configurations(config, command_overrides)
        self._rich_output.debug("Applied command-line overrides")

    return config

merge_configurations(base, overrides)

Merge configuration with runtime overrides.

Parameters:

Name Type Description Default
base ConfigT

Base configuration

required
overrides ConfigDict

Override values to apply

required

Returns:

Type Description
ConfigT

Merged configuration

Source code in src\kp_ssf_tools\core\services\config\service.py
def merge_configurations(self, base: ConfigT, overrides: ConfigDict) -> ConfigT:
    """
    Merge configuration with runtime overrides.

    Args:
        base: Base configuration
        overrides: Override values to apply

    Returns:
        Merged configuration

    """
    try:
        # Convert base to dict for merging
        base_dict = base.model_dump()

        # Deep merge the overrides
        merged_dict = self._deep_merge(base_dict, overrides)

        # Create new instance with merged data
        merged_config = self._config_model(**merged_dict)
    except ValidationError as e:
        msg = f"Merged configuration is invalid: {e}"
        self._rich_output.error(msg)
        raise ValueError(msg) from e
    except Exception as e:
        msg = f"Failed to merge configurations: {e}"
        self._rich_output.error(msg)
        raise RuntimeError(msg) from e
    else:
        self._rich_output.debug("Successfully merged configurations")
        return merged_config

save_config(config, config_path)

Save configuration to unified config file.

Parameters:

Name Type Description Default
config ConfigT

Configuration to save

required
config_path Path

Target file path

required
Source code in src\kp_ssf_tools\core\services\config\service.py
def save_config(self, config: ConfigT, config_path: Path) -> None:
    """
    Save configuration to unified config file.

    Args:
        config: Configuration to save
        config_path: Target file path

    """
    # Ensure directory exists
    config_path.parent.mkdir(parents=True, exist_ok=True)

    # Load existing unified config or create new structure
    if config_path.exists():
        try:
            unified_config = self._load_file(config_path)
        except RuntimeError:
            # If file is corrupt, start fresh
            unified_config = {}
    else:
        unified_config = {}

    # Prepare section config
    section_config_dict = config.model_dump()

    # Add timestamp if config has created_at field
    if (
        hasattr(config, "created_at")
        and section_config_dict.get("created_at") is None
    ):
        section_config_dict["created_at"] = self._timestamp_service.format_iso(
            self._timestamp_service.utc_now(),
        )

    # Update the specific section in unified config
    unified_config[self._config_section] = section_config_dict

    # Save unified config as YAML
    try:
        with config_path.open("w", encoding="utf-8") as f:
            yaml.safe_dump(
                unified_config,
                f,
                default_flow_style=False,
                allow_unicode=True,
                sort_keys=False,
            )
        self._rich_output.success(
            f"Configuration saved to {config_path} (section: {self._config_section})",
        )
    except Exception as e:
        msg = f"Failed to save configuration to {config_path}: {e}"
        self._rich_output.error(msg)
        raise RuntimeError(msg) from e

validate_config(config)

Validate configuration and return detailed results.

Parameters:

Name Type Description Default
config ConfigT | ConfigDict

Configuration to validate (model instance or dict)

required

Returns:

Type Description
ValidationResult

Validation result with errors, warnings, and deprecated fields

Source code in src\kp_ssf_tools\core\services\config\service.py
def validate_config(self, config: ConfigT | ConfigDict) -> ValidationResult:
    """
    Validate configuration and return detailed results.

    Args:
        config: Configuration to validate (model instance or dict)

    Returns:
        Validation result with errors, warnings, and deprecated fields

    """
    errors: list[str] = []
    warnings: list[str] = []
    deprecated_fields: list[str] = []

    try:
        # If it's a dict, try to create model instance
        if isinstance(config, dict):
            self._config_model(**config)
            self._rich_output.debug("Configuration validation passed")
        else:
            # Already a model instance, validate by re-creating
            self._config_model(**config.model_dump())
            self._rich_output.debug("Configuration model validation passed")

    except ValidationError as e:
        for error in e.errors():
            field_path = " -> ".join(str(loc) for loc in error["loc"])
            error_msg = f"{field_path}: {error['msg']}"
            errors.append(error_msg)
            self._rich_output.debug(f"Validation error: {error_msg}")

    except TypeError as e:
        errors.append(f"Type error during validation: {e}")
        self._rich_output.error(f"Type error during validation: {e}")

    # Check for deprecated fields (this would be extended based on actual deprecations)
    config_data = config if isinstance(config, dict) else config.model_dump()

    # Add logic here to check for deprecated field names
    # This is a placeholder for future deprecation handling
    deprecated_candidates: list[str] = []  # Add known deprecated fields as needed
    for field in deprecated_candidates:
        if field in config_data:
            deprecated_fields.append(field)
            warnings.append(f"Field '{field}' is deprecated")

    is_valid = len(errors) == 0

    return ValidationResult(
        is_valid=is_valid,
        errors=errors,
        warnings=warnings,
        deprecated_fields=deprecated_fields,
    )

CLI Integration

The analyze module exposes two main commands through the CLI interface. These commands provide file analysis capabilities with configurable options for different security assessment scenarios.

Command Structure

You can access entropy and credential analysis through dedicated CLI commands:

kp_ssf_tools.cli.commands.analyze.entropy(target, risk_threshold, file_block_size, analysis_block_size, step_size, ignore_pattern, *, no_recurse, include_samples, analyzer=Provide[ApplicationContainer.entropy.analyzer], rich_output=Provide[ApplicationContainer.core.rich_output], file_discovery=Provide[ApplicationContainer.core.file_discoverer], global_config_service=Provide[ApplicationContainer.core.global_config_service], entropy_config_service=Provide[ApplicationContainer.core.entropy_config_service], timestamp_service=Provide[ApplicationContainer.core.timestamp])

Analyze entropy of files for PCI SSF 2.3 compliance.

Performs Shannon entropy analysis using content-aware thresholds to detect potentially suspicious patterns in files. Results are streamed directly to Excel with minimal memory usage.

Arguments:

TARGET                  Path to file or directory to analyze

Examples:

# Basic file analysis
ssf_tools analyze entropy sample.bin

# Analyze with higher risk threshold (fewer results)
ssf_tools analyze entropy sample.bin --risk-threshold high

# Analyze with custom block size
ssf_tools analyze entropy sample.bin --analysis-block-size 128

# Override file type detection
ssf_tools analyze entropy app.exe --force-file-type windows_pe

# Analyze directory non-recursively
ssf_tools analyze entropy data/ --no-recurse

Source code in src\kp_ssf_tools\cli\commands\analyze.py
@analyze_group.command("entropy")
@click.argument("target", type=click.Path(exists=True, path_type=Path))
@click.option(
    "--ignore-pattern",
    multiple=True,
    help="Glob pattern(s) to ignore when searching for files (e.g. --ignore-pattern='__pycache__' --ignore-pattern='*.egg-info')",
)
@click.option(
    "--risk-threshold",
    type=click.Choice(
        ["very_low", "low", "medium", "medium_high", "high", "critical"],
        case_sensitive=False,
    ),
    default="medium_high",
    help="Minimum risk level for regions to include in analysis (default: medium_high)",
)
@click.option(
    "--file-block-size",
    type=int,
    help="File I/O block size in bytes (default: 65536)",
)
@click.option(
    "--analysis-block-size",
    type=int,
    help="Analysis block size in bytes (default: 64)",
)
@click.option(
    "--step-size",
    type=int,
    help="Step size for sliding window analysis (default: 16)",
)
@click.option(
    "--no-recurse",
    is_flag=True,
    help="Disable recursive directory analysis (analyze current directory only)",
)
@click.option(
    "--include-samples",
    is_flag=True,
    help="Include data samples in region analysis (increases file size)",
)
@inject
def entropy(  # noqa: PLR0913
    target: Path,
    risk_threshold: str,
    file_block_size: int | None,
    analysis_block_size: int | None,
    step_size: int | None,
    ignore_pattern: tuple[str, ...],
    *,
    no_recurse: bool,
    include_samples: bool,
    analyzer: EntropyAnalyzer = Provide[ApplicationContainer.entropy.analyzer],
    rich_output: RichOutputService = Provide[ApplicationContainer.core.rich_output],
    file_discovery: FileDiscoveryService = Provide[
        ApplicationContainer.core.file_discoverer
    ],
    global_config_service: ConfigurationService[GlobalConfiguration] = Provide[
        ApplicationContainer.core.global_config_service
    ],
    entropy_config_service: ConfigurationService[AnalysisConfiguration] = Provide[
        ApplicationContainer.core.entropy_config_service
    ],
    timestamp_service: TimestampService = Provide[ApplicationContainer.core.timestamp],
) -> None:
    """
    Analyze entropy of files for PCI SSF 2.3 compliance.

    Performs Shannon entropy analysis using content-aware thresholds to detect
    potentially suspicious patterns in files. Results are streamed directly to
    Excel with minimal memory usage.

    **Arguments:**
    ```
    TARGET                  Path to file or directory to analyze
    ```

    **Examples:**
    ```
    # Basic file analysis
    ssf_tools analyze entropy sample.bin

    # Analyze with higher risk threshold (fewer results)
    ssf_tools analyze entropy sample.bin --risk-threshold high

    # Analyze with custom block size
    ssf_tools analyze entropy sample.bin --analysis-block-size 128

    # Override file type detection
    ssf_tools analyze entropy app.exe --force-file-type windows_pe

    # Analyze directory non-recursively
    ssf_tools analyze entropy data/ --no-recurse
    ```
    """
    try:
        # Build configuration overrides and load configs
        cli_overrides = _build_cli_overrides(
            file_block_size,
            analysis_block_size,
            step_size,
        )
        global_config = global_config_service.load_config()
        entropy_config = entropy_config_service.load_config(
            command_overrides=cli_overrides,
        )

        if global_config.output.verbose:
            rich_output.debug("Verbose mode enabled")

        # Setup analysis parameters
        risk_level = EntropyLevel[risk_threshold.upper()]
        timestamp = timestamp_service.format_filename_now()
        output_path = Path(f"entropy-analysis-{timestamp}.xlsx")

        # Discover and filter files to analyze
        files_to_analyze = _discover_and_filter_files(
            target,
            file_discovery,
            no_recurse=no_recurse,
            ignore_pattern=ignore_pattern,
            rich_output=rich_output,
        )
        if not files_to_analyze:
            return

        # Check Excel limits and warn if necessary
        step_size_val = step_size or entropy_config.analysis.step_size
        if _check_excel_limits(
            files_to_analyze,
            risk_level,
            step_size_val,
            rich_output,
        ):
            return

        # Process files
        rich_output.info(
            f"Starting entropy analysis with {risk_level.value} risk threshold",
        )
        rich_output.info(f"Output will be saved to: {output_path}")

        processing_config = ProcessingConfig(
            file_block_size=file_block_size,
            analysis_block_size=analysis_block_size,
            step_size=step_size,
            include_samples=include_samples,
        )

        context = ProcessingContext(
            analyzer=analyzer,
            entropy_config=entropy_config,
            global_config=global_config,
            rich_output=rich_output,
        )

        total_files_analyzed, total_high_risk_regions, total_time = _process_files(
            files_to_analyze,
            output_path,
            risk_level,
            context,
            processing_config,
        )

        # Report final summary
        summary = AnalysisSummary(
            total_files_analyzed=total_files_analyzed,
            total_files=len(files_to_analyze),
            total_high_risk_regions=total_high_risk_regions,
            total_time=total_time,
        )
        _report_summary(output_path, summary, rich_output)

    except Exception as e:
        rich_output.error(f"Analysis failed: {e}")
        if "global_config" in locals() and global_config.output.verbose:
            import traceback

            rich_output.error(traceback.format_exc())
        raise

kp_ssf_tools.cli.commands.analyze.credentials(target, *, recursive, file_extensions, context_lines, scan_binary, max_binary_size, credential_service=Provide[ApplicationContainer.analysis.active_credential_service], rich_output=Provide[ApplicationContainer.core.rich_output], excel_service=Provide[ApplicationContainer.core.excel_export_service], timestamp_service=Provide[ApplicationContainer.core.timestamp], global_config_service=Provide[ApplicationContainer.core.global_config_service], analysis_config_service=Provide[ApplicationContainer.core.entropy_config_service])

Detect credentials in files for PCI SSF 2.3 compliance.

Analyzes files for embedded credentials including usernames, passwords, API keys, and other sensitive information. Uses wordlists from SecLists and regex patterns to identify potential security issues.

Results are automatically exported to Excel with per-file worksheets and a summary sheet. Output filename: analyze-credentials-.xlsx

Arguments:

TARGET                  Path to file or directory to analyze

Examples:

# Basic credential detection
ssf_tools analyze credentials sample.py

# Analyze specific file types only
ssf_tools analyze credentials data/ --file-extensions .py --file-extensions .js

# Include more context around matches
ssf_tools analyze credentials config/ --context-lines 5

# Skip binary files to speed up analysis
ssf_tools analyze credentials project/ --no-scan-binary

Source code in src\kp_ssf_tools\cli\commands\analyze.py
@analyze_group.command("credentials")
@click.argument("target", type=click.Path(exists=True, path_type=Path))
@click.option(
    "--recursive/--no-recursive",
    default=True,
    help="Search directories recursively for files to analyze",
)
@click.option(
    "--file-extensions",
    multiple=True,
    help="File extensions to include (e.g., .py .js .txt). If not specified, all text files are analyzed",
)
@click.option(
    "--context-lines",
    type=int,
    default=3,
    help="Number of context lines to show around matches",
)
@click.option(
    "--scan-binary/--no-scan-binary",
    default=True,
    help="Whether to scan binary files for embedded credentials",
)
@click.option(
    "--max-binary-size",
    type=int,
    default=10,
    help="Maximum size in MB for binary files to scan",
)
@inject
def credentials(  # noqa: PLR0913
    target: Path,
    *,
    recursive: bool,
    file_extensions: tuple[str, ...],
    context_lines: int,
    scan_binary: bool,
    max_binary_size: int,
    credential_service: CredentialDetectionProtocol = Provide[
        ApplicationContainer.analysis.active_credential_service
    ],
    rich_output: RichOutputService = Provide[ApplicationContainer.core.rich_output],
    excel_service: ExcelExportService = Provide[
        ApplicationContainer.core.excel_export_service
    ],
    timestamp_service: TimestampService = Provide[ApplicationContainer.core.timestamp],
    global_config_service: ConfigurationService[GlobalConfiguration] = Provide[
        ApplicationContainer.core.global_config_service
    ],
    analysis_config_service: ConfigurationService[AnalysisConfiguration] = Provide[
        ApplicationContainer.core.entropy_config_service
    ],
) -> None:
    """
    Detect credentials in files for PCI SSF 2.3 compliance.

    Analyzes files for embedded credentials including usernames, passwords,
    API keys, and other sensitive information. Uses wordlists from SecLists
    and regex patterns to identify potential security issues.

    Results are automatically exported to Excel with per-file worksheets
    and a summary sheet. Output filename: analyze-credentials-<timestamp>.xlsx

    **Arguments:**
    ```
    TARGET                  Path to file or directory to analyze
    ```

    **Examples:**
    ```
    # Basic credential detection
    ssf_tools analyze credentials sample.py

    # Analyze specific file types only
    ssf_tools analyze credentials data/ --file-extensions .py --file-extensions .js

    # Include more context around matches
    ssf_tools analyze credentials config/ --context-lines 5

    # Skip binary files to speed up analysis
    ssf_tools analyze credentials project/ --no-scan-binary
    ```
    """
    # Load configurations
    global_config = global_config_service.load_config()
    analysis_config = analysis_config_service.load_config()

    # Set verbose mode if requested
    if global_config.output.verbose:
        rich_output.debug("Verbose mode enabled")

    # Convert file extensions to list
    extensions_list = list(file_extensions) if file_extensions else None

    try:
        # Convert the Pydantic configuration to the dict format expected by the service
        config_dict = {
            "credentials": {
                "enabled": analysis_config.credentials.enabled,
                "cache_duration_hours": analysis_config.credentials.cache_duration_hours,
                "auto_download": analysis_config.credentials.auto_download,
                "wordlist_sources": analysis_config.credentials.wordlist_sources,
            },
        }

        # Perform credential analysis
        scan_options = CredentialScanOptions(
            recursive=recursive,
            file_extensions=tuple(extensions_list) if extensions_list else (),
            context_lines=context_lines,
            scan_binary_files=scan_binary,
            max_binary_size_mb=max_binary_size,
        )

        result = credential_service.analyze_files(
            target_paths=[target],
            config=config_dict,
            options=scan_options,
        )

        # Export to Excel if results found
        if result and result.patterns:
            export_context = ExportContext(
                excel_service=excel_service,
                timestamp_service=timestamp_service,
                rich_output=rich_output,
            )
            _export_credentials_to_excel(result, export_context)

        # Display results in console
        credential_display_limit = 10
        if result and result.patterns:
            rich_output.warning(f"Found {len(result.patterns)} potential credentials")
            for pattern in result.patterns[:credential_display_limit]:  # Show first N
                rich_output.info(
                    f"  {pattern.pattern_type}: {pattern.value[:50]}... "
                    f"(line {pattern.line_start})",
                )
            if len(result.patterns) > credential_display_limit:
                rich_output.info(
                    f"  ... and {len(result.patterns) - credential_display_limit} more",
                )
            rich_output.info("Complete results have been exported to Excel")
        else:
            rich_output.success("No credentials detected in analyzed files")

    except Exception as e:
        rich_output.error(f"Credential analysis failed: {e}")
        if global_config.output.verbose:
            import traceback

            rich_output.error(traceback.format_exc())
        raise

Processing Pipeline

Both commands use a consistent file processing pattern:

kp_ssf_tools.cli.commands.analyze._process_files(files_to_analyze, output_path, risk_level, context, processing_config)

Process all files and return (files_analyzed, high_risk_regions, total_time).

Source code in src\kp_ssf_tools\cli\commands\analyze.py
def _process_files(
    files_to_analyze: list[Path],
    output_path: Path,
    risk_level: EntropyLevel,
    context: ProcessingContext,
    processing_config: ProcessingConfig,
) -> tuple[int, int, float]:
    """Process all files and return (files_analyzed, high_risk_regions, total_time)."""
    with StreamingExcelExporter(output_path, risk_level) as exporter:
        start_time = time.time()
        total_files_analyzed = 0
        total_high_risk_regions = 0

        for file_index, file_path in enumerate(files_to_analyze, 1):
            file_display = _format_file_display(file_path)
            context.rich_output.info(
                f"[{file_index}/{len(files_to_analyze)}] Analyzing: {file_display}",
            )

            try:
                # Stream analysis results directly to Excel
                total_regions, high_risk_regions = exporter.process_file_streaming(
                    file_path,
                    context.analyzer,
                    file_chunk_size=processing_config.file_block_size
                    or context.entropy_config.analysis.file_chunk_size,
                    analysis_block_size=processing_config.analysis_block_size
                    or context.entropy_config.analysis.analysis_block_size,
                    step_size=processing_config.step_size
                    or context.entropy_config.analysis.step_size,
                    include_samples=processing_config.include_samples or False,
                )

                total_files_analyzed += 1
                total_high_risk_regions += high_risk_regions

                if context.global_config.output.verbose:
                    context.rich_output.debug(
                        f"  Processed {total_regions:,} regions, "
                        f"found {high_risk_regions:,} high-risk regions",
                    )

            except Exception as e:  # noqa: BLE001
                # Broad exception catch justified: analysis may fail for any file due to I/O, format, or analyzer errors
                context.rich_output.error(f"  Failed to analyze {file_path.name}: {e}")
                if context.global_config.output.verbose:
                    import traceback

                    context.rich_output.error(traceback.format_exc())
                continue

        total_time = time.time() - start_time

        # Check if Excel limit warning should be shown
        if exporter.warned_about_limit:
            context.rich_output.warning(
                "Excel row limit was reached. Some regions may not be included. "
                "Consider using a higher --risk-threshold to reduce output.",
            )

        return total_files_analyzed, total_high_risk_regions, total_time

Data Models

Analysis Results

The module defines structured result models for type safety:

kp_ssf_tools.analyze.models.analysis.EntropyAnalysisResult

Bases: SSFToolsBaseModel

Complete analysis results for all processed files.

Source code in src\kp_ssf_tools\analyze\models\analysis.py
class EntropyAnalysisResult(SSFToolsBaseModel):
    """Complete analysis results for all processed files."""

    # Schema versioning and metadata
    schema_version: str = "1.0.0"  # Schema format version
    tool_version: str  # SSF-Tools version that generated this result
    generation_timestamp: datetime  # When the analysis was performed
    commit_hash: str | None = None  # Git commit hash if available

    # Analysis configuration and results
    input_config: EntropyInputModel
    files_analyzed: int
    total_size: int
    analysis_start: datetime
    analysis_end: datetime
    file_results: list[FileAnalysisResult]
    summary_statistics: dict[str, float]
    high_risk_findings: list[FileAnalysisResult]

kp_ssf_tools.analyze.models.analysis.CredentialAnalysisResult

Bases: SSFToolsBaseModel

Result from credential analysis containing all detected patterns.

Source code in src\kp_ssf_tools\analyze\models\analysis.py
class CredentialAnalysisResult(SSFToolsBaseModel):
    """Result from credential analysis containing all detected patterns."""

    file_path: Path = Field(..., description="Primary file path analyzed")
    patterns: list[CredentialPattern] = Field(
        default_factory=list,
        description="List of detected credential patterns",
    )
    total_patterns: int = Field(default=0, description="Total number of patterns found")
    processed_files: list[Path] = Field(
        default_factory=list,
        description="List of all files that were processed during analysis",
    )
    analysis_metadata: dict[str, str] = Field(
        default_factory=dict,
        description="Additional metadata about the analysis",
    )

Pattern Detection

Credential patterns include location and context information:

kp_ssf_tools.analyze.models.analysis.CredentialPattern

Bases: DetectedCredential

A pattern detected by credential analysis.

Source code in src\kp_ssf_tools\analyze\models\analysis.py
class CredentialPattern(DetectedCredential):
    """A pattern detected by credential analysis."""

    # File path where this pattern was detected
    file_path: Path = Field(..., description="Path to file where pattern was detected")

    # Additional fields specific to pattern-based detection
    regex_pattern: str | None = Field(
        default=None,
        description="Regex pattern used for detection",
    )
    wordlist_source: str | None = Field(
        default=None,
        description="Source wordlist used for detection",
    )

Usage Examples

Entropy Analysis

Analyze files for high-entropy regions that may indicate embedded cryptographic material:

# Basic entropy analysis
ssf_tools analyze entropy sample.bin

# Directory analysis with custom threshold
ssf_tools analyze entropy data/ --risk-threshold high

# Custom analysis parameters
ssf_tools analyze entropy large_file.exe --analysis-block-size 128 --step-size 32

Credential Detection

Detect embedded credentials using the detect-secrets backend:

# Basic credential detection
ssf_tools analyze credentials project/

# Specific file types with context
ssf_tools analyze credentials src/ --file-extensions .py --file-extensions .js --context-lines 5

# Skip binary files for faster analysis
ssf_tools analyze credentials config/ --no-scan-binary

Performance Considerations

Entropy Analysis

  • Streaming Processing: Large files processed in chunks to minimize memory usage
  • Content-Aware Thresholds: Reduce false positives through file-type-specific thresholds
  • Configurable Block Sizes: Tune analysis parameters for different file types

Credential Detection

  • External Tool Efficiency: Use detect-secrets optimized pattern matching
  • File Type Filtering: Focus analysis on relevant file types
  • Binary File Handling: Optional binary file scanning with size limits

Excel Export

  • Streaming Export: Direct-to-Excel streaming prevents memory exhaustion
  • Row Limit Management: Automatic warnings when approaching Excel limits
  • Worksheet Organization: Per-file worksheets with summary sheet

Testing Patterns

Protocol-Based Testing

The protocol-based design enables testing through mocking:

from unittest.mock import Mock
from kp_ssf_tools.analyze.services.interfaces import CredentialDetectionProtocol

def test_credential_analysis():
    # Mock the credential detection service
    mock_service = Mock(spec=CredentialDetectionProtocol)
    mock_service.analyze_files.return_value = CredentialAnalysisResult(
        file_path=Path("test.py"),
        patterns=[],
        total_patterns=0,
        processed_files=[Path("test.py")]
    )

    # Test with mocked service
    result = mock_service.analyze_files([Path("test.py")], {}, None)
    assert result.total_patterns == 0

Subprocess Testing

The subprocess integration requires careful testing with mocked external tools:

from unittest.mock import patch, MagicMock

@patch('subprocess.run')
def test_detect_secrets_integration(mock_run):
    # Mock detect-secrets output
    mock_run.return_value = MagicMock(
        returncode=0,
        stdout='{"results": {}}',
        stderr=''
    )

    service = DetectSecretsCredentialService(mock_output, mock_timestamp, mock_discovery, mock_processing)
    result = service.analyze_files([Path("test.py")], {}, CredentialScanOptions())

    # Verify subprocess called correctly
    mock_run.assert_called_once()
    assert "detect-secrets" in mock_run.call_args[0][0]

Implementation Status

The analyze module implements a security analysis solution:

  • Shannon Entropy Analysis: Content-aware threshold system with streaming Excel export
  • Credential Detection: Integration with detect-secrets for pattern detection
  • Protocol-Based Design: Architecture supporting multiple analysis types
  • CLI Integration: Command-line interface with progress feedback
  • Container Integration: Dependency injection with configuration management
  • Type Safety: Type annotation coverage with validation
  • Testing Framework: Test patterns for all components

Future Enhancements

Planned improvements include:

  • Additional Detectors: Support for custom credential detection patterns
  • Performance Optimization: Parallel processing for large directory analysis
  • Report Formats: Additional export formats beyond Excel
  • Integration APIs: Programmatic access for external tool integration